1e57556b2b8733696d817b736865a90ff9a9c2fc
[jalview.git] / src / jalview / ws2 / actions / AbstractPollableTask.java
1 package jalview.ws2.actions;
2
3 import java.io.IOException;
4 import java.util.Collections;
5 import java.util.List;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
11
12 import jalview.bin.Cache;
13 import jalview.util.ArrayUtils;
14 import jalview.ws.params.ArgumentI;
15 import jalview.ws2.actions.api.TaskEventListener;
16 import jalview.ws2.actions.api.TaskI;
17 import jalview.ws2.api.Credentials;
18 import jalview.ws2.api.JobStatus;
19 import jalview.ws2.api.WebServiceJobHandle;
20 import jalview.ws2.client.api.WebServiceClientI;
21 import jalview.ws2.helpers.DelegateJobEventListener;
22 import jalview.ws2.helpers.TaskEventSupport;
23 import static java.lang.String.format;
24
25 /**
26  * An abstract base class for non-interactive tasks which implements common
27  * tasks methods. Additionally, it manages task execution in a polling loop.
28  * Subclasses are only required to implement {@link #prepare()} and
29  * {@link #done()} methods.
30  * 
31  * @author mmwarowny
32  *
33  * @param <T>
34  *          the type of jobs managed by the task
35  * @param <R>
36  *          the type of result provided by the task
37  */
38 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
39 {
40   protected final WebServiceClientI client;
41
42   protected final List<ArgumentI> args;
43
44   protected final Credentials credentials;
45
46   private final TaskEventSupport<R> eventHandler;
47
48   protected JobStatus taskStatus = null;
49
50   private Future<?> future = null;
51
52   protected List<T> jobs = Collections.emptyList();
53
54   protected R result;
55
56   protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
57       Credentials credentials, TaskEventListener<R> eventListener)
58   {
59     this.client = client;
60     this.args = args;
61     this.credentials = credentials;
62     this.eventHandler = new TaskEventSupport<R>(this, eventListener);
63   }
64
65   /**
66    * Start the task using provided scheduled executor service. It creates a
67    * polling loop running at set intervals.
68    * 
69    * @param executor
70    *          executor to run the polling loop with
71    */
72   public void start(ScheduledExecutorService executor)
73   {
74     if (future != null)
75       throw new IllegalStateException("task already started");
76     var runnable = new Runnable()
77     {
78       private int stage = STAGE_PREPARE;
79
80       private static final int STAGE_PREPARE = 0;
81
82       private static final int STAGE_START = 1;
83
84       private static final int STAGE_POLL = 2;
85
86       private static final int STAGE_FINALIZE = 3;
87
88       private static final int STAGE_DONE = 4;
89
90       private int retryCount = 0;
91
92       private static final int MAX_RETRY = 5;
93
94       /**
95        * A polling loop run periodically which carries the task through its
96        * consecutive execution stages.
97        */
98       @Override
99       public void run()
100       {
101         if (stage == STAGE_PREPARE)
102         {
103           // first stage - the input data is collected and the jobs are created
104           try
105           {
106             jobs = prepare();
107           } catch (ServiceInputInvalidException e)
108           {
109             stage = STAGE_DONE;
110             setStatus(JobStatus.INVALID);
111             eventHandler.fireTaskException(e);
112             throw new CompletionException(e);
113           }
114           stage = STAGE_START;
115           setStatus(JobStatus.READY);
116           eventHandler.fireTaskStarted(jobs);
117           var jobListener = new DelegateJobEventListener<>(eventHandler);
118           for (var job : jobs)
119           {
120             job.addPropertyChagneListener(jobListener);
121           }
122         }
123         try
124         {
125           if (stage == STAGE_START)
126           {
127             // second stage - jobs are submitted to the server
128             startJobs();
129             stage = STAGE_POLL;
130             setStatus(JobStatus.SUBMITTED);
131           }
132           if (stage == STAGE_POLL)
133           {
134             // third stage - jobs are poolled until all of them are completed
135             if (pollJobs())
136             {
137               stage = STAGE_FINALIZE;
138             }
139             updateGlobalStatus();
140           }
141           if (stage == STAGE_FINALIZE)
142           {
143             // final stage - results are collected and stored
144             result = done();
145             eventHandler.fireTaskCompleted(result);
146             stage = STAGE_DONE;
147           }
148           retryCount = 0;
149         } catch (IOException e)
150         {
151           eventHandler.fireTaskException(e);
152           if (++retryCount > MAX_RETRY)
153           {
154             stage = STAGE_DONE;
155             cancelJobs();
156             setStatus(JobStatus.SERVER_ERROR);
157             throw new CompletionException(e);
158           }
159         }
160         if (stage == STAGE_DONE)
161         {
162           // finalization - terminating the future task
163           throw new CancellationException("task terminated");
164         }
165       }
166     };
167     if (taskStatus != JobStatus.CANCELLED)
168       future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
169   }
170
171   @Override
172   public JobStatus getStatus()
173   {
174     return taskStatus;
175   }
176
177   /**
178    * Set the status of the task and notify the event handler.
179    * 
180    * @param status
181    *          new task status
182    */
183   protected void setStatus(JobStatus status)
184   {
185     if (this.taskStatus != status)
186     {
187       this.taskStatus = status;
188       eventHandler.fireTaskStatusChanged(status);
189     }
190   }
191
192   /**
193    * Update task status according to the overall status of its jobs. The rules
194    * of setting the status are following:
195    * <ul>
196    * <li>task is invalid if all jobs are invalid</li>
197    * <li>task is completed if all but invalid jobs are completed</li>
198    * <li>task is ready, submitted or queued if at least one job is ready,
199    * submitted or queued an none proceeded to the next stage excluding
200    * completed.</li>
201    * <li>task is running if at least one job is running and none are failed or
202    * cancelled</li>
203    * <li>task is cancelled if at least one job is cancelled and none failed</li>
204    * <li>task is failed or server error if at least one job is failed or server
205    * error</li>
206    * </ul>
207    */
208   private void updateGlobalStatus()
209   {
210     JobStatus newStatus = taskStatus;
211     int currentPrecedence = ArrayUtils.indexOf(statusPrecedence, newStatus);
212     for (BaseJob job : jobs)
213     {
214       JobStatus status = job.getStatus();
215       int jobPrecedence = ArrayUtils.indexOf(statusPrecedence, status);
216       if (currentPrecedence < jobPrecedence)
217       {
218         currentPrecedence = jobPrecedence;
219         newStatus = status;
220       }
221     }
222     setStatus(newStatus);
223   }
224
225   /**
226    * A precedence order of job statuses used to compute the overall task status.
227    */
228   private static JobStatus[] statusPrecedence = {
229       JobStatus.INVALID, // all must be invalid for task to be invalid
230       JobStatus.COMPLETED, // all but invalid must be completed for task to be
231                            // completed
232       JobStatus.UNKNOWN, // unknown prevents successful completion but not
233                          // running or failure
234       JobStatus.READY,
235       JobStatus.SUBMITTED,
236       JobStatus.QUEUED,
237       JobStatus.RUNNING,
238       JobStatus.CANCELLED, // if any is terminated unsuccessfully, the task is
239                            // failed
240       JobStatus.FAILED,
241       JobStatus.SERVER_ERROR
242   };
243
244   @Override
245   public void cancel()
246   {
247     setStatus(JobStatus.CANCELLED);
248     if (future != null)
249       future.cancel(false);
250     cancelJobs();
251   }
252
253   @Override
254   public List<? extends BaseJob> getSubJobs()
255   {
256     return jobs;
257   }
258
259   /**
260    * Collect and process input sequences for submission and return the list of
261    * jobs to be submitted.
262    * 
263    * @return list of jobs to be submitted
264    * @throws ServiceInputInvalidException
265    *           input is invalid and the task should not be started
266    */
267   protected abstract List<T> prepare() throws ServiceInputInvalidException;
268
269   /**
270    * Submit all valid jobs to the server and store their job handles.
271    * 
272    * @throws IOException
273    *           if server error occurred
274    */
275   protected void startJobs() throws IOException
276   {
277     for (BaseJob job : jobs)
278     {
279       if (job.isInputValid() && job.getStatus() == JobStatus.READY)
280       {
281         WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
282             args, credentials);
283         job.setServerJob(serverJob);
284         job.setStatus(JobStatus.SUBMITTED);
285       }
286     }
287   }
288
289   /**
290    * Poll all running jobs and update their status and logs. Polling is repeated
291    * periodically until this method return true when all jobs are done.
292    * 
293    * @return {@code true] if all jobs are done @throws IOException if server
294    *         error occurred
295    */
296   protected boolean pollJobs() throws IOException
297   {
298     boolean allDone = true;
299     for (BaseJob job : jobs)
300     {
301       if (job.isInputValid() && !job.getStatus().isDone())
302       {
303         WebServiceJobHandle serverJob = job.getServerJob();
304         job.setStatus(client.getStatus(serverJob));
305         job.setLog(client.getLog(serverJob));
306         job.setErrorLog(client.getErrorLog(serverJob));
307       }
308       allDone &= job.isCompleted();
309     }
310     return allDone;
311   }
312
313   /**
314    * Fetch and process the outputs produced by jobs and return the final result
315    * of the task. The method is called once all jobs have finished execution. If
316    * this method raises {@link IOException} it will be called again after a
317    * delay. All IO operations should happen before data processing, so
318    * potentially expensive computation is avoided in case of an error.
319    * 
320    * @return final result of the computation
321    * @throws IOException
322    *           if server error occurred
323    */
324   protected abstract R done() throws IOException;
325
326   /**
327    * Cancel all running jobs. Used in case of task failure to cleanup the
328    * resources or when the task has been cancelled.
329    */
330   protected void cancelJobs()
331   {
332     for (BaseJob job : jobs)
333     {
334       if (!job.isCompleted())
335       {
336         try
337         {
338           if (job.getServerJob() != null)
339           {
340             client.cancel(job.getServerJob());
341           }
342           job.setStatus(JobStatus.CANCELLED);
343         } catch (IOException e)
344         {
345           Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);
346         }
347       }
348     }
349   }
350
351   @Override
352   public R getResult()
353   {
354     return result;
355   }
356 }