JAL-3878 Fix task status precedence calculation
[jalview.git] / src / jalview / ws2 / actions / AbstractPollableTask.java
1 package jalview.ws2.actions;
2
3 import java.io.IOException;
4 import java.util.Collections;
5 import java.util.List;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
11
12 import jalview.bin.Cache;
13 import jalview.util.ArrayUtils;
14 import jalview.ws.params.ArgumentI;
15 import jalview.ws2.actions.api.TaskEventListener;
16 import jalview.ws2.actions.api.TaskI;
17 import jalview.ws2.api.Credentials;
18 import jalview.ws2.api.JobStatus;
19 import jalview.ws2.api.WebServiceJobHandle;
20 import jalview.ws2.client.api.WebServiceClientI;
21 import jalview.ws2.helpers.DelegateJobEventListener;
22 import jalview.ws2.helpers.TaskEventSupport;
23 import static java.lang.String.format;
24
25 /**
26  * An abstract base class for non-interactive tasks which implements common
27  * tasks methods. Additionally, it manages task execution in a polling loop.
28  * Subclasses are only required to implement {@link #prepare()} and
29  * {@link #done()} methods.
30  * 
31  * @author mmwarowny
32  *
33  * @param <T>
34  *          the type of jobs managed by the task
35  * @param <R>
36  *          the type of result provided by the task
37  */
38 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
39 {
40   protected final WebServiceClientI client;
41
42   protected final List<ArgumentI> args;
43
44   protected final Credentials credentials;
45
46   private final TaskEventSupport<R> eventHandler;
47
48   protected JobStatus taskStatus = null;
49
50   private Future<?> future = null;
51
52   protected List<T> jobs = Collections.emptyList();
53
54   protected R result;
55
56   protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
57       Credentials credentials, TaskEventListener<R> eventListener)
58   {
59     this.client = client;
60     this.args = args;
61     this.credentials = credentials;
62     this.eventHandler = new TaskEventSupport<R>(this, eventListener);
63   }
64
65   /**
66    * Start the task using provided scheduled executor service. It creates a
67    * polling loop running at set intervals.
68    * 
69    * @param executor
70    *          executor to run the polling loop with
71    */
72   public void start(ScheduledExecutorService executor)
73   {
74     if (future != null)
75       throw new IllegalStateException("task already started");
76     var runnable = new Runnable()
77     {
78       private int stage = STAGE_PREPARE;
79
80       private static final int STAGE_PREPARE = 0;
81
82       private static final int STAGE_START = 1;
83
84       private static final int STAGE_POLL = 2;
85
86       private static final int STAGE_FINALIZE = 3;
87
88       private static final int STAGE_DONE = 4;
89
90       private int retryCount = 0;
91
92       private static final int MAX_RETRY = 5;
93
94       /**
95        * A polling loop run periodically which carries the task through its
96        * consecutive execution stages.
97        */
98       @Override
99       public void run()
100       {
101         if (stage == STAGE_PREPARE)
102         {
103           // first stage - the input data is collected and the jobs are created
104           try
105           {
106             jobs = prepare();
107           } catch (ServiceInputInvalidException e)
108           {
109             stage = STAGE_DONE;
110             setStatus(JobStatus.INVALID);
111             eventHandler.fireTaskException(e);
112             throw new CompletionException(e);
113           }
114           stage = STAGE_START;
115           setStatus(JobStatus.READY);
116           eventHandler.fireTaskStarted(jobs);
117           var jobListener = new DelegateJobEventListener<>(eventHandler);
118           for (var job : jobs)
119           {
120             job.addPropertyChagneListener(jobListener);
121           }
122         }
123         try
124         {
125           if (stage == STAGE_START)
126           {
127             // second stage - jobs are submitted to the server
128             startJobs();
129             stage = STAGE_POLL;
130             setStatus(JobStatus.SUBMITTED);
131           }
132           if (stage == STAGE_POLL)
133           {
134             // third stage - jobs are poolled until all of them are completed
135             if (pollJobs())
136             {
137               stage = STAGE_FINALIZE;
138             }
139             updateGlobalStatus();
140           }
141           if (stage == STAGE_FINALIZE)
142           {
143             // final stage - results are collected and stored
144             result = done();
145             eventHandler.fireTaskCompleted(result);
146             stage = STAGE_DONE;
147           }
148           retryCount = 0;
149         } catch (IOException e)
150         {
151           eventHandler.fireTaskException(e);
152           if (++retryCount > MAX_RETRY)
153           {
154             stage = STAGE_DONE;
155             cancelJobs();
156             setStatus(JobStatus.SERVER_ERROR);
157             throw new CompletionException(e);
158           }
159         }
160         if (stage == STAGE_DONE)
161         {
162           // finalization - terminating the future task
163           throw new CancellationException("task terminated");
164         }
165       }
166     };
167     if (taskStatus != JobStatus.CANCELLED)
168       future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
169   }
170
171   @Override
172   public JobStatus getStatus()
173   {
174     return taskStatus;
175   }
176
177   /**
178    * Set the status of the task and notify the event handler.
179    * 
180    * @param status
181    *          new task status
182    */
183   protected void setStatus(JobStatus status)
184   {
185     if (this.taskStatus != status)
186     {
187       this.taskStatus = status;
188       eventHandler.fireTaskStatusChanged(status);
189     }
190   }
191
192   /**
193    * Update task status according to the overall status of its jobs. The rules
194    * of setting the status are following:
195    * <ul>
196    * <li>task is invalid if all jobs are invalid</li>
197    * <li>task is completed if all but invalid jobs are completed</li>
198    * <li>task is ready, submitted or queued if at least one job is ready,
199    * submitted or queued an none proceeded to the next stage excluding
200    * completed.</li>
201    * <li>task is running if at least one job is running and none are failed or
202    * cancelled</li>
203    * <li>task is cancelled if at least one job is cancelled and none failed</li>
204    * <li>task is failed or server error if at least one job is failed or server
205    * error</li>
206    * </ul>
207    */
208   private void updateGlobalStatus()
209   {
210     int precedence = -1;
211     for (BaseJob job : jobs)
212     {
213       JobStatus status = job.getStatus();
214       int jobPrecedence = ArrayUtils.indexOf(statusPrecedence, status);
215       if (precedence < jobPrecedence)
216         precedence = jobPrecedence;
217     }
218     if (precedence >= 0)
219     {
220       setStatus(statusPrecedence[precedence]);
221     }
222   }
223
224   /**
225    * A precedence order of job statuses used to compute the overall task status.
226    */
227   private static JobStatus[] statusPrecedence = {
228       JobStatus.INVALID, // all must be invalid for task to be invalid
229       JobStatus.COMPLETED, // all but invalid must be completed for task to be
230                            // completed
231       JobStatus.UNKNOWN, // unknown prevents successful completion but not
232                          // running or failure
233       JobStatus.READY,
234       JobStatus.SUBMITTED,
235       JobStatus.QUEUED,
236       JobStatus.RUNNING,
237       JobStatus.CANCELLED, // if any is terminated unsuccessfully, the task is
238                            // failed
239       JobStatus.FAILED,
240       JobStatus.SERVER_ERROR
241   };
242
243   @Override
244   public void cancel()
245   {
246     setStatus(JobStatus.CANCELLED);
247     if (future != null)
248       future.cancel(false);
249     cancelJobs();
250   }
251
252   @Override
253   public List<? extends BaseJob> getSubJobs()
254   {
255     return jobs;
256   }
257
258   /**
259    * Collect and process input sequences for submission and return the list of
260    * jobs to be submitted.
261    * 
262    * @return list of jobs to be submitted
263    * @throws ServiceInputInvalidException
264    *           input is invalid and the task should not be started
265    */
266   protected abstract List<T> prepare() throws ServiceInputInvalidException;
267
268   /**
269    * Submit all valid jobs to the server and store their job handles.
270    * 
271    * @throws IOException
272    *           if server error occurred
273    */
274   protected void startJobs() throws IOException
275   {
276     for (BaseJob job : jobs)
277     {
278       if (job.isInputValid() && job.getStatus() == JobStatus.READY)
279       {
280         WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
281             args, credentials);
282         job.setServerJob(serverJob);
283         job.setStatus(JobStatus.SUBMITTED);
284       }
285     }
286   }
287
288   /**
289    * Poll all running jobs and update their status and logs. Polling is repeated
290    * periodically until this method return true when all jobs are done.
291    * 
292    * @return {@code true] if all jobs are done @throws IOException if server
293    *         error occurred
294    */
295   protected boolean pollJobs() throws IOException
296   {
297     boolean allDone = true;
298     for (BaseJob job : jobs)
299     {
300       if (job.isInputValid() && !job.getStatus().isDone())
301       {
302         WebServiceJobHandle serverJob = job.getServerJob();
303         job.setStatus(client.getStatus(serverJob));
304         job.setLog(client.getLog(serverJob));
305         job.setErrorLog(client.getErrorLog(serverJob));
306       }
307       allDone &= job.isCompleted();
308     }
309     return allDone;
310   }
311
312   /**
313    * Fetch and process the outputs produced by jobs and return the final result
314    * of the task. The method is called once all jobs have finished execution. If
315    * this method raises {@link IOException} it will be called again after a
316    * delay. All IO operations should happen before data processing, so
317    * potentially expensive computation is avoided in case of an error.
318    * 
319    * @return final result of the computation
320    * @throws IOException
321    *           if server error occurred
322    */
323   protected abstract R done() throws IOException;
324
325   /**
326    * Cancel all running jobs. Used in case of task failure to cleanup the
327    * resources or when the task has been cancelled.
328    */
329   protected void cancelJobs()
330   {
331     for (BaseJob job : jobs)
332     {
333       if (!job.isCompleted())
334       {
335         try
336         {
337           if (job.getServerJob() != null)
338           {
339             client.cancel(job.getServerJob());
340           }
341           job.setStatus(JobStatus.CANCELLED);
342         } catch (IOException e)
343         {
344           Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);
345         }
346       }
347     }
348   }
349
350   @Override
351   public R getResult()
352   {
353     return result;
354   }
355 }