JAL-3690 AlignCalcManager - add full support for oneshot workers
authorMateusz <mmzwarowny@dundee.ac.uk>
Tue, 19 Jan 2021 17:29:42 +0000 (18:29 +0100)
committerMateusz <mmzwarowny@dundee.ac.uk>
Fri, 22 Jan 2021 14:11:53 +0000 (15:11 +0100)
src/jalview/workers/AlignCalcManager2.java

index cd190bd..8340900 100644 (file)
@@ -5,6 +5,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.WeakHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -29,59 +30,67 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   private abstract class WorkerManager
   {
     protected volatile boolean enabled = true;
-            
-    protected final AlignCalcWorkerI worker;
-    
+
+    protected AlignCalcWorkerI worker;
+
     WorkerManager(AlignCalcWorkerI worker)
     {
       this.worker = worker;
     }
-    
-    AlignCalcWorkerI getWorker()
+
+    protected AlignCalcWorkerI getWorker()
     {
       return worker;
     }
-    
+
     boolean isEnabled()
     {
       return enabled;
     }
-    
+
     void setEnabled(boolean enabled)
     {
       this.enabled = enabled;
     }
-    
+
     synchronized void restart()
     {
       if (!isEnabled())
       {
         return;
       }
+      if (!isRestartable())
+      {
+        setEnabled(false);
+      }
       if (isWorking())
       {
         cancel();
       }
       submit();
     }
-    
+
+    protected boolean isRestartable()
+    {
+      return registered.containsKey(getWorker());
+    }
+
     abstract boolean isWorking();
-    
+
     protected abstract void submit();
-    
+
     abstract void cancel();
   }
-  
-  
+
   private class SimpleWorkerManager extends WorkerManager
   {
     private Future<?> task = null;
-    
+
     SimpleWorkerManager(AlignCalcWorkerI worker)
     {
       super(worker);
     }
-    
+
     @Override
     boolean isWorking()
     {
@@ -94,31 +103,33 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
       if (task != null && !(task.isDone() || task.isCancelled()))
       {
         throw new IllegalStateException(
-            "Cannot submit new task if the prevoius one is still running");
+                "Cannot submit new task if the prevoius one is still running");
       }
-      Cache.log.debug(format("Worker %s queued",
-              worker.getClass().getName()));
+      Cache.log.debug(
+              format("Worker %s queued", getWorker().getClass().getName()));
       task = executor.submit(() -> {
         try
         {
           Cache.log.debug(format("Worker %s started",
-                  worker.getClass().getName()));
-          worker.run();
+                  getWorker().getClass().getName()));
+          getWorker().run();
           Cache.log.debug(format("Worker %s finished",
-                  worker.getClass().getName()));
-        }
-        catch (InterruptedException e)
+                  getWorker().getClass().getName()));
+        } catch (InterruptedException e)
         {
           Cache.log.debug(format("Worker %s interrupted",
-                  worker.getClass().getName()));
-        }
-        catch (Throwable th)
+                  getWorker().getClass().getName()));
+        } catch (Throwable th)
         {
           Cache.log.debug(format("Worker %s failed",
-                  worker.getClass().getName()), th);
-        }
-        finally
+                  getWorker().getClass().getName()), th);
+        } finally
         {
+          if (!isRestartable())
+          {
+            // delete worker reference so garbage collector can remove it
+            worker = null;
+          }
         }
       });
     }
@@ -131,44 +142,49 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
         return;
       }
       Cache.log.debug(format("Cancelling worker %s",
-              worker.getClass().getName()));
+              getWorker().getClass().getName()));
       task.cancel(true);
     }
   }
-  
-  
+
   private class PollableWorkerManager extends WorkerManager
   {
-    private final PollableAlignCalcWorkerI worker;
     private Future<?> task = null;
-    
+
     PollableWorkerManager(PollableAlignCalcWorkerI worker)
     {
       super(worker);
-      this.worker = worker;
     }
-    
+
+    @Override
+    protected PollableAlignCalcWorkerI getWorker()
+    {
+      return (PollableAlignCalcWorkerI) super.getWorker();
+    }
+
     @Override
     boolean isWorking()
     {
       return task != null && !task.isDone();
     }
-    
+
     protected void submit()
     {
       if (task != null && !(task.isDone() || task.isCancelled()))
       {
         throw new IllegalStateException(
-            "Cannot submit new task if the prevoius one is still running");
+                "Cannot submit new task if the prevoius one is still running");
       }
-      Cache.log.debug(format("Worker %s queued",
-              worker.getClass().getName()));
+      Cache.log.debug(
+              format("Worker %s queued", getWorker().getClass().getName()));
       final var runnable = new Runnable()
       {
         private boolean started = false;
+
         private boolean completed = false;
+
         Future<?> future = null;
-        
+
         @Override
         public void run()
         {
@@ -177,29 +193,32 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
             if (!started)
             {
               Cache.log.debug(format("Worker %s started",
-                      worker.getClass().getName()));
-              worker.startUp();
+                      getWorker().getClass().getName()));
+              getWorker().startUp();
               started = true;
             }
             else if (!completed)
             {
               Cache.log.debug(format("Polling worker %s",
-                      worker.getClass().getName()));
-              if (worker.poll())
+                      getWorker().getClass().getName()));
+              if (getWorker().poll())
               {
                 Cache.log.debug(format("Worker %s finished",
-                        worker.getClass().getName()));
+                        getWorker().getClass().getName()));
                 completed = true;
               }
             }
           } catch (Throwable th)
           {
             Cache.log.debug(format("Worker %s failed",
-                    worker.getClass().getName()), th);
+                    getWorker().getClass().getName()), th);
             completed = true;
           }
           if (completed)
           {
+            final var worker = getWorker();
+            if (!isRestartable())
+              PollableWorkerManager.super.worker = null;
             Cache.log.debug(format("Finalizing completed worker %s",
                     worker.getClass().getName()));
             worker.done();
@@ -209,10 +228,10 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
           }
         }
       };
-      runnable.future = task = executor.scheduleWithFixedDelay(
-              runnable, 10, 1000, TimeUnit.MILLISECONDS);
+      runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10,
+              1000, TimeUnit.MILLISECONDS);
     }
-    
+
     synchronized protected void cancel()
     {
       if (!isWorking())
@@ -220,33 +239,46 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
         return;
       }
       Cache.log.debug(format("Cancelling worker %s",
-              worker.getClass().getName()));
+              getWorker().getClass().getName()));
       task.cancel(false);
       executor.submit(() -> {
-        worker.cancel();
-        Cache.log.debug(format("Finalizing cancelled worker %s",
-                worker.getClass().getName()));
-        worker.done();
+        final var worker = getWorker();
+        if (!isRestartable())
+          PollableWorkerManager.super.worker = null;
+        if (worker != null)
+        {
+          worker.cancel();
+          Cache.log.debug(format("Finalizing cancelled worker %s",
+                  worker.getClass().getName()));
+          worker.done();
+        }
       });
     }
   }
-  
-  
-  private final ScheduledExecutorService executor =
-          Executors.newSingleThreadScheduledExecutor();
-  private final Map<AlignCalcWorkerI, WorkerManager> registered =
-          synchronizedMap(new HashMap<>());
-  
-  private final List<AlignCalcListener> listeners =
-          new CopyOnWriteArrayList<>();
-  
-  private WorkerManager createManager(AlignCalcWorkerI worker) {
+
+  private final ScheduledExecutorService executor = Executors
+          .newSingleThreadScheduledExecutor();
+
+  private final Map<AlignCalcWorkerI, WorkerManager> registered = synchronizedMap(
+          new HashMap<>());
+
+  private final Map<AlignCalcWorkerI, WorkerManager> oneshot = synchronizedMap(
+          new WeakHashMap<>());
+
+  private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<>();
+
+  private WorkerManager createManager(AlignCalcWorkerI worker)
+  {
     if (worker instanceof PollableAlignCalcWorkerI)
+    {
       return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
+    }
     else
+    {
       return new SimpleWorkerManager(worker);
+    }
   }
-  
+
   @Override
   public void registerWorker(AlignCalcWorkerI worker)
   {
@@ -345,14 +377,13 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   @Override
   public boolean isWorking(AlignCalcWorkerI worker)
   {
-    if (!registered.containsKey(worker))
-    {
+    var manager = registered.get(worker);
+    if (manager == null)
+      manager = oneshot.get(worker);
+    if (manager == null)
       return false;
-    }
     else
-    {
-      return registered.get(worker).isWorking();
-    }
+      return manager.isWorking();
   }
 
   @Override
@@ -361,13 +392,14 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
     synchronized (registered)
     {
       for (var entry : registered.entrySet())
-      {
-        if (entry.getKey().involves(annot) &&
-                entry.getValue().isWorking())
-        {
+        if (entry.getKey().involves(annot) && entry.getValue().isWorking())
+          return true;
+    }
+    synchronized (oneshot)
+    {
+      for (var entry : registered.entrySet())
+        if (entry.getKey().involves(annot) && entry.getValue().isWorking())
           return true;
-        }
-      }
     }
     return false;
   }
@@ -378,12 +410,14 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
     synchronized (registered)
     {
       for (var manager : registered.values())
-      {
         if (manager.isWorking())
-        {
           return true;
-        }
-      }
+    }
+    synchronized (oneshot)
+    {
+      for (var manager : oneshot.values())
+        if (manager.isWorking())
+          return true;
     }
     return false;
   }
@@ -393,10 +427,11 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
   {
     Objects.requireNonNull(worker);
     var manager = registered.get(worker);
-    if (manager == null) 
+    if (manager == null)
     {
       Cache.log.warn("Starting unregistered worker " + worker);
       manager = createManager(worker);
+      oneshot.put(worker, manager);
     }
     manager.restart();
   }
@@ -415,16 +450,18 @@ public class AlignCalcManager2 implements AlignCalcManagerI2
 
   @Override
   public void cancelWorker(AlignCalcWorkerI worker)
-  {    
+  {
     Objects.requireNonNull(worker);
     var manager = registered.get(worker);
-    if (manager == null) 
+    if (manager == null)
+      manager = oneshot.get(worker);
+    if (manager == null)
     {
       throw new NoSuchElementException();
     }
     manager.cancel();
   }
-  
+
   private void notifyQueued(AlignCalcWorkerI worker)
   {
     for (AlignCalcListener listener : listeners)