b5a51588eb6c022a87919a42d226181aaf92ed7f
[jalview.git] / src / jalview / ws2 / actions / BaseTask.java
1 package jalview.ws2.actions;
2
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.Collections;
6 import java.util.List;
7 import java.util.Objects;
8
9 import jalview.bin.Console;
10 import jalview.util.ArrayUtils;
11 import jalview.util.MathUtils;
12 import jalview.ws.params.ArgumentI;
13 import jalview.ws2.actions.api.TaskEventListener;
14 import jalview.ws2.actions.api.TaskI;
15 import jalview.ws2.api.Credentials;
16 import jalview.ws2.api.JobStatus;
17 import jalview.ws2.client.api.WebServiceClientI;
18 import jalview.ws2.helpers.DelegateJobEventListener;
19 import jalview.ws2.helpers.TaskEventSupport;
20
21 import static java.lang.String.format;
22
23 public abstract class BaseTask<T extends BaseJob, R> implements TaskI<R>
24 {
25   protected final long uid = MathUtils.getUID();
26
27   protected final WebServiceClientI webClient;
28
29   protected final List<ArgumentI> args;
30
31   protected final Credentials credentials;
32
33   private final TaskEventSupport<R> eventHandler;
34
35   protected JobStatus status = JobStatus.CREATED;
36
37   protected List<T> jobs = Collections.emptyList();
38
39   protected R result = null;
40
41   protected Runnable cancelAction = () -> {
42   };
43
44   protected BaseTask(WebServiceClientI webClient, List<ArgumentI> args,
45       Credentials credentials)
46   {
47     this.webClient = webClient;
48     this.args = args;
49     this.credentials = credentials;
50     this.eventHandler = new TaskEventSupport<>(this);
51   }
52
53   @Override
54   public final long getUid()
55   {
56     return uid;
57   }
58
59   @Override
60   public final JobStatus getStatus()
61   {
62     return status;
63   }
64
65   @Override
66   public final List<? extends BaseJob> getSubJobs()
67   {
68     return jobs;
69   }
70
71   @Override
72   public final void addTaskEventListener(TaskEventListener<R> listener)
73   {
74     eventHandler.addListener(listener);
75   }
76
77   @Override
78   public final void removeTaskEventListener(TaskEventListener<R> listener)
79   {
80     eventHandler.addListener(listener);
81   }
82
83   @Override
84   public final R getResult()
85   {
86     return result;
87   }
88
89   @Override
90   public final void init() throws Exception
91   {
92     try
93     {
94       jobs = prepareJobs();
95     } catch (ServiceInputInvalidException e)
96     {
97       setStatus(JobStatus.INVALID);
98       eventHandler.fireTaskException(e);
99       throw e;
100     }
101     setStatus(JobStatus.READY);
102     eventHandler.fireTaskStarted(jobs);
103     var jobListener = new DelegateJobEventListener<>(eventHandler);
104     for (var job : jobs)
105       job.addPropertyChangeListener(jobListener);
106     submitJobs(jobs);
107   }
108
109   static final int MAX_SUBMIT_RETRY = 5;
110
111   protected final void submitJobs(List<T> jobs) throws IOException
112   {
113     var retryCounter = 0;
114     while (true)
115     {
116       try
117       {
118         submitJobs0(jobs);
119         setStatus(JobStatus.SUBMITTED);
120         break;
121       } catch (IOException e)
122       {
123         eventHandler.fireTaskException(e);
124         if (++retryCounter > MAX_SUBMIT_RETRY)
125         {
126           cancel();
127           setStatus(JobStatus.SERVER_ERROR);
128           throw e;
129         }
130       }
131     }
132   }
133
134   private final void submitJobs0(List<T> jobs) throws IOException
135   {
136     IOException exception = null;
137     for (BaseJob job : jobs)
138     {
139       if (job.getStatus() != JobStatus.READY || !job.isInputValid())
140         continue;
141       try
142       {
143         var jobRef = webClient.submit(job.getInputSequences(), args, credentials);
144         job.setServerJob(jobRef);
145         job.setStatus(JobStatus.SUBMITTED);
146       } catch (IOException e)
147       {
148         exception = e;
149       }
150     }
151     if (exception != null)
152       throw exception;
153   }
154
155   /**
156    * Poll all running jobs and update their status and logs. Polling is repeated
157    * periodically until this method return true when all jobs are done.
158    * 
159    * @return {@code true] if all jobs are done @throws IOException if server
160    *         error occurred
161    */
162   @Override
163   public final boolean poll() throws IOException
164   {
165     boolean allDone = true;
166     IOException exception = null;
167     for (BaseJob job : jobs)
168     {
169       if (job.isInputValid() && !job.getStatus().isDone())
170       {
171         var serverJob = job.getServerJob();
172         try
173         {
174           job.setStatus(webClient.getStatus(serverJob));
175           job.setLog(webClient.getLog(serverJob));
176           job.setErrorLog(webClient.getErrorLog(serverJob));
177         } catch (IOException e)
178         {
179           exception = e;
180         }
181       }
182       allDone &= job.isCompleted();
183     }
184     updateGlobalStatus();
185     if (exception != null)
186       throw exception;
187     return allDone;
188   }
189
190   @Override
191   public final void complete() throws IOException
192   {
193     for (var job : jobs)
194     {
195       if (!job.isCompleted())
196       {
197         // a fallback in case the executor decides to finish prematurely
198         cancelJob(job);
199         job.setStatus(JobStatus.SERVER_ERROR);
200       }
201     }
202     updateGlobalStatus();
203     try {
204       result = collectResult(jobs);
205       eventHandler.fireTaskCompleted(result);
206     }
207     catch (Exception e)
208     {
209       eventHandler.fireTaskException(e);
210       setStatus(JobStatus.SERVER_ERROR);
211       throw e;
212     }
213   }
214
215   /**
216    * Cancel all running jobs. Used in case of task failure to cleanup the
217    * resources or when the task has been cancelled.
218    */
219   @Override
220   public final void cancel()
221   {
222     cancelAction.run();
223     for (T job : jobs)
224     {
225       cancelJob(job);
226     }
227     setStatus(JobStatus.CANCELLED);
228   }
229
230   private final void cancelJob(T job)
231   {
232     if (!job.isCompleted())
233     {
234       try
235       {
236         if (job.getServerJob() != null)
237           webClient.cancel(job.getServerJob());
238         job.setStatus(JobStatus.CANCELLED);
239       } catch (IOException e)
240       {
241         Console.error(format("failed to cancel job %s", job.getServerJob()), e);
242       }
243     }
244   }
245
246   protected final void setStatus(JobStatus status)
247   {
248     Objects.requireNonNull(status);
249     if (this.status != status)
250     {
251       this.status = status;
252       eventHandler.fireTaskStatusChanged(status);
253     }
254   }
255
256   protected abstract List<T> prepareJobs() throws ServiceInputInvalidException;
257
258   protected abstract R collectResult(List<T> jobs) throws IOException;
259
260   /**
261    * Update task status according to the overall status of its jobs. The rules
262    * of setting the status are following:
263    * <ul>
264    * <li>task is invalid if all jobs are invalid</li>
265    * <li>task is completed if all but invalid jobs are completed</li>
266    * <li>task is ready, submitted or queued if at least one job is ready,
267    * submitted or queued an none proceeded to the next stage excluding
268    * completed.</li>
269    * <li>task is running if at least one job is running and none are failed or
270    * cancelled</li>
271    * <li>task is cancelled if at least one job is cancelled and none failed</li>
272    * <li>task is failed or server error if at least one job is failed or server
273    * error</li>
274    * </ul>
275    */
276   protected final void updateGlobalStatus()
277   {
278     int precedence = -1;
279     for (BaseJob job : jobs)
280     {
281       JobStatus status = job.getStatus();
282       int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
283       if (precedence < jobPrecedence)
284         precedence = jobPrecedence;
285     }
286     if (precedence >= 0)
287     {
288       setStatus(JobStatus.statusPrecedence[precedence]);
289     }
290   }
291
292   /**
293    * Set the action that will be run when the {@link #cancel()} method is
294    * invoked. The action should typically stop the executor polling the task and
295    * release resources and threads running the task.
296    * 
297    * @param action
298    *          runnable to be executed when the task is cancelled
299    */
300   public void setCancelAction(Runnable action)
301   {
302     Objects.requireNonNull(action);
303     this.cancelAction = action;
304   }
305
306   @Override
307   public String toString()
308   {
309     var statusName = status != null ? status.name() : "UNSET";
310     return String.format("%s(%x, %s)", getClass().getSimpleName(), uid, statusName);
311   }
312 }