1 package jalview.ws2.actions;
3 import java.io.IOException;
4 import java.util.Collections;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
12 import jalview.bin.Cache;
13 import jalview.util.ArrayUtils;
14 import jalview.ws.params.ArgumentI;
15 import jalview.ws2.actions.api.TaskEventListener;
16 import jalview.ws2.actions.api.TaskI;
17 import jalview.ws2.api.Credentials;
18 import jalview.ws2.api.JobStatus;
19 import jalview.ws2.api.WebServiceJobHandle;
20 import jalview.ws2.client.api.WebServiceClientI;
21 import jalview.ws2.helpers.DelegateJobEventListener;
22 import jalview.ws2.helpers.TaskEventSupport;
23 import static java.lang.String.format;
26 * An abstract base class for non-interactive tasks which implements common
27 * tasks methods. Additionally, it manages task execution in a polling loop.
28 * Subclasses are only required to implement {@link #prepare()} and
29 * {@link #done()} methods.
34 * the type of jobs managed by the task
36 * the type of result provided by the task
38 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
40 protected final WebServiceClientI client;
42 protected final List<ArgumentI> args;
44 protected final Credentials credentials;
46 private final TaskEventSupport<R> eventHandler;
48 protected JobStatus taskStatus = null;
50 private Future<?> future = null;
52 protected List<T> jobs = Collections.emptyList();
56 protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
57 Credentials credentials, TaskEventListener<R> eventListener)
61 this.credentials = credentials;
62 this.eventHandler = new TaskEventSupport<R>(this, eventListener);
66 * Start the task using provided scheduled executor service. It creates a
67 * polling loop running at set intervals.
70 * executor to run the polling loop with
72 public void start(ScheduledExecutorService executor)
75 throw new IllegalStateException("task already started");
76 var runnable = new Runnable()
78 private int stage = STAGE_PREPARE;
80 private static final int STAGE_PREPARE = 0;
82 private static final int STAGE_START = 1;
84 private static final int STAGE_POLL = 2;
86 private static final int STAGE_FINALIZE = 3;
88 private static final int STAGE_DONE = 4;
90 private int retryCount = 0;
92 private static final int MAX_RETRY = 5;
95 * A polling loop run periodically which carries the task through its
96 * consecutive execution stages.
101 if (stage == STAGE_PREPARE)
103 // first stage - the input data is collected and the jobs are created
107 } catch (ServiceInputInvalidException e)
110 setStatus(JobStatus.INVALID);
111 eventHandler.fireTaskException(e);
112 throw new CompletionException(e);
115 setStatus(JobStatus.READY);
116 eventHandler.fireTaskStarted(jobs);
117 var jobListener = new DelegateJobEventListener<>(eventHandler);
120 job.addPropertyChagneListener(jobListener);
125 if (stage == STAGE_START)
127 // second stage - jobs are submitted to the server
130 setStatus(JobStatus.SUBMITTED);
132 if (stage == STAGE_POLL)
134 // third stage - jobs are poolled until all of them are completed
137 stage = STAGE_FINALIZE;
139 updateGlobalStatus();
141 if (stage == STAGE_FINALIZE)
143 // final stage - results are collected and stored
145 eventHandler.fireTaskCompleted(result);
149 } catch (IOException e)
151 eventHandler.fireTaskException(e);
152 if (++retryCount > MAX_RETRY)
156 setStatus(JobStatus.SERVER_ERROR);
157 throw new CompletionException(e);
160 if (stage == STAGE_DONE)
162 // finalization - terminating the future task
163 throw new CancellationException("task terminated");
167 if (taskStatus != JobStatus.CANCELLED)
168 future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
172 public JobStatus getStatus()
178 * Set the status of the task and notify the event handler.
183 protected void setStatus(JobStatus status)
185 if (this.taskStatus != status)
187 this.taskStatus = status;
188 eventHandler.fireTaskStatusChanged(status);
193 * Update task status according to the overall status of its jobs. The rules
194 * of setting the status are following:
196 * <li>task is invalid if all jobs are invalid</li>
197 * <li>task is completed if all but invalid jobs are completed</li>
198 * <li>task is ready, submitted or queued if at least one job is ready,
199 * submitted or queued an none proceeded to the next stage excluding
201 * <li>task is running if at least one job is running and none are failed or
203 * <li>task is cancelled if at least one job is cancelled and none failed</li>
204 * <li>task is failed or server error if at least one job is failed or server
208 private void updateGlobalStatus()
211 for (BaseJob job : jobs)
213 JobStatus status = job.getStatus();
214 int jobPrecedence = ArrayUtils.indexOf(statusPrecedence, status);
215 if (precedence < jobPrecedence)
216 precedence = jobPrecedence;
220 setStatus(statusPrecedence[precedence]);
225 * A precedence order of job statuses used to compute the overall task status.
227 private static JobStatus[] statusPrecedence = {
228 JobStatus.INVALID, // all must be invalid for task to be invalid
229 JobStatus.COMPLETED, // all but invalid must be completed for task to be
231 JobStatus.UNKNOWN, // unknown prevents successful completion but not
232 // running or failure
237 JobStatus.CANCELLED, // if any is terminated unsuccessfully, the task is
240 JobStatus.SERVER_ERROR
246 setStatus(JobStatus.CANCELLED);
248 future.cancel(false);
253 public List<? extends BaseJob> getSubJobs()
259 * Collect and process input sequences for submission and return the list of
260 * jobs to be submitted.
262 * @return list of jobs to be submitted
263 * @throws ServiceInputInvalidException
264 * input is invalid and the task should not be started
266 protected abstract List<T> prepare() throws ServiceInputInvalidException;
269 * Submit all valid jobs to the server and store their job handles.
271 * @throws IOException
272 * if server error occurred
274 protected void startJobs() throws IOException
276 for (BaseJob job : jobs)
278 if (job.isInputValid() && job.getStatus() == JobStatus.READY)
280 WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
282 job.setServerJob(serverJob);
283 job.setStatus(JobStatus.SUBMITTED);
289 * Poll all running jobs and update their status and logs. Polling is repeated
290 * periodically until this method return true when all jobs are done.
292 * @return {@code true] if all jobs are done @throws IOException if server
295 protected boolean pollJobs() throws IOException
297 boolean allDone = true;
298 for (BaseJob job : jobs)
300 if (job.isInputValid() && !job.getStatus().isDone())
302 WebServiceJobHandle serverJob = job.getServerJob();
303 job.setStatus(client.getStatus(serverJob));
304 job.setLog(client.getLog(serverJob));
305 job.setErrorLog(client.getErrorLog(serverJob));
307 allDone &= job.isCompleted();
313 * Fetch and process the outputs produced by jobs and return the final result
314 * of the task. The method is called once all jobs have finished execution. If
315 * this method raises {@link IOException} it will be called again after a
316 * delay. All IO operations should happen before data processing, so
317 * potentially expensive computation is avoided in case of an error.
319 * @return final result of the computation
320 * @throws IOException
321 * if server error occurred
323 protected abstract R done() throws IOException;
326 * Cancel all running jobs. Used in case of task failure to cleanup the
327 * resources or when the task has been cancelled.
329 protected void cancelJobs()
331 for (BaseJob job : jobs)
333 if (!job.isCompleted())
337 if (job.getServerJob() != null)
339 client.cancel(job.getServerJob());
341 job.setStatus(JobStatus.CANCELLED);
342 } catch (IOException e)
344 Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);