JAL-3690 partially reworked AlignCalcManager
authorMateusz Waronwy <mmzwarowny@dundee.ac.uk>
Wed, 11 Nov 2020 18:42:48 +0000 (19:42 +0100)
committerMateusz Waronwy <mmzwarowny@dundee.ac.uk>
Wed, 11 Nov 2020 18:45:31 +0000 (19:45 +0100)
todo:
 - modify SeqAnnotationServiceCalcWorker to implement PollableAlignCalcWorkerI
 - fix race condition where worker state gets overriden by the old job (QUEUED to IDLE)

src/jalview/api/AlignCalcWorkerI.java
src/jalview/api/PollableAlignCalcWorkerI.java [new file with mode: 0644]
src/jalview/workers/AlignCalcManager2.java

index 1184853..b51b94e 100644 (file)
@@ -28,7 +28,7 @@ import jalview.datamodel.AlignmentAnnotation;
  * Interface describing a worker that calculates alignment annotation(s). The
  * main (re-)calculation should be performed by the inherited run() method.
  */
-public interface AlignCalcWorkerI extends Callable<Void>
+public interface AlignCalcWorkerI
 {
   /**
    * Answers true if this worker updates the given annotation (regardless of its
@@ -51,22 +51,11 @@ public interface AlignCalcWorkerI extends Callable<Void>
    */
   void removeAnnotation();
   
-  
-  /**
-   * Default implementation of call which calls run and propagates the
-   * exception.
-   */
-  @Override
-  public default Void call() throws Exception
-  {
-    run();
-    return null;
-  }
-  
   /**
    * The main calculation happens here
+   * @throws Throwable 
    */
-  public void run() throws Exception;
+  public void run() throws Throwable;
   
   /**
    * Answers true if the worker should be deleted entirely when its annotation
diff --git a/src/jalview/api/PollableAlignCalcWorkerI.java b/src/jalview/api/PollableAlignCalcWorkerI.java
new file mode 100644 (file)
index 0000000..9697187
--- /dev/null
@@ -0,0 +1,18 @@
+package jalview.api;
+
+public interface PollableAlignCalcWorkerI extends AlignCalcWorkerI
+{
+  @Override
+  public default void run() throws Throwable
+  {
+    startUp();
+  }
+  
+  public void startUp() throws Throwable;
+  
+  public boolean poll() throws Throwable;
+  
+  public void cancel();
+  
+  public void cleanUp();
+}
index b914d65..f7d263a 100644 (file)
 package jalview.workers;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+import static java.util.Collections.synchronizedMap;
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
 
 import jalview.api.AlignCalcListener;
 import jalview.api.AlignCalcManagerI2;
 import jalview.api.AlignCalcWorkerI;
+import jalview.api.PollableAlignCalcWorkerI;
 import jalview.bin.Cache;
 import jalview.datamodel.AlignmentAnnotation;
 
-import static java.util.Collections.synchronizedList;
-import static java.util.Collections.synchronizedSet;
-import static java.util.Collections.unmodifiableList;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-
-import static java.lang.String.format;
-
 public class AlignCalcManager2 implements AlignCalcManagerI2
 {
-  class AlignCalcTask extends FutureTask<Void>
+  private abstract class WorkerManager
   {
-    final AlignCalcWorkerI worker;
-
-    public AlignCalcTask(AlignCalcWorkerI worker)
+    static final int IDLE = 0;
+    static final int QUEUED = 1;
+    static final int RUNNING = 2;
+    static final int CANCELLING = 3;
+
+    protected volatile int state = IDLE;
+    protected volatile boolean enabled = true;
+            
+    protected final AlignCalcWorkerI worker;
+    
+    WorkerManager(AlignCalcWorkerI worker)
     {
-      super(new Callable<Void>()
-      {
-        public Void call() throws Exception
-        {
-          Cache.log.debug(format("Worker %s started%n",
-                  worker.getClass().getName()));
-          notifyStarted(worker);
-          worker.run();
-          return null;
-        }
-      });
       this.worker = worker;
     }
-
-    public AlignCalcWorkerI getWorker()
+    
+    AlignCalcWorkerI getWorker()
     {
       return worker;
     }
-
-    @Override
-    protected void done()
+    
+    boolean isEnabled()
     {
-      boolean success = false;
-      Throwable exception = null;
-      try
-      {
-        get();
-        success = true;
-      } catch (CancellationException e)
-      {
-        Cache.log.debug(format("Worker %s cancelled%n",
-                getWorker().getClass().getName()));
-        notifyCancelled(worker);
-      } catch (ExecutionException e)
+      return enabled;
+    }
+    
+    void setEnabled(boolean enabled)
+    {
+      this.enabled = enabled;
+    }
+    
+    synchronized protected void setState(int state) 
+    {
+      this.state = state;
+    }
+    
+    int getState()
+    {
+      return state;
+    }
+    
+    void restart()
+    {
+      if (!isEnabled())
       {
-        exception = e.getCause();
-        if (exception instanceof OutOfMemoryError)
-        {
-          disableWorker(getWorker());
-        }
-      } catch (Throwable e)
+        return;
+      }
+      if (state == IDLE)
       {
-        exception = e;
-      } finally
+        submit();
+      }
+      else if (state == QUEUED)
       {
-        inProgress.remove(getWorker());
-        tasks.remove(this);
+        // job already queued, do nothing
       }
-      if (success)
+      else if (state == RUNNING)
       {
-        Cache.log.debug(format("Worker %s finished%n",
-                getWorker().getClass().getName()));
-        notifyCompleted(worker);
+        cancel();
+        submit();
       }
-      else if (exception != null)
+      else if (state == CANCELLING)
       {
-        Cache.log.warn(format("Worker %s failed%n",
-                getWorker().getClass().getName()));
-        exception.printStackTrace();
-        notifyExceptional(worker, exception);
+        submit();
       }
     }
+    
+    protected abstract void submit();
+    
+    abstract void cancel();  
   }
-
-  private static class CalcManagerThreadFactory implements ThreadFactory
+  
+  
+  private class SimpleWorkerManager extends WorkerManager
   {
-    private static final AtomicInteger threadNumber = new AtomicInteger(1);
-
-    private final ThreadGroup group;
+    private Future<?> task = null;
+    
+    SimpleWorkerManager(AlignCalcWorkerI worker)
+    {
+      super(worker);
+    }
 
-    private static final String namePrefix = "AlignCalcManager-pool-thread-";
+    @Override
+    protected void submit()
+    {
+      if (task != null && !(task.isDone() || task.isCancelled()))
+      {
+        throw new IllegalStateException(
+            "Cannot submit new task if the prevoius one is still running");
+      }
+      Cache.log.debug(format("Worker %s queued",
+              worker.getClass().getName()));
+      setState(QUEUED);
+      task = executor.submit(() -> {
+        setState(RUNNING);
+        try
+        {
+          Cache.log.debug(format("Worker %s started",
+                  worker.getClass().getName()));
+          worker.run();
+          Cache.log.debug(format("Worker %s finished",
+                  worker.getClass().getName()));
+        }
+        catch (InterruptedException e)
+        {
+          Cache.log.debug(format("Worker %s interrupted",
+                  worker.getClass().getName()));
+        }
+        catch (Throwable th)
+        {
+          Cache.log.debug(format("Worker %s failed",
+                  worker.getClass().getName()), th);
+        }
+        finally
+        {
+          // fixme: should not set to idle if another task is already queued for execution
+          setState(IDLE);
+        }
+      });
+    }
 
-    CalcManagerThreadFactory()
+    @Override
+    synchronized void cancel()
     {
-      var securityManager = System.getSecurityManager();
-      if (securityManager != null)
+      if (task == null || state == IDLE || state == CANCELLING)
       {
-        group = securityManager.getThreadGroup();
+        return;
       }
-      else
+      Cache.log.debug(format("Cancelling worker %s",
+              worker.getClass().getName()));
+      setState(CANCELLING);
+      task.cancel(true);
+      if (task.isCancelled())
       {
-        group = Thread.currentThread().getThreadGroup();
+        setState(IDLE);
       }
     }
-
-    @Override
-    public Thread newThread(Runnable r)
+  }
+  
+  
+  private class PollableWorkerManager extends WorkerManager
+  {
+    private final PollableAlignCalcWorkerI worker;
+    private Future<?> task = null;
+    
+    PollableWorkerManager(PollableAlignCalcWorkerI worker)
     {
-      Thread t = new Thread(group, r,
-              namePrefix + threadNumber.getAndIncrement(), 0);
-      t.setDaemon(false);
-      t.setPriority(Thread.NORM_PRIORITY);
-      return t;
+      super(worker);
+      this.worker = worker;
+    }
+    
+    protected void submit()
+    {
+      if (task != null && !(task.isDone() || task.isCancelled()))
+      {
+        throw new IllegalStateException(
+            "Cannot submit new task if the prevoius one is still running");
+      }
+      Cache.log.debug(format("Worker %s queued",
+              worker.getClass().getName()));
+      setState(QUEUED);
+      final var runnable = new Runnable()
+      {
+        private boolean started = false;
+        private boolean completed = false;
+        Future<?> future = null;
+        
+        @Override
+        public void run()
+        {
+          try
+          {
+            if (!started)
+            {
+              Cache.log.debug(format("Worker %s started",
+                      worker.getClass().getName()));
+              setState(RUNNING);
+              worker.startUp();
+              started = true;
+            }
+            else if (!completed)
+            {
+              if (worker.poll())
+              {
+                Cache.log.debug(format("Worker %s finished",
+                        worker.getClass().getName()));
+                completed = true;
+                setState(IDLE);
+              }
+            }
+          } catch (Throwable th)
+          {
+            Cache.log.debug(format("Worker %s failed",
+                    worker.getClass().getName()), th);
+            completed = true;
+            setState(IDLE);
+          }
+          if (completed)
+          {
+            try
+            {
+              future.cancel(false);
+            }
+            catch (NullPointerException ignored)
+            {
+              // extremely unlikely to happen
+            }
+          }
+        }
+      };
+      runnable.future = task = executor.scheduleWithFixedDelay(
+              runnable, 10, 1000, TimeUnit.MILLISECONDS);
+    }
+    
+    synchronized protected void cancel()
+    {
+      if (task == null || state == IDLE || state == CANCELLING)
+      {
+        return;
+      }
+      Cache.log.debug(format("Cancelling worker %s",
+              worker.getClass().getName()));
+      setState(CANCELLING);
+      task.cancel(false);
+      if (task.isCancelled())
+      {
+        setState(IDLE);
+      }
+      executor.submit(() -> {
+        worker.cancel();
+      });
     }
   }
-
-  // main executor for running workers one-by-one
-  private final ExecutorService executor = Executors
-          .newSingleThreadExecutor(new CalcManagerThreadFactory());
-
-  private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
-
-  // list of all registered workers (other collections are subsets of this)
-  private final List<AlignCalcWorkerI> registered = synchronizedList(
-          new ArrayList<>());
-
-  // list of tasks holding queued and running workers
-  private final List<AlignCalcTask> tasks = synchronizedList(
-          new ArrayList<>());
-
-  // the collection of currently running workers
-  private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(
-          new HashSet<>());
-
-  // the collection of workers that will not be started
-  private final Set<AlignCalcWorkerI> disabled = synchronizedSet(
-          new HashSet<>());
-
-  /*
-   * Register the worker with this manager and scheduler for execution.
-   */
+  
+  
+  private final ScheduledExecutorService executor =
+          Executors.newSingleThreadScheduledExecutor();
+  private final Map<AlignCalcWorkerI, WorkerManager> registered =
+          synchronizedMap(new HashMap<>());
+  
+  private final List<AlignCalcListener> listeners =
+          new CopyOnWriteArrayList<>();
+  
+  
   @Override
   public void registerWorker(AlignCalcWorkerI worker)
   {
     Objects.requireNonNull(worker);
-    synchronized (registered)
-    {
-      if (!registered.contains(worker))
-        registered.add(worker);
-    }
+    WorkerManager manager = (worker instanceof PollableAlignCalcWorkerI) ?
+            new PollableWorkerManager((PollableAlignCalcWorkerI) worker) : 
+              new SimpleWorkerManager(worker);
+    registered.putIfAbsent(worker, manager);
     startWorker(worker);
   }
 
   @Override
   public List<AlignCalcWorkerI> getWorkers()
   {
-    return unmodifiableList(new ArrayList<>(registered));
+    return List.copyOf(registered.keySet());
   }
 
   @Override
   public List<AlignCalcWorkerI> getWorkersOfClass(
           Class<? extends AlignCalcWorkerI> cls)
   {
-    synchronized (registered)
+    List<AlignCalcWorkerI> collected = new ArrayList<>();
+    for (var worker : getWorkers())
     {
-      List<AlignCalcWorkerI> collected = new ArrayList<>();
-      for (var worker : registered)
+      if (worker.getClass().equals(cls))
       {
-        if (worker.getClass().equals(cls))
-        {
-          collected.add(worker);
-        }
+        collected.add(worker);
       }
-      return unmodifiableList(collected);
     }
+    return unmodifiableList(collected);
   }
 
   @Override
   public void removeWorker(AlignCalcWorkerI worker)
   {
     registered.remove(worker);
-    disabled.remove(worker);
   }
 
   @Override
-  public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
+  public void removeWorkerForAnnotation(AlignmentAnnotation annot)
   {
     synchronized (registered)
     {
-      for (var worker : registered)
+      for (var worker : getWorkers())
       {
-        if (worker.getClass().equals(cls))
+        if (worker.involves(annot) && worker.isDeletable())
         {
           removeWorker(worker);
         }
@@ -217,13 +324,13 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   }
 
   @Override
-  public void removeWorkerForAnnotation(AlignmentAnnotation annot)
+  public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
   {
     synchronized (registered)
     {
-      for (var worker : registered)
+      for (var worker : getWorkers())
       {
-        if (worker.involves(annot) && worker.isDeletable())
+        if (worker.getClass().equals(cls))
         {
           removeWorker(worker);
         }
@@ -234,95 +341,112 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   @Override
   public void disableWorker(AlignCalcWorkerI worker)
   {
-    disabled.add(worker);
+    // Null pointer check might be needed
+    registered.get(worker).setEnabled(false);
   }
 
   @Override
   public void enableWorker(AlignCalcWorkerI worker)
   {
-    disabled.remove(worker);
+    // Null pointer check might be needed
+    registered.get(worker).setEnabled(true);
   }
 
   @Override
-  public void restartWorkers()
+  public boolean isDisabled(AlignCalcWorkerI worker)
   {
-    synchronized (registered)
+    if (registered.containsKey(worker))
     {
-      for (AlignCalcWorkerI worker : registered)
-      {
-        if (!isDisabled(worker))
-          startWorker(worker);
-      }
+      return !registered.get(worker).isEnabled();
+    }
+    else
+    {
+      return false;
     }
   }
 
   @Override
-  public void startWorker(AlignCalcWorkerI worker)
+  public boolean isWorking(AlignCalcWorkerI worker)
   {
-    Objects.requireNonNull(worker);
-    AlignCalcTask newTask = new AlignCalcTask(worker);
-    synchronized (inProgress)
+    if (!registered.containsKey(worker))
     {
-      cancelWorker(worker);
-      inProgress.add(worker);
-      tasks.add(newTask);
+      return false;
+    }
+    else
+    {
+      return registered.get(worker).getState() == WorkerManager.RUNNING;
     }
-    notifyQueued(worker);
-    executor.execute(newTask);
   }
 
   @Override
-  public void cancelWorker(AlignCalcWorkerI worker)
+  public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
   {
-    if (isWorking(worker))
+    synchronized (registered)
     {
-      synchronized (tasks)
+      for (var entry : registered.entrySet())
       {
-        Optional<AlignCalcTask> oldTask = tasks.stream()
-                .filter(task -> task.getWorker().equals(worker))
-                .findFirst();
-        if (oldTask.isPresent())
+        if (entry.getKey().involves(annot) &&
+                entry.getValue().getState() == WorkerManager.RUNNING)
         {
-          oldTask.get().cancel(true);
+          return true;
         }
       }
     }
+    return false;
   }
 
   @Override
-  public boolean isDisabled(AlignCalcWorkerI worker)
-  {
-    return disabled.contains(worker);
-  }
-
-  @Override
-  public boolean isWorking(AlignCalcWorkerI worker)
+  public boolean isWorking()
   {
-    return inProgress.contains(worker);
+    synchronized (registered)
+    {
+      for (var manager : registered.values())
+      {
+        if (manager.getState() == WorkerManager.RUNNING)
+        {
+          return true;
+        }
+      }
+    }
+    return false;
   }
 
   @Override
-  public boolean isWorking()
+  public void startWorker(AlignCalcWorkerI worker)
   {
-    return !inProgress.isEmpty();
+    Objects.requireNonNull(worker);
+    var manager = registered.get(worker);
+    if (manager == null) 
+    {
+      throw new NoSuchElementException();
+    }
+    manager.restart();
   }
 
   @Override
-  public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
+  public void restartWorkers()
   {
-    synchronized (inProgress)
+    synchronized (registered)
     {
-      for (AlignCalcWorkerI worker : inProgress)
+      for (var manager : registered.values())
       {
-        if (worker.involves(annot))
-        {
-          return true;
-        }
+        manager.restart();
       }
     }
-    return false;
   }
 
+  @Override
+  public void cancelWorker(AlignCalcWorkerI worker)
+  {    
+    Objects.requireNonNull(worker);
+    var manager = registered.get(worker);
+    if (manager == null) 
+    {
+      throw new NoSuchElementException();
+    }
+    manager.cancel();
+  }
+  
   private void notifyQueued(AlignCalcWorkerI worker)
   {
     for (AlignCalcListener listener : listeners)
@@ -400,9 +524,6 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
     executor.shutdownNow();
     listeners.clear();
     registered.clear();
-    tasks.clear();
-    inProgress.clear();
-    disabled.clear();
   }
 
 }