JAL-3690 Introduce AlignCalcManager2 tests.
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
index dcaff10..8d4796d 100644 (file)
 package jalview.workers;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
+import java.util.WeakHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.stream.Collectors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+import static java.util.Collections.synchronizedMap;
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
 
 import jalview.api.AlignCalcListener;
 import jalview.api.AlignCalcManagerI2;
 import jalview.api.AlignCalcWorkerI;
+import jalview.api.PollableAlignCalcWorkerI;
 import jalview.bin.Cache;
 import jalview.datamodel.AlignmentAnnotation;
 
-import static java.util.Collections.synchronizedList;
-import static java.util.Collections.synchronizedSet;
-import static java.util.Collections.unmodifiableList;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-
-import static java.lang.String.format;
-
 public class AlignCalcManager2 implements AlignCalcManagerI2
 {
-  class AlignCalcTask extends FutureTask<Void>
+  private abstract class WorkerManager
   {
-    final AlignCalcWorkerI worker;
+    protected volatile boolean enabled = true;
+
+    protected AlignCalcWorkerI worker;
 
-    public AlignCalcTask(AlignCalcWorkerI worker)
+    WorkerManager(AlignCalcWorkerI worker)
     {
-      super(new Callable<Void>() {
-        public Void call() throws Exception {
-            Cache.log.debug(format("Worker %s started%n", worker.getClass().getName()));
-            notifyStarted(worker);
-            worker.run();
-            return null;
-        }
-      });
       this.worker = worker;
     }
 
-    public AlignCalcWorkerI getWorker()
+    protected AlignCalcWorkerI getWorker()
     {
       return worker;
     }
 
-    @Override
-    protected void done()
+    boolean isEnabled()
     {
-      boolean success = false;
-      Throwable exception = null;
-      try
+      return enabled;
+    }
+
+    void setEnabled(boolean enabled)
+    {
+      this.enabled = enabled;
+    }
+
+    synchronized void restart()
+    {
+      if (!isEnabled())
       {
-        get();
-        success = true;
+        return;
       }
-      catch (ExecutionException e)
+      if (!isRegistered())
       {
-        exception = e.getCause();
-        if (exception instanceof OutOfMemoryError) {
-          disableWorker(getWorker());
-        }
-      } catch (Throwable e)
+        setEnabled(false);
+      }
+      if (isWorking())
       {
-        exception = e;
+        cancel();
       }
-      finally {
-        inProgress.remove(getWorker());
-        tasks.remove(this);
+      submit();
+    }
+
+    protected boolean isRegistered()
+    {
+      return registered.containsKey(getWorker());
+    }
+
+    abstract boolean isWorking();
+
+    protected abstract void submit();
+
+    abstract void cancel();
+  }
+
+  private class SimpleWorkerManager extends WorkerManager
+  {
+    private Future<?> task = null;
+
+    SimpleWorkerManager(AlignCalcWorkerI worker)
+    {
+      super(worker);
+    }
+
+    @Override
+    boolean isWorking()
+    {
+      return task != null && !task.isDone();
+    }
+
+    @Override
+    protected void submit()
+    {
+      if (task != null && !(task.isDone() || task.isCancelled()))
+      {
+        throw new IllegalStateException(
+                "Cannot submit new task if the prevoius one is still running");
       }
-      if (success)
+      Cache.log.debug(
+              format("Worker %s queued", getWorker().getClass().getName()));
+      task = executor.submit(() -> {
+        try
+        {
+          Cache.log.debug(format("Worker %s started",
+                  getWorker().getClass().getName()));
+          getWorker().run();
+          Cache.log.debug(format("Worker %s finished",
+                  getWorker().getClass().getName()));
+        } catch (InterruptedException e)
+        {
+          Cache.log.debug(format("Worker %s interrupted",
+                  getWorker().getClass().getName()));
+        } catch (Throwable th)
+        {
+          Cache.log.debug(format("Worker %s failed",
+                  getWorker().getClass().getName()), th);
+        } finally
+        {
+          if (!isRegistered())
+          {
+            // delete worker reference so garbage collector can remove it
+            worker = null;
+          }
+        }
+      });
+    }
+
+    @Override
+    synchronized void cancel()
+    {
+      if (!isWorking())
       {
-        Cache.log.debug(format("Worker %s finished%n", getWorker().getClass().getName()));
-        notifyCompleted(worker);
+        return;
       }
-      else if (exception != null){
-        Cache.log.warn(format("Worker %s failed%n", getWorker().getClass().getName()));
-        exception.printStackTrace();
-        notifyExceptional(worker, exception);
+      Cache.log.debug(format("Cancelling worker %s",
+              getWorker().getClass().getName()));
+      task.cancel(true);
+    }
+  }
+
+  private class PollableWorkerManager extends WorkerManager
+  {
+    private Future<?> task = null;
+
+    PollableWorkerManager(PollableAlignCalcWorkerI worker)
+    {
+      super(worker);
+    }
+
+    @Override
+    protected PollableAlignCalcWorkerI getWorker()
+    {
+      return (PollableAlignCalcWorkerI) super.getWorker();
+    }
+
+    @Override
+    boolean isWorking()
+    {
+      return task != null && !task.isDone();
+    }
+
+    protected void submit()
+    {
+      if (task != null && !(task.isDone() || task.isCancelled()))
+      {
+        throw new IllegalStateException(
+                "Cannot submit new task if the prevoius one is still running");
       }
+      Cache.log.debug(
+              format("Worker %s queued", getWorker().getClass().getName()));
+      final var runnable = new Runnable()
+      {
+        private boolean started = false;
+
+        private boolean completed = false;
+
+        Future<?> future = null;
+
+        @Override
+        public void run()
+        {
+          try
+          {
+            if (!started)
+            {
+              Cache.log.debug(format("Worker %s started",
+                      getWorker().getClass().getName()));
+              getWorker().startUp();
+              started = true;
+            }
+            else if (!completed)
+            {
+              Cache.log.debug(format("Polling worker %s",
+                      getWorker().getClass().getName()));
+              if (getWorker().poll())
+              {
+                Cache.log.debug(format("Worker %s finished",
+                        getWorker().getClass().getName()));
+                completed = true;
+              }
+            }
+          } catch (Throwable th)
+          {
+            Cache.log.debug(format("Worker %s failed",
+                    getWorker().getClass().getName()), th);
+            completed = true;
+          }
+          if (completed)
+          {
+            final var worker = getWorker();
+            if (!isRegistered())
+              PollableWorkerManager.super.worker = null;
+            Cache.log.debug(format("Finalizing completed worker %s",
+                    worker.getClass().getName()));
+            worker.done();
+            // almost impossible, but the future may be null at this point
+            // let it throw NPE to cancel forcefully
+            future.cancel(false);
+          }
+        }
+      };
+      runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10,
+              1000, TimeUnit.MILLISECONDS);
+    }
+
+    synchronized protected void cancel()
+    {
+      if (!isWorking())
+      {
+        return;
+      }
+      Cache.log.debug(format("Cancelling worker %s",
+              getWorker().getClass().getName()));
+      task.cancel(false);
+      executor.submit(() -> {
+        final var worker = getWorker();
+        if (!isRegistered())
+          PollableWorkerManager.super.worker = null;
+        if (worker != null)
+        {
+          worker.cancel();
+          Cache.log.debug(format("Finalizing cancelled worker %s",
+                  worker.getClass().getName()));
+          worker.done();
+        }
+      });
     }
   }
 
-  // main executor for running workers one-by-one
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
-  
-  private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
+  private final ScheduledExecutorService executor = Executors
+          .newSingleThreadScheduledExecutor();
 
-  // list of all registered workers (other collections are subsets of this)
-  private final List<AlignCalcWorkerI> registered = synchronizedList(new ArrayList<>());
+  private final Map<AlignCalcWorkerI, WorkerManager> registered = synchronizedMap(
+          new HashMap<>());
 
-  // list of tasks holding queued and running workers
-  private final List<AlignCalcTask> tasks = synchronizedList(new ArrayList<>());
-  
-  // the collection of currently running workers
-  private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(new HashSet<>());
+  private final Map<AlignCalcWorkerI, WorkerManager> oneshot = synchronizedMap(
+          new WeakHashMap<>());
 
-  // the collection of workers that will not be started
-  private final Set<AlignCalcWorkerI> disabled = synchronizedSet(new HashSet<>());
+  private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<>();
+
+  private WorkerManager createManager(AlignCalcWorkerI worker)
+  {
+    if (worker instanceof PollableAlignCalcWorkerI)
+    {
+      return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
+    }
+    else
+    {
+      return new SimpleWorkerManager(worker);
+    }
+  }
 
-  /*
-   * Register the worker with this manager and scheduler for execution.
-   */
   @Override
   public void registerWorker(AlignCalcWorkerI worker)
   {
     Objects.requireNonNull(worker);
-    synchronized (registered)
-    {
-      if (!registered.contains(worker))
-        registered.add(worker);
-    }
+    WorkerManager manager = createManager(worker);
+    registered.putIfAbsent(worker, manager);
     startWorker(worker);
   }
 
   @Override
   public List<AlignCalcWorkerI> getWorkers()
   {
-    return unmodifiableList(new ArrayList<>(registered));
+    List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
+    result.addAll(registered.keySet());
+    return result;
   }
-  
+
   @Override
   public List<AlignCalcWorkerI> getWorkersOfClass(
           Class<? extends AlignCalcWorkerI> cls)
   {
-    synchronized (registered)
+    List<AlignCalcWorkerI> collected = new ArrayList<>();
+    for (var worker : getWorkers())
     {
-      return registered.stream()
-              .filter(worker -> worker.getClass().equals(cls))
-              .collect(Collectors.toUnmodifiableList());
+      if (worker.getClass().equals(cls))
+      {
+        collected.add(worker);
+      }
     }
+    return unmodifiableList(collected);
   }
-  
+
   @Override
   public void removeWorker(AlignCalcWorkerI worker)
   {
-    registered.remove(worker);
-    disabled.remove(worker);
+    if (worker.isDeletable())
+    {
+      registered.remove(worker);
+    }
   }
 
   @Override
-  public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
+  public void removeWorkerForAnnotation(AlignmentAnnotation annot)
   {
     synchronized (registered)
     {
-      for (var worker : registered)
+      for (var worker : getWorkers())
       {
-        if (worker.getClass().equals(cls))
+        if (worker.involves(annot))
         {
           removeWorker(worker);
         }
@@ -162,13 +336,13 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   }
 
   @Override
-  public void removeWorkerForAnnotation(AlignmentAnnotation annot)
+  public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
   {
     synchronized (registered)
     {
-      for (var worker : registered)
+      for (var worker : getWorkers())
       {
-        if (worker.involves(annot) && worker.isDeletable())
+        if (worker.getClass().equals(cls))
         {
           removeWorker(worker);
         }
@@ -179,92 +353,116 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   @Override
   public void disableWorker(AlignCalcWorkerI worker)
   {
-    disabled.add(worker);
+    // Null pointer check might be needed
+    registered.get(worker).setEnabled(false);
   }
 
   @Override
   public void enableWorker(AlignCalcWorkerI worker)
   {
-    disabled.remove(worker);
+    // Null pointer check might be needed
+    registered.get(worker).setEnabled(true);
   }
 
   @Override
-  public void restartWorkers()
+  public boolean isDisabled(AlignCalcWorkerI worker)
   {
-    synchronized (registered)
+    if (registered.containsKey(worker))
     {
-      for (AlignCalcWorkerI worker : registered)
-      {
-        if (!isDisabled(worker))
-          startWorker(worker);
-      }
+      return !registered.get(worker).isEnabled();
+    }
+    else
+    {
+      return false;
     }
   }
 
   @Override
-  public void startWorker(AlignCalcWorkerI worker)
+  public boolean isWorking(AlignCalcWorkerI worker)
   {
-    Objects.requireNonNull(worker);
-    AlignCalcTask newTask = new AlignCalcTask(worker);
-    synchronized (inProgress)
-    {
-      cancelWorker(worker);
-      inProgress.add(worker);
-      tasks.add(newTask);
-    }
-    notifyQueued(worker);
-    executor.execute(newTask);
+    var manager = registered.get(worker);
+    if (manager == null)
+      manager = oneshot.get(worker);
+    if (manager == null)
+      return false;
+    else
+      return manager.isWorking();
   }
-  
+
   @Override
-  public void cancelWorker(AlignCalcWorkerI worker)
+  public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
   {
-    if (isWorking(worker)) 
+    synchronized (registered)
     {
-      synchronized (tasks) 
-      {
-        Optional<AlignCalcTask> oldTask = tasks.stream()
-            .filter(task -> task.getWorker().equals(worker))
-            .findFirst();
-        if (oldTask.isPresent()) {
-          oldTask.get().cancel(true);
-        }
-      }
+      for (var entry : registered.entrySet())
+        if (entry.getKey().involves(annot) && entry.getValue().isWorking())
+          return true;
     }
+    synchronized (oneshot)
+    {
+      for (var entry : registered.entrySet())
+        if (entry.getKey().involves(annot) && entry.getValue().isWorking())
+          return true;
+    }
+    return false;
   }
 
   @Override
-  public boolean isDisabled(AlignCalcWorkerI worker)
+  public boolean isWorking()
   {
-    return disabled.contains(worker);
+    synchronized (registered)
+    {
+      for (var manager : registered.values())
+        if (manager.isWorking())
+          return true;
+    }
+    synchronized (oneshot)
+    {
+      for (var manager : oneshot.values())
+        if (manager.isWorking())
+          return true;
+    }
+    return false;
   }
 
   @Override
-  public boolean isWorking(AlignCalcWorkerI worker)
+  public void startWorker(AlignCalcWorkerI worker)
   {
-    return inProgress.contains(worker);
+    Objects.requireNonNull(worker);
+    var manager = registered.get(worker);
+    if (manager == null)
+    {
+      Cache.log.warn("Starting unregistered worker " + worker);
+      manager = createManager(worker);
+      oneshot.put(worker, manager);
+    }
+    manager.restart();
   }
 
   @Override
-  public boolean isWorking()
+  public void restartWorkers()
   {
-    return !inProgress.isEmpty();
+    synchronized (registered)
+    {
+      for (var manager : registered.values())
+      {
+        manager.restart();
+      }
+    }
   }
-  
+
   @Override
-  public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
+  public void cancelWorker(AlignCalcWorkerI worker)
   {
-    synchronized (inProgress)
+    Objects.requireNonNull(worker);
+    var manager = registered.get(worker);
+    if (manager == null)
+      manager = oneshot.get(worker);
+    if (manager == null)
     {
-      for (AlignCalcWorkerI worker : inProgress)
-      {
-        if (worker.involves(annot))
-        {
-          return true;
-        }
-      }
+      throw new NoSuchElementException();
     }
-    return false;
+    manager.cancel();
   }
 
   private void notifyQueued(AlignCalcWorkerI worker)
@@ -287,7 +485,8 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   {
     for (AlignCalcListener listener : listeners)
     {
-      try {
+      try
+      {
         listener.workerCompleted(worker);
       } catch (RuntimeException e)
       {
@@ -296,12 +495,27 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
     }
   }
 
+  private void notifyCancelled(AlignCalcWorkerI worker)
+  {
+    for (AlignCalcListener listener : listeners)
+    {
+      try
+      {
+        listener.workerCancelled(worker);
+      } catch (RuntimeException e)
+      {
+        e.printStackTrace();
+      }
+    }
+  }
+
   private void notifyExceptional(AlignCalcWorkerI worker,
           Throwable throwable)
   {
     for (AlignCalcListener listener : listeners)
     {
-      try {
+      try
+      {
         listener.workerExceptional(worker, throwable);
       } catch (RuntimeException e)
       {
@@ -315,11 +529,19 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   {
     listeners.add(listener);
   }
-  
+
   @Override
   public void removeAlignCalcListener(AlignCalcListener listener)
   {
     listeners.remove(listener);
   }
 
+  @Override
+  public void shutdown()
+  {
+    executor.shutdownNow();
+    listeners.clear();
+    registered.clear();
+  }
+
 }