1 package jalview.ws2.actions;
3 import java.io.IOException;
4 import java.util.Collections;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
12 import jalview.bin.Cache;
13 import jalview.util.ArrayUtils;
14 import jalview.util.MathUtils;
15 import jalview.ws.params.ArgumentI;
16 import jalview.ws2.actions.api.TaskEventListener;
17 import jalview.ws2.actions.api.TaskI;
18 import jalview.ws2.api.Credentials;
19 import jalview.ws2.api.JobStatus;
20 import jalview.ws2.api.WebServiceJobHandle;
21 import jalview.ws2.client.api.WebServiceClientI;
22 import jalview.ws2.helpers.DelegateJobEventListener;
23 import jalview.ws2.helpers.TaskEventSupport;
24 import static java.lang.String.format;
27 * An abstract base class for non-interactive tasks which implements common
28 * tasks methods. Additionally, it manages task execution in a polling loop.
29 * Subclasses are only required to implement {@link #prepare()} and
30 * {@link #done()} methods.
35 * the type of jobs managed by the task
37 * the type of result provided by the task
39 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
41 private final long uid = MathUtils.getUID();
43 protected final WebServiceClientI client;
45 protected final List<ArgumentI> args;
47 protected final Credentials credentials;
49 private final TaskEventSupport<R> eventHandler;
51 protected JobStatus taskStatus = null;
53 private Future<?> future = null;
55 protected List<T> jobs = Collections.emptyList();
59 protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
60 Credentials credentials, TaskEventListener<R> eventListener)
64 this.credentials = credentials;
65 this.eventHandler = new TaskEventSupport<R>(this, eventListener);
74 * Start the task using provided scheduled executor service. It creates a
75 * polling loop running at set intervals.
78 * executor to run the polling loop with
80 public void start(ScheduledExecutorService executor)
83 throw new IllegalStateException("task already started");
84 var runnable = new Runnable()
86 private int stage = STAGE_PREPARE;
88 private static final int STAGE_PREPARE = 0;
90 private static final int STAGE_START = 1;
92 private static final int STAGE_POLL = 2;
94 private static final int STAGE_FINALIZE = 3;
96 private static final int STAGE_DONE = 4;
98 private int retryCount = 0;
100 private static final int MAX_RETRY = 5;
103 * A polling loop run periodically which carries the task through its
104 * consecutive execution stages.
109 if (stage == STAGE_PREPARE)
111 // first stage - the input data is collected and the jobs are created
115 } catch (ServiceInputInvalidException e)
118 setStatus(JobStatus.INVALID);
119 eventHandler.fireTaskException(e);
120 throw new CompletionException(e);
123 setStatus(JobStatus.READY);
124 eventHandler.fireTaskStarted(jobs);
125 var jobListener = new DelegateJobEventListener<>(eventHandler);
128 job.addPropertyChagneListener(jobListener);
133 if (stage == STAGE_START)
135 // second stage - jobs are submitted to the server
138 setStatus(JobStatus.SUBMITTED);
140 if (stage == STAGE_POLL)
142 // third stage - jobs are poolled until all of them are completed
145 stage = STAGE_FINALIZE;
147 updateGlobalStatus();
149 if (stage == STAGE_FINALIZE)
151 // final stage - results are collected and stored
153 eventHandler.fireTaskCompleted(result);
157 } catch (IOException e)
159 eventHandler.fireTaskException(e);
160 if (++retryCount > MAX_RETRY)
164 setStatus(JobStatus.SERVER_ERROR);
165 throw new CompletionException(e);
168 if (stage == STAGE_DONE)
170 // finalization - terminating the future task
171 throw new CancellationException("task terminated");
175 if (taskStatus != JobStatus.CANCELLED)
176 future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
180 public JobStatus getStatus()
186 * Set the status of the task and notify the event handler.
191 protected void setStatus(JobStatus status)
193 if (this.taskStatus != status)
195 this.taskStatus = status;
196 eventHandler.fireTaskStatusChanged(status);
201 * Update task status according to the overall status of its jobs. The rules
202 * of setting the status are following:
204 * <li>task is invalid if all jobs are invalid</li>
205 * <li>task is completed if all but invalid jobs are completed</li>
206 * <li>task is ready, submitted or queued if at least one job is ready,
207 * submitted or queued an none proceeded to the next stage excluding
209 * <li>task is running if at least one job is running and none are failed or
211 * <li>task is cancelled if at least one job is cancelled and none failed</li>
212 * <li>task is failed or server error if at least one job is failed or server
216 private void updateGlobalStatus()
219 for (BaseJob job : jobs)
221 JobStatus status = job.getStatus();
222 int jobPrecedence = ArrayUtils.indexOf(statusPrecedence, status);
223 if (precedence < jobPrecedence)
224 precedence = jobPrecedence;
228 setStatus(statusPrecedence[precedence]);
233 * A precedence order of job statuses used to compute the overall task status.
235 private static JobStatus[] statusPrecedence = {
236 JobStatus.INVALID, // all must be invalid for task to be invalid
237 JobStatus.COMPLETED, // all but invalid must be completed for task to be
239 JobStatus.UNKNOWN, // unknown prevents successful completion but not
240 // running or failure
245 JobStatus.CANCELLED, // if any is terminated unsuccessfully, the task is
248 JobStatus.SERVER_ERROR
254 setStatus(JobStatus.CANCELLED);
256 future.cancel(false);
261 public List<? extends BaseJob> getSubJobs()
267 * Collect and process input sequences for submission and return the list of
268 * jobs to be submitted.
270 * @return list of jobs to be submitted
271 * @throws ServiceInputInvalidException
272 * input is invalid and the task should not be started
274 protected abstract List<T> prepare() throws ServiceInputInvalidException;
277 * Submit all valid jobs to the server and store their job handles.
279 * @throws IOException
280 * if server error occurred
282 protected void startJobs() throws IOException
284 for (BaseJob job : jobs)
286 if (job.isInputValid() && job.getStatus() == JobStatus.READY)
288 WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
290 job.setServerJob(serverJob);
291 job.setStatus(JobStatus.SUBMITTED);
297 * Poll all running jobs and update their status and logs. Polling is repeated
298 * periodically until this method return true when all jobs are done.
300 * @return {@code true] if all jobs are done @throws IOException if server
303 protected boolean pollJobs() throws IOException
305 boolean allDone = true;
306 for (BaseJob job : jobs)
308 if (job.isInputValid() && !job.getStatus().isDone())
310 WebServiceJobHandle serverJob = job.getServerJob();
311 job.setStatus(client.getStatus(serverJob));
312 job.setLog(client.getLog(serverJob));
313 job.setErrorLog(client.getErrorLog(serverJob));
315 allDone &= job.isCompleted();
321 * Fetch and process the outputs produced by jobs and return the final result
322 * of the task. The method is called once all jobs have finished execution. If
323 * this method raises {@link IOException} it will be called again after a
324 * delay. All IO operations should happen before data processing, so
325 * potentially expensive computation is avoided in case of an error.
327 * @return final result of the computation
328 * @throws IOException
329 * if server error occurred
331 protected abstract R done() throws IOException;
334 * Cancel all running jobs. Used in case of task failure to cleanup the
335 * resources or when the task has been cancelled.
337 protected void cancelJobs()
339 for (BaseJob job : jobs)
341 if (!job.isCompleted())
345 if (job.getServerJob() != null)
347 client.cancel(job.getServerJob());
349 job.setStatus(JobStatus.CANCELLED);
350 } catch (IOException e)
352 Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);