JAL-3878 Move status precedence list to JobStatus class
[jalview.git] / src / jalview / ws2 / actions / AbstractPollableTask.java
1 package jalview.ws2.actions;
2
3 import java.io.IOException;
4 import java.util.Collections;
5 import java.util.List;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
11
12 import jalview.bin.Cache;
13 import jalview.util.ArrayUtils;
14 import jalview.util.MathUtils;
15 import jalview.ws.params.ArgumentI;
16 import jalview.ws2.actions.api.TaskEventListener;
17 import jalview.ws2.actions.api.TaskI;
18 import jalview.ws2.api.Credentials;
19 import jalview.ws2.api.JobStatus;
20 import jalview.ws2.api.WebServiceJobHandle;
21 import jalview.ws2.client.api.WebServiceClientI;
22 import jalview.ws2.helpers.DelegateJobEventListener;
23 import jalview.ws2.helpers.TaskEventSupport;
24 import static java.lang.String.format;
25
26 /**
27  * An abstract base class for non-interactive tasks which implements common
28  * tasks methods. Additionally, it manages task execution in a polling loop.
29  * Subclasses are only required to implement {@link #prepare()} and
30  * {@link #done()} methods.
31  * 
32  * @author mmwarowny
33  *
34  * @param <T>
35  *          the type of jobs managed by the task
36  * @param <R>
37  *          the type of result provided by the task
38  */
39 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
40 {
41   private final long uid = MathUtils.getUID();
42
43   protected final WebServiceClientI client;
44
45   protected final List<ArgumentI> args;
46
47   protected final Credentials credentials;
48
49   private final TaskEventSupport<R> eventHandler;
50
51   protected JobStatus taskStatus = null;
52
53   private Future<?> future = null;
54
55   protected List<T> jobs = Collections.emptyList();
56
57   protected R result;
58
59   protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
60       Credentials credentials, TaskEventListener<R> eventListener)
61   {
62     this.client = client;
63     this.args = args;
64     this.credentials = credentials;
65     this.eventHandler = new TaskEventSupport<R>(this, eventListener);
66   }
67
68   public long getUid()
69   {
70     return uid;
71   }
72
73   /**
74    * Start the task using provided scheduled executor service. It creates a
75    * polling loop running at set intervals.
76    * 
77    * @param executor
78    *          executor to run the polling loop with
79    */
80   public void start(ScheduledExecutorService executor)
81   {
82     if (future != null)
83       throw new IllegalStateException("task already started");
84     var runnable = new Runnable()
85     {
86       private int stage = STAGE_PREPARE;
87
88       private static final int STAGE_PREPARE = 0;
89
90       private static final int STAGE_START = 1;
91
92       private static final int STAGE_POLL = 2;
93
94       private static final int STAGE_FINALIZE = 3;
95
96       private static final int STAGE_DONE = 4;
97
98       private int retryCount = 0;
99
100       private static final int MAX_RETRY = 5;
101
102       /**
103        * A polling loop run periodically which carries the task through its
104        * consecutive execution stages.
105        */
106       @Override
107       public void run()
108       {
109         if (stage == STAGE_PREPARE)
110         {
111           // first stage - the input data is collected and the jobs are created
112           try
113           {
114             jobs = prepare();
115           } catch (ServiceInputInvalidException e)
116           {
117             stage = STAGE_DONE;
118             setStatus(JobStatus.INVALID);
119             eventHandler.fireTaskException(e);
120             throw new CompletionException(e);
121           }
122           stage = STAGE_START;
123           setStatus(JobStatus.READY);
124           eventHandler.fireTaskStarted(jobs);
125           var jobListener = new DelegateJobEventListener<>(eventHandler);
126           for (var job : jobs)
127           {
128             job.addPropertyChagneListener(jobListener);
129           }
130         }
131         try
132         {
133           if (stage == STAGE_START)
134           {
135             // second stage - jobs are submitted to the server
136             startJobs();
137             stage = STAGE_POLL;
138             setStatus(JobStatus.SUBMITTED);
139           }
140           if (stage == STAGE_POLL)
141           {
142             // third stage - jobs are poolled until all of them are completed
143             if (pollJobs())
144             {
145               stage = STAGE_FINALIZE;
146             }
147             updateGlobalStatus();
148           }
149           if (stage == STAGE_FINALIZE)
150           {
151             // final stage - results are collected and stored
152             result = done();
153             eventHandler.fireTaskCompleted(result);
154             stage = STAGE_DONE;
155           }
156           retryCount = 0;
157         } catch (IOException e)
158         {
159           eventHandler.fireTaskException(e);
160           if (++retryCount > MAX_RETRY)
161           {
162             stage = STAGE_DONE;
163             cancelJobs();
164             setStatus(JobStatus.SERVER_ERROR);
165             throw new CompletionException(e);
166           }
167         }
168         if (stage == STAGE_DONE)
169         {
170           // finalization - terminating the future task
171           throw new CancellationException("task terminated");
172         }
173       }
174     };
175     if (taskStatus != JobStatus.CANCELLED)
176       future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
177   }
178
179   @Override
180   public JobStatus getStatus()
181   {
182     return taskStatus;
183   }
184
185   /**
186    * Set the status of the task and notify the event handler.
187    * 
188    * @param status
189    *          new task status
190    */
191   protected void setStatus(JobStatus status)
192   {
193     if (this.taskStatus != status)
194     {
195       this.taskStatus = status;
196       eventHandler.fireTaskStatusChanged(status);
197     }
198   }
199
200   /**
201    * Update task status according to the overall status of its jobs. The rules
202    * of setting the status are following:
203    * <ul>
204    * <li>task is invalid if all jobs are invalid</li>
205    * <li>task is completed if all but invalid jobs are completed</li>
206    * <li>task is ready, submitted or queued if at least one job is ready,
207    * submitted or queued an none proceeded to the next stage excluding
208    * completed.</li>
209    * <li>task is running if at least one job is running and none are failed or
210    * cancelled</li>
211    * <li>task is cancelled if at least one job is cancelled and none failed</li>
212    * <li>task is failed or server error if at least one job is failed or server
213    * error</li>
214    * </ul>
215    */
216   private void updateGlobalStatus()
217   {
218     int precedence = -1;
219     for (BaseJob job : jobs)
220     {
221       JobStatus status = job.getStatus();
222       int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
223       if (precedence < jobPrecedence)
224         precedence = jobPrecedence;
225     }
226     if (precedence >= 0)
227     {
228       setStatus(JobStatus.statusPrecedence[precedence]);
229     }
230   }
231
232   @Override
233   public void cancel()
234   {
235     setStatus(JobStatus.CANCELLED);
236     if (future != null)
237       future.cancel(false);
238     cancelJobs();
239   }
240
241   @Override
242   public List<? extends BaseJob> getSubJobs()
243   {
244     return jobs;
245   }
246
247   /**
248    * Collect and process input sequences for submission and return the list of
249    * jobs to be submitted.
250    * 
251    * @return list of jobs to be submitted
252    * @throws ServiceInputInvalidException
253    *           input is invalid and the task should not be started
254    */
255   protected abstract List<T> prepare() throws ServiceInputInvalidException;
256
257   /**
258    * Submit all valid jobs to the server and store their job handles.
259    * 
260    * @throws IOException
261    *           if server error occurred
262    */
263   protected void startJobs() throws IOException
264   {
265     for (BaseJob job : jobs)
266     {
267       if (job.isInputValid() && job.getStatus() == JobStatus.READY)
268       {
269         WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
270             args, credentials);
271         job.setServerJob(serverJob);
272         job.setStatus(JobStatus.SUBMITTED);
273       }
274     }
275   }
276
277   /**
278    * Poll all running jobs and update their status and logs. Polling is repeated
279    * periodically until this method return true when all jobs are done.
280    * 
281    * @return {@code true] if all jobs are done @throws IOException if server
282    *         error occurred
283    */
284   protected boolean pollJobs() throws IOException
285   {
286     boolean allDone = true;
287     for (BaseJob job : jobs)
288     {
289       if (job.isInputValid() && !job.getStatus().isDone())
290       {
291         WebServiceJobHandle serverJob = job.getServerJob();
292         job.setStatus(client.getStatus(serverJob));
293         job.setLog(client.getLog(serverJob));
294         job.setErrorLog(client.getErrorLog(serverJob));
295       }
296       allDone &= job.isCompleted();
297     }
298     return allDone;
299   }
300
301   /**
302    * Fetch and process the outputs produced by jobs and return the final result
303    * of the task. The method is called once all jobs have finished execution. If
304    * this method raises {@link IOException} it will be called again after a
305    * delay. All IO operations should happen before data processing, so
306    * potentially expensive computation is avoided in case of an error.
307    * 
308    * @return final result of the computation
309    * @throws IOException
310    *           if server error occurred
311    */
312   protected abstract R done() throws IOException;
313
314   /**
315    * Cancel all running jobs. Used in case of task failure to cleanup the
316    * resources or when the task has been cancelled.
317    */
318   protected void cancelJobs()
319   {
320     for (BaseJob job : jobs)
321     {
322       if (!job.isCompleted())
323       {
324         try
325         {
326           if (job.getServerJob() != null)
327           {
328             client.cancel(job.getServerJob());
329           }
330           job.setStatus(JobStatus.CANCELLED);
331         } catch (IOException e)
332         {
333           Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);
334         }
335       }
336     }
337   }
338
339   @Override
340   public R getResult()
341   {
342     return result;
343   }
344 }