--- /dev/null
+package jalview.ws2.actions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import jalview.bin.Cache;
+import jalview.util.ArrayUtils;
+import jalview.util.MathUtils;
+import jalview.ws.params.ArgumentI;
+import jalview.ws2.actions.api.TaskEventListener;
+import jalview.ws2.actions.api.TaskI;
+import jalview.ws2.api.Credentials;
+import jalview.ws2.api.JobStatus;
+import jalview.ws2.api.WebServiceJobHandle;
+import jalview.ws2.client.api.WebServiceClientI;
+import jalview.ws2.helpers.DelegateJobEventListener;
+import jalview.ws2.helpers.TaskEventSupport;
+import static java.lang.String.format;
+
+/**
+ * An abstract base class for non-interactive tasks which implements common
+ * tasks methods. Additionally, it manages task execution in a polling loop.
+ * Subclasses are only required to implement {@link #prepare()} and
+ * {@link #done()} methods.
+ *
+ * @author mmwarowny
+ *
+ * @param <T>
+ * the type of jobs managed by the task
+ * @param <R>
+ * the type of result provided by the task
+ */
+public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
+{
+ private final long uid = MathUtils.getUID();
+
+ protected final WebServiceClientI client;
+
+ protected final List<ArgumentI> args;
+
+ protected final Credentials credentials;
+
+ private final TaskEventSupport<R> eventHandler;
+
+ protected JobStatus taskStatus = null;
+
+ private Future<?> future = null;
+
+ protected List<T> jobs = Collections.emptyList();
+
+ protected R result;
+
+ protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
+ Credentials credentials, TaskEventListener<R> eventListener)
+ {
+ this.client = client;
+ this.args = args;
+ this.credentials = credentials;
+ this.eventHandler = new TaskEventSupport<R>(this, eventListener);
+ }
+
+ public long getUid()
+ {
+ return uid;
+ }
+
+ /**
+ * Start the task using provided scheduled executor service. It creates a
+ * polling loop running at set intervals.
+ *
+ * @param executor
+ * executor to run the polling loop with
+ */
+ public void start(ScheduledExecutorService executor)
+ {
+ if (future != null)
+ throw new IllegalStateException("task already started");
+ var runnable = new Runnable()
+ {
+ private int stage = STAGE_PREPARE;
+
+ private static final int STAGE_PREPARE = 0;
+
+ private static final int STAGE_START = 1;
+
+ private static final int STAGE_POLL = 2;
+
+ private static final int STAGE_FINALIZE = 3;
+
+ private static final int STAGE_DONE = 4;
+
+ private int retryCount = 0;
+
+ private static final int MAX_RETRY = 5;
+
+ /**
+ * A polling loop run periodically which carries the task through its
+ * consecutive execution stages.
+ */
+ @Override
+ public void run()
+ {
+ if (stage == STAGE_PREPARE)
+ {
+ // first stage - the input data is collected and the jobs are created
+ try
+ {
+ jobs = prepare();
+ } catch (ServiceInputInvalidException e)
+ {
+ stage = STAGE_DONE;
+ setStatus(JobStatus.INVALID);
+ eventHandler.fireTaskException(e);
+ throw new CompletionException(e);
+ }
+ stage = STAGE_START;
+ setStatus(JobStatus.READY);
+ eventHandler.fireTaskStarted(jobs);
+ var jobListener = new DelegateJobEventListener<>(eventHandler);
+ for (var job : jobs)
+ {
+ job.addPropertyChagneListener(jobListener);
+ }
+ }
+ try
+ {
+ if (stage == STAGE_START)
+ {
+ // second stage - jobs are submitted to the server
+ startJobs();
+ stage = STAGE_POLL;
+ setStatus(JobStatus.SUBMITTED);
+ }
+ if (stage == STAGE_POLL)
+ {
+ // third stage - jobs are poolled until all of them are completed
+ if (pollJobs())
+ {
+ stage = STAGE_FINALIZE;
+ }
+ updateGlobalStatus();
+ }
+ if (stage == STAGE_FINALIZE)
+ {
+ // final stage - results are collected and stored
+ result = done();
+ eventHandler.fireTaskCompleted(result);
+ stage = STAGE_DONE;
+ }
+ retryCount = 0;
+ } catch (IOException e)
+ {
+ eventHandler.fireTaskException(e);
+ if (++retryCount > MAX_RETRY)
+ {
+ stage = STAGE_DONE;
+ cancelJobs();
+ setStatus(JobStatus.SERVER_ERROR);
+ throw new CompletionException(e);
+ }
+ }
+ if (stage == STAGE_DONE)
+ {
+ // finalization - terminating the future task
+ throw new CancellationException("task terminated");
+ }
+ }
+ };
+ if (taskStatus != JobStatus.CANCELLED)
+ future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public JobStatus getStatus()
+ {
+ return taskStatus;
+ }
+
+ /**
+ * Set the status of the task and notify the event handler.
+ *
+ * @param status
+ * new task status
+ */
+ protected void setStatus(JobStatus status)
+ {
+ if (this.taskStatus != status)
+ {
+ this.taskStatus = status;
+ eventHandler.fireTaskStatusChanged(status);
+ }
+ }
+
+ /**
+ * Update task status according to the overall status of its jobs. The rules
+ * of setting the status are following:
+ * <ul>
+ * <li>task is invalid if all jobs are invalid</li>
+ * <li>task is completed if all but invalid jobs are completed</li>
+ * <li>task is ready, submitted or queued if at least one job is ready,
+ * submitted or queued an none proceeded to the next stage excluding
+ * completed.</li>
+ * <li>task is running if at least one job is running and none are failed or
+ * cancelled</li>
+ * <li>task is cancelled if at least one job is cancelled and none failed</li>
+ * <li>task is failed or server error if at least one job is failed or server
+ * error</li>
+ * </ul>
+ */
+ private void updateGlobalStatus()
+ {
+ int precedence = -1;
+ for (BaseJob job : jobs)
+ {
+ JobStatus status = job.getStatus();
+ int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
+ if (precedence < jobPrecedence)
+ precedence = jobPrecedence;
+ }
+ if (precedence >= 0)
+ {
+ setStatus(JobStatus.statusPrecedence[precedence]);
+ }
+ }
+
+ @Override
+ public void cancel()
+ {
+ setStatus(JobStatus.CANCELLED);
+ if (future != null)
+ future.cancel(false);
+ cancelJobs();
+ }
+
+ @Override
+ public List<? extends BaseJob> getSubJobs()
+ {
+ return jobs;
+ }
+
+ /**
+ * Collect and process input sequences for submission and return the list of
+ * jobs to be submitted.
+ *
+ * @return list of jobs to be submitted
+ * @throws ServiceInputInvalidException
+ * input is invalid and the task should not be started
+ */
+ protected abstract List<T> prepare() throws ServiceInputInvalidException;
+
+ /**
+ * Submit all valid jobs to the server and store their job handles.
+ *
+ * @throws IOException
+ * if server error occurred
+ */
+ protected void startJobs() throws IOException
+ {
+ for (BaseJob job : jobs)
+ {
+ if (job.isInputValid() && job.getStatus() == JobStatus.READY)
+ {
+ WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
+ args, credentials);
+ job.setServerJob(serverJob);
+ job.setStatus(JobStatus.SUBMITTED);
+ }
+ }
+ }
+
+ /**
+ * Poll all running jobs and update their status and logs. Polling is repeated
+ * periodically until this method return true when all jobs are done.
+ *
+ * @return {@code true] if all jobs are done @throws IOException if server
+ * error occurred
+ */
+ protected boolean pollJobs() throws IOException
+ {
+ boolean allDone = true;
+ for (BaseJob job : jobs)
+ {
+ if (job.isInputValid() && !job.getStatus().isDone())
+ {
+ WebServiceJobHandle serverJob = job.getServerJob();
+ job.setStatus(client.getStatus(serverJob));
+ job.setLog(client.getLog(serverJob));
+ job.setErrorLog(client.getErrorLog(serverJob));
+ }
+ allDone &= job.isCompleted();
+ }
+ return allDone;
+ }
+
+ /**
+ * Fetch and process the outputs produced by jobs and return the final result
+ * of the task. The method is called once all jobs have finished execution. If
+ * this method raises {@link IOException} it will be called again after a
+ * delay. All IO operations should happen before data processing, so
+ * potentially expensive computation is avoided in case of an error.
+ *
+ * @return final result of the computation
+ * @throws IOException
+ * if server error occurred
+ */
+ protected abstract R done() throws IOException;
+
+ /**
+ * Cancel all running jobs. Used in case of task failure to cleanup the
+ * resources or when the task has been cancelled.
+ */
+ protected void cancelJobs()
+ {
+ for (BaseJob job : jobs)
+ {
+ if (!job.isCompleted())
+ {
+ try
+ {
+ if (job.getServerJob() != null)
+ {
+ client.cancel(job.getServerJob());
+ }
+ job.setStatus(JobStatus.CANCELLED);
+ } catch (IOException e)
+ {
+ Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public R getResult()
+ {
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ var status = taskStatus != null ? taskStatus.name() : "UNSET";
+ return String.format("%s(%x, %s)", getClass().getSimpleName(), uid, status);
+ }
+}