From 5150133eed3d978727552434fe6fe7e0ff335370 Mon Sep 17 00:00:00 2001 From: Mateusz Date: Tue, 19 Jan 2021 18:29:42 +0100 Subject: [PATCH] JAL-3690 AlignCalcManager - add full support for oneshot workers --- src/jalview/workers/AlignCalcManager2.java | 211 ++++++++++++++++------------ 1 file changed, 124 insertions(+), 87 deletions(-) diff --git a/src/jalview/workers/AlignCalcManager2.java b/src/jalview/workers/AlignCalcManager2.java index cd190bd..8340900 100644 --- a/src/jalview/workers/AlignCalcManager2.java +++ b/src/jalview/workers/AlignCalcManager2.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.WeakHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -29,59 +30,67 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 private abstract class WorkerManager { protected volatile boolean enabled = true; - - protected final AlignCalcWorkerI worker; - + + protected AlignCalcWorkerI worker; + WorkerManager(AlignCalcWorkerI worker) { this.worker = worker; } - - AlignCalcWorkerI getWorker() + + protected AlignCalcWorkerI getWorker() { return worker; } - + boolean isEnabled() { return enabled; } - + void setEnabled(boolean enabled) { this.enabled = enabled; } - + synchronized void restart() { if (!isEnabled()) { return; } + if (!isRestartable()) + { + setEnabled(false); + } if (isWorking()) { cancel(); } submit(); } - + + protected boolean isRestartable() + { + return registered.containsKey(getWorker()); + } + abstract boolean isWorking(); - + protected abstract void submit(); - + abstract void cancel(); } - - + private class SimpleWorkerManager extends WorkerManager { private Future task = null; - + SimpleWorkerManager(AlignCalcWorkerI worker) { super(worker); } - + @Override boolean isWorking() { @@ -94,31 +103,33 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 if (task != null && !(task.isDone() || task.isCancelled())) { throw new IllegalStateException( - "Cannot submit new task if the prevoius one is still running"); + "Cannot submit new task if the prevoius one is still running"); } - Cache.log.debug(format("Worker %s queued", - worker.getClass().getName())); + Cache.log.debug( + format("Worker %s queued", getWorker().getClass().getName())); task = executor.submit(() -> { try { Cache.log.debug(format("Worker %s started", - worker.getClass().getName())); - worker.run(); + getWorker().getClass().getName())); + getWorker().run(); Cache.log.debug(format("Worker %s finished", - worker.getClass().getName())); - } - catch (InterruptedException e) + getWorker().getClass().getName())); + } catch (InterruptedException e) { Cache.log.debug(format("Worker %s interrupted", - worker.getClass().getName())); - } - catch (Throwable th) + getWorker().getClass().getName())); + } catch (Throwable th) { Cache.log.debug(format("Worker %s failed", - worker.getClass().getName()), th); - } - finally + getWorker().getClass().getName()), th); + } finally { + if (!isRestartable()) + { + // delete worker reference so garbage collector can remove it + worker = null; + } } }); } @@ -131,44 +142,49 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 return; } Cache.log.debug(format("Cancelling worker %s", - worker.getClass().getName())); + getWorker().getClass().getName())); task.cancel(true); } } - - + private class PollableWorkerManager extends WorkerManager { - private final PollableAlignCalcWorkerI worker; private Future task = null; - + PollableWorkerManager(PollableAlignCalcWorkerI worker) { super(worker); - this.worker = worker; } - + + @Override + protected PollableAlignCalcWorkerI getWorker() + { + return (PollableAlignCalcWorkerI) super.getWorker(); + } + @Override boolean isWorking() { return task != null && !task.isDone(); } - + protected void submit() { if (task != null && !(task.isDone() || task.isCancelled())) { throw new IllegalStateException( - "Cannot submit new task if the prevoius one is still running"); + "Cannot submit new task if the prevoius one is still running"); } - Cache.log.debug(format("Worker %s queued", - worker.getClass().getName())); + Cache.log.debug( + format("Worker %s queued", getWorker().getClass().getName())); final var runnable = new Runnable() { private boolean started = false; + private boolean completed = false; + Future future = null; - + @Override public void run() { @@ -177,29 +193,32 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 if (!started) { Cache.log.debug(format("Worker %s started", - worker.getClass().getName())); - worker.startUp(); + getWorker().getClass().getName())); + getWorker().startUp(); started = true; } else if (!completed) { Cache.log.debug(format("Polling worker %s", - worker.getClass().getName())); - if (worker.poll()) + getWorker().getClass().getName())); + if (getWorker().poll()) { Cache.log.debug(format("Worker %s finished", - worker.getClass().getName())); + getWorker().getClass().getName())); completed = true; } } } catch (Throwable th) { Cache.log.debug(format("Worker %s failed", - worker.getClass().getName()), th); + getWorker().getClass().getName()), th); completed = true; } if (completed) { + final var worker = getWorker(); + if (!isRestartable()) + PollableWorkerManager.super.worker = null; Cache.log.debug(format("Finalizing completed worker %s", worker.getClass().getName())); worker.done(); @@ -209,10 +228,10 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 } } }; - runnable.future = task = executor.scheduleWithFixedDelay( - runnable, 10, 1000, TimeUnit.MILLISECONDS); + runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10, + 1000, TimeUnit.MILLISECONDS); } - + synchronized protected void cancel() { if (!isWorking()) @@ -220,33 +239,46 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 return; } Cache.log.debug(format("Cancelling worker %s", - worker.getClass().getName())); + getWorker().getClass().getName())); task.cancel(false); executor.submit(() -> { - worker.cancel(); - Cache.log.debug(format("Finalizing cancelled worker %s", - worker.getClass().getName())); - worker.done(); + final var worker = getWorker(); + if (!isRestartable()) + PollableWorkerManager.super.worker = null; + if (worker != null) + { + worker.cancel(); + Cache.log.debug(format("Finalizing cancelled worker %s", + worker.getClass().getName())); + worker.done(); + } }); } } - - - private final ScheduledExecutorService executor = - Executors.newSingleThreadScheduledExecutor(); - private final Map registered = - synchronizedMap(new HashMap<>()); - - private final List listeners = - new CopyOnWriteArrayList<>(); - - private WorkerManager createManager(AlignCalcWorkerI worker) { + + private final ScheduledExecutorService executor = Executors + .newSingleThreadScheduledExecutor(); + + private final Map registered = synchronizedMap( + new HashMap<>()); + + private final Map oneshot = synchronizedMap( + new WeakHashMap<>()); + + private final List listeners = new CopyOnWriteArrayList<>(); + + private WorkerManager createManager(AlignCalcWorkerI worker) + { if (worker instanceof PollableAlignCalcWorkerI) + { return new PollableWorkerManager((PollableAlignCalcWorkerI) worker); + } else + { return new SimpleWorkerManager(worker); + } } - + @Override public void registerWorker(AlignCalcWorkerI worker) { @@ -345,14 +377,13 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 @Override public boolean isWorking(AlignCalcWorkerI worker) { - if (!registered.containsKey(worker)) - { + var manager = registered.get(worker); + if (manager == null) + manager = oneshot.get(worker); + if (manager == null) return false; - } else - { - return registered.get(worker).isWorking(); - } + return manager.isWorking(); } @Override @@ -361,13 +392,14 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 synchronized (registered) { for (var entry : registered.entrySet()) - { - if (entry.getKey().involves(annot) && - entry.getValue().isWorking()) - { + if (entry.getKey().involves(annot) && entry.getValue().isWorking()) + return true; + } + synchronized (oneshot) + { + for (var entry : registered.entrySet()) + if (entry.getKey().involves(annot) && entry.getValue().isWorking()) return true; - } - } } return false; } @@ -378,12 +410,14 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 synchronized (registered) { for (var manager : registered.values()) - { if (manager.isWorking()) - { return true; - } - } + } + synchronized (oneshot) + { + for (var manager : oneshot.values()) + if (manager.isWorking()) + return true; } return false; } @@ -393,10 +427,11 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 { Objects.requireNonNull(worker); var manager = registered.get(worker); - if (manager == null) + if (manager == null) { Cache.log.warn("Starting unregistered worker " + worker); manager = createManager(worker); + oneshot.put(worker, manager); } manager.restart(); } @@ -415,16 +450,18 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 @Override public void cancelWorker(AlignCalcWorkerI worker) - { + { Objects.requireNonNull(worker); var manager = registered.get(worker); - if (manager == null) + if (manager == null) + manager = oneshot.get(worker); + if (manager == null) { throw new NoSuchElementException(); } manager.cancel(); } - + private void notifyQueued(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) -- 1.7.10.2