JAL - 3690 AlignCalc rebuilt - FutureTask-based manager
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
diff --git a/src/jalview/workers/AlignCalcManager2.java b/src/jalview/workers/AlignCalcManager2.java
new file mode 100644 (file)
index 0000000..7d3cca6
--- /dev/null
@@ -0,0 +1,319 @@
+package jalview.workers;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import jalview.api.AlignCalcListener;
+import jalview.api.AlignCalcManagerI;
+import jalview.api.AlignCalcManagerI2;
+import jalview.api.AlignCalcWorkerI;
+import jalview.datamodel.AlignmentAnnotation;
+
+import static java.util.Collections.synchronizedList;
+import static java.util.Collections.synchronizedSet;
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+
+public class AlignCalcManager2 implements AlignCalcManagerI2
+{
+  class AlignCalcTask extends FutureTask<Void>
+  {
+    final AlignCalcWorkerI worker;
+
+    public AlignCalcTask(AlignCalcWorkerI worker)
+    {
+      super(new Callable<Void>() {
+        public Void call() throws Exception {
+            notifyStarted(worker);
+            worker.run();
+            return null;
+        }
+      });
+      this.worker = worker;
+    }
+
+    public AlignCalcWorkerI getWorker()
+    {
+      return worker;
+    }
+
+    @Override
+    protected void done()
+    {
+      boolean success = false;
+      Throwable exception = null;
+      try
+      {
+        get();
+        success = true;
+      } catch (ExecutionException e)
+      {
+        exception = e.getCause();
+        if (exception instanceof OutOfMemoryError) {
+          disableWorker(getWorker());
+        }
+      } catch (Throwable e)
+      {
+        exception = e;
+      }
+      finally {
+        inProgress.remove(getWorker());
+        tasks.remove(this);
+      }
+      if (success)
+        notifyCompleted(worker);
+      else
+        notifyExceptional(worker, exception);
+    }
+  }
+
+  // main executor for running workers one-by-one
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
+  
+  private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
+
+  // list of all registered workers (other collections are subsets of this)
+  private final List<AlignCalcWorkerI> registered = synchronizedList(new ArrayList<>());
+
+  // list of tasks holding queued and running workers
+  private final List<AlignCalcTask> tasks = synchronizedList(new ArrayList<>());
+  
+  // the collection of currently running workers
+  private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(new HashSet<>());
+
+  // the collection of workers that will not be started
+  private final Set<AlignCalcWorkerI> disabled = synchronizedSet(new HashSet<>());
+
+  /*
+   * Register the worker with this manager and scheduler for execution.
+   */
+  @Override
+  public void registerWorker(AlignCalcWorkerI worker)
+  {
+    synchronized (registered)
+    {
+      if (!registered.contains(worker))
+        registered.add(worker);
+    }
+    startWorker(worker);
+  }
+
+  @Override
+  public List<AlignCalcWorkerI> getWorkers()
+  {
+    return unmodifiableList(new ArrayList<>(registered));
+  }
+  
+  @Override
+  public List<AlignCalcWorkerI> getWorkersOfClass(
+          Class<? extends AlignCalcWorkerI> cls)
+  {
+    synchronized (registered)
+    {
+      return registered.stream()
+              .filter(worker -> worker.getClass().equals(cls))
+              .collect(Collectors.toUnmodifiableList());
+    }
+  }
+  
+  @Override
+  public void removeWorker(AlignCalcWorkerI worker)
+  {
+    registered.remove(worker);
+    disabled.remove(worker);
+  }
+
+  @Override
+  public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
+  {
+    synchronized (registered)
+    {
+      for (var it = registered.iterator(); it.hasNext();)
+      {
+        var worker = it.next();
+        if (worker.getClass().equals(cls))
+        {
+          it.remove();
+          disabled.remove(worker);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void removeWorkerForAnnotation(AlignmentAnnotation annot)
+  {
+    synchronized (registered)
+    {
+      for (var it = registered.iterator(); it.hasNext();)
+      {
+        var worker = it.next();
+        if (worker.involves(annot) && worker.isDeletable())
+        {
+          it.remove();
+          disabled.remove(worker);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void disableWorker(AlignCalcWorkerI worker)
+  {
+    assert registered.contains(worker);
+    disabled.add(worker);
+  }
+
+  @Override
+  public void enableWorker(AlignCalcWorkerI worker)
+  {
+    disabled.remove(worker);
+  }
+
+  @Override
+  public void restartWorkers()
+  {
+    synchronized (registered)
+    {
+      for (AlignCalcWorkerI worker : registered)
+      {
+        if (!isDisabled(worker))
+          startWorker(worker);
+      }
+    }
+  }
+
+  @Override
+  public void startWorker(AlignCalcWorkerI worker)
+  {
+    assert registered.contains(worker);
+    synchronized (tasks) {
+      for (var task : tasks)
+      {
+        if (task.getWorker().equals(worker))
+          task.cancel(true);
+      }
+    }
+    AlignCalcTask newTask = new AlignCalcTask(worker);
+    tasks.add(newTask);
+    notifyQueued(worker);
+    executor.execute(newTask);
+  }
+  
+  @Override
+  public void cancelWorker(AlignCalcWorkerI worker)
+  {
+    if (isWorking(worker)) 
+    {
+      synchronized (tasks) 
+      {
+        for (var task : tasks)
+        {
+          if (task.getWorker().equals(worker))
+            task.cancel(true);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean isDisabled(AlignCalcWorkerI worker)
+  {
+    return disabled.contains(worker);
+  }
+
+  @Override
+  public boolean isWorking(AlignCalcWorkerI worker)
+  {
+    return inProgress.contains(worker);
+  }
+
+  @Override
+  public boolean isWorking()
+  {
+    return !inProgress.isEmpty();
+  }
+  
+  @Override
+  public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
+  {
+    synchronized (inProgress)
+    {
+      for (AlignCalcWorkerI worker : inProgress)
+      {
+        if (worker.involves(annot))
+        {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private void notifyQueued(AlignCalcWorkerI worker)
+  {
+    for (AlignCalcListener listener : listeners)
+    {
+      listener.workerQueued(worker);
+    }
+  }
+
+  private void notifyStarted(AlignCalcWorkerI worker)
+  {
+    for (AlignCalcListener listener : listeners)
+    {
+      listener.workerStarted(worker);
+    }
+  }
+
+  private void notifyCompleted(AlignCalcWorkerI worker)
+  {
+    for (AlignCalcListener listener : listeners)
+    {
+      try {
+        listener.workerCompleted(worker);
+      } catch (RuntimeException e)
+      {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private void notifyExceptional(AlignCalcWorkerI worker,
+          Throwable throwable)
+  {
+    for (AlignCalcListener listener : listeners)
+    {
+      try {
+        listener.workerExceptional(worker, throwable);
+      } catch (RuntimeException e)
+      {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @Override
+  public void addAlignCalcListener(AlignCalcListener listener)
+  {
+    listeners.add(listener);
+  }
+  
+  @Override
+  public void removeAlignCalcListener(AlignCalcListener listener)
+  {
+    listeners.remove(listener);
+  }
+
+}