b61711cc25cf74b771506f661de904232104664d
[jalview.git] / src / jalview / ws2 / actions / AbstractPollableTask.java
1 package jalview.ws2.actions;
2
3 import java.io.IOException;
4 import java.util.Collections;
5 import java.util.List;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
11
12 import jalview.bin.Cache;
13 import jalview.bin.Console;
14 import jalview.util.ArrayUtils;
15 import jalview.util.MathUtils;
16 import jalview.ws.params.ArgumentI;
17 import jalview.ws2.actions.api.TaskEventListener;
18 import jalview.ws2.actions.api.TaskI;
19 import jalview.ws2.api.Credentials;
20 import jalview.ws2.api.JobStatus;
21 import jalview.ws2.api.WebServiceJobHandle;
22 import jalview.ws2.client.api.WebServiceClientI;
23 import jalview.ws2.helpers.DelegateJobEventListener;
24 import jalview.ws2.helpers.TaskEventSupport;
25 import static java.lang.String.format;
26
27 /**
28  * An abstract base class for non-interactive tasks which implements common
29  * tasks methods. Additionally, it manages task execution in a polling loop.
30  * Subclasses are only required to implement {@link #prepare()} and
31  * {@link #done()} methods.
32  * 
33  * @author mmwarowny
34  *
35  * @param <T>
36  *          the type of jobs managed by the task
37  * @param <R>
38  *          the type of result provided by the task
39  */
40 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
41 {
42   private final long uid = MathUtils.getUID();
43
44   protected final WebServiceClientI client;
45
46   protected final List<ArgumentI> args;
47
48   protected final Credentials credentials;
49
50   private final TaskEventSupport<R> eventHandler;
51
52   protected JobStatus taskStatus = JobStatus.CREATED;
53
54   private Future<?> future = null;
55
56   protected List<T> jobs = Collections.emptyList();
57
58   protected R result;
59
60   protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
61       Credentials credentials, TaskEventListener<R> eventListener)
62   {
63     this.client = client;
64     this.args = args;
65     this.credentials = credentials;
66     this.eventHandler = new TaskEventSupport<R>(this, eventListener);
67   }
68
69   public long getUid()
70   {
71     return uid;
72   }
73
74   /**
75    * Start the task using provided scheduled executor service. It creates a
76    * polling loop running at set intervals.
77    * 
78    * @param executor
79    *          executor to run the polling loop with
80    */
81   public void start(ScheduledExecutorService executor)
82   {
83     if (future != null)
84       throw new IllegalStateException("task already started");
85     var runnable = new Runnable()
86     {
87       private int stage = STAGE_PREPARE;
88
89       private static final int STAGE_PREPARE = 0;
90
91       private static final int STAGE_START = 1;
92
93       private static final int STAGE_POLL = 2;
94
95       private static final int STAGE_FINALIZE = 3;
96
97       private static final int STAGE_DONE = 4;
98
99       private int retryCount = 0;
100
101       private static final int MAX_RETRY = 5;
102
103       /**
104        * A polling loop run periodically which carries the task through its
105        * consecutive execution stages.
106        */
107       @Override
108       public void run()
109       {
110         if (stage == STAGE_PREPARE)
111         {
112           // first stage - the input data is collected and the jobs are created
113           try
114           {
115             jobs = prepare();
116           } catch (ServiceInputInvalidException e)
117           {
118             stage = STAGE_DONE;
119             setStatus(JobStatus.INVALID);
120             eventHandler.fireTaskException(e);
121             throw new CompletionException(e);
122           }
123           stage = STAGE_START;
124           setStatus(JobStatus.READY);
125           eventHandler.fireTaskStarted(jobs);
126           var jobListener = new DelegateJobEventListener<>(eventHandler);
127           for (var job : jobs)
128           {
129             job.addPropertyChagneListener(jobListener);
130           }
131         }
132         try
133         {
134           if (stage == STAGE_START)
135           {
136             // second stage - jobs are submitted to the server
137             startJobs();
138             stage = STAGE_POLL;
139             setStatus(JobStatus.SUBMITTED);
140           }
141           if (stage == STAGE_POLL)
142           {
143             // third stage - jobs are poolled until all of them are completed
144             if (pollJobs())
145             {
146               stage = STAGE_FINALIZE;
147             }
148             updateGlobalStatus();
149           }
150           if (stage == STAGE_FINALIZE)
151           {
152             // final stage - results are collected and stored
153             result = done();
154             eventHandler.fireTaskCompleted(result);
155             stage = STAGE_DONE;
156           }
157           retryCount = 0;
158         } catch (IOException e)
159         {
160           eventHandler.fireTaskException(e);
161           if (++retryCount > MAX_RETRY)
162           {
163             stage = STAGE_DONE;
164             cancelJobs();
165             setStatus(JobStatus.SERVER_ERROR);
166             throw new CompletionException(e);
167           }
168         }
169         if (stage == STAGE_DONE)
170         {
171           // finalization - terminating the future task
172           throw new CancellationException("task terminated");
173         }
174       }
175     };
176     if (taskStatus != JobStatus.CANCELLED)
177       future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
178   }
179
180   @Override
181   public JobStatus getStatus()
182   {
183     return taskStatus;
184   }
185
186   /**
187    * Set the status of the task and notify the event handler.
188    * 
189    * @param status
190    *          new task status
191    */
192   protected void setStatus(JobStatus status)
193   {
194     if (this.taskStatus != status)
195     {
196       this.taskStatus = status;
197       eventHandler.fireTaskStatusChanged(status);
198     }
199   }
200
201   /**
202    * Update task status according to the overall status of its jobs. The rules
203    * of setting the status are following:
204    * <ul>
205    * <li>task is invalid if all jobs are invalid</li>
206    * <li>task is completed if all but invalid jobs are completed</li>
207    * <li>task is ready, submitted or queued if at least one job is ready,
208    * submitted or queued an none proceeded to the next stage excluding
209    * completed.</li>
210    * <li>task is running if at least one job is running and none are failed or
211    * cancelled</li>
212    * <li>task is cancelled if at least one job is cancelled and none failed</li>
213    * <li>task is failed or server error if at least one job is failed or server
214    * error</li>
215    * </ul>
216    */
217   private void updateGlobalStatus()
218   {
219     int precedence = -1;
220     for (BaseJob job : jobs)
221     {
222       JobStatus status = job.getStatus();
223       int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
224       if (precedence < jobPrecedence)
225         precedence = jobPrecedence;
226     }
227     if (precedence >= 0)
228     {
229       setStatus(JobStatus.statusPrecedence[precedence]);
230     }
231   }
232
233   @Override
234   public void cancel()
235   {
236     setStatus(JobStatus.CANCELLED);
237     if (future != null)
238       future.cancel(false);
239     cancelJobs();
240   }
241
242   @Override
243   public List<? extends BaseJob> getSubJobs()
244   {
245     return jobs;
246   }
247
248   /**
249    * Collect and process input sequences for submission and return the list of
250    * jobs to be submitted.
251    * 
252    * @return list of jobs to be submitted
253    * @throws ServiceInputInvalidException
254    *           input is invalid and the task should not be started
255    */
256   protected abstract List<T> prepare() throws ServiceInputInvalidException;
257
258   /**
259    * Submit all valid jobs to the server and store their job handles.
260    * 
261    * @throws IOException
262    *           if server error occurred
263    */
264   protected void startJobs() throws IOException
265   {
266     for (BaseJob job : jobs)
267     {
268       if (job.isInputValid() && job.getStatus() == JobStatus.READY)
269       {
270         WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
271             args, credentials);
272         job.setServerJob(serverJob);
273         job.setStatus(JobStatus.SUBMITTED);
274       }
275     }
276   }
277
278   /**
279    * Poll all running jobs and update their status and logs. Polling is repeated
280    * periodically until this method return true when all jobs are done.
281    * 
282    * @return {@code true] if all jobs are done @throws IOException if server
283    *         error occurred
284    */
285   protected boolean pollJobs() throws IOException
286   {
287     boolean allDone = true;
288     for (BaseJob job : jobs)
289     {
290       if (job.isInputValid() && !job.getStatus().isDone())
291       {
292         WebServiceJobHandle serverJob = job.getServerJob();
293         job.setStatus(client.getStatus(serverJob));
294         job.setLog(client.getLog(serverJob));
295         job.setErrorLog(client.getErrorLog(serverJob));
296       }
297       allDone &= job.isCompleted();
298     }
299     return allDone;
300   }
301
302   /**
303    * Fetch and process the outputs produced by jobs and return the final result
304    * of the task. The method is called once all jobs have finished execution. If
305    * this method raises {@link IOException} it will be called again after a
306    * delay. All IO operations should happen before data processing, so
307    * potentially expensive computation is avoided in case of an error.
308    * 
309    * @return final result of the computation
310    * @throws IOException
311    *           if server error occurred
312    */
313   protected abstract R done() throws IOException;
314
315   /**
316    * Cancel all running jobs. Used in case of task failure to cleanup the
317    * resources or when the task has been cancelled.
318    */
319   protected void cancelJobs()
320   {
321     for (BaseJob job : jobs)
322     {
323       if (!job.isCompleted())
324       {
325         try
326         {
327           if (job.getServerJob() != null)
328           {
329             client.cancel(job.getServerJob());
330           }
331           job.setStatus(JobStatus.CANCELLED);
332         } catch (IOException e)
333         {
334           Console.error(format("failed to cancel job %s", job.getServerJob()), e);
335         }
336       }
337     }
338   }
339
340   @Override
341   public R getResult()
342   {
343     return result;
344   }
345
346   @Override
347   public String toString()
348   {
349     var status = taskStatus != null ? taskStatus.name() : "UNSET";
350     return String.format("%s(%x, %s)", getClass().getSimpleName(), uid, status);
351   }
352 }