JAL-3878 Add uid to WS Tasks
[jalview.git] / src / jalview / ws2 / actions / AbstractPollableTask.java
1 package jalview.ws2.actions;
2
3 import java.io.IOException;
4 import java.util.Collections;
5 import java.util.List;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
11
12 import jalview.bin.Cache;
13 import jalview.util.ArrayUtils;
14 import jalview.util.MathUtils;
15 import jalview.ws.params.ArgumentI;
16 import jalview.ws2.actions.api.TaskEventListener;
17 import jalview.ws2.actions.api.TaskI;
18 import jalview.ws2.api.Credentials;
19 import jalview.ws2.api.JobStatus;
20 import jalview.ws2.api.WebServiceJobHandle;
21 import jalview.ws2.client.api.WebServiceClientI;
22 import jalview.ws2.helpers.DelegateJobEventListener;
23 import jalview.ws2.helpers.TaskEventSupport;
24 import static java.lang.String.format;
25
26 /**
27  * An abstract base class for non-interactive tasks which implements common
28  * tasks methods. Additionally, it manages task execution in a polling loop.
29  * Subclasses are only required to implement {@link #prepare()} and
30  * {@link #done()} methods.
31  * 
32  * @author mmwarowny
33  *
34  * @param <T>
35  *          the type of jobs managed by the task
36  * @param <R>
37  *          the type of result provided by the task
38  */
39 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
40 {
41   private final long uid = MathUtils.getUID();
42
43   protected final WebServiceClientI client;
44
45   protected final List<ArgumentI> args;
46
47   protected final Credentials credentials;
48
49   private final TaskEventSupport<R> eventHandler;
50
51   protected JobStatus taskStatus = null;
52
53   private Future<?> future = null;
54
55   protected List<T> jobs = Collections.emptyList();
56
57   protected R result;
58
59   protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
60       Credentials credentials, TaskEventListener<R> eventListener)
61   {
62     this.client = client;
63     this.args = args;
64     this.credentials = credentials;
65     this.eventHandler = new TaskEventSupport<R>(this, eventListener);
66   }
67
68   public long getUid()
69   {
70     return uid;
71   }
72
73   /**
74    * Start the task using provided scheduled executor service. It creates a
75    * polling loop running at set intervals.
76    * 
77    * @param executor
78    *          executor to run the polling loop with
79    */
80   public void start(ScheduledExecutorService executor)
81   {
82     if (future != null)
83       throw new IllegalStateException("task already started");
84     var runnable = new Runnable()
85     {
86       private int stage = STAGE_PREPARE;
87
88       private static final int STAGE_PREPARE = 0;
89
90       private static final int STAGE_START = 1;
91
92       private static final int STAGE_POLL = 2;
93
94       private static final int STAGE_FINALIZE = 3;
95
96       private static final int STAGE_DONE = 4;
97
98       private int retryCount = 0;
99
100       private static final int MAX_RETRY = 5;
101
102       /**
103        * A polling loop run periodically which carries the task through its
104        * consecutive execution stages.
105        */
106       @Override
107       public void run()
108       {
109         if (stage == STAGE_PREPARE)
110         {
111           // first stage - the input data is collected and the jobs are created
112           try
113           {
114             jobs = prepare();
115           } catch (ServiceInputInvalidException e)
116           {
117             stage = STAGE_DONE;
118             setStatus(JobStatus.INVALID);
119             eventHandler.fireTaskException(e);
120             throw new CompletionException(e);
121           }
122           stage = STAGE_START;
123           setStatus(JobStatus.READY);
124           eventHandler.fireTaskStarted(jobs);
125           var jobListener = new DelegateJobEventListener<>(eventHandler);
126           for (var job : jobs)
127           {
128             job.addPropertyChagneListener(jobListener);
129           }
130         }
131         try
132         {
133           if (stage == STAGE_START)
134           {
135             // second stage - jobs are submitted to the server
136             startJobs();
137             stage = STAGE_POLL;
138             setStatus(JobStatus.SUBMITTED);
139           }
140           if (stage == STAGE_POLL)
141           {
142             // third stage - jobs are poolled until all of them are completed
143             if (pollJobs())
144             {
145               stage = STAGE_FINALIZE;
146             }
147             updateGlobalStatus();
148           }
149           if (stage == STAGE_FINALIZE)
150           {
151             // final stage - results are collected and stored
152             result = done();
153             eventHandler.fireTaskCompleted(result);
154             stage = STAGE_DONE;
155           }
156           retryCount = 0;
157         } catch (IOException e)
158         {
159           eventHandler.fireTaskException(e);
160           if (++retryCount > MAX_RETRY)
161           {
162             stage = STAGE_DONE;
163             cancelJobs();
164             setStatus(JobStatus.SERVER_ERROR);
165             throw new CompletionException(e);
166           }
167         }
168         if (stage == STAGE_DONE)
169         {
170           // finalization - terminating the future task
171           throw new CancellationException("task terminated");
172         }
173       }
174     };
175     if (taskStatus != JobStatus.CANCELLED)
176       future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
177   }
178
179   @Override
180   public JobStatus getStatus()
181   {
182     return taskStatus;
183   }
184
185   /**
186    * Set the status of the task and notify the event handler.
187    * 
188    * @param status
189    *          new task status
190    */
191   protected void setStatus(JobStatus status)
192   {
193     if (this.taskStatus != status)
194     {
195       this.taskStatus = status;
196       eventHandler.fireTaskStatusChanged(status);
197     }
198   }
199
200   /**
201    * Update task status according to the overall status of its jobs. The rules
202    * of setting the status are following:
203    * <ul>
204    * <li>task is invalid if all jobs are invalid</li>
205    * <li>task is completed if all but invalid jobs are completed</li>
206    * <li>task is ready, submitted or queued if at least one job is ready,
207    * submitted or queued an none proceeded to the next stage excluding
208    * completed.</li>
209    * <li>task is running if at least one job is running and none are failed or
210    * cancelled</li>
211    * <li>task is cancelled if at least one job is cancelled and none failed</li>
212    * <li>task is failed or server error if at least one job is failed or server
213    * error</li>
214    * </ul>
215    */
216   private void updateGlobalStatus()
217   {
218     int precedence = -1;
219     for (BaseJob job : jobs)
220     {
221       JobStatus status = job.getStatus();
222       int jobPrecedence = ArrayUtils.indexOf(statusPrecedence, status);
223       if (precedence < jobPrecedence)
224         precedence = jobPrecedence;
225     }
226     if (precedence >= 0)
227     {
228       setStatus(statusPrecedence[precedence]);
229     }
230   }
231
232   /**
233    * A precedence order of job statuses used to compute the overall task status.
234    */
235   private static JobStatus[] statusPrecedence = {
236       JobStatus.INVALID, // all must be invalid for task to be invalid
237       JobStatus.COMPLETED, // all but invalid must be completed for task to be
238                            // completed
239       JobStatus.UNKNOWN, // unknown prevents successful completion but not
240                          // running or failure
241       JobStatus.READY,
242       JobStatus.SUBMITTED,
243       JobStatus.QUEUED,
244       JobStatus.RUNNING,
245       JobStatus.CANCELLED, // if any is terminated unsuccessfully, the task is
246                            // failed
247       JobStatus.FAILED,
248       JobStatus.SERVER_ERROR
249   };
250
251   @Override
252   public void cancel()
253   {
254     setStatus(JobStatus.CANCELLED);
255     if (future != null)
256       future.cancel(false);
257     cancelJobs();
258   }
259
260   @Override
261   public List<? extends BaseJob> getSubJobs()
262   {
263     return jobs;
264   }
265
266   /**
267    * Collect and process input sequences for submission and return the list of
268    * jobs to be submitted.
269    * 
270    * @return list of jobs to be submitted
271    * @throws ServiceInputInvalidException
272    *           input is invalid and the task should not be started
273    */
274   protected abstract List<T> prepare() throws ServiceInputInvalidException;
275
276   /**
277    * Submit all valid jobs to the server and store their job handles.
278    * 
279    * @throws IOException
280    *           if server error occurred
281    */
282   protected void startJobs() throws IOException
283   {
284     for (BaseJob job : jobs)
285     {
286       if (job.isInputValid() && job.getStatus() == JobStatus.READY)
287       {
288         WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
289             args, credentials);
290         job.setServerJob(serverJob);
291         job.setStatus(JobStatus.SUBMITTED);
292       }
293     }
294   }
295
296   /**
297    * Poll all running jobs and update their status and logs. Polling is repeated
298    * periodically until this method return true when all jobs are done.
299    * 
300    * @return {@code true] if all jobs are done @throws IOException if server
301    *         error occurred
302    */
303   protected boolean pollJobs() throws IOException
304   {
305     boolean allDone = true;
306     for (BaseJob job : jobs)
307     {
308       if (job.isInputValid() && !job.getStatus().isDone())
309       {
310         WebServiceJobHandle serverJob = job.getServerJob();
311         job.setStatus(client.getStatus(serverJob));
312         job.setLog(client.getLog(serverJob));
313         job.setErrorLog(client.getErrorLog(serverJob));
314       }
315       allDone &= job.isCompleted();
316     }
317     return allDone;
318   }
319
320   /**
321    * Fetch and process the outputs produced by jobs and return the final result
322    * of the task. The method is called once all jobs have finished execution. If
323    * this method raises {@link IOException} it will be called again after a
324    * delay. All IO operations should happen before data processing, so
325    * potentially expensive computation is avoided in case of an error.
326    * 
327    * @return final result of the computation
328    * @throws IOException
329    *           if server error occurred
330    */
331   protected abstract R done() throws IOException;
332
333   /**
334    * Cancel all running jobs. Used in case of task failure to cleanup the
335    * resources or when the task has been cancelled.
336    */
337   protected void cancelJobs()
338   {
339     for (BaseJob job : jobs)
340     {
341       if (!job.isCompleted())
342       {
343         try
344         {
345           if (job.getServerJob() != null)
346           {
347             client.cancel(job.getServerJob());
348           }
349           job.setStatus(JobStatus.CANCELLED);
350         } catch (IOException e)
351         {
352           Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);
353         }
354       }
355     }
356   }
357
358   @Override
359   public R getResult()
360   {
361     return result;
362   }
363 }