JAL-3878 Create pollable task and executor it can be run with.
authorMateusz Warowny <mmzwarowny@dundee.ac.uk>
Fri, 19 Nov 2021 15:36:02 +0000 (16:36 +0100)
committerMateusz Warowny <mmzwarowny@dundee.ac.uk>
Mon, 22 Nov 2021 13:59:12 +0000 (14:59 +0100)
src/jalview/viewmodel/AlignmentViewport.java
src/jalview/ws2/PollableTaskI.java [new file with mode: 0644]
src/jalview/ws2/PollableTaskListenerI.java [new file with mode: 0644]
src/jalview/ws2/PollingTaskExecutor.java [new file with mode: 0644]

index 1a08b1b..0cae401 100644 (file)
@@ -62,6 +62,7 @@ import jalview.workers.ComplementConsensusThread;
 import jalview.workers.ConsensusThread;
 import jalview.workers.InformationThread;
 import jalview.workers.StrucConsensusThread;
+import jalview.ws2.PollingTaskExecutor;
 
 import java.awt.Color;
 import java.beans.PropertyChangeSupport;
@@ -994,6 +995,13 @@ public abstract class AlignmentViewport
     }
     return false;
   }
+  
+  private PollingTaskExecutor wsExecutor = new PollingTaskExecutor();
+  
+  public PollingTaskExecutor getWSExecutor()
+  {
+    return wsExecutor;
+  }
 
   public void setAlignment(AlignmentI align)
   {
@@ -1024,6 +1032,8 @@ public abstract class AlignmentViewport
     gapcounts = null;
     calculator.shutdown();
     calculator = null;
+    wsExecutor.shutdown();
+    wsExecutor = null;
     residueShading = null; // may hold a reference to Consensus
     changeSupport = null;
     ranges = null;
diff --git a/src/jalview/ws2/PollableTaskI.java b/src/jalview/ws2/PollableTaskI.java
new file mode 100644 (file)
index 0000000..d097974
--- /dev/null
@@ -0,0 +1,51 @@
+package jalview.ws2;
+
+/**
+ * The {@code PollableTaskI} interface should be implemented by classes
+ * representing a background task that must be polled repeatedly to check for
+ * completion. Those are typically jobs that run on a remote host and need to be
+ * periodically checked for status updates.
+ * 
+ * The life-cycle of a task consist of calling {@link #start} method once to
+ * start the process, followed by repeated calls to {@link #poll} that should
+ * check for execution status and finally {@link #done} method that finalizes
+ * the process.
+ * 
+ * The instances can be started with {@link PollingTaskExecutor} which manages
+ * start up, polling and finalization of the task using a thread executor.
+ * 
+ * @author mmwarowny
+ *
+ */
+public interface PollableTaskI
+{
+  /**
+   * Called by the executor once and the beginning to start the task. May throw
+   * any exception, in such case the task will be interrupted.
+   * 
+   * @throws Exception
+   */
+  void start() throws Exception;
+
+  /**
+   * Called repeatedly by the executor to check for task completion. The
+   * implementation should check the remote host for job status updates and
+   * return true when the task is finished. If any exception is thrown, the task
+   * is interrupted.
+   * 
+   * @return whether the task is done
+   * @throws Exception
+   */
+  boolean poll() throws Exception;
+
+  /**
+   * @return whether the task is done
+   */
+  boolean isDone();
+
+  /**
+   * Called once the task is done running ({@link #poll} returned true) to
+   * finalize the task and collect the results.
+   */
+  void done();
+}
diff --git a/src/jalview/ws2/PollableTaskListenerI.java b/src/jalview/ws2/PollableTaskListenerI.java
new file mode 100644 (file)
index 0000000..d69b2d9
--- /dev/null
@@ -0,0 +1,57 @@
+package jalview.ws2;
+
+/**
+ * Classes listening to the pollable task events must implement
+ * {@link PollableTaskListenerI}. They can be added to the
+ * {@link PollingTaskExecutor} to respond to the task execution events.
+ * 
+ * @author mmwarowny
+ *
+ */
+public interface PollableTaskListenerI
+{
+  /**
+   * Called when a new task is submitted for execution after its
+   * {@link PollableTask#start} method was called successfully.
+   * 
+   * @param task
+   *          submitted task
+   */
+  public void submitted(PollableTaskI task);
+
+  /**
+   * Called when a new task failed to start and raised an uncaught exception.
+   * 
+   * @param task
+   *          task that failed
+   * @param e
+   *          raised exception
+   */
+  public void submissionFailed(PollableTaskI task, Exception e);
+
+  /**
+   * Called when polling resulted in an uncaught exception.
+   * 
+   * @param task
+   *          task that failed
+   * @param e
+   *          raised exception
+   */
+  public void pollFailed(PollableTaskI task, Exception e);
+
+  /**
+   * Called when a task is cancelled.
+   * 
+   * @param task
+   *          cancelled task
+   */
+  public void cancelled(PollableTaskI task);
+
+  /**
+   * Called when the task finished execution either successfully or not.
+   * 
+   * @param task
+   *          finished task
+   */
+  public void done(PollableTaskI task);
+}
diff --git a/src/jalview/ws2/PollingTaskExecutor.java b/src/jalview/ws2/PollingTaskExecutor.java
new file mode 100644 (file)
index 0000000..c003e49
--- /dev/null
@@ -0,0 +1,164 @@
+package jalview.ws2;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import jalview.bin.Cache;
+
+/**
+ * An object that executes submitted {@link PollableTaskI} tasks using
+ * {@link SchedulekExecutorservice}. The task is first started using its
+ * {@link PollableTaskI#start} method and then repeatedly polled every second
+ * with {@link PollableTaskI#poll}.
+ * 
+ * The {@link PollingTaskExecutor} automates the process of running tasks and
+ * provides convenient interface to listen to events associated with task
+ * execution.
+ * 
+ * @author mmwarowny
+ *
+ */
+public class PollingTaskExecutor
+{
+  private ScheduledExecutorService executor = Executors
+      .newSingleThreadScheduledExecutor();
+
+  /**
+   * Submit the task for execution. Calls task's {@code start} method and, if
+   * started successfully, schedules next poll after one second.
+   * 
+   * @param task
+   *          task to submit
+   */
+  public void submit(final PollableTaskI task)
+  {
+    executor.submit(() -> {
+      try
+      {
+        task.start();
+        wsThreadSupport.submitted(task);
+      } catch (Exception e)
+      {
+        Cache.log.error("Failed to submit web service jobs.", e);
+        wsThreadSupport.submissionFailed(task, e);
+        return;
+      }
+      executor.schedule(() -> poll(task), 1, TimeUnit.SECONDS);
+    });
+  }
+
+  /**
+   * Poll the task by calling it's {@code poll} method. If not finished, the
+   * next poll is scheduled to happen after one second, otherwise task's
+   * {@code done} method is called immediately.
+   * 
+   * @param task
+   *          task to poll
+   */
+  private void poll(PollableTaskI task)
+  {
+    boolean done;
+    try
+    {
+      done = task.poll();
+    } catch (Exception e)
+    {
+      Cache.log.error("Failed to poll task.", e);
+      wsThreadSupport.pollFailed(task, e);
+      return;
+    }
+    if (!done)
+    {
+      executor.schedule(() -> poll(task), 1, TimeUnit.SECONDS);
+    }
+    else
+    {
+      task.done();
+      wsThreadSupport.done(task);
+    }
+  }
+
+  private WebServiceThreadSupport wsThreadSupport = new WebServiceThreadSupport();
+
+  /**
+   * Add listener of the task related events.
+   * 
+   * @param listener
+   *          listener to add
+   */
+  public void addThreadListener(PollableTaskListenerI listener)
+  {
+    wsThreadSupport.addListener(listener);
+  }
+
+  /**
+   * @param listener
+   *          listener to be removed
+   */
+  public void removeThreadListener(PollableTaskListenerI listener)
+  {
+    wsThreadSupport.removeListener(listener);
+  }
+
+  public void shutdown()
+  {
+    executor.shutdownNow();
+  }
+
+}
+
+class WebServiceThreadSupport implements PollableTaskListenerI
+{
+  List<PollableTaskListenerI> listeners = new CopyOnWriteArrayList<>();
+
+  @Override
+  public void submitted(PollableTaskI task)
+  {
+    for (var listener : listeners)
+      listener.submitted(task);
+  }
+
+  @Override
+  public void submissionFailed(PollableTaskI task, Exception e)
+  {
+    for (var listener : listeners)
+      listener.submissionFailed(task, e);
+  }
+
+  @Override
+  public void pollFailed(PollableTaskI task, Exception e)
+  {
+    for (var listener : listeners)
+      listener.pollFailed(task, e);
+  }
+
+  @Override
+  public void cancelled(PollableTaskI task)
+  {
+    for (var listener : listeners)
+      listener.cancelled(task);
+  }
+
+  @Override
+  public void done(PollableTaskI task)
+  {
+    for (var listener : listeners)
+      listener.done(task);
+  }
+
+  public void addListener(PollableTaskListenerI listener)
+  {
+    if (!listeners.contains(listener))
+    {
+      listeners.add(listener);
+    }
+  }
+
+  public void removeListener(PollableTaskListenerI listener)
+  {
+    listeners.remove(listener);
+  }
+}