JAL-4199 Replace AbstractPollingTask with BaseTask
authorMateusz Warowny <mmzwarowny@dundee.ac.uk>
Tue, 4 Jul 2023 08:00:15 +0000 (10:00 +0200)
committerMateusz Warowny <mmzwarowny@dundee.ac.uk>
Wed, 12 Jul 2023 14:25:05 +0000 (16:25 +0200)
BaseTask no longer implements execution system and its use
is more generic. Execution of polling tasks is realised
by PollingTaskExecutor.

src/jalview/datamodel/ContiguousI.java
src/jalview/ws2/actions/AbstractPollableTask.java [deleted file]
src/jalview/ws2/actions/BaseTask.java [new file with mode: 0644]
src/jalview/ws2/actions/PollingTaskExecutor.java [new file with mode: 0644]
src/jalview/ws2/actions/alignment/AlignmentTask.java
src/jalview/ws2/actions/annotation/AnnotationTask.java
src/jalview/ws2/actions/api/TaskI.java

index a9b1372..bc72984 100644 (file)
  */
 package jalview.datamodel;
 
+import java.util.Collection;
+
 public interface ContiguousI
 {
   int getBegin(); // todo want long for genomic positions?
 
   int getEnd();
+  
+  public static int[] toStartEndArray(Collection<? extends ContiguousI> ranges)
+  {
+    int[] startend = new int[ranges.size() * 2];
+    int i = 0;
+    for (var range : ranges)
+    {
+      startend[i++] = range.getBegin();
+      startend[i++] = range.getEnd();
+    }
+    return startend;
+  }
 }
diff --git a/src/jalview/ws2/actions/AbstractPollableTask.java b/src/jalview/ws2/actions/AbstractPollableTask.java
deleted file mode 100644 (file)
index b61711c..0000000
+++ /dev/null
@@ -1,352 +0,0 @@
-package jalview.ws2.actions;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import jalview.bin.Cache;
-import jalview.bin.Console;
-import jalview.util.ArrayUtils;
-import jalview.util.MathUtils;
-import jalview.ws.params.ArgumentI;
-import jalview.ws2.actions.api.TaskEventListener;
-import jalview.ws2.actions.api.TaskI;
-import jalview.ws2.api.Credentials;
-import jalview.ws2.api.JobStatus;
-import jalview.ws2.api.WebServiceJobHandle;
-import jalview.ws2.client.api.WebServiceClientI;
-import jalview.ws2.helpers.DelegateJobEventListener;
-import jalview.ws2.helpers.TaskEventSupport;
-import static java.lang.String.format;
-
-/**
- * An abstract base class for non-interactive tasks which implements common
- * tasks methods. Additionally, it manages task execution in a polling loop.
- * Subclasses are only required to implement {@link #prepare()} and
- * {@link #done()} methods.
- * 
- * @author mmwarowny
- *
- * @param <T>
- *          the type of jobs managed by the task
- * @param <R>
- *          the type of result provided by the task
- */
-public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
-{
-  private final long uid = MathUtils.getUID();
-
-  protected final WebServiceClientI client;
-
-  protected final List<ArgumentI> args;
-
-  protected final Credentials credentials;
-
-  private final TaskEventSupport<R> eventHandler;
-
-  protected JobStatus taskStatus = JobStatus.CREATED;
-
-  private Future<?> future = null;
-
-  protected List<T> jobs = Collections.emptyList();
-
-  protected R result;
-
-  protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
-      Credentials credentials, TaskEventListener<R> eventListener)
-  {
-    this.client = client;
-    this.args = args;
-    this.credentials = credentials;
-    this.eventHandler = new TaskEventSupport<R>(this, eventListener);
-  }
-
-  public long getUid()
-  {
-    return uid;
-  }
-
-  /**
-   * Start the task using provided scheduled executor service. It creates a
-   * polling loop running at set intervals.
-   * 
-   * @param executor
-   *          executor to run the polling loop with
-   */
-  public void start(ScheduledExecutorService executor)
-  {
-    if (future != null)
-      throw new IllegalStateException("task already started");
-    var runnable = new Runnable()
-    {
-      private int stage = STAGE_PREPARE;
-
-      private static final int STAGE_PREPARE = 0;
-
-      private static final int STAGE_START = 1;
-
-      private static final int STAGE_POLL = 2;
-
-      private static final int STAGE_FINALIZE = 3;
-
-      private static final int STAGE_DONE = 4;
-
-      private int retryCount = 0;
-
-      private static final int MAX_RETRY = 5;
-
-      /**
-       * A polling loop run periodically which carries the task through its
-       * consecutive execution stages.
-       */
-      @Override
-      public void run()
-      {
-        if (stage == STAGE_PREPARE)
-        {
-          // first stage - the input data is collected and the jobs are created
-          try
-          {
-            jobs = prepare();
-          } catch (ServiceInputInvalidException e)
-          {
-            stage = STAGE_DONE;
-            setStatus(JobStatus.INVALID);
-            eventHandler.fireTaskException(e);
-            throw new CompletionException(e);
-          }
-          stage = STAGE_START;
-          setStatus(JobStatus.READY);
-          eventHandler.fireTaskStarted(jobs);
-          var jobListener = new DelegateJobEventListener<>(eventHandler);
-          for (var job : jobs)
-          {
-            job.addPropertyChagneListener(jobListener);
-          }
-        }
-        try
-        {
-          if (stage == STAGE_START)
-          {
-            // second stage - jobs are submitted to the server
-            startJobs();
-            stage = STAGE_POLL;
-            setStatus(JobStatus.SUBMITTED);
-          }
-          if (stage == STAGE_POLL)
-          {
-            // third stage - jobs are poolled until all of them are completed
-            if (pollJobs())
-            {
-              stage = STAGE_FINALIZE;
-            }
-            updateGlobalStatus();
-          }
-          if (stage == STAGE_FINALIZE)
-          {
-            // final stage - results are collected and stored
-            result = done();
-            eventHandler.fireTaskCompleted(result);
-            stage = STAGE_DONE;
-          }
-          retryCount = 0;
-        } catch (IOException e)
-        {
-          eventHandler.fireTaskException(e);
-          if (++retryCount > MAX_RETRY)
-          {
-            stage = STAGE_DONE;
-            cancelJobs();
-            setStatus(JobStatus.SERVER_ERROR);
-            throw new CompletionException(e);
-          }
-        }
-        if (stage == STAGE_DONE)
-        {
-          // finalization - terminating the future task
-          throw new CancellationException("task terminated");
-        }
-      }
-    };
-    if (taskStatus != JobStatus.CANCELLED)
-      future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
-  }
-
-  @Override
-  public JobStatus getStatus()
-  {
-    return taskStatus;
-  }
-
-  /**
-   * Set the status of the task and notify the event handler.
-   * 
-   * @param status
-   *          new task status
-   */
-  protected void setStatus(JobStatus status)
-  {
-    if (this.taskStatus != status)
-    {
-      this.taskStatus = status;
-      eventHandler.fireTaskStatusChanged(status);
-    }
-  }
-
-  /**
-   * Update task status according to the overall status of its jobs. The rules
-   * of setting the status are following:
-   * <ul>
-   * <li>task is invalid if all jobs are invalid</li>
-   * <li>task is completed if all but invalid jobs are completed</li>
-   * <li>task is ready, submitted or queued if at least one job is ready,
-   * submitted or queued an none proceeded to the next stage excluding
-   * completed.</li>
-   * <li>task is running if at least one job is running and none are failed or
-   * cancelled</li>
-   * <li>task is cancelled if at least one job is cancelled and none failed</li>
-   * <li>task is failed or server error if at least one job is failed or server
-   * error</li>
-   * </ul>
-   */
-  private void updateGlobalStatus()
-  {
-    int precedence = -1;
-    for (BaseJob job : jobs)
-    {
-      JobStatus status = job.getStatus();
-      int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
-      if (precedence < jobPrecedence)
-        precedence = jobPrecedence;
-    }
-    if (precedence >= 0)
-    {
-      setStatus(JobStatus.statusPrecedence[precedence]);
-    }
-  }
-
-  @Override
-  public void cancel()
-  {
-    setStatus(JobStatus.CANCELLED);
-    if (future != null)
-      future.cancel(false);
-    cancelJobs();
-  }
-
-  @Override
-  public List<? extends BaseJob> getSubJobs()
-  {
-    return jobs;
-  }
-
-  /**
-   * Collect and process input sequences for submission and return the list of
-   * jobs to be submitted.
-   * 
-   * @return list of jobs to be submitted
-   * @throws ServiceInputInvalidException
-   *           input is invalid and the task should not be started
-   */
-  protected abstract List<T> prepare() throws ServiceInputInvalidException;
-
-  /**
-   * Submit all valid jobs to the server and store their job handles.
-   * 
-   * @throws IOException
-   *           if server error occurred
-   */
-  protected void startJobs() throws IOException
-  {
-    for (BaseJob job : jobs)
-    {
-      if (job.isInputValid() && job.getStatus() == JobStatus.READY)
-      {
-        WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
-            args, credentials);
-        job.setServerJob(serverJob);
-        job.setStatus(JobStatus.SUBMITTED);
-      }
-    }
-  }
-
-  /**
-   * Poll all running jobs and update their status and logs. Polling is repeated
-   * periodically until this method return true when all jobs are done.
-   * 
-   * @return {@code true] if all jobs are done @throws IOException if server
-   *         error occurred
-   */
-  protected boolean pollJobs() throws IOException
-  {
-    boolean allDone = true;
-    for (BaseJob job : jobs)
-    {
-      if (job.isInputValid() && !job.getStatus().isDone())
-      {
-        WebServiceJobHandle serverJob = job.getServerJob();
-        job.setStatus(client.getStatus(serverJob));
-        job.setLog(client.getLog(serverJob));
-        job.setErrorLog(client.getErrorLog(serverJob));
-      }
-      allDone &= job.isCompleted();
-    }
-    return allDone;
-  }
-
-  /**
-   * Fetch and process the outputs produced by jobs and return the final result
-   * of the task. The method is called once all jobs have finished execution. If
-   * this method raises {@link IOException} it will be called again after a
-   * delay. All IO operations should happen before data processing, so
-   * potentially expensive computation is avoided in case of an error.
-   * 
-   * @return final result of the computation
-   * @throws IOException
-   *           if server error occurred
-   */
-  protected abstract R done() throws IOException;
-
-  /**
-   * Cancel all running jobs. Used in case of task failure to cleanup the
-   * resources or when the task has been cancelled.
-   */
-  protected void cancelJobs()
-  {
-    for (BaseJob job : jobs)
-    {
-      if (!job.isCompleted())
-      {
-        try
-        {
-          if (job.getServerJob() != null)
-          {
-            client.cancel(job.getServerJob());
-          }
-          job.setStatus(JobStatus.CANCELLED);
-        } catch (IOException e)
-        {
-          Console.error(format("failed to cancel job %s", job.getServerJob()), e);
-        }
-      }
-    }
-  }
-
-  @Override
-  public R getResult()
-  {
-    return result;
-  }
-
-  @Override
-  public String toString()
-  {
-    var status = taskStatus != null ? taskStatus.name() : "UNSET";
-    return String.format("%s(%x, %s)", getClass().getSimpleName(), uid, status);
-  }
-}
diff --git a/src/jalview/ws2/actions/BaseTask.java b/src/jalview/ws2/actions/BaseTask.java
new file mode 100644 (file)
index 0000000..40f96b5
--- /dev/null
@@ -0,0 +1,303 @@
+package jalview.ws2.actions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import jalview.bin.Console;
+import jalview.util.ArrayUtils;
+import jalview.util.MathUtils;
+import jalview.ws.params.ArgumentI;
+import jalview.ws2.actions.api.TaskEventListener;
+import jalview.ws2.actions.api.TaskI;
+import jalview.ws2.api.Credentials;
+import jalview.ws2.api.JobStatus;
+import jalview.ws2.client.api.WebServiceClientI;
+import jalview.ws2.helpers.DelegateJobEventListener;
+import jalview.ws2.helpers.TaskEventSupport;
+
+import static java.lang.String.format;
+
+public abstract class BaseTask<T extends BaseJob, R> implements TaskI<R>
+{
+  protected final long uid = MathUtils.getUID();
+
+  protected final WebServiceClientI webClient;
+
+  protected final List<ArgumentI> args;
+
+  protected final Credentials credentials;
+
+  private final TaskEventSupport<R> eventHandler;
+
+  protected JobStatus status = JobStatus.CREATED;
+
+  protected List<T> jobs = Collections.emptyList();
+
+  protected R result = null;
+
+  protected Runnable cancelAction = () -> {
+  };
+
+  protected BaseTask(WebServiceClientI webClient, List<ArgumentI> args,
+      Credentials credentials)
+  {
+    this.webClient = webClient;
+    this.args = args;
+    this.credentials = credentials;
+    this.eventHandler = new TaskEventSupport<>(this);
+  }
+
+  @Override
+  public final long getUid()
+  {
+    return uid;
+  }
+
+  @Override
+  public final JobStatus getStatus()
+  {
+    return status;
+  }
+
+  @Override
+  public final List<? extends BaseJob> getSubJobs()
+  {
+    return jobs;
+  }
+
+  @Override
+  public final void addTaskEventListener(TaskEventListener<R> listener)
+  {
+    eventHandler.addListener(listener);
+  }
+
+  @Override
+  public final void removeTaskEventListener(TaskEventListener<R> listener)
+  {
+    eventHandler.addListener(listener);
+  }
+
+  @Override
+  public final R getResult()
+  {
+    return result;
+  }
+
+  @Override
+  public final void init() throws Exception
+  {
+    try
+    {
+      jobs = prepareJobs();
+    } catch (ServiceInputInvalidException e)
+    {
+      setStatus(JobStatus.INVALID);
+      eventHandler.fireTaskException(e);
+      throw e;
+    }
+    setStatus(JobStatus.READY);
+    eventHandler.fireTaskStarted(jobs);
+    var jobListener = new DelegateJobEventListener<>(eventHandler);
+    for (var job : jobs)
+      job.addPropertyChangeListener(jobListener);
+    submitJobs(jobs);
+  }
+
+  static final int MAX_SUBMIT_RETRY = 5;
+
+  protected final void submitJobs(List<T> jobs) throws IOException
+  {
+    var retryCounter = 0;
+    while (true)
+    {
+      try
+      {
+        submitJobs0(jobs);
+        setStatus(JobStatus.SUBMITTED);
+        break;
+      } catch (IOException e)
+      {
+        eventHandler.fireTaskException(e);
+        if (++retryCounter > MAX_SUBMIT_RETRY)
+        {
+          cancel();
+          setStatus(JobStatus.SERVER_ERROR);
+          throw e;
+        }
+      }
+    }
+  }
+
+  private final void submitJobs0(List<T> jobs) throws IOException
+  {
+    IOException exception = null;
+    for (BaseJob job : jobs)
+    {
+      if (job.getStatus() != JobStatus.READY || !job.isInputValid())
+        continue;
+      try
+      {
+        var jobRef = webClient.submit(job.getInputSequences(), args, credentials);
+        job.setServerJob(jobRef);
+        job.setStatus(JobStatus.SUBMITTED);
+      } catch (IOException e)
+      {
+        exception = e;
+      }
+    }
+    if (exception != null)
+      throw exception;
+  }
+
+  /**
+   * Poll all running jobs and update their status and logs. Polling is repeated
+   * periodically until this method return true when all jobs are done.
+   * 
+   * @return {@code true] if all jobs are done @throws IOException if server
+   *         error occurred
+   */
+  @Override
+  public final boolean poll() throws IOException
+  {
+    boolean allDone = true;
+    IOException exception = null;
+    for (BaseJob job : jobs)
+    {
+      if (job.isInputValid() && !job.getStatus().isDone())
+      {
+        var serverJob = job.getServerJob();
+        try
+        {
+          job.setStatus(webClient.getStatus(serverJob));
+          job.setLog(webClient.getLog(serverJob));
+          job.setErrorLog(webClient.getErrorLog(serverJob));
+        } catch (IOException e)
+        {
+          exception = e;
+        }
+      }
+      allDone &= job.isCompleted();
+    }
+    updateGlobalStatus();
+    if (exception != null)
+      throw exception;
+    return allDone;
+  }
+
+  @Override
+  public final void complete() throws IOException
+  {
+    for (var job : jobs)
+    {
+      if (!job.isCompleted())
+      {
+        // a fallback in case the executor decides to finish prematurely
+        cancelJob(job);
+        job.setStatus(JobStatus.SERVER_ERROR);
+      }
+    }
+    updateGlobalStatus();
+    try {
+      result = collectResult(jobs);
+      eventHandler.fireTaskCompleted(result);
+    }
+    catch (Exception e)
+    {
+      eventHandler.fireTaskException(e);
+      throw e;
+    }
+  }
+
+  /**
+   * Cancel all running jobs. Used in case of task failure to cleanup the
+   * resources or when the task has been cancelled.
+   */
+  @Override
+  public final void cancel()
+  {
+    cancelAction.run();
+    for (T job : jobs)
+    {
+      cancelJob(job);
+    }
+  }
+
+  private final void cancelJob(T job)
+  {
+    if (!job.isCompleted())
+    {
+      try
+      {
+        if (job.getServerJob() != null)
+          webClient.cancel(job.getServerJob());
+        job.setStatus(JobStatus.CANCELLED);
+      } catch (IOException e)
+      {
+        Console.error(format("failed to cancel job %s", job.getServerJob()), e);
+      }
+    }
+  }
+
+  protected final void setStatus(JobStatus status)
+  {
+    Objects.requireNonNull(status);
+    if (this.status != status)
+    {
+      this.status = status;
+      eventHandler.fireTaskStatusChanged(status);
+    }
+  }
+
+  protected abstract List<T> prepareJobs() throws ServiceInputInvalidException;
+
+  protected abstract R collectResult(List<T> jobs) throws IOException;
+
+  /**
+   * Update task status according to the overall status of its jobs. The rules
+   * of setting the status are following:
+   * <ul>
+   * <li>task is invalid if all jobs are invalid</li>
+   * <li>task is completed if all but invalid jobs are completed</li>
+   * <li>task is ready, submitted or queued if at least one job is ready,
+   * submitted or queued an none proceeded to the next stage excluding
+   * completed.</li>
+   * <li>task is running if at least one job is running and none are failed or
+   * cancelled</li>
+   * <li>task is cancelled if at least one job is cancelled and none failed</li>
+   * <li>task is failed or server error if at least one job is failed or server
+   * error</li>
+   * </ul>
+   */
+  protected final void updateGlobalStatus()
+  {
+    int precedence = -1;
+    for (BaseJob job : jobs)
+    {
+      JobStatus status = job.getStatus();
+      int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
+      if (precedence < jobPrecedence)
+        precedence = jobPrecedence;
+    }
+    if (precedence >= 0)
+    {
+      setStatus(JobStatus.statusPrecedence[precedence]);
+    }
+  }
+
+  /**
+   * Set the action that will be run when the {@link #cancel()} method is
+   * invoked. The action should typically stop the executor polling the task and
+   * release resources and threads running the task.
+   * 
+   * @param action
+   *          runnable to be executed when the task is cancelled
+   */
+  public void setCancelAction(Runnable action)
+  {
+    Objects.requireNonNull(action);
+    this.cancelAction = action;
+  }
+}
diff --git a/src/jalview/ws2/actions/PollingTaskExecutor.java b/src/jalview/ws2/actions/PollingTaskExecutor.java
new file mode 100644 (file)
index 0000000..e1775d4
--- /dev/null
@@ -0,0 +1,108 @@
+package jalview.ws2.actions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.WeakHashMap;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import jalview.ws2.actions.api.TaskI;
+
+public class PollingTaskExecutor
+{
+  private static final Map<ScheduledExecutorService, PollingTaskExecutor> executorPool =
+      Collections.synchronizedMap(new WeakHashMap<>());
+  
+  public static PollingTaskExecutor fromPool(ScheduledExecutorService executor)
+  {
+    return executorPool.computeIfAbsent(executor, PollingTaskExecutor::new);
+  }
+  
+  private final ScheduledExecutorService executor;
+
+  public PollingTaskExecutor(ScheduledExecutorService executor)
+  {
+    this.executor = executor;
+  }
+
+  public Future<?> submit(TaskI<?> task)
+  {
+    Objects.requireNonNull(task);
+    return executor.scheduleWithFixedDelay(
+        new TaskRunnable(task), 2, 2, TimeUnit.SECONDS);
+  }
+
+  private static class TaskRunnable implements Runnable
+  {
+    private final TaskI<?> task;
+
+    private static final int STAGE_INIT = 0;
+
+    private static final int STAGE_POLLING = 2;
+
+    private static final int STAGE_FINISHED = 3;
+
+    private static final int STAGE_STOPPED = 4;
+
+    private int stage = STAGE_INIT;
+
+    private static final int MAX_POLL_RETRY = 5;
+
+    private int pollRetryCount = 0;
+
+    private TaskRunnable(TaskI<?> task)
+    {
+      this.task = task;
+    }
+
+    @Override
+    public void run()
+    {
+      if (task.getStatus().isDone())
+      {
+        stage = STAGE_STOPPED;
+      }
+      if (stage == STAGE_INIT)
+      {
+        try
+        {
+          task.init();
+          stage = STAGE_POLLING;
+        } catch (Exception e)
+        {
+          stage = STAGE_STOPPED;
+          throw new CompletionException(e);
+        }
+      }
+      try
+      {
+        if (stage == STAGE_POLLING && task.poll())
+        {
+          stage = STAGE_FINISHED;
+        }
+        if (stage == STAGE_FINISHED)
+        {
+          task.complete();
+          stage = STAGE_STOPPED;
+        }
+      } catch (Exception e)
+      {
+        if (++pollRetryCount > MAX_POLL_RETRY || e instanceof RuntimeException)
+        {
+          task.cancel();
+          stage = STAGE_STOPPED;
+          throw new CompletionException(e);
+        }
+      }
+      if (stage == STAGE_STOPPED)
+      {
+        throw new CancellationException();
+      }
+    }
+  }
+}
index 6a0c4dd..60ca384 100644 (file)
@@ -25,7 +25,7 @@ import jalview.datamodel.HiddenColumns;
 import jalview.datamodel.Sequence;
 import jalview.datamodel.SequenceI;
 import jalview.ws.params.ArgumentI;
-import jalview.ws2.actions.AbstractPollableTask;
+import jalview.ws2.actions.BaseTask;
 import jalview.ws2.actions.ServiceInputInvalidException;
 import jalview.ws2.actions.api.TaskEventListener;
 import jalview.ws2.api.Credentials;
@@ -39,7 +39,7 @@ import jalview.ws2.client.api.AlignmentWebServiceClientI;
  * @author mmwarowny
  *
  */
-class AlignmentTask extends AbstractPollableTask<AlignmentJob, AlignmentResult>
+class AlignmentTask extends BaseTask<AlignmentJob, AlignmentResult>
 {
   /* task parameters set in the constructor */
   private final AlignmentWebServiceClientI client;
@@ -62,13 +62,12 @@ class AlignmentTask extends AbstractPollableTask<AlignmentJob, AlignmentResult>
 
   AlignmentTask(AlignmentWebServiceClientI client, AlignmentAction action,
       List<ArgumentI> args, Credentials credentials,
-      AlignmentView msa, AlignViewportI viewport, boolean submitGaps,
-      TaskEventListener<AlignmentResult> eventListener)
+      AlignViewportI viewport, boolean submitGaps)
   {
-    super(client, args, credentials, eventListener);
+    super(client, args, credentials);
     this.client = client;
     this.action = action;
-    this.msa = msa;
+    this.msa = viewport.getAlignmentView(true);
     this.viewport = viewport;
     this.submitGaps = submitGaps;
     this.currentView = viewport.getAlignment();
@@ -80,7 +79,7 @@ class AlignmentTask extends AbstractPollableTask<AlignmentJob, AlignmentResult>
   }
   
   @Override
-  protected List<AlignmentJob> prepare() throws ServiceInputInvalidException
+  protected List<AlignmentJob> prepareJobs() throws ServiceInputInvalidException
   { 
     Console.info(format("starting alignment service %s:%s",
         client.getClientName(), action.getName()));
@@ -107,7 +106,7 @@ class AlignmentTask extends AbstractPollableTask<AlignmentJob, AlignmentResult>
   }
 
   @Override
-  protected AlignmentResult done() throws IOException
+  protected AlignmentResult collectResult(List<AlignmentJob> jobs) throws IOException
   {
     IOException lastIOE = null;
     for (AlignmentJob job : jobs)
index 271d9ed..238439a 100644 (file)
@@ -30,8 +30,8 @@ import jalview.util.MathUtils;
 import jalview.util.Pair;
 import jalview.workers.AlignCalcWorker;
 import jalview.ws.params.ArgumentI;
-import jalview.ws2.actions.AbstractPollableTask;
 import jalview.ws2.actions.BaseJob;
+import jalview.ws2.actions.BaseTask;
 import jalview.ws2.actions.ServiceInputInvalidException;
 import jalview.ws2.actions.api.JobI;
 import jalview.ws2.actions.api.TaskEventListener;
@@ -43,287 +43,55 @@ import jalview.ws2.client.api.AnnotationWebServiceClientI;
 import jalview.ws2.helpers.DelegateJobEventListener;
 import jalview.ws2.helpers.TaskEventSupport;
 
-public class AnnotationTask implements TaskI<AnnotationResult>
+public class AnnotationTask extends BaseTask<AnnotationJob, AnnotationResult>
 {
-  private final long uid = MathUtils.getUID();
-
   private AnnotationWebServiceClientI client;
 
   private final AnnotationAction action;
 
-  private final List<ArgumentI> args;
-
-  private final Credentials credentials;
-
   private final AlignViewportI viewport;
 
-  private final TaskEventSupport<AnnotationResult> eventHandler;
-
   private JobStatus taskStatus = null;
 
   private AlignCalcWorkerAdapter worker = null;
 
-  private List<AnnotationJob> jobs = Collections.emptyList();
-
-  private AnnotationResult result = null;
-
   private DelegateJobEventListener<AnnotationResult> jobEventHandler;
 
-  private class AlignCalcWorkerAdapter extends AlignCalcWorker
-      implements PollableAlignCalcWorkerI
-  {
-    private boolean restarting = false;
-
-    AlignCalcWorkerAdapter(AlignCalcManagerI2 calcMan)
-    {
-      super(viewport, null);
-      this.calcMan = calcMan;
-    }
-
-    String getServiceName()
-    {
-      return action.getWebService().getName();
-    }
-
-    @Override
-    public void startUp() throws Throwable
-    {
-      if (alignViewport.isClosed())
-      {
-        stop();
-        throw new IllegalStateException("Starting annotation for closed viewport");
-      }
-      if (restarting)
-        eventHandler.fireTaskRestarted();
-      else
-        restarting = true;
-      jobs = Collections.emptyList();
-      try
-      {
-        jobs = prepare();
-      } catch (ServiceInputInvalidException e)
-      {
-        setStatus(JobStatus.INVALID);
-        eventHandler.fireTaskException(e);
-        throw e;
-      }
-      setStatus(JobStatus.READY);
-      eventHandler.fireTaskStarted(jobs);
-      for (var job : jobs)
-      {
-        job.addPropertyChagneListener(jobEventHandler);
-      }
-      try
-      {
-        startJobs();
-      } catch (IOException e)
-      {
-        eventHandler.fireTaskException(e);
-        cancelJobs();
-        setStatus(JobStatus.SERVER_ERROR);
-        throw e;
-      }
-      setStatus(JobStatus.SUBMITTED);
-    }
-
-    @Override
-    public boolean poll() throws Throwable
-    {
-      boolean done = AnnotationTask.this.poll();
-      updateGlobalStatus();
-      if (done)
-      {
-        retrieveAndProcessResult();
-        eventHandler.fireTaskCompleted(result);
-      }
-      return done;
-    }
-
-    private void retrieveAndProcessResult() throws IOException
-    {
-      result = retrieveResult();
-      updateOurAnnots(result.annotations);
-      if (result.transferFeatures)
-      {
-        final var featureColours = result.featureColours;
-        final var featureFilters = result.featureFilters;
-        viewport.applyFeaturesStyle(new FeatureSettingsAdapter()
-        {
-          @Override
-          public FeatureColourI getFeatureColour(String type)
-          {
-            return featureColours.get(type);
-          }
-
-          @Override
-          public FeatureMatcherSetI getFeatureFilters(String type)
-          {
-            return featureFilters.get(type);
-          }
-
-          @Override
-          public boolean isFeatureDisplayed(String type)
-          {
-            return featureColours.containsKey(type);
-          }
-        });
-      }
-    }
-
-    @Override
-    public void updateAnnotation()
-    {
-      var job = jobs.size() > 0 ? jobs.get(0) : null;
-      if (!calcMan.isWorking(this) && job != null)
-      {
-        var ret = updateResultAnnotation(job, job.returnedAnnotations);
-        updateOurAnnots(ret.get0());
-      }
-    }
-
-    private void updateOurAnnots(List<AlignmentAnnotation> newAnnots)
-    {
-      List<AlignmentAnnotation> oldAnnots = ourAnnots != null ? ourAnnots : Collections.emptyList();
-      ourAnnots = newAnnots;
-      AlignmentI alignment = viewport.getAlignment();
-      for (AlignmentAnnotation an : oldAnnots)
-      {
-        if (!newAnnots.contains(an))
-        {
-          alignment.deleteAnnotation(an);
-        }
-      }
-      oldAnnots.clear();
-      for (AlignmentAnnotation an : ourAnnots)
-      {
-        viewport.getAlignment().validateAnnotation(an);
-      }
-    }
-
-    @Override
-    public void cancel()
-    {
-      cancelJobs();
-    }
-
-    void stop()
-    {
-      calcMan.disableWorker(this);
-      super.abortAndDestroy();
-    }
-
-    @Override
-    public void done()
-    {
-      for (var job : jobs)
-      {
-        if (job.isInputValid() && !job.isCompleted())
-        {
-          /* if done was called but job is not completed then it
-           * must have been stopped by an exception */
-          job.setStatus(JobStatus.SERVER_ERROR);
-        }
-      }
-      updateGlobalStatus();
-      // dispose of unfinished jobs just in case
-      cancelJobs();
-    }
-
-    @Override
-    public String toString()
-    {
-      return AnnotationTask.this.toString() + "$AlignCalcWorker@"
-          + Integer.toHexString(hashCode());
-    }
-  }
-
   public AnnotationTask(AnnotationWebServiceClientI client,
       AnnotationAction action, List<ArgumentI> args, Credentials credentials,
-      AlignViewportI viewport,
-      TaskEventListener<AnnotationResult> eventListener)
+      AlignViewportI viewport)
   {
+    super(client, args, credentials);
     this.client = client;
     this.action = action;
-    this.args = args;
-    this.credentials = credentials;
     this.viewport = viewport;
-    this.eventHandler = new TaskEventSupport<>(this, eventListener);
-    this.jobEventHandler = new DelegateJobEventListener<>(this.eventHandler);
-  }
-
-  @Override
-  public long getUid()
-  {
-    return uid;
   }
 
-  public void start(AlignCalcManagerI2 calcManager)
-  {
-    if (this.worker != null)
-      throw new IllegalStateException("task already started");
-    this.worker = new AlignCalcWorkerAdapter(calcManager);
-    if (taskStatus != JobStatus.CANCELLED)
-    {
-      List<AlignCalcWorkerI> oldWorkers = calcManager.getWorkersOfClass(
-          AlignCalcWorkerAdapter.class);
-      for (var worker : oldWorkers)
-      {
-        if (action.getWebService().getName().equalsIgnoreCase(
-            ((AlignCalcWorkerAdapter) worker).getServiceName()))
-        {
-          // remove interactive workers for the same service.
-          calcManager.removeWorker(worker);
-          calcManager.cancelWorker(worker);
-        }
-      }
-      if (action.getWebService().isInteractive())
-        calcManager.registerWorker(worker);
-      else
-        calcManager.startWorker(worker);
-    }
-  }
-
-  /*
-   * The following methods are mostly copied from the {@link AbstractPollableTask}
-   * TODO: move common functionality to a base class
-   */
-  @Override
-  public JobStatus getStatus()
-  {
-    return taskStatus;
-  }
-
-  private void setStatus(JobStatus status)
-  {
-    if (this.taskStatus != status)
-    {
-      Console.debug(String.format("%s status change to %s", this, status.name()));
-      this.taskStatus = status;
-      eventHandler.fireTaskStatusChanged(status);
-    }
-  }
-
-  private void updateGlobalStatus()
-  {
-    int precedence = -1;
-    for (BaseJob job : jobs)
-    {
-      JobStatus status = job.getStatus();
-      int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
-      if (precedence < jobPrecedence)
-        precedence = jobPrecedence;
-    }
-    if (precedence >= 0)
-    {
-      setStatus(JobStatus.statusPrecedence[precedence]);
-    }
-  }
-
-  @Override
-  public List<? extends JobI> getSubJobs()
-  {
-    return jobs;
-  }
+  // public void start(AlignCalcManagerI2 calcManager)
+  // {
+  // if (this.worker != null)
+  // throw new IllegalStateException("task already started");
+  // this.worker = new AlignCalcWorkerAdapter(calcManager);
+  // if (taskStatus != JobStatus.CANCELLED)
+  // {
+  // List<AlignCalcWorkerI> oldWorkers = calcManager.getWorkersOfClass(
+  // AlignCalcWorkerAdapter.class);
+  // for (var worker : oldWorkers)
+  // {
+  // if (action.getWebService().getName().equalsIgnoreCase(
+  // ((AlignCalcWorkerAdapter) worker).getServiceName()))
+  // {
+  // // remove interactive workers for the same service.
+  // calcManager.removeWorker(worker);
+  // calcManager.cancelWorker(worker);
+  // }
+  // }
+  // if (action.getWebService().isInteractive())
+  // calcManager.registerWorker(worker);
+  // else
+  // calcManager.startWorker(worker);
+  // }
+  // }
 
   /**
    * Create and return a list of annotation jobs from the current state of the
@@ -334,7 +102,8 @@ public class AnnotationTask implements TaskI<AnnotationResult>
    * @throws ServiceInputInvalidException
    *           input data is not valid
    */
-  private List<AnnotationJob> prepare() throws ServiceInputInvalidException
+  @Override
+  public List<AnnotationJob> prepareJobs() throws ServiceInputInvalidException
   {
     AlignmentI alignment = viewport.getAlignment();
     if (alignment == null || alignment.getWidth() <= 0 ||
@@ -366,38 +135,8 @@ public class AnnotationTask implements TaskI<AnnotationResult>
     return List.of(job);
   }
 
-  private void startJobs() throws IOException
-  {
-    for (BaseJob job : jobs)
-    {
-      if (job.isInputValid() && job.getStatus() == JobStatus.READY)
-      {
-        var serverJob = client.submit(job.getInputSequences(),
-            args, credentials);
-        job.setServerJob(serverJob);
-        job.setStatus(JobStatus.SUBMITTED);
-      }
-    }
-  }
-
-  private boolean poll() throws IOException
-  {
-    boolean allDone = true;
-    for (BaseJob job : jobs)
-    {
-      if (job.isInputValid() && !job.getStatus().isDone())
-      {
-        WebServiceJobHandle serverJob = job.getServerJob();
-        job.setStatus(client.getStatus(serverJob));
-        job.setLog(client.getLog(serverJob));
-        job.setErrorLog(client.getErrorLog(serverJob));
-      }
-      allDone &= job.isCompleted();
-    }
-    return allDone;
-  }
-
-  private AnnotationResult retrieveResult() throws IOException
+  @Override
+  protected AnnotationResult collectResult(List<AnnotationJob> jobs) throws IOException
   {
     final Map<String, FeatureColourI> featureColours = new HashMap<>();
     final Map<String, FeatureMatcherSetI> featureFilters = new HashMap<>();
@@ -410,173 +149,129 @@ public class AnnotationTask implements TaskI<AnnotationResult>
      * sequence, excluding regions not annotated due to gapMap/column
      * visibility */
 
-    // update calcId if it is not already set on returned annotation
-    for (AlignmentAnnotation annot : returnedAnnot)
+    udpateCalcId(returnedAnnot);
+    int graphGroup = getNextGraphGroup(viewport.getAlignment());
+    shiftGraphGroup(returnedAnnot, graphGroup);
+    List<AlignmentAnnotation> annotations = new ArrayList<>();
+    for (AlignmentAnnotation ala : returnedAnnot)
     {
-      if (annot.getCalcId() == null || annot.getCalcId().isEmpty())
-      {
-        annot.setCalcId(action.getFullName());
-      }
-      annot.autoCalculated = action.isAlignmentAnalysis() &&
-          action.getWebService().isInteractive();
-    }
-    job.returnedAnnotations = returnedAnnot;
-    job.featureColours = featureColours;
-    job.featureFilters = featureFilters;
-    var ret = updateResultAnnotation(job, returnedAnnot);
-    var annotations = ret.get0();
-    var transferFeatures = ret.get1();
-    return new AnnotationResult(annotations, transferFeatures, featureColours,
-        featureFilters);
-  }
-
-  private Pair<List<AlignmentAnnotation>, Boolean> updateResultAnnotation(
-      AnnotationJob job, List<AlignmentAnnotation> annotations)
-  {
-    List<AlignmentAnnotation> newAnnots = new ArrayList<>();
-    // update graphGroup for all annotation
-    /* find a graphGroup greater than any existing one, could be moved
-     * to Alignment#getNewGraphGroup() - returns next unused graph group */
-    int graphGroup = 1;
-    if (viewport.getAlignment().getAlignmentAnnotation() != null)
-    {
-      for (var ala : viewport.getAlignment().getAlignmentAnnotation())
-      {
-        graphGroup = Math.max(graphGroup, ala.graphGroup);
-      }
-    }
-    // update graphGroup in the annotation rows returned form service'
-    /* TODO: look at sequence annotation rows and update graph groups in the
-     * case of reference annotation */
-    for (AlignmentAnnotation ala : annotations)
-    {
-      if (ala.graphGroup > 0)
-        ala.graphGroup += graphGroup;
-      SequenceI aseq = null;
-      // transfer sequence refs and adjust gapMap
-      if (ala.sequenceRef != null)
-      {
-        aseq = job.seqNames.get(ala.sequenceRef.getName());
-      }
+      SequenceI seq = job.seqNames.get(ala.sequenceRef.getName());
+      SequenceI aseq = getRootDatasetSequence(seq);
+      Annotation[] gappedAnnots = createGappedAnnotations(ala.annotations, job.start, job.gapMap);
       ala.sequenceRef = aseq;
-
-      Annotation[] resAnnot = ala.annotations;
-      boolean[] gapMap = job.gapMap;
-      Annotation[] gappedAnnot = new Annotation[Math.max(
-          viewport.getAlignment().getWidth(), gapMap.length)];
-      for (int p = 0, ap = job.start; ap < gappedAnnot.length; ap++)
-      {
-        if (gapMap.length > ap && !gapMap[ap])
-          gappedAnnot[ap] = new Annotation("", "", ' ', Float.NaN);
-        else if (p < resAnnot.length)
-          gappedAnnot[ap] = resAnnot[p++];
-        // is this loop exhaustive of resAnnot?
-      }
-      ala.annotations = gappedAnnot;
+      ala.annotations = gappedAnnots;
 
       AlignmentAnnotation newAnnot = viewport.getAlignment()
           .updateFromOrCopyAnnotation(ala);
-      if (aseq != null)
+      if (aseq != null) // I suspect it's always true
       {
         aseq.addAlignmentAnnotation(newAnnot);
         newAnnot.adjustForAlignment();
         AlignmentAnnotationUtils.replaceAnnotationOnAlignmentWith(
             newAnnot, newAnnot.label, newAnnot.getCalcId());
       }
-      newAnnots.add(newAnnot);
+      annotations.add(newAnnot);
     }
 
-    boolean transferFeatures = false;
+    boolean hasFeatures = false;
     for (SequenceI sq : job.getInputSequences())
     {
-      if (!sq.getFeatures().hasFeatures() &&
-          (sq.getDBRefs() == null || sq.getDBRefs().size() == 0))
+      if (!sq.getFeatures().hasFeatures() && (sq.getDBRefs() == null || sq.getDBRefs().isEmpty()))
         continue;
-      transferFeatures = true;
+      hasFeatures = true;
       SequenceI seq = job.seqNames.get(sq.getName());
-      SequenceI dseq;
-      int start = job.start, end = job.end;
-      boolean[] gapMap = job.gapMap;
-      ContiguousI seqRange = seq.findPositions(start, end);
-      while ((dseq = seq).getDatasetSequence() != null)
-      {
-        seq = seq.getDatasetSequence();
-      }
-      List<ContiguousI> sourceRange = new ArrayList<>();
-      if (gapMap.length >= end)
-      {
-        int lastcol = start, col = start;
-        do
-        {
-          if (col == end || !gapMap[col])
-          {
-            if (lastcol <= (col - 1))
-            {
-              seqRange = seq.findPositions(lastcol, col);
-              sourceRange.add(seqRange);
-            }
-            lastcol = col + 1;
-          }
-        } while (col++ < end);
-      }
-      else
-      {
-        sourceRange.add(seq.findPositions(start, end));
-      }
+      SequenceI datasetSeq = getRootDatasetSequence(seq);
+      List<ContiguousI> sourceRange = findContiguousRanges(datasetSeq, job.gapMap, job.start, job.end);
+      int[] sourceStartEnd = ContiguousI.toStartEndArray(sourceRange);
+      Mapping mp = new Mapping(new MapList(
+          sourceStartEnd, new int[]
+          { datasetSeq.getStart(), datasetSeq.getEnd() }, 1, 1));
+      datasetSeq.transferAnnotation(sq, mp);
+    }
+
+    return new AnnotationResult(annotations, hasFeatures, featureColours, featureFilters);
+  }
 
-      int i = 0;
-      int sourceStartEnd[] = new int[sourceRange.size() * 2];
-      for (ContiguousI range : sourceRange)
+  /**
+   * Updates calcId on provided annotations if not already set.
+   */
+  public void udpateCalcId(Iterable<AlignmentAnnotation> annotations)
+  {
+    for (var annotation : annotations)
+    {
+      if (annotation.getCalcId() == null || annotation.getCalcId().isEmpty())
       {
-        sourceStartEnd[i++] = range.getBegin();
-        sourceStartEnd[i++] = range.getEnd();
+        annotation.setCalcId(action.getFullName());
       }
-      Mapping mp = new Mapping(new MapList(
-          sourceStartEnd, new int[]
-          { seq.getStart(), seq.getEnd() }, 1, 1));
-      dseq.transferAnnotation(sq, mp);
+      annotation.autoCalculated = action.isAlignmentAnalysis() &&
+          action.getWebService().isInteractive();
     }
+  }
 
-    return new Pair<>(newAnnots, transferFeatures);
+  private static int getNextGraphGroup(AlignmentI alignment)
+  {
+    if (alignment == null || alignment.getAlignmentAnnotation() == null)
+      return 1;
+    int graphGroup = 1;
+    for (AlignmentAnnotation ala : alignment.getAlignmentAnnotation())
+      graphGroup = Math.max(graphGroup, ala.graphGroup);
+    return graphGroup;
   }
 
-  @Override
-  public AnnotationResult getResult()
+  private static void shiftGraphGroup(Iterable<AlignmentAnnotation> annotations, int shift)
   {
-    return result;
+    for (AlignmentAnnotation ala : annotations)
+    {
+      if (ala.graphGroup > 0)
+      {
+        ala.graphGroup += shift;
+      }
+    }
   }
 
-  @Override
-  public void cancel()
+  private static SequenceI getRootDatasetSequence(SequenceI sequence)
   {
-    setStatus(JobStatus.CANCELLED);
-    if (worker != null)
+    while (sequence.getDatasetSequence() != null)
     {
-      worker.stop();
+      sequence = sequence.getDatasetSequence();
     }
-    cancelJobs();
+    return sequence;
   }
 
-  public void cancelJobs()
+  private Annotation[] createGappedAnnotations(Annotation[] annotations, int start, boolean[] gapMap)
   {
-    for (BaseJob job : jobs)
+    var size = Math.max(viewport.getAlignment().getWidth(), gapMap.length);
+    Annotation[] gappedAnnotations = new Annotation[size];
+    for (int p = 0, ap = start; ap < size; ap++)
     {
-      if (!job.isCompleted())
+      if (gapMap != null && gapMap.length > ap && !gapMap[ap])
       {
-        try
-        {
-          if (job.getServerJob() != null)
-          {
-            client.cancel(job.getServerJob());
-          }
-          job.setStatus(JobStatus.CANCELLED);
-        } catch (IOException e)
-        {
-          Console.error(String.format(
-              "failed to cancel job %s", job.getServerJob()), e);
-        }
+        gappedAnnotations[ap] = new Annotation("", "", ' ', Float.NaN);
+      }
+      else if (p < annotations.length)
+      {
+        gappedAnnotations[ap] = annotations[p++];
       }
     }
+    return gappedAnnotations;
+  }
+
+  private List<ContiguousI> findContiguousRanges(SequenceI seq, boolean[] gapMap, int start, int end)
+  {
+    if (gapMap == null || gapMap.length < end)
+      return List.of(seq.findPositions(start, end));
+    List<ContiguousI> ranges = new ArrayList<>();
+    int lastcol = start, col = start;
+    do
+    {
+      if (col == end || !gapMap[col])
+      {
+        if (lastcol < col)
+          ranges.add(seq.findPositions(lastcol, col));
+        lastcol = col + 1;
+      }
+    } while (++col <= end);
+    return ranges;
   }
 
   @Override
index cb84944..5f4d575 100644 (file)
@@ -38,6 +38,10 @@ public interface TaskI<T>
    */
   List<? extends JobI> getSubJobs();
 
+  void addTaskEventListener(TaskEventListener<T> listener);
+
+  void removeTaskEventListener(TaskEventListener<T> listener);
+
   /**
    * Get the last result of the task or {@code null} if not present. Note that
    * the result is subject to change for restartable tasks.
@@ -46,6 +50,12 @@ public interface TaskI<T>
    */
   T getResult();
 
+  public void init() throws Exception;
+
+  public boolean poll() throws Exception;
+
+  public void complete() throws Exception;
+
   /**
    * Cancel the task, stop all sub-jobs running on a server and stop all threads
    * managing this task.