JAL-3878 Create abstract base classes for non-interactive tasks
[jalview.git] / src / jalview / ws2 / actions / AbstractPollableTask.java
1 package jalview.ws2.actions;
2
3 import java.io.IOException;
4 import java.util.Collections;
5 import java.util.List;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
11
12 import jalview.bin.Cache;
13 import jalview.ws.params.ArgumentI;
14 import jalview.ws2.actions.api.TaskEventListener;
15 import jalview.ws2.actions.api.TaskI;
16 import jalview.ws2.api.Credentials;
17 import jalview.ws2.api.JobStatus;
18 import jalview.ws2.api.WebServiceJobHandle;
19 import jalview.ws2.client.api.WebServiceClientI;
20 import jalview.ws2.helpers.DelegateJobEventListener;
21 import jalview.ws2.helpers.TaskEventSupport;
22 import static java.lang.String.format;
23
24 /**
25  * An abstract base class for non-interactive tasks which implements common
26  * tasks methods. Additionally, it manages task execution in a polling loop.
27  * Subclasses are only required to implement {@link #prepare()} and
28  * {@link #done()} methods.
29  * 
30  * @author mmwarowny
31  *
32  * @param <T>
33  *          the type of jobs managed by the task
34  * @param <R>
35  *          the type of result provided by the task
36  */
37 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
38 {
39   protected final WebServiceClientI client;
40
41   protected final List<ArgumentI> args;
42
43   protected final Credentials credentials;
44
45   private final TaskEventSupport<R> eventHandler;
46
47   protected JobStatus taskStatus = null;
48
49   private Future<?> future = null;
50
51   protected List<T> jobs = Collections.emptyList();
52
53   protected R result;
54
55   protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
56       Credentials credentials, TaskEventListener<R> eventListener)
57   {
58     this.client = client;
59     this.args = args;
60     this.credentials = credentials;
61     this.eventHandler = new TaskEventSupport<R>(this, eventListener);
62   }
63
64   /**
65    * Start the task using provided scheduled executor service. It creates a
66    * polling loop running at set intervals.
67    * 
68    * @param executor
69    *          executor to run the polling loop with
70    */
71   public void start(ScheduledExecutorService executor)
72   {
73     if (future != null)
74       throw new IllegalStateException("task already started");
75     var runnable = new Runnable()
76     {
77       private int stage = STAGE_PREPARE;
78
79       private static final int STAGE_PREPARE = 0;
80
81       private static final int STAGE_START = 1;
82
83       private static final int STAGE_POLL = 2;
84
85       private static final int STAGE_FINALIZE = 3;
86
87       private static final int STAGE_DONE = 4;
88
89       private int retryCount = 0;
90
91       private static final int MAX_RETRY = 5;
92
93       /**
94        * A polling loop run periodically which carries the task through its
95        * consecutive execution stages.
96        */
97       @Override
98       public void run()
99       {
100         if (stage == STAGE_PREPARE)
101         {
102           // first stage - the input data is collected and the jobs are created
103           try
104           {
105             jobs = prepare();
106           } catch (ServiceInputInvalidException e)
107           {
108             stage = STAGE_DONE;
109             setStatus(JobStatus.INVALID);
110             eventHandler.fireTaskException(e);
111             throw new CompletionException(e);
112           }
113           stage = STAGE_START;
114           setStatus(JobStatus.READY);
115           eventHandler.fireTaskStarted(jobs);
116           var jobListener = new DelegateJobEventListener<>(eventHandler);
117           for (var job : jobs)
118           {
119             job.addPropertyChagneListener(jobListener);
120           }
121         }
122         try
123         {
124           if (stage == STAGE_START)
125           {
126             // second stage - jobs are submitted to the server
127             startJobs();
128             stage = STAGE_POLL;
129             setStatus(JobStatus.SUBMITTED);
130           }
131           if (stage == STAGE_POLL)
132           {
133             // third stage - jobs are poolled until all of them are completed
134             if (pollJobs())
135             {
136               stage = STAGE_FINALIZE;
137             }
138             updateGlobalStatus();
139           }
140           if (stage == STAGE_FINALIZE)
141           {
142             // final stage - results are collected and stored
143             result = done();
144             eventHandler.fireTaskCompleted(result);
145             stage = STAGE_DONE;
146           }
147           retryCount = 0;
148         } catch (IOException e)
149         {
150           eventHandler.fireTaskException(e);
151           if (++retryCount > MAX_RETRY)
152           {
153             stage = STAGE_DONE;
154             cancelJobs();
155             setStatus(JobStatus.SERVER_ERROR);
156             throw new CompletionException(e);
157           }
158         }
159         if (stage == STAGE_DONE)
160         {
161           // finalization - terminating the future task
162           throw new CancellationException("task terminated");
163         }
164       }
165     };
166     if (taskStatus != JobStatus.CANCELLED)
167       future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
168   }
169
170   @Override
171   public JobStatus getStatus()
172   {
173     return taskStatus;
174   }
175
176   /**
177    * Set the status of the task and notify the event handler.
178    * 
179    * @param status
180    *          new task status
181    */
182   protected void setStatus(JobStatus status)
183   {
184     if (this.taskStatus != status)
185     {
186       this.taskStatus = status;
187       eventHandler.fireTaskStatusChanged(status);
188     }
189   }
190
191   /**
192    * Update task status according to the overall status of its jobs. The rules
193    * of setting the status are following:
194    * <ul>
195    * <li>task is invalid if all jobs are invalid</li>
196    * <li>task is completed if all but invalid jobs are completed</li>
197    * <li>task is ready, submitted or queued if at least one job is ready,
198    * submitted or queued an none proceeded to the next stage excluding
199    * completed.</li>
200    * <li>task is running if at least one job is running and none are failed or
201    * cancelled</li>
202    * <li>task is cancelled if at least one job is cancelled and none failed</li>
203    * <li>task is failed or server error if at least one job is failed or server
204    * error</li>
205    * </ul>
206    */
207   private void updateGlobalStatus()
208   {
209     JobStatus newStatus = taskStatus;
210     int currentPrecedence = getPrecedenceOf(newStatus);
211     for (BaseJob job : jobs)
212     {
213       JobStatus status = job.getStatus();
214       int jobPrecedence = getPrecedenceOf(status);
215       if (currentPrecedence < jobPrecedence)
216       {
217         currentPrecedence = jobPrecedence;
218         newStatus = status;
219       }
220     }
221     setStatus(newStatus);
222   }
223
224   /**
225    * A precedence order of job statuses used to compute the overall task status.
226    */
227   private static JobStatus[] statusPrecedence = {
228       JobStatus.INVALID, // all must be invalid for task to be invalid
229       JobStatus.COMPLETED, // all but invalid must be completed for task to be
230                            // completed
231       JobStatus.UNKNOWN, // unknown prevents successful completion but not
232                          // running or failure
233       JobStatus.READY,
234       JobStatus.SUBMITTED,
235       JobStatus.QUEUED,
236       JobStatus.RUNNING,
237       JobStatus.CANCELLED, // if any is terminated unsuccessfully, the task is
238                            // failed
239       JobStatus.FAILED,
240       JobStatus.SERVER_ERROR
241   };
242
243   /**
244    * @param status
245    *          status to find the precedence of
246    * @return precedence of the status
247    */
248   private static int getPrecedenceOf(JobStatus status)
249   {
250     final int len = statusPrecedence.length;
251     for (int i = 0; i < len; i++)
252     {
253       if (statusPrecedence[i] == status)
254       {
255         return i;
256       }
257     }
258     return -1;
259   }
260
261   @Override
262   public void cancel()
263   {
264     setStatus(JobStatus.CANCELLED);
265     if (future != null)
266       future.cancel(false);
267     cancelJobs();
268   }
269
270   @Override
271   public List<? extends BaseJob> getSubJobs()
272   {
273     return jobs;
274   }
275
276   /**
277    * Collect and process input sequences for submission and return the list of
278    * jobs to be submitted.
279    * 
280    * @return list of jobs to be submitted
281    * @throws ServiceInputInvalidException
282    *           input is invalid and the task should not be started
283    */
284   protected abstract List<T> prepare() throws ServiceInputInvalidException;
285
286   /**
287    * Submit all valid jobs to the server and store their job handles.
288    * 
289    * @throws IOException
290    *           if server error occurred
291    */
292   protected void startJobs() throws IOException
293   {
294     for (BaseJob job : jobs)
295     {
296       if (job.isInputValid() && job.getStatus() == JobStatus.READY)
297       {
298         WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
299             args, credentials);
300         job.setServerJob(serverJob);
301         job.setStatus(JobStatus.SUBMITTED);
302       }
303     }
304   }
305
306   /**
307    * Poll all running jobs and update their status and logs. Polling is repeated
308    * periodically until this method return true when all jobs are done.
309    * 
310    * @return {@code true] if all jobs are done @throws IOException if server
311    *         error occurred
312    */
313   protected boolean pollJobs() throws IOException
314   {
315     boolean allDone = true;
316     for (BaseJob job : jobs)
317     {
318       if (job.isInputValid() && !job.getStatus().isDone())
319       {
320         WebServiceJobHandle serverJob = job.getServerJob();
321         job.setStatus(client.getStatus(serverJob));
322         job.setLog(client.getLog(serverJob));
323         job.setErrorLog(client.getErrorLog(serverJob));
324       }
325       allDone &= job.isCompleted();
326     }
327     return allDone;
328   }
329
330   /**
331    * Fetch and process the outputs produced by jobs and return the final result
332    * of the task. The method is called once all jobs have finished execution. If
333    * this method raises {@link IOException} it will be called again after a
334    * delay. All IO operations should happen before data processing, so
335    * potentially expensive computation is avoided in case of an error.
336    * 
337    * @return final result of the computation
338    * @throws IOException
339    *           if server error occurred
340    */
341   protected abstract R done() throws IOException;
342
343   /**
344    * Cancel all running jobs. Used in case of task failure to cleanup the
345    * resources or when the task has been cancelled.
346    */
347   protected void cancelJobs()
348   {
349     for (BaseJob job : jobs)
350     {
351       if (!job.isCompleted())
352       {
353         try
354         {
355           if (job.getServerJob() != null)
356           {
357             client.cancel(job.getServerJob());
358           }
359           job.setStatus(JobStatus.CANCELLED);
360         } catch (IOException e)
361         {
362           Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);
363         }
364       }
365     }
366   }
367
368   @Override
369   public R getResult()
370   {
371     return result;
372   }
373 }