JAL-3878 Create abstract base classes for non-interactive tasks
authorMateusz Warowny <mmzwarowny@dundee.ac.uk>
Thu, 3 Mar 2022 13:05:37 +0000 (14:05 +0100)
committerMateusz Warowny <mmzwarowny@dundee.ac.uk>
Thu, 3 Mar 2022 13:59:57 +0000 (14:59 +0100)
src/jalview/ws2/actions/AbstractPollableTask.java [new file with mode: 0644]
src/jalview/ws2/actions/BaseJob.java [new file with mode: 0644]
src/jalview/ws2/actions/ServiceInputInvalidException.java [new file with mode: 0644]
src/jalview/ws2/helpers/DelegateJobEventListener.java [new file with mode: 0644]
src/jalview/ws2/helpers/TaskEventSupport.java [new file with mode: 0644]

diff --git a/src/jalview/ws2/actions/AbstractPollableTask.java b/src/jalview/ws2/actions/AbstractPollableTask.java
new file mode 100644 (file)
index 0000000..d1db921
--- /dev/null
@@ -0,0 +1,373 @@
+package jalview.ws2.actions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import jalview.bin.Cache;
+import jalview.ws.params.ArgumentI;
+import jalview.ws2.actions.api.TaskEventListener;
+import jalview.ws2.actions.api.TaskI;
+import jalview.ws2.api.Credentials;
+import jalview.ws2.api.JobStatus;
+import jalview.ws2.api.WebServiceJobHandle;
+import jalview.ws2.client.api.WebServiceClientI;
+import jalview.ws2.helpers.DelegateJobEventListener;
+import jalview.ws2.helpers.TaskEventSupport;
+import static java.lang.String.format;
+
+/**
+ * An abstract base class for non-interactive tasks which implements common
+ * tasks methods. Additionally, it manages task execution in a polling loop.
+ * Subclasses are only required to implement {@link #prepare()} and
+ * {@link #done()} methods.
+ * 
+ * @author mmwarowny
+ *
+ * @param <T>
+ *          the type of jobs managed by the task
+ * @param <R>
+ *          the type of result provided by the task
+ */
+public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
+{
+  protected final WebServiceClientI client;
+
+  protected final List<ArgumentI> args;
+
+  protected final Credentials credentials;
+
+  private final TaskEventSupport<R> eventHandler;
+
+  protected JobStatus taskStatus = null;
+
+  private Future<?> future = null;
+
+  protected List<T> jobs = Collections.emptyList();
+
+  protected R result;
+
+  protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
+      Credentials credentials, TaskEventListener<R> eventListener)
+  {
+    this.client = client;
+    this.args = args;
+    this.credentials = credentials;
+    this.eventHandler = new TaskEventSupport<R>(this, eventListener);
+  }
+
+  /**
+   * Start the task using provided scheduled executor service. It creates a
+   * polling loop running at set intervals.
+   * 
+   * @param executor
+   *          executor to run the polling loop with
+   */
+  public void start(ScheduledExecutorService executor)
+  {
+    if (future != null)
+      throw new IllegalStateException("task already started");
+    var runnable = new Runnable()
+    {
+      private int stage = STAGE_PREPARE;
+
+      private static final int STAGE_PREPARE = 0;
+
+      private static final int STAGE_START = 1;
+
+      private static final int STAGE_POLL = 2;
+
+      private static final int STAGE_FINALIZE = 3;
+
+      private static final int STAGE_DONE = 4;
+
+      private int retryCount = 0;
+
+      private static final int MAX_RETRY = 5;
+
+      /**
+       * A polling loop run periodically which carries the task through its
+       * consecutive execution stages.
+       */
+      @Override
+      public void run()
+      {
+        if (stage == STAGE_PREPARE)
+        {
+          // first stage - the input data is collected and the jobs are created
+          try
+          {
+            jobs = prepare();
+          } catch (ServiceInputInvalidException e)
+          {
+            stage = STAGE_DONE;
+            setStatus(JobStatus.INVALID);
+            eventHandler.fireTaskException(e);
+            throw new CompletionException(e);
+          }
+          stage = STAGE_START;
+          setStatus(JobStatus.READY);
+          eventHandler.fireTaskStarted(jobs);
+          var jobListener = new DelegateJobEventListener<>(eventHandler);
+          for (var job : jobs)
+          {
+            job.addPropertyChagneListener(jobListener);
+          }
+        }
+        try
+        {
+          if (stage == STAGE_START)
+          {
+            // second stage - jobs are submitted to the server
+            startJobs();
+            stage = STAGE_POLL;
+            setStatus(JobStatus.SUBMITTED);
+          }
+          if (stage == STAGE_POLL)
+          {
+            // third stage - jobs are poolled until all of them are completed
+            if (pollJobs())
+            {
+              stage = STAGE_FINALIZE;
+            }
+            updateGlobalStatus();
+          }
+          if (stage == STAGE_FINALIZE)
+          {
+            // final stage - results are collected and stored
+            result = done();
+            eventHandler.fireTaskCompleted(result);
+            stage = STAGE_DONE;
+          }
+          retryCount = 0;
+        } catch (IOException e)
+        {
+          eventHandler.fireTaskException(e);
+          if (++retryCount > MAX_RETRY)
+          {
+            stage = STAGE_DONE;
+            cancelJobs();
+            setStatus(JobStatus.SERVER_ERROR);
+            throw new CompletionException(e);
+          }
+        }
+        if (stage == STAGE_DONE)
+        {
+          // finalization - terminating the future task
+          throw new CancellationException("task terminated");
+        }
+      }
+    };
+    if (taskStatus != JobStatus.CANCELLED)
+      future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public JobStatus getStatus()
+  {
+    return taskStatus;
+  }
+
+  /**
+   * Set the status of the task and notify the event handler.
+   * 
+   * @param status
+   *          new task status
+   */
+  protected void setStatus(JobStatus status)
+  {
+    if (this.taskStatus != status)
+    {
+      this.taskStatus = status;
+      eventHandler.fireTaskStatusChanged(status);
+    }
+  }
+
+  /**
+   * Update task status according to the overall status of its jobs. The rules
+   * of setting the status are following:
+   * <ul>
+   * <li>task is invalid if all jobs are invalid</li>
+   * <li>task is completed if all but invalid jobs are completed</li>
+   * <li>task is ready, submitted or queued if at least one job is ready,
+   * submitted or queued an none proceeded to the next stage excluding
+   * completed.</li>
+   * <li>task is running if at least one job is running and none are failed or
+   * cancelled</li>
+   * <li>task is cancelled if at least one job is cancelled and none failed</li>
+   * <li>task is failed or server error if at least one job is failed or server
+   * error</li>
+   * </ul>
+   */
+  private void updateGlobalStatus()
+  {
+    JobStatus newStatus = taskStatus;
+    int currentPrecedence = getPrecedenceOf(newStatus);
+    for (BaseJob job : jobs)
+    {
+      JobStatus status = job.getStatus();
+      int jobPrecedence = getPrecedenceOf(status);
+      if (currentPrecedence < jobPrecedence)
+      {
+        currentPrecedence = jobPrecedence;
+        newStatus = status;
+      }
+    }
+    setStatus(newStatus);
+  }
+
+  /**
+   * A precedence order of job statuses used to compute the overall task status.
+   */
+  private static JobStatus[] statusPrecedence = {
+      JobStatus.INVALID, // all must be invalid for task to be invalid
+      JobStatus.COMPLETED, // all but invalid must be completed for task to be
+                           // completed
+      JobStatus.UNKNOWN, // unknown prevents successful completion but not
+                         // running or failure
+      JobStatus.READY,
+      JobStatus.SUBMITTED,
+      JobStatus.QUEUED,
+      JobStatus.RUNNING,
+      JobStatus.CANCELLED, // if any is terminated unsuccessfully, the task is
+                           // failed
+      JobStatus.FAILED,
+      JobStatus.SERVER_ERROR
+  };
+
+  /**
+   * @param status
+   *          status to find the precedence of
+   * @return precedence of the status
+   */
+  private static int getPrecedenceOf(JobStatus status)
+  {
+    final int len = statusPrecedence.length;
+    for (int i = 0; i < len; i++)
+    {
+      if (statusPrecedence[i] == status)
+      {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  @Override
+  public void cancel()
+  {
+    setStatus(JobStatus.CANCELLED);
+    if (future != null)
+      future.cancel(false);
+    cancelJobs();
+  }
+
+  @Override
+  public List<? extends BaseJob> getSubJobs()
+  {
+    return jobs;
+  }
+
+  /**
+   * Collect and process input sequences for submission and return the list of
+   * jobs to be submitted.
+   * 
+   * @return list of jobs to be submitted
+   * @throws ServiceInputInvalidException
+   *           input is invalid and the task should not be started
+   */
+  protected abstract List<T> prepare() throws ServiceInputInvalidException;
+
+  /**
+   * Submit all valid jobs to the server and store their job handles.
+   * 
+   * @throws IOException
+   *           if server error occurred
+   */
+  protected void startJobs() throws IOException
+  {
+    for (BaseJob job : jobs)
+    {
+      if (job.isInputValid() && job.getStatus() == JobStatus.READY)
+      {
+        WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
+            args, credentials);
+        job.setServerJob(serverJob);
+        job.setStatus(JobStatus.SUBMITTED);
+      }
+    }
+  }
+
+  /**
+   * Poll all running jobs and update their status and logs. Polling is repeated
+   * periodically until this method return true when all jobs are done.
+   * 
+   * @return {@code true] if all jobs are done @throws IOException if server
+   *         error occurred
+   */
+  protected boolean pollJobs() throws IOException
+  {
+    boolean allDone = true;
+    for (BaseJob job : jobs)
+    {
+      if (job.isInputValid() && !job.getStatus().isDone())
+      {
+        WebServiceJobHandle serverJob = job.getServerJob();
+        job.setStatus(client.getStatus(serverJob));
+        job.setLog(client.getLog(serverJob));
+        job.setErrorLog(client.getErrorLog(serverJob));
+      }
+      allDone &= job.isCompleted();
+    }
+    return allDone;
+  }
+
+  /**
+   * Fetch and process the outputs produced by jobs and return the final result
+   * of the task. The method is called once all jobs have finished execution. If
+   * this method raises {@link IOException} it will be called again after a
+   * delay. All IO operations should happen before data processing, so
+   * potentially expensive computation is avoided in case of an error.
+   * 
+   * @return final result of the computation
+   * @throws IOException
+   *           if server error occurred
+   */
+  protected abstract R done() throws IOException;
+
+  /**
+   * Cancel all running jobs. Used in case of task failure to cleanup the
+   * resources or when the task has been cancelled.
+   */
+  protected void cancelJobs()
+  {
+    for (BaseJob job : jobs)
+    {
+      if (!job.isCompleted())
+      {
+        try
+        {
+          if (job.getServerJob() != null)
+          {
+            client.cancel(job.getServerJob());
+          }
+          job.setStatus(JobStatus.CANCELLED);
+        } catch (IOException e)
+        {
+          Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public R getResult()
+  {
+    return result;
+  }
+}
diff --git a/src/jalview/ws2/actions/BaseJob.java b/src/jalview/ws2/actions/BaseJob.java
new file mode 100644 (file)
index 0000000..906b625
--- /dev/null
@@ -0,0 +1,193 @@
+package jalview.ws2.actions;
+
+import java.beans.PropertyChangeListener;
+import java.beans.PropertyChangeSupport;
+import java.util.List;
+
+import jalview.datamodel.SequenceI;
+import jalview.util.MathUtils;
+import jalview.ws2.actions.api.JobI;
+import jalview.ws2.actions.api.TaskEventListener;
+import jalview.ws2.api.JobStatus;
+import jalview.ws2.api.WebServiceJobHandle;
+
+/**
+ * Basic implementation of the {@link JobI} interface which stores internal job
+ * id, status, log and error log and provides getters to those fields.
+ * Additionally, it stores sequences that will be submitted as job input and the
+ * handle to the job on the server. Extending classes can add extra fields in
+ * order to associate additional data with the job.
+ * 
+ * Observers can be registered with this bean-like object to listen to changes
+ * to {@code status}, {@code log}, and {@code errorLog} properties. Typically,
+ * the events are delegated to the {@link TaskEventListener} objects observing
+ * the task that created this job.
+ * 
+ * @author mmwarowny
+ */
+public abstract class BaseJob implements JobI
+{
+  protected final long internalId = MathUtils.getUID();
+
+  protected final List<SequenceI> inputSeqs;
+
+  protected JobStatus status = null;
+
+  protected String log = "";
+
+  protected String errorLog = "";
+
+  /* FIXME: server job is not specific to the BaseJob and should preferably
+   * be managed by classes using clients (tasks). */
+  protected WebServiceJobHandle serverJob;
+
+  public BaseJob(List<SequenceI> inputSeqs)
+  {
+    this.inputSeqs = inputSeqs;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final long getInternalId()
+  {
+    return internalId;
+  }
+
+  /**
+   * Return the list of input sequences associated with this job.
+   * 
+   * @return input sequences
+   */
+  public List<SequenceI> getInputSequences()
+  {
+    return inputSeqs;
+  }
+
+  /**
+   * Check if inputs make a valid job.
+   * 
+   * @return {@code true} if the input is valid.
+   */
+  public abstract boolean isInputValid();
+
+  /**
+   * Check if the job is completed, This includes jobs with invalid input,
+   * successful and unsuccessful termination.
+   * 
+   * @return {@code true} if job is completed successfully or not
+   */
+  public boolean isCompleted()
+  {
+    return !isInputValid() || getStatus().isDone();
+  }
+
+  @Override
+  public final JobStatus getStatus()
+  {
+    return status;
+  }
+
+  /**
+   * Set new status of the job and notify listeners of the change. Job status is
+   * managed internally by tasks and should not be modified outside the task
+   * which created this job.
+   * 
+   * @param status
+   *          new job status
+   */
+  public final void setStatus(JobStatus status)
+  {
+    JobStatus oldStatus = this.status;
+    this.status = status;
+    pcs.firePropertyChange("status", oldStatus, status);
+  }
+
+  @Override
+  public final String getLog()
+  {
+    return log;
+  }
+
+  /**
+   * Set log text and notify listeners of the change. Log is managed by tasks
+   * which created the job and should not be modified by other classes.
+   * 
+   * @param log
+   *          new log
+   */
+  final void setLog(String log)
+  {
+    String oldLog = this.log;
+    this.log = log;
+    pcs.firePropertyChange("log", oldLog, log);
+  }
+
+  @Override
+  public final String getErrorLog()
+  {
+    return errorLog;
+  }
+
+  /**
+   * Set error log text and notify listeners of the change. Error log is managed
+   * by tasks which created the job and should not be modified by other classes.
+   * 
+   * @param errorLog
+   */
+  final void setErrorLog(String errorLog)
+  {
+    String oldLog = this.errorLog;
+    this.errorLog = errorLog;
+    pcs.firePropertyChange("errorLog", oldLog, errorLog);
+  }
+
+  /**
+   * Return the job handle that identifies this job running on the server or
+   * {@code null} if the job was not submitted.
+   * 
+   * @return server job handle
+   */
+  public final WebServiceJobHandle getServerJob()
+  {
+    return serverJob;
+  }
+
+  /**
+   * Set the server job handle once the job was submitted to the server. The
+   * handler is managed by the task which created this job and should not be
+   * modified by other classes.
+   * 
+   * @param job
+   */
+  final void setServerJob(WebServiceJobHandle job)
+  {
+    this.serverJob = job;
+  }
+
+  private final PropertyChangeSupport pcs = new PropertyChangeSupport(this);
+
+  /**
+   * Register an observer that will be notified of changes to status, log and
+   * error log.
+   * 
+   * @param listener
+   *          property change listener
+   */
+  public final void addPropertyChagneListener(PropertyChangeListener listener)
+  {
+    pcs.addPropertyChangeListener(listener);
+  }
+
+  /**
+   * Remove the property listener from this object.
+   * 
+   * @param listener
+   *          listener to remove
+   */
+  public final void removePropertyChangeListener(PropertyChangeListener listener)
+  {
+    pcs.removePropertyChangeListener(listener);
+  }
+}
diff --git a/src/jalview/ws2/actions/ServiceInputInvalidException.java b/src/jalview/ws2/actions/ServiceInputInvalidException.java
new file mode 100644 (file)
index 0000000..c816d61
--- /dev/null
@@ -0,0 +1,31 @@
+package jalview.ws2.actions;
+
+/**
+ * An exception thrown to indicate that the input is invalid and the service
+ * cannot be started.
+ *  
+ * @author mmwarowny
+ *
+ */
+public class ServiceInputInvalidException extends Exception
+{
+  /**
+   * 
+   */
+  private static final long serialVersionUID = 174066679057181584L;
+
+  public ServiceInputInvalidException(String message)
+  {
+    super(message);
+  }
+  
+  public ServiceInputInvalidException(Throwable cause)
+  {
+    super(cause);
+  }
+  
+  public ServiceInputInvalidException(String message, Throwable cause)
+  {
+    super(message, cause);
+  }
+}
diff --git a/src/jalview/ws2/helpers/DelegateJobEventListener.java b/src/jalview/ws2/helpers/DelegateJobEventListener.java
new file mode 100644 (file)
index 0000000..45afefc
--- /dev/null
@@ -0,0 +1,73 @@
+package jalview.ws2.helpers;
+
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
+
+import jalview.ws2.actions.BaseJob;
+import jalview.ws2.actions.api.JobI;
+import jalview.ws2.api.JobStatus;
+
+/**
+ * A property change listener to be used by web service tasks that delegates all
+ * sub-job related events from {@link BaseJob} subclasses to
+ * {@link TaskEventSupport}. Tasks can create one instance of this class with
+ * their event handler as a delegate and add it as a property change listener to
+ * each sub-job supporting property change listeners. It ensures that an
+ * appropriate {@code fireSubJob*Changed} method of the delegate object will be
+ * called whenever a {@link PropertyChagneEvent} is emitted by the sub-job.
+ * 
+ * @author mmwarowny
+ *
+ * @param <T>
+ *          result type of the task
+ */
+public class DelegateJobEventListener<T> implements PropertyChangeListener
+{
+  private final TaskEventSupport<T> delegate;
+
+  public DelegateJobEventListener(TaskEventSupport<T> delegate)
+  {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public void propertyChange(PropertyChangeEvent evt)
+  {
+    switch (evt.getPropertyName())
+    {
+    case "status":
+      statusChanged(evt);
+      break;
+    case "log":
+      logChanged(evt);
+      break;
+    case "errorLog":
+      errorLogChanged(evt);
+      break;
+    default:
+      throw new AssertionError(String.format(
+          "illegal property name \"%s\"", evt.getPropertyName()));
+    }
+  }
+
+  private void statusChanged(PropertyChangeEvent evt)
+  {
+    JobI job = (JobI) evt.getSource();
+    JobStatus status = (JobStatus) evt.getNewValue();
+    delegate.fireSubJobStatusChanged(job, status);
+  }
+
+  private void logChanged(PropertyChangeEvent evt)
+  {
+    JobI job = (JobI) evt.getSource();
+    String log = (String) evt.getNewValue();
+    delegate.fireSubJobLogChanged(job, log);
+  }
+
+  private void errorLogChanged(PropertyChangeEvent evt)
+  {
+    JobI job = (JobI) evt.getSource();
+    String errorLog = (String) evt.getNewValue();
+    delegate.fireSubJobErrorLogChanged(job, errorLog);
+  }
+}
diff --git a/src/jalview/ws2/helpers/TaskEventSupport.java b/src/jalview/ws2/helpers/TaskEventSupport.java
new file mode 100644 (file)
index 0000000..611eefb
--- /dev/null
@@ -0,0 +1,60 @@
+package jalview.ws2.helpers;
+
+import java.util.List;
+
+import jalview.ws2.actions.api.JobI;
+import jalview.ws2.actions.api.TaskEventListener;
+import jalview.ws2.actions.api.TaskI;
+import jalview.ws2.api.JobStatus;
+
+public class TaskEventSupport<T>
+{
+  TaskI<T> source;
+  TaskEventListener<T> handler;
+  
+  public TaskEventSupport(TaskI<T> source, TaskEventListener<T> handler)
+  {
+    this.source = source;
+    this.handler = handler;
+  }
+  
+  public void fireTaskStarted(List<? extends JobI> subJobs)
+  {
+    handler.taskStarted(source, subJobs);
+  }
+  
+  public void fireTaskStatusChanged(JobStatus status)
+  {
+    handler.taskStatusChanged(source, status);
+  }
+  
+  public void fireTaskCompleted(T result)
+  {
+    handler.taskCompleted(source, result);
+  }
+  
+  public void fireTaskException(Exception e)
+  {
+    handler.taskException(source, e);
+  }
+  
+  public void taskRestarted()
+  {
+    handler.taskRestarted(source);
+  }
+  
+  public void fireSubJobStatusChanged(JobI job, JobStatus status)
+  {
+    handler.subJobStatusChanged(source, job, status);
+  }
+  
+  public void fireSubJobLogChanged(JobI job, String log)
+  {
+    handler.subJobLogChanged(source, job, log);
+  }
+  
+  public void fireSubJobErrorLogChanged(JobI job, String log)
+  {
+    handler.subJobErrorLogChanged(source, job, log);
+  }
+}