1 package jalview.ws2.actions;
3 import java.io.IOException;
4 import java.util.Collections;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
12 import jalview.bin.Cache;
13 import jalview.ws.params.ArgumentI;
14 import jalview.ws2.actions.api.TaskEventListener;
15 import jalview.ws2.actions.api.TaskI;
16 import jalview.ws2.api.Credentials;
17 import jalview.ws2.api.JobStatus;
18 import jalview.ws2.api.WebServiceJobHandle;
19 import jalview.ws2.client.api.WebServiceClientI;
20 import jalview.ws2.helpers.DelegateJobEventListener;
21 import jalview.ws2.helpers.TaskEventSupport;
22 import static java.lang.String.format;
25 * An abstract base class for non-interactive tasks which implements common
26 * tasks methods. Additionally, it manages task execution in a polling loop.
27 * Subclasses are only required to implement {@link #prepare()} and
28 * {@link #done()} methods.
33 * the type of jobs managed by the task
35 * the type of result provided by the task
37 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
39 protected final WebServiceClientI client;
41 protected final List<ArgumentI> args;
43 protected final Credentials credentials;
45 private final TaskEventSupport<R> eventHandler;
47 protected JobStatus taskStatus = null;
49 private Future<?> future = null;
51 protected List<T> jobs = Collections.emptyList();
55 protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
56 Credentials credentials, TaskEventListener<R> eventListener)
60 this.credentials = credentials;
61 this.eventHandler = new TaskEventSupport<R>(this, eventListener);
65 * Start the task using provided scheduled executor service. It creates a
66 * polling loop running at set intervals.
69 * executor to run the polling loop with
71 public void start(ScheduledExecutorService executor)
74 throw new IllegalStateException("task already started");
75 var runnable = new Runnable()
77 private int stage = STAGE_PREPARE;
79 private static final int STAGE_PREPARE = 0;
81 private static final int STAGE_START = 1;
83 private static final int STAGE_POLL = 2;
85 private static final int STAGE_FINALIZE = 3;
87 private static final int STAGE_DONE = 4;
89 private int retryCount = 0;
91 private static final int MAX_RETRY = 5;
94 * A polling loop run periodically which carries the task through its
95 * consecutive execution stages.
100 if (stage == STAGE_PREPARE)
102 // first stage - the input data is collected and the jobs are created
106 } catch (ServiceInputInvalidException e)
109 setStatus(JobStatus.INVALID);
110 eventHandler.fireTaskException(e);
111 throw new CompletionException(e);
114 setStatus(JobStatus.READY);
115 eventHandler.fireTaskStarted(jobs);
116 var jobListener = new DelegateJobEventListener<>(eventHandler);
119 job.addPropertyChagneListener(jobListener);
124 if (stage == STAGE_START)
126 // second stage - jobs are submitted to the server
129 setStatus(JobStatus.SUBMITTED);
131 if (stage == STAGE_POLL)
133 // third stage - jobs are poolled until all of them are completed
136 stage = STAGE_FINALIZE;
138 updateGlobalStatus();
140 if (stage == STAGE_FINALIZE)
142 // final stage - results are collected and stored
144 eventHandler.fireTaskCompleted(result);
148 } catch (IOException e)
150 eventHandler.fireTaskException(e);
151 if (++retryCount > MAX_RETRY)
155 setStatus(JobStatus.SERVER_ERROR);
156 throw new CompletionException(e);
159 if (stage == STAGE_DONE)
161 // finalization - terminating the future task
162 throw new CancellationException("task terminated");
166 if (taskStatus != JobStatus.CANCELLED)
167 future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
171 public JobStatus getStatus()
177 * Set the status of the task and notify the event handler.
182 protected void setStatus(JobStatus status)
184 if (this.taskStatus != status)
186 this.taskStatus = status;
187 eventHandler.fireTaskStatusChanged(status);
192 * Update task status according to the overall status of its jobs. The rules
193 * of setting the status are following:
195 * <li>task is invalid if all jobs are invalid</li>
196 * <li>task is completed if all but invalid jobs are completed</li>
197 * <li>task is ready, submitted or queued if at least one job is ready,
198 * submitted or queued an none proceeded to the next stage excluding
200 * <li>task is running if at least one job is running and none are failed or
202 * <li>task is cancelled if at least one job is cancelled and none failed</li>
203 * <li>task is failed or server error if at least one job is failed or server
207 private void updateGlobalStatus()
209 JobStatus newStatus = taskStatus;
210 int currentPrecedence = getPrecedenceOf(newStatus);
211 for (BaseJob job : jobs)
213 JobStatus status = job.getStatus();
214 int jobPrecedence = getPrecedenceOf(status);
215 if (currentPrecedence < jobPrecedence)
217 currentPrecedence = jobPrecedence;
221 setStatus(newStatus);
225 * A precedence order of job statuses used to compute the overall task status.
227 private static JobStatus[] statusPrecedence = {
228 JobStatus.INVALID, // all must be invalid for task to be invalid
229 JobStatus.COMPLETED, // all but invalid must be completed for task to be
231 JobStatus.UNKNOWN, // unknown prevents successful completion but not
232 // running or failure
237 JobStatus.CANCELLED, // if any is terminated unsuccessfully, the task is
240 JobStatus.SERVER_ERROR
245 * status to find the precedence of
246 * @return precedence of the status
248 private static int getPrecedenceOf(JobStatus status)
250 final int len = statusPrecedence.length;
251 for (int i = 0; i < len; i++)
253 if (statusPrecedence[i] == status)
264 setStatus(JobStatus.CANCELLED);
266 future.cancel(false);
271 public List<? extends BaseJob> getSubJobs()
277 * Collect and process input sequences for submission and return the list of
278 * jobs to be submitted.
280 * @return list of jobs to be submitted
281 * @throws ServiceInputInvalidException
282 * input is invalid and the task should not be started
284 protected abstract List<T> prepare() throws ServiceInputInvalidException;
287 * Submit all valid jobs to the server and store their job handles.
289 * @throws IOException
290 * if server error occurred
292 protected void startJobs() throws IOException
294 for (BaseJob job : jobs)
296 if (job.isInputValid() && job.getStatus() == JobStatus.READY)
298 WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
300 job.setServerJob(serverJob);
301 job.setStatus(JobStatus.SUBMITTED);
307 * Poll all running jobs and update their status and logs. Polling is repeated
308 * periodically until this method return true when all jobs are done.
310 * @return {@code true] if all jobs are done @throws IOException if server
313 protected boolean pollJobs() throws IOException
315 boolean allDone = true;
316 for (BaseJob job : jobs)
318 if (job.isInputValid() && !job.getStatus().isDone())
320 WebServiceJobHandle serverJob = job.getServerJob();
321 job.setStatus(client.getStatus(serverJob));
322 job.setLog(client.getLog(serverJob));
323 job.setErrorLog(client.getErrorLog(serverJob));
325 allDone &= job.isCompleted();
331 * Fetch and process the outputs produced by jobs and return the final result
332 * of the task. The method is called once all jobs have finished execution. If
333 * this method raises {@link IOException} it will be called again after a
334 * delay. All IO operations should happen before data processing, so
335 * potentially expensive computation is avoided in case of an error.
337 * @return final result of the computation
338 * @throws IOException
339 * if server error occurred
341 protected abstract R done() throws IOException;
344 * Cancel all running jobs. Used in case of task failure to cleanup the
345 * resources or when the task has been cancelled.
347 protected void cancelJobs()
349 for (BaseJob job : jobs)
351 if (!job.isCompleted())
355 if (job.getServerJob() != null)
357 client.cancel(job.getServerJob());
359 job.setStatus(JobStatus.CANCELLED);
360 } catch (IOException e)
362 Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);