--- /dev/null
+package jalview.ws2.actions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import jalview.bin.Cache;
+import jalview.ws.params.ArgumentI;
+import jalview.ws2.actions.api.TaskEventListener;
+import jalview.ws2.actions.api.TaskI;
+import jalview.ws2.api.Credentials;
+import jalview.ws2.api.JobStatus;
+import jalview.ws2.api.WebServiceJobHandle;
+import jalview.ws2.client.api.WebServiceClientI;
+import jalview.ws2.helpers.DelegateJobEventListener;
+import jalview.ws2.helpers.TaskEventSupport;
+import static java.lang.String.format;
+
+/**
+ * An abstract base class for non-interactive tasks which implements common
+ * tasks methods. Additionally, it manages task execution in a polling loop.
+ * Subclasses are only required to implement {@link #prepare()} and
+ * {@link #done()} methods.
+ *
+ * @author mmwarowny
+ *
+ * @param <T>
+ * the type of jobs managed by the task
+ * @param <R>
+ * the type of result provided by the task
+ */
+public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
+{
+ protected final WebServiceClientI client;
+
+ protected final List<ArgumentI> args;
+
+ protected final Credentials credentials;
+
+ private final TaskEventSupport<R> eventHandler;
+
+ protected JobStatus taskStatus = null;
+
+ private Future<?> future = null;
+
+ protected List<T> jobs = Collections.emptyList();
+
+ protected R result;
+
+ protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
+ Credentials credentials, TaskEventListener<R> eventListener)
+ {
+ this.client = client;
+ this.args = args;
+ this.credentials = credentials;
+ this.eventHandler = new TaskEventSupport<R>(this, eventListener);
+ }
+
+ /**
+ * Start the task using provided scheduled executor service. It creates a
+ * polling loop running at set intervals.
+ *
+ * @param executor
+ * executor to run the polling loop with
+ */
+ public void start(ScheduledExecutorService executor)
+ {
+ if (future != null)
+ throw new IllegalStateException("task already started");
+ var runnable = new Runnable()
+ {
+ private int stage = STAGE_PREPARE;
+
+ private static final int STAGE_PREPARE = 0;
+
+ private static final int STAGE_START = 1;
+
+ private static final int STAGE_POLL = 2;
+
+ private static final int STAGE_FINALIZE = 3;
+
+ private static final int STAGE_DONE = 4;
+
+ private int retryCount = 0;
+
+ private static final int MAX_RETRY = 5;
+
+ /**
+ * A polling loop run periodically which carries the task through its
+ * consecutive execution stages.
+ */
+ @Override
+ public void run()
+ {
+ if (stage == STAGE_PREPARE)
+ {
+ // first stage - the input data is collected and the jobs are created
+ try
+ {
+ jobs = prepare();
+ } catch (ServiceInputInvalidException e)
+ {
+ stage = STAGE_DONE;
+ setStatus(JobStatus.INVALID);
+ eventHandler.fireTaskException(e);
+ throw new CompletionException(e);
+ }
+ stage = STAGE_START;
+ setStatus(JobStatus.READY);
+ eventHandler.fireTaskStarted(jobs);
+ var jobListener = new DelegateJobEventListener<>(eventHandler);
+ for (var job : jobs)
+ {
+ job.addPropertyChagneListener(jobListener);
+ }
+ }
+ try
+ {
+ if (stage == STAGE_START)
+ {
+ // second stage - jobs are submitted to the server
+ startJobs();
+ stage = STAGE_POLL;
+ setStatus(JobStatus.SUBMITTED);
+ }
+ if (stage == STAGE_POLL)
+ {
+ // third stage - jobs are poolled until all of them are completed
+ if (pollJobs())
+ {
+ stage = STAGE_FINALIZE;
+ }
+ updateGlobalStatus();
+ }
+ if (stage == STAGE_FINALIZE)
+ {
+ // final stage - results are collected and stored
+ result = done();
+ eventHandler.fireTaskCompleted(result);
+ stage = STAGE_DONE;
+ }
+ retryCount = 0;
+ } catch (IOException e)
+ {
+ eventHandler.fireTaskException(e);
+ if (++retryCount > MAX_RETRY)
+ {
+ stage = STAGE_DONE;
+ cancelJobs();
+ setStatus(JobStatus.SERVER_ERROR);
+ throw new CompletionException(e);
+ }
+ }
+ if (stage == STAGE_DONE)
+ {
+ // finalization - terminating the future task
+ throw new CancellationException("task terminated");
+ }
+ }
+ };
+ if (taskStatus != JobStatus.CANCELLED)
+ future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public JobStatus getStatus()
+ {
+ return taskStatus;
+ }
+
+ /**
+ * Set the status of the task and notify the event handler.
+ *
+ * @param status
+ * new task status
+ */
+ protected void setStatus(JobStatus status)
+ {
+ if (this.taskStatus != status)
+ {
+ this.taskStatus = status;
+ eventHandler.fireTaskStatusChanged(status);
+ }
+ }
+
+ /**
+ * Update task status according to the overall status of its jobs. The rules
+ * of setting the status are following:
+ * <ul>
+ * <li>task is invalid if all jobs are invalid</li>
+ * <li>task is completed if all but invalid jobs are completed</li>
+ * <li>task is ready, submitted or queued if at least one job is ready,
+ * submitted or queued an none proceeded to the next stage excluding
+ * completed.</li>
+ * <li>task is running if at least one job is running and none are failed or
+ * cancelled</li>
+ * <li>task is cancelled if at least one job is cancelled and none failed</li>
+ * <li>task is failed or server error if at least one job is failed or server
+ * error</li>
+ * </ul>
+ */
+ private void updateGlobalStatus()
+ {
+ JobStatus newStatus = taskStatus;
+ int currentPrecedence = getPrecedenceOf(newStatus);
+ for (BaseJob job : jobs)
+ {
+ JobStatus status = job.getStatus();
+ int jobPrecedence = getPrecedenceOf(status);
+ if (currentPrecedence < jobPrecedence)
+ {
+ currentPrecedence = jobPrecedence;
+ newStatus = status;
+ }
+ }
+ setStatus(newStatus);
+ }
+
+ /**
+ * A precedence order of job statuses used to compute the overall task status.
+ */
+ private static JobStatus[] statusPrecedence = {
+ JobStatus.INVALID, // all must be invalid for task to be invalid
+ JobStatus.COMPLETED, // all but invalid must be completed for task to be
+ // completed
+ JobStatus.UNKNOWN, // unknown prevents successful completion but not
+ // running or failure
+ JobStatus.READY,
+ JobStatus.SUBMITTED,
+ JobStatus.QUEUED,
+ JobStatus.RUNNING,
+ JobStatus.CANCELLED, // if any is terminated unsuccessfully, the task is
+ // failed
+ JobStatus.FAILED,
+ JobStatus.SERVER_ERROR
+ };
+
+ /**
+ * @param status
+ * status to find the precedence of
+ * @return precedence of the status
+ */
+ private static int getPrecedenceOf(JobStatus status)
+ {
+ final int len = statusPrecedence.length;
+ for (int i = 0; i < len; i++)
+ {
+ if (statusPrecedence[i] == status)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public void cancel()
+ {
+ setStatus(JobStatus.CANCELLED);
+ if (future != null)
+ future.cancel(false);
+ cancelJobs();
+ }
+
+ @Override
+ public List<? extends BaseJob> getSubJobs()
+ {
+ return jobs;
+ }
+
+ /**
+ * Collect and process input sequences for submission and return the list of
+ * jobs to be submitted.
+ *
+ * @return list of jobs to be submitted
+ * @throws ServiceInputInvalidException
+ * input is invalid and the task should not be started
+ */
+ protected abstract List<T> prepare() throws ServiceInputInvalidException;
+
+ /**
+ * Submit all valid jobs to the server and store their job handles.
+ *
+ * @throws IOException
+ * if server error occurred
+ */
+ protected void startJobs() throws IOException
+ {
+ for (BaseJob job : jobs)
+ {
+ if (job.isInputValid() && job.getStatus() == JobStatus.READY)
+ {
+ WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
+ args, credentials);
+ job.setServerJob(serverJob);
+ job.setStatus(JobStatus.SUBMITTED);
+ }
+ }
+ }
+
+ /**
+ * Poll all running jobs and update their status and logs. Polling is repeated
+ * periodically until this method return true when all jobs are done.
+ *
+ * @return {@code true] if all jobs are done @throws IOException if server
+ * error occurred
+ */
+ protected boolean pollJobs() throws IOException
+ {
+ boolean allDone = true;
+ for (BaseJob job : jobs)
+ {
+ if (job.isInputValid() && !job.getStatus().isDone())
+ {
+ WebServiceJobHandle serverJob = job.getServerJob();
+ job.setStatus(client.getStatus(serverJob));
+ job.setLog(client.getLog(serverJob));
+ job.setErrorLog(client.getErrorLog(serverJob));
+ }
+ allDone &= job.isCompleted();
+ }
+ return allDone;
+ }
+
+ /**
+ * Fetch and process the outputs produced by jobs and return the final result
+ * of the task. The method is called once all jobs have finished execution. If
+ * this method raises {@link IOException} it will be called again after a
+ * delay. All IO operations should happen before data processing, so
+ * potentially expensive computation is avoided in case of an error.
+ *
+ * @return final result of the computation
+ * @throws IOException
+ * if server error occurred
+ */
+ protected abstract R done() throws IOException;
+
+ /**
+ * Cancel all running jobs. Used in case of task failure to cleanup the
+ * resources or when the task has been cancelled.
+ */
+ protected void cancelJobs()
+ {
+ for (BaseJob job : jobs)
+ {
+ if (!job.isCompleted())
+ {
+ try
+ {
+ if (job.getServerJob() != null)
+ {
+ client.cancel(job.getServerJob());
+ }
+ job.setStatus(JobStatus.CANCELLED);
+ } catch (IOException e)
+ {
+ Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public R getResult()
+ {
+ return result;
+ }
+}
--- /dev/null
+package jalview.ws2.actions;
+
+import java.beans.PropertyChangeListener;
+import java.beans.PropertyChangeSupport;
+import java.util.List;
+
+import jalview.datamodel.SequenceI;
+import jalview.util.MathUtils;
+import jalview.ws2.actions.api.JobI;
+import jalview.ws2.actions.api.TaskEventListener;
+import jalview.ws2.api.JobStatus;
+import jalview.ws2.api.WebServiceJobHandle;
+
+/**
+ * Basic implementation of the {@link JobI} interface which stores internal job
+ * id, status, log and error log and provides getters to those fields.
+ * Additionally, it stores sequences that will be submitted as job input and the
+ * handle to the job on the server. Extending classes can add extra fields in
+ * order to associate additional data with the job.
+ *
+ * Observers can be registered with this bean-like object to listen to changes
+ * to {@code status}, {@code log}, and {@code errorLog} properties. Typically,
+ * the events are delegated to the {@link TaskEventListener} objects observing
+ * the task that created this job.
+ *
+ * @author mmwarowny
+ */
+public abstract class BaseJob implements JobI
+{
+ protected final long internalId = MathUtils.getUID();
+
+ protected final List<SequenceI> inputSeqs;
+
+ protected JobStatus status = null;
+
+ protected String log = "";
+
+ protected String errorLog = "";
+
+ /* FIXME: server job is not specific to the BaseJob and should preferably
+ * be managed by classes using clients (tasks). */
+ protected WebServiceJobHandle serverJob;
+
+ public BaseJob(List<SequenceI> inputSeqs)
+ {
+ this.inputSeqs = inputSeqs;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final long getInternalId()
+ {
+ return internalId;
+ }
+
+ /**
+ * Return the list of input sequences associated with this job.
+ *
+ * @return input sequences
+ */
+ public List<SequenceI> getInputSequences()
+ {
+ return inputSeqs;
+ }
+
+ /**
+ * Check if inputs make a valid job.
+ *
+ * @return {@code true} if the input is valid.
+ */
+ public abstract boolean isInputValid();
+
+ /**
+ * Check if the job is completed, This includes jobs with invalid input,
+ * successful and unsuccessful termination.
+ *
+ * @return {@code true} if job is completed successfully or not
+ */
+ public boolean isCompleted()
+ {
+ return !isInputValid() || getStatus().isDone();
+ }
+
+ @Override
+ public final JobStatus getStatus()
+ {
+ return status;
+ }
+
+ /**
+ * Set new status of the job and notify listeners of the change. Job status is
+ * managed internally by tasks and should not be modified outside the task
+ * which created this job.
+ *
+ * @param status
+ * new job status
+ */
+ public final void setStatus(JobStatus status)
+ {
+ JobStatus oldStatus = this.status;
+ this.status = status;
+ pcs.firePropertyChange("status", oldStatus, status);
+ }
+
+ @Override
+ public final String getLog()
+ {
+ return log;
+ }
+
+ /**
+ * Set log text and notify listeners of the change. Log is managed by tasks
+ * which created the job and should not be modified by other classes.
+ *
+ * @param log
+ * new log
+ */
+ final void setLog(String log)
+ {
+ String oldLog = this.log;
+ this.log = log;
+ pcs.firePropertyChange("log", oldLog, log);
+ }
+
+ @Override
+ public final String getErrorLog()
+ {
+ return errorLog;
+ }
+
+ /**
+ * Set error log text and notify listeners of the change. Error log is managed
+ * by tasks which created the job and should not be modified by other classes.
+ *
+ * @param errorLog
+ */
+ final void setErrorLog(String errorLog)
+ {
+ String oldLog = this.errorLog;
+ this.errorLog = errorLog;
+ pcs.firePropertyChange("errorLog", oldLog, errorLog);
+ }
+
+ /**
+ * Return the job handle that identifies this job running on the server or
+ * {@code null} if the job was not submitted.
+ *
+ * @return server job handle
+ */
+ public final WebServiceJobHandle getServerJob()
+ {
+ return serverJob;
+ }
+
+ /**
+ * Set the server job handle once the job was submitted to the server. The
+ * handler is managed by the task which created this job and should not be
+ * modified by other classes.
+ *
+ * @param job
+ */
+ final void setServerJob(WebServiceJobHandle job)
+ {
+ this.serverJob = job;
+ }
+
+ private final PropertyChangeSupport pcs = new PropertyChangeSupport(this);
+
+ /**
+ * Register an observer that will be notified of changes to status, log and
+ * error log.
+ *
+ * @param listener
+ * property change listener
+ */
+ public final void addPropertyChagneListener(PropertyChangeListener listener)
+ {
+ pcs.addPropertyChangeListener(listener);
+ }
+
+ /**
+ * Remove the property listener from this object.
+ *
+ * @param listener
+ * listener to remove
+ */
+ public final void removePropertyChangeListener(PropertyChangeListener listener)
+ {
+ pcs.removePropertyChangeListener(listener);
+ }
+}
--- /dev/null
+package jalview.ws2.helpers;
+
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
+
+import jalview.ws2.actions.BaseJob;
+import jalview.ws2.actions.api.JobI;
+import jalview.ws2.api.JobStatus;
+
+/**
+ * A property change listener to be used by web service tasks that delegates all
+ * sub-job related events from {@link BaseJob} subclasses to
+ * {@link TaskEventSupport}. Tasks can create one instance of this class with
+ * their event handler as a delegate and add it as a property change listener to
+ * each sub-job supporting property change listeners. It ensures that an
+ * appropriate {@code fireSubJob*Changed} method of the delegate object will be
+ * called whenever a {@link PropertyChagneEvent} is emitted by the sub-job.
+ *
+ * @author mmwarowny
+ *
+ * @param <T>
+ * result type of the task
+ */
+public class DelegateJobEventListener<T> implements PropertyChangeListener
+{
+ private final TaskEventSupport<T> delegate;
+
+ public DelegateJobEventListener(TaskEventSupport<T> delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void propertyChange(PropertyChangeEvent evt)
+ {
+ switch (evt.getPropertyName())
+ {
+ case "status":
+ statusChanged(evt);
+ break;
+ case "log":
+ logChanged(evt);
+ break;
+ case "errorLog":
+ errorLogChanged(evt);
+ break;
+ default:
+ throw new AssertionError(String.format(
+ "illegal property name \"%s\"", evt.getPropertyName()));
+ }
+ }
+
+ private void statusChanged(PropertyChangeEvent evt)
+ {
+ JobI job = (JobI) evt.getSource();
+ JobStatus status = (JobStatus) evt.getNewValue();
+ delegate.fireSubJobStatusChanged(job, status);
+ }
+
+ private void logChanged(PropertyChangeEvent evt)
+ {
+ JobI job = (JobI) evt.getSource();
+ String log = (String) evt.getNewValue();
+ delegate.fireSubJobLogChanged(job, log);
+ }
+
+ private void errorLogChanged(PropertyChangeEvent evt)
+ {
+ JobI job = (JobI) evt.getSource();
+ String errorLog = (String) evt.getNewValue();
+ delegate.fireSubJobErrorLogChanged(job, errorLog);
+ }
+}