JAL-3878 Create abstract workers and their utility classes.
authorMateusz Warowny <mmzwarowny@dundee.ac.uk>
Fri, 19 Nov 2021 16:51:40 +0000 (17:51 +0100)
committerMateusz Warowny <mmzwarowny@dundee.ac.uk>
Mon, 22 Nov 2021 13:59:22 +0000 (14:59 +0100)
src/jalview/ws2/operations/AbstractOperation.java
src/jalview/ws2/operations/AbstractPollableWorker.java [new file with mode: 0644]
src/jalview/ws2/operations/AbstractWorker.java [new file with mode: 0644]
src/jalview/ws2/operations/Operation.java
src/jalview/ws2/operations/WSJobList.java [new file with mode: 0644]
src/jalview/ws2/operations/WebServiceWorkerI.java [new file with mode: 0644]
src/jalview/ws2/operations/WebServiceWorkerListener.java [new file with mode: 0644]
src/jalview/ws2/operations/WebServiceWorkerListenersList.java [new file with mode: 0644]

index 36a86e3..f3a6e16 100644 (file)
@@ -32,6 +32,12 @@ public abstract class AbstractOperation implements Operation
   }
 
   @Override
+  public WebServiceI getWebService()
+  {
+    return service;
+  }
+
+  @Override
   public String getName()
   {
     return service.getName();
diff --git a/src/jalview/ws2/operations/AbstractPollableWorker.java b/src/jalview/ws2/operations/AbstractPollableWorker.java
new file mode 100644 (file)
index 0000000..973199e
--- /dev/null
@@ -0,0 +1,75 @@
+package jalview.ws2.operations;
+
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import jalview.bin.Cache;
+import jalview.ws2.PollableTaskI;
+import jalview.ws2.WSJob;
+import jalview.ws2.WSJobStatus;
+
+public abstract class AbstractPollableWorker
+    extends AbstractWorker implements PollableTaskI
+{
+
+  private Map<Long, Integer> exceptionCount = new HashMap<>();
+
+  private static final int MAX_RETRY = 5;
+
+  @Override
+  public boolean poll()
+  {
+    boolean done = true;
+    for (WSJob job : getJobs())
+    {
+      if (!job.getStatus().isDone() && !job.getStatus().isFailed())
+      {
+        Cache.log.debug(format("Polling job %s.", job));
+        try
+        {
+          getOperation().getWebService().updateProgress(job);
+          exceptionCount.remove(job.getUid());
+        } catch (IOException e)
+        {
+          Cache.log.error(format("Polling job %s failed.", job), e);
+          listeners.firePollException(job, e);
+          int count = exceptionCount.getOrDefault(job.getUid(),
+              MAX_RETRY);
+          if (--count <= 0)
+          {
+            job.setStatus(WSJobStatus.SERVER_ERROR);
+            Cache.log.warn(format(
+                "Attempts limit exceeded. Droping job %s.", job));
+          }
+          exceptionCount.put(job.getUid(), count);
+        } catch (OutOfMemoryError e)
+        {
+          job.setStatus(WSJobStatus.BROKEN);
+          Cache.log.error(
+              format("Out of memory when retrieving job %s", job), e);
+        }
+        Cache.log.debug(
+            format("Job %s status is %s", job, job.getStatus()));
+      }
+      done &= job.getStatus().isDone() || job.getStatus().isFailed();
+    }
+    return done;
+  }
+
+  @Override
+  public boolean isDone()
+  {
+    if (getJobs().size() == 0)
+      return false;
+    for (WSJob job : getJobs())
+    {
+      if (!job.getStatus().isDone() && !job.getStatus().isFailed())
+        return false;
+    }
+    return true;
+  }
+
+}
diff --git a/src/jalview/ws2/operations/AbstractWorker.java b/src/jalview/ws2/operations/AbstractWorker.java
new file mode 100644 (file)
index 0000000..926b7ce
--- /dev/null
@@ -0,0 +1,20 @@
+package jalview.ws2.operations;
+
+import jalview.util.MathUtils;
+import jalview.ws2.WSJob;
+
+public abstract class AbstractWorker implements WebServiceWorkerI
+{
+  protected long uid = MathUtils.getUID();
+  public long getUID()
+  {
+    return uid;
+  }
+  
+  protected WebServiceWorkerListenersList listeners = new WebServiceWorkerListenersList(this);
+
+  public void addListener(WebServiceWorkerListener listener)
+  {
+    listeners.addListener(listener);
+  }
+}
index 8b432ba..5cf2cb5 100644 (file)
@@ -1,6 +1,7 @@
 package jalview.ws2.operations;
 
 import jalview.ws.params.ParamDatastoreI;
+import jalview.ws2.WebServiceI;
 import jalview.ws2.gui.MenuEntryProviderI;
 
 /**
@@ -18,6 +19,13 @@ import jalview.ws2.gui.MenuEntryProviderI;
 public interface Operation
 {
   /**
+   * Get the web service instance used to communicate with the web client.
+   * 
+   * @return web service client instance
+   */
+  public WebServiceI getWebService();
+
+  /**
    * Get the name of the operation. Typically fetched from the server.
    * 
    * @return operation name
diff --git a/src/jalview/ws2/operations/WSJobList.java b/src/jalview/ws2/operations/WSJobList.java
new file mode 100644 (file)
index 0000000..76bc015
--- /dev/null
@@ -0,0 +1,64 @@
+package jalview.ws2.operations;
+
+import java.util.ArrayList;
+
+import jalview.ws2.WSJob;
+
+public class WSJobList<T extends WSJob> extends ArrayList<T>
+{
+  private static final long serialVersionUID = -1684855135603987602L;
+
+  public int countSubmitted() {
+    int count = 0;
+    for (WSJob job : this) {
+      if (job.getStatus().isSubmitted()) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  public int countQueuing() {
+    int count = 0;
+    for (WSJob job : this) {
+      if (job.getStatus().isQueuing()) count++;
+    }
+    return count;
+  }
+
+  public int countRunning() {
+    int count = 0;
+    for (WSJob job : this) {
+      if (job.getStatus().isRunning()) count++;
+    }
+    return count;
+  }
+
+  public int countDone() {
+    int count = 0;
+    for (WSJob job : this) {
+      if (job.getStatus().isDone()) count++;
+    }
+    return count;
+  }
+
+  public int countFailed() {
+    int count = 0;
+    for (WSJob job : this) {
+      if (job.getStatus().isFailed()) count++;
+    }
+    return count;
+  }
+
+  public int countSuccessful() {
+    return countDone() - countFailed() - countCancelled();
+  }
+
+  public int countCancelled() {
+    int count = 0;
+    for (WSJob job : this) {
+      if (job.getStatus().isCancelled()) count++;
+    }
+    return count;
+  }
+}
diff --git a/src/jalview/ws2/operations/WebServiceWorkerI.java b/src/jalview/ws2/operations/WebServiceWorkerI.java
new file mode 100644 (file)
index 0000000..5ec7e97
--- /dev/null
@@ -0,0 +1,10 @@
+package jalview.ws2.operations;
+
+import jalview.ws2.WSJob;
+
+public interface WebServiceWorkerI
+{
+  Operation getOperation();
+
+  WSJobList<? extends WSJob> getJobs();
+}
diff --git a/src/jalview/ws2/operations/WebServiceWorkerListener.java b/src/jalview/ws2/operations/WebServiceWorkerListener.java
new file mode 100644 (file)
index 0000000..1044284
--- /dev/null
@@ -0,0 +1,74 @@
+package jalview.ws2.operations;
+
+import jalview.ws2.WSJob;
+
+/**
+ * The listener interface for receiving signals from the
+ * {@link WebServiceWorkerI} about the state changes or new sub-jobs spawned.
+ * 
+ * The {@link WebServiceWorkerListener} objects created from that interface are
+ * then registered with a worker objects using
+ * {@link WebServiceWorkerI#addListener} method. When an event occurs, a
+ * relevant method in the listener is invoked with the worker that emitted the
+ * signal as a first parameter and, optionally, followed by additional event
+ * details.
+ * 
+ * @author mmwarowny
+ *
+ */
+public interface WebServiceWorkerListener
+{
+  /**
+   * Called when the worker started successfully.
+   * 
+   * @param source
+   *          worker that emitted the signal
+   */
+  void workerStarted(WebServiceWorkerI source);
+
+  /**
+   * Called when the worker failed to start the jobs.
+   * 
+   * @param source
+   *          worker that emitted the signal
+   */
+  void workerNotStarted(WebServiceWorkerI source);
+
+  /**
+   * Called when the worker creates a new job
+   * 
+   * @param source
+   *          worker that emitted the signal
+   * @param job
+   *          newly created job
+   */
+  void jobCreated(WebServiceWorkerI source, WSJob job);
+
+  /**
+   * Called when polling the job results in an exception.
+   * 
+   * @param source
+   *          worker that emitted the signal
+   * @param job
+   *          polled job
+   * @param e
+   *          exception that occurred
+   */
+  void pollException(WebServiceWorkerI source, WSJob job, Exception e);
+
+  /**
+   * Called when the polling has finished and the finalization process begun.
+   * 
+   * @param source
+   *          worker that emitted the signal
+   */
+  void workerCompleting(WebServiceWorkerI source);
+
+  /**
+   * Called when the worker completes its work.
+   * 
+   * @param source
+   *          worker that emitted the signal
+   */
+  void workerCompleted(WebServiceWorkerI source);
+}
diff --git a/src/jalview/ws2/operations/WebServiceWorkerListenersList.java b/src/jalview/ws2/operations/WebServiceWorkerListenersList.java
new file mode 100644 (file)
index 0000000..acfffb3
--- /dev/null
@@ -0,0 +1,144 @@
+package jalview.ws2.operations;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
+
+import jalview.ws2.WSJob;
+
+/**
+ * Utility class that manages a list of {@link WebServiceWorkerListener} and
+ * dispatches signals to them. An instance of this class can be used as a member
+ * field of the {@link WebServiceWorkerI} object to easily store listeners and
+ * delegate signals to them. Firing any signal with this object will invoke a
+ * corresponding method on all registered listeners in order they were
+ * registered.
+ * 
+ * @author mmwarowny
+ *
+ */
+public class WebServiceWorkerListenersList
+{
+  private WebServiceWorkerI owner;
+
+  private List<WebServiceWorkerListener> listeners = new CopyOnWriteArrayList<>();
+
+  /**
+   * Constructs a listeners list object with the worker which will be given
+   * to the listeners as the source of the signals.
+   * 
+   * @param worker the worker to be given as the source for signals
+   */
+  public WebServiceWorkerListenersList(WebServiceWorkerI worker)
+  {
+    this.owner = worker;
+  }
+
+  /**
+   * Add listener to the listeners list. The listener will be notified of
+   * any signals triggered with this object.
+   * 
+   * @param listener listener to add
+   */
+  public void addListener(WebServiceWorkerListener listener)
+  {
+    listeners.add(listener);
+  }
+
+  /**
+   * Remove listener from the lsiteners list. The listener will no longer be
+   * notified of the emitted signals.
+   * 
+   * @param listener listener to remove
+   */
+  public void removeListener(WebServiceWorkerListener listener)
+  {
+    listeners.remove(listener);
+  }
+
+  /**
+   * Emit "worker started" signal to all listeners.
+   */
+  public void fireWorkerStarted()
+  {
+    for (var listener : listeners)
+      listener.workerStarted(owner);
+  }
+
+  /**
+   * Emit "worker not started" signal to all listeners.
+   */
+  public void fireWorkerNotStarted()
+  {
+    for (var listener : listeners)
+      listener.workerNotStarted(owner);
+  }
+
+  /**
+   * Emit "job created" signal to all listeners passing the job that has been
+   * created to them.
+   * 
+   * @param job newly created job
+   */
+  public void fireJobCreated(WSJob job)
+  {
+    for (var listener : listeners)
+      listener.jobCreated(owner, job);
+  }
+
+  /**
+   * Emit "poll exception" signal to all listener when an exception occurred
+   * during job polling. The job which caused an exception as well as the exception
+   * are passed to all listeners.
+   * 
+   * @param job polled job
+   * @param e exception that occurred
+   */
+  public void firePollException(WSJob job, Exception e)
+  {
+    for (var listener : listeners)
+      listener.pollException(owner, job, e);
+  }
+
+  /**
+   * Emit "worker completing" signal to all listeners when the worker starts
+   * collecting and parsing the results.
+   */
+  public void fireWorkerCompleting()
+  {
+    for (var listener : listeners)
+      listener.workerCompleting(owner);
+  }
+
+  /**
+   * Emit "worker completed" signal to all listeners indicating that the worker
+   * finished processing data and finalized the jobs. 
+   */
+  public void fireWorkerCompleted()
+  {
+    for (var listener : listeners)
+      listener.workerCompleted(owner);
+  }
+  
+  /**
+   * Get the list of registered listeners. The returned list should not be
+   * modified externally and its content may change when workers are added
+   * or removed from the list.
+   * @return
+   */
+  public List<WebServiceWorkerListener> getListeners()
+  {
+    return listeners;
+  }
+
+  /**
+   * Execute an operation for each listener in the listeners list.
+   * 
+   * @param consumer listener object consumer 
+   */
+  public void forEach(Consumer<WebServiceWorkerListener> consumer)
+  {
+    for (var listener : listeners)
+      consumer.accept(listener);
+  }
+}