Merge branch 'JAL-3878_ws-overhaul-3' into mmw/Release_2_12_ws_merge
[jalview.git] / src / jalview / ws2 / actions / AbstractPollableTask.java
diff --git a/src/jalview/ws2/actions/AbstractPollableTask.java b/src/jalview/ws2/actions/AbstractPollableTask.java
new file mode 100644 (file)
index 0000000..fc3c554
--- /dev/null
@@ -0,0 +1,351 @@
+package jalview.ws2.actions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import jalview.bin.Cache;
+import jalview.util.ArrayUtils;
+import jalview.util.MathUtils;
+import jalview.ws.params.ArgumentI;
+import jalview.ws2.actions.api.TaskEventListener;
+import jalview.ws2.actions.api.TaskI;
+import jalview.ws2.api.Credentials;
+import jalview.ws2.api.JobStatus;
+import jalview.ws2.api.WebServiceJobHandle;
+import jalview.ws2.client.api.WebServiceClientI;
+import jalview.ws2.helpers.DelegateJobEventListener;
+import jalview.ws2.helpers.TaskEventSupport;
+import static java.lang.String.format;
+
+/**
+ * An abstract base class for non-interactive tasks which implements common
+ * tasks methods. Additionally, it manages task execution in a polling loop.
+ * Subclasses are only required to implement {@link #prepare()} and
+ * {@link #done()} methods.
+ * 
+ * @author mmwarowny
+ *
+ * @param <T>
+ *          the type of jobs managed by the task
+ * @param <R>
+ *          the type of result provided by the task
+ */
+public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
+{
+  private final long uid = MathUtils.getUID();
+
+  protected final WebServiceClientI client;
+
+  protected final List<ArgumentI> args;
+
+  protected final Credentials credentials;
+
+  private final TaskEventSupport<R> eventHandler;
+
+  protected JobStatus taskStatus = null;
+
+  private Future<?> future = null;
+
+  protected List<T> jobs = Collections.emptyList();
+
+  protected R result;
+
+  protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
+      Credentials credentials, TaskEventListener<R> eventListener)
+  {
+    this.client = client;
+    this.args = args;
+    this.credentials = credentials;
+    this.eventHandler = new TaskEventSupport<R>(this, eventListener);
+  }
+
+  public long getUid()
+  {
+    return uid;
+  }
+
+  /**
+   * Start the task using provided scheduled executor service. It creates a
+   * polling loop running at set intervals.
+   * 
+   * @param executor
+   *          executor to run the polling loop with
+   */
+  public void start(ScheduledExecutorService executor)
+  {
+    if (future != null)
+      throw new IllegalStateException("task already started");
+    var runnable = new Runnable()
+    {
+      private int stage = STAGE_PREPARE;
+
+      private static final int STAGE_PREPARE = 0;
+
+      private static final int STAGE_START = 1;
+
+      private static final int STAGE_POLL = 2;
+
+      private static final int STAGE_FINALIZE = 3;
+
+      private static final int STAGE_DONE = 4;
+
+      private int retryCount = 0;
+
+      private static final int MAX_RETRY = 5;
+
+      /**
+       * A polling loop run periodically which carries the task through its
+       * consecutive execution stages.
+       */
+      @Override
+      public void run()
+      {
+        if (stage == STAGE_PREPARE)
+        {
+          // first stage - the input data is collected and the jobs are created
+          try
+          {
+            jobs = prepare();
+          } catch (ServiceInputInvalidException e)
+          {
+            stage = STAGE_DONE;
+            setStatus(JobStatus.INVALID);
+            eventHandler.fireTaskException(e);
+            throw new CompletionException(e);
+          }
+          stage = STAGE_START;
+          setStatus(JobStatus.READY);
+          eventHandler.fireTaskStarted(jobs);
+          var jobListener = new DelegateJobEventListener<>(eventHandler);
+          for (var job : jobs)
+          {
+            job.addPropertyChagneListener(jobListener);
+          }
+        }
+        try
+        {
+          if (stage == STAGE_START)
+          {
+            // second stage - jobs are submitted to the server
+            startJobs();
+            stage = STAGE_POLL;
+            setStatus(JobStatus.SUBMITTED);
+          }
+          if (stage == STAGE_POLL)
+          {
+            // third stage - jobs are poolled until all of them are completed
+            if (pollJobs())
+            {
+              stage = STAGE_FINALIZE;
+            }
+            updateGlobalStatus();
+          }
+          if (stage == STAGE_FINALIZE)
+          {
+            // final stage - results are collected and stored
+            result = done();
+            eventHandler.fireTaskCompleted(result);
+            stage = STAGE_DONE;
+          }
+          retryCount = 0;
+        } catch (IOException e)
+        {
+          eventHandler.fireTaskException(e);
+          if (++retryCount > MAX_RETRY)
+          {
+            stage = STAGE_DONE;
+            cancelJobs();
+            setStatus(JobStatus.SERVER_ERROR);
+            throw new CompletionException(e);
+          }
+        }
+        if (stage == STAGE_DONE)
+        {
+          // finalization - terminating the future task
+          throw new CancellationException("task terminated");
+        }
+      }
+    };
+    if (taskStatus != JobStatus.CANCELLED)
+      future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public JobStatus getStatus()
+  {
+    return taskStatus;
+  }
+
+  /**
+   * Set the status of the task and notify the event handler.
+   * 
+   * @param status
+   *          new task status
+   */
+  protected void setStatus(JobStatus status)
+  {
+    if (this.taskStatus != status)
+    {
+      this.taskStatus = status;
+      eventHandler.fireTaskStatusChanged(status);
+    }
+  }
+
+  /**
+   * Update task status according to the overall status of its jobs. The rules
+   * of setting the status are following:
+   * <ul>
+   * <li>task is invalid if all jobs are invalid</li>
+   * <li>task is completed if all but invalid jobs are completed</li>
+   * <li>task is ready, submitted or queued if at least one job is ready,
+   * submitted or queued an none proceeded to the next stage excluding
+   * completed.</li>
+   * <li>task is running if at least one job is running and none are failed or
+   * cancelled</li>
+   * <li>task is cancelled if at least one job is cancelled and none failed</li>
+   * <li>task is failed or server error if at least one job is failed or server
+   * error</li>
+   * </ul>
+   */
+  private void updateGlobalStatus()
+  {
+    int precedence = -1;
+    for (BaseJob job : jobs)
+    {
+      JobStatus status = job.getStatus();
+      int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
+      if (precedence < jobPrecedence)
+        precedence = jobPrecedence;
+    }
+    if (precedence >= 0)
+    {
+      setStatus(JobStatus.statusPrecedence[precedence]);
+    }
+  }
+
+  @Override
+  public void cancel()
+  {
+    setStatus(JobStatus.CANCELLED);
+    if (future != null)
+      future.cancel(false);
+    cancelJobs();
+  }
+
+  @Override
+  public List<? extends BaseJob> getSubJobs()
+  {
+    return jobs;
+  }
+
+  /**
+   * Collect and process input sequences for submission and return the list of
+   * jobs to be submitted.
+   * 
+   * @return list of jobs to be submitted
+   * @throws ServiceInputInvalidException
+   *           input is invalid and the task should not be started
+   */
+  protected abstract List<T> prepare() throws ServiceInputInvalidException;
+
+  /**
+   * Submit all valid jobs to the server and store their job handles.
+   * 
+   * @throws IOException
+   *           if server error occurred
+   */
+  protected void startJobs() throws IOException
+  {
+    for (BaseJob job : jobs)
+    {
+      if (job.isInputValid() && job.getStatus() == JobStatus.READY)
+      {
+        WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
+            args, credentials);
+        job.setServerJob(serverJob);
+        job.setStatus(JobStatus.SUBMITTED);
+      }
+    }
+  }
+
+  /**
+   * Poll all running jobs and update their status and logs. Polling is repeated
+   * periodically until this method return true when all jobs are done.
+   * 
+   * @return {@code true] if all jobs are done @throws IOException if server
+   *         error occurred
+   */
+  protected boolean pollJobs() throws IOException
+  {
+    boolean allDone = true;
+    for (BaseJob job : jobs)
+    {
+      if (job.isInputValid() && !job.getStatus().isDone())
+      {
+        WebServiceJobHandle serverJob = job.getServerJob();
+        job.setStatus(client.getStatus(serverJob));
+        job.setLog(client.getLog(serverJob));
+        job.setErrorLog(client.getErrorLog(serverJob));
+      }
+      allDone &= job.isCompleted();
+    }
+    return allDone;
+  }
+
+  /**
+   * Fetch and process the outputs produced by jobs and return the final result
+   * of the task. The method is called once all jobs have finished execution. If
+   * this method raises {@link IOException} it will be called again after a
+   * delay. All IO operations should happen before data processing, so
+   * potentially expensive computation is avoided in case of an error.
+   * 
+   * @return final result of the computation
+   * @throws IOException
+   *           if server error occurred
+   */
+  protected abstract R done() throws IOException;
+
+  /**
+   * Cancel all running jobs. Used in case of task failure to cleanup the
+   * resources or when the task has been cancelled.
+   */
+  protected void cancelJobs()
+  {
+    for (BaseJob job : jobs)
+    {
+      if (!job.isCompleted())
+      {
+        try
+        {
+          if (job.getServerJob() != null)
+          {
+            client.cancel(job.getServerJob());
+          }
+          job.setStatus(JobStatus.CANCELLED);
+        } catch (IOException e)
+        {
+          Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public R getResult()
+  {
+    return result;
+  }
+
+  @Override
+  public String toString()
+  {
+    var status = taskStatus != null ? taskStatus.name() : "UNSET";
+    return String.format("%s(%x, %s)", getClass().getSimpleName(), uid, status);
+  }
+}