JAL-3690 - introduce shutdown method that cleans up manager's resources.
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
index 7d3cca6..b914d65 100644 (file)
@@ -1,6 +1,8 @@
 package jalview.workers;
 
 import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
@@ -9,13 +11,13 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import jalview.api.AlignCalcListener;
-import jalview.api.AlignCalcManagerI;
 import jalview.api.AlignCalcManagerI2;
 import jalview.api.AlignCalcWorkerI;
+import jalview.bin.Cache;
 import jalview.datamodel.AlignmentAnnotation;
 
 import static java.util.Collections.synchronizedList;
@@ -25,6 +27,8 @@ import static java.util.Collections.unmodifiableList;
 import java.util.ArrayList;
 import java.util.HashSet;
 
+import static java.lang.String.format;
+
 public class AlignCalcManager2 implements AlignCalcManagerI2
 {
   class AlignCalcTask extends FutureTask<Void>
@@ -33,11 +37,15 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
 
     public AlignCalcTask(AlignCalcWorkerI worker)
     {
-      super(new Callable<Void>() {
-        public Void call() throws Exception {
-            notifyStarted(worker);
-            worker.run();
-            return null;
+      super(new Callable<Void>()
+      {
+        public Void call() throws Exception
+        {
+          Cache.log.debug(format("Worker %s started%n",
+                  worker.getClass().getName()));
+          notifyStarted(worker);
+          worker.run();
+          return null;
         }
       });
       this.worker = worker;
@@ -57,43 +65,95 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
       {
         get();
         success = true;
+      } catch (CancellationException e)
+      {
+        Cache.log.debug(format("Worker %s cancelled%n",
+                getWorker().getClass().getName()));
+        notifyCancelled(worker);
       } catch (ExecutionException e)
       {
         exception = e.getCause();
-        if (exception instanceof OutOfMemoryError) {
+        if (exception instanceof OutOfMemoryError)
+        {
           disableWorker(getWorker());
         }
       } catch (Throwable e)
       {
         exception = e;
-      }
-      finally {
+      } finally
+      {
         inProgress.remove(getWorker());
         tasks.remove(this);
       }
       if (success)
+      {
+        Cache.log.debug(format("Worker %s finished%n",
+                getWorker().getClass().getName()));
         notifyCompleted(worker);
-      else
+      }
+      else if (exception != null)
+      {
+        Cache.log.warn(format("Worker %s failed%n",
+                getWorker().getClass().getName()));
+        exception.printStackTrace();
         notifyExceptional(worker, exception);
+      }
+    }
+  }
+
+  private static class CalcManagerThreadFactory implements ThreadFactory
+  {
+    private static final AtomicInteger threadNumber = new AtomicInteger(1);
+
+    private final ThreadGroup group;
+
+    private static final String namePrefix = "AlignCalcManager-pool-thread-";
+
+    CalcManagerThreadFactory()
+    {
+      var securityManager = System.getSecurityManager();
+      if (securityManager != null)
+      {
+        group = securityManager.getThreadGroup();
+      }
+      else
+      {
+        group = Thread.currentThread().getThreadGroup();
+      }
+    }
+
+    @Override
+    public Thread newThread(Runnable r)
+    {
+      Thread t = new Thread(group, r,
+              namePrefix + threadNumber.getAndIncrement(), 0);
+      t.setDaemon(false);
+      t.setPriority(Thread.NORM_PRIORITY);
+      return t;
     }
   }
 
   // main executor for running workers one-by-one
-  private final ExecutorService executor = Executors.newSingleThreadExecutor();
-  
+  private final ExecutorService executor = Executors
+          .newSingleThreadExecutor(new CalcManagerThreadFactory());
+
   private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
 
   // list of all registered workers (other collections are subsets of this)
-  private final List<AlignCalcWorkerI> registered = synchronizedList(new ArrayList<>());
+  private final List<AlignCalcWorkerI> registered = synchronizedList(
+          new ArrayList<>());
 
   // list of tasks holding queued and running workers
-  private final List<AlignCalcTask> tasks = synchronizedList(new ArrayList<>());
-  
+  private final List<AlignCalcTask> tasks = synchronizedList(
+          new ArrayList<>());
+
   // the collection of currently running workers
-  private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(new HashSet<>());
+  private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(
+          new HashSet<>());
 
   // the collection of workers that will not be started
-  private final Set<AlignCalcWorkerI> disabled = synchronizedSet(new HashSet<>());
+  private final Set<AlignCalcWorkerI> disabled = synchronizedSet(
+          new HashSet<>());
 
   /*
    * Register the worker with this manager and scheduler for execution.
@@ -101,6 +161,7 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   @Override
   public void registerWorker(AlignCalcWorkerI worker)
   {
+    Objects.requireNonNull(worker);
     synchronized (registered)
     {
       if (!registered.contains(worker))
@@ -114,19 +175,25 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   {
     return unmodifiableList(new ArrayList<>(registered));
   }
-  
+
   @Override
   public List<AlignCalcWorkerI> getWorkersOfClass(
           Class<? extends AlignCalcWorkerI> cls)
   {
     synchronized (registered)
     {
-      return registered.stream()
-              .filter(worker -> worker.getClass().equals(cls))
-              .collect(Collectors.toUnmodifiableList());
+      List<AlignCalcWorkerI> collected = new ArrayList<>();
+      for (var worker : registered)
+      {
+        if (worker.getClass().equals(cls))
+        {
+          collected.add(worker);
+        }
+      }
+      return unmodifiableList(collected);
     }
   }
-  
+
   @Override
   public void removeWorker(AlignCalcWorkerI worker)
   {
@@ -139,13 +206,11 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   {
     synchronized (registered)
     {
-      for (var it = registered.iterator(); it.hasNext();)
+      for (var worker : registered)
       {
-        var worker = it.next();
         if (worker.getClass().equals(cls))
         {
-          it.remove();
-          disabled.remove(worker);
+          removeWorker(worker);
         }
       }
     }
@@ -156,13 +221,11 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   {
     synchronized (registered)
     {
-      for (var it = registered.iterator(); it.hasNext();)
+      for (var worker : registered)
       {
-        var worker = it.next();
         if (worker.involves(annot) && worker.isDeletable())
         {
-          it.remove();
-          disabled.remove(worker);
+          removeWorker(worker);
         }
       }
     }
@@ -171,7 +234,6 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   @Override
   public void disableWorker(AlignCalcWorkerI worker)
   {
-    assert registered.contains(worker);
     disabled.add(worker);
   }
 
@@ -197,31 +259,31 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   @Override
   public void startWorker(AlignCalcWorkerI worker)
   {
-    assert registered.contains(worker);
-    synchronized (tasks) {
-      for (var task : tasks)
-      {
-        if (task.getWorker().equals(worker))
-          task.cancel(true);
-      }
-    }
+    Objects.requireNonNull(worker);
     AlignCalcTask newTask = new AlignCalcTask(worker);
-    tasks.add(newTask);
+    synchronized (inProgress)
+    {
+      cancelWorker(worker);
+      inProgress.add(worker);
+      tasks.add(newTask);
+    }
     notifyQueued(worker);
     executor.execute(newTask);
   }
-  
+
   @Override
   public void cancelWorker(AlignCalcWorkerI worker)
   {
-    if (isWorking(worker)) 
+    if (isWorking(worker))
     {
-      synchronized (tasks) 
+      synchronized (tasks)
       {
-        for (var task : tasks)
+        Optional<AlignCalcTask> oldTask = tasks.stream()
+                .filter(task -> task.getWorker().equals(worker))
+                .findFirst();
+        if (oldTask.isPresent())
         {
-          if (task.getWorker().equals(worker))
-            task.cancel(true);
+          oldTask.get().cancel(true);
         }
       }
     }
@@ -244,7 +306,7 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   {
     return !inProgress.isEmpty();
   }
-  
+
   @Override
   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
   {
@@ -281,7 +343,8 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   {
     for (AlignCalcListener listener : listeners)
     {
-      try {
+      try
+      {
         listener.workerCompleted(worker);
       } catch (RuntimeException e)
       {
@@ -290,12 +353,27 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
     }
   }
 
+  private void notifyCancelled(AlignCalcWorkerI worker)
+  {
+    for (AlignCalcListener listener : listeners)
+    {
+      try
+      {
+        listener.workerCancelled(worker);
+      } catch (RuntimeException e)
+      {
+        e.printStackTrace();
+      }
+    }
+  }
+
   private void notifyExceptional(AlignCalcWorkerI worker,
           Throwable throwable)
   {
     for (AlignCalcListener listener : listeners)
     {
-      try {
+      try
+      {
         listener.workerExceptional(worker, throwable);
       } catch (RuntimeException e)
       {
@@ -309,11 +387,22 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   {
     listeners.add(listener);
   }
-  
+
   @Override
   public void removeAlignCalcListener(AlignCalcListener listener)
   {
     listeners.remove(listener);
   }
 
+  @Override
+  public void shutdown()
+  {
+    executor.shutdownNow();
+    listeners.clear();
+    registered.clear();
+    tasks.clear();
+    inProgress.clear();
+    disabled.clear();
+  }
+
 }