X-Git-Url: http://source.jalview.org/gitweb/?a=blobdiff_plain;f=src%2Fjalview%2Fworkers%2FAlignCalcManager2.java;h=1d5c1edf136eba7d59d0843e517f1ac9dbe1b7c5;hb=747167089ecf8d6afc70d417f5a20352e029bd95;hp=f7d263ac5d11dc0cd541ef2e01964042ae09113a;hpb=29288682745147c9da172e5a6f2c21f1f3dbb669;p=jalview.git diff --git a/src/jalview/workers/AlignCalcManager2.java b/src/jalview/workers/AlignCalcManager2.java index f7d263a..1d5c1ed 100644 --- a/src/jalview/workers/AlignCalcManager2.java +++ b/src/jalview/workers/AlignCalcManager2.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.WeakHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -22,127 +23,110 @@ import jalview.api.AlignCalcManagerI2; import jalview.api.AlignCalcWorkerI; import jalview.api.PollableAlignCalcWorkerI; import jalview.bin.Cache; +import jalview.bin.Console; import jalview.datamodel.AlignmentAnnotation; public class AlignCalcManager2 implements AlignCalcManagerI2 { private abstract class WorkerManager { - static final int IDLE = 0; - static final int QUEUED = 1; - static final int RUNNING = 2; - static final int CANCELLING = 3; - - protected volatile int state = IDLE; protected volatile boolean enabled = true; - - protected final AlignCalcWorkerI worker; - + + protected AlignCalcWorkerI worker; + WorkerManager(AlignCalcWorkerI worker) { this.worker = worker; } - - AlignCalcWorkerI getWorker() + + protected AlignCalcWorkerI getWorker() { return worker; } - + boolean isEnabled() { return enabled; } - + void setEnabled(boolean enabled) { this.enabled = enabled; } - - synchronized protected void setState(int state) - { - this.state = state; - } - - int getState() - { - return state; - } - - void restart() + + synchronized void restart() { if (!isEnabled()) { return; } - if (state == IDLE) - { - submit(); - } - else if (state == QUEUED) + if (!isRegistered()) { - // job already queued, do nothing + setEnabled(false); } - else if (state == RUNNING) + if (isWorking()) { cancel(); - submit(); - } - else if (state == CANCELLING) - { - submit(); } + submit(); + } + + protected boolean isRegistered() + { + return registered.containsKey(getWorker()); } - + + abstract boolean isWorking(); + protected abstract void submit(); - - abstract void cancel(); + + abstract void cancel(); } - - + private class SimpleWorkerManager extends WorkerManager { private Future task = null; - + SimpleWorkerManager(AlignCalcWorkerI worker) { super(worker); } @Override + boolean isWorking() + { + return task != null && !task.isDone(); + } + + @Override protected void submit() { if (task != null && !(task.isDone() || task.isCancelled())) { throw new IllegalStateException( - "Cannot submit new task if the prevoius one is still running"); + "Cannot submit new task if the prevoius one is still running"); } - Cache.log.debug(format("Worker %s queued", - worker.getClass().getName())); - setState(QUEUED); + Console.debug( + format("Worker %s queued", getWorker().getClass().getName())); task = executor.submit(() -> { - setState(RUNNING); try { - Cache.log.debug(format("Worker %s started", - worker.getClass().getName())); - worker.run(); - Cache.log.debug(format("Worker %s finished", - worker.getClass().getName())); - } - catch (InterruptedException e) + Console.debug(format("Worker %s started", getWorker())); + getWorker().run(); + Console.debug(format("Worker %s finished", getWorker())); + } catch (InterruptedException e) { - Cache.log.debug(format("Worker %s interrupted", - worker.getClass().getName())); - } - catch (Throwable th) + Console.debug(format("Worker %s interrupted", getWorker())); + } catch (Throwable th) { - Cache.log.debug(format("Worker %s failed", - worker.getClass().getName()), th); - } - finally + Console.debug(format("Worker %s failed", getWorker()), th); + } finally { - // fixme: should not set to idle if another task is already queued for execution - setState(IDLE); + if (!isRegistered()) + { + // delete worker reference so garbage collector can remove it + worker = null; + } } }); } @@ -150,49 +134,53 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 @Override synchronized void cancel() { - if (task == null || state == IDLE || state == CANCELLING) + if (!isWorking()) { return; } - Cache.log.debug(format("Cancelling worker %s", - worker.getClass().getName())); - setState(CANCELLING); + Console.debug(format("Cancelling worker %s", getWorker())); task.cancel(true); - if (task.isCancelled()) - { - setState(IDLE); - } } } - - + private class PollableWorkerManager extends WorkerManager { - private final PollableAlignCalcWorkerI worker; private Future task = null; - + PollableWorkerManager(PollableAlignCalcWorkerI worker) { super(worker); - this.worker = worker; } - + + @Override + protected PollableAlignCalcWorkerI getWorker() + { + return (PollableAlignCalcWorkerI) super.getWorker(); + } + + @Override + boolean isWorking() + { + return task != null && !task.isDone(); + } + protected void submit() { if (task != null && !(task.isDone() || task.isCancelled())) { throw new IllegalStateException( - "Cannot submit new task if the prevoius one is still running"); + "Cannot submit new task if the prevoius one is still running"); } - Cache.log.debug(format("Worker %s queued", - worker.getClass().getName())); - setState(QUEUED); + Console.debug( + format("Worker %s queued", getWorker())); final var runnable = new Runnable() { private boolean started = false; + private boolean completed = false; + Future future = null; - + @Override public void run() { @@ -200,83 +188,91 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 { if (!started) { - Cache.log.debug(format("Worker %s started", - worker.getClass().getName())); - setState(RUNNING); - worker.startUp(); + Console.debug(format("Worker %s started", getWorker())); + getWorker().startUp(); started = true; } else if (!completed) { - if (worker.poll()) + Console.debug(format("Polling worker %s", getWorker())); + if (getWorker().poll()) { - Cache.log.debug(format("Worker %s finished", - worker.getClass().getName())); + Console.debug(format("Worker %s finished", getWorker())); completed = true; - setState(IDLE); } } } catch (Throwable th) { - Cache.log.debug(format("Worker %s failed", - worker.getClass().getName()), th); + Console.debug(format("Worker %s failed", getWorker()), th); completed = true; - setState(IDLE); } if (completed) { - try - { - future.cancel(false); - } - catch (NullPointerException ignored) - { - // extremely unlikely to happen - } + final var worker = getWorker(); + if (!isRegistered()) + PollableWorkerManager.super.worker = null; + Console.debug(format("Finalizing completed worker %s", worker)); + worker.done(); + // almost impossible, but the future may be null at this point + // let it throw NPE to cancel forcefully + future.cancel(false); } } }; - runnable.future = task = executor.scheduleWithFixedDelay( - runnable, 10, 1000, TimeUnit.MILLISECONDS); + runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10, + 1000, TimeUnit.MILLISECONDS); } - + synchronized protected void cancel() { - if (task == null || state == IDLE || state == CANCELLING) + if (!isWorking()) { return; } - Cache.log.debug(format("Cancelling worker %s", - worker.getClass().getName())); - setState(CANCELLING); + Console.debug(format("Cancelling worker %s", getWorker())); task.cancel(false); - if (task.isCancelled()) - { - setState(IDLE); - } executor.submit(() -> { - worker.cancel(); + final var worker = getWorker(); + if (!isRegistered()) + PollableWorkerManager.super.worker = null; + if (worker != null) + { + worker.cancel(); + Console.debug(format("Finalizing cancelled worker %s", worker)); + worker.done(); + } }); } } - - - private final ScheduledExecutorService executor = - Executors.newSingleThreadScheduledExecutor(); - private final Map registered = - synchronizedMap(new HashMap<>()); - - private final List listeners = - new CopyOnWriteArrayList<>(); - - + + private final ScheduledExecutorService executor = Executors + .newSingleThreadScheduledExecutor(); + + private final Map registered = synchronizedMap( + new HashMap<>()); + + private final Map oneshot = synchronizedMap( + new WeakHashMap<>()); + + private final List listeners = new CopyOnWriteArrayList<>(); + + private WorkerManager createManager(AlignCalcWorkerI worker) + { + if (worker instanceof PollableAlignCalcWorkerI) + { + return new PollableWorkerManager((PollableAlignCalcWorkerI) worker); + } + else + { + return new SimpleWorkerManager(worker); + } + } + @Override public void registerWorker(AlignCalcWorkerI worker) { Objects.requireNonNull(worker); - WorkerManager manager = (worker instanceof PollableAlignCalcWorkerI) ? - new PollableWorkerManager((PollableAlignCalcWorkerI) worker) : - new SimpleWorkerManager(worker); + WorkerManager manager = createManager(worker); registered.putIfAbsent(worker, manager); startWorker(worker); } @@ -284,7 +280,9 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 @Override public List getWorkers() { - return List.copyOf(registered.keySet()); + List result = new ArrayList<>(registered.size()); + result.addAll(registered.keySet()); + return result; } @Override @@ -305,7 +303,10 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 @Override public void removeWorker(AlignCalcWorkerI worker) { - registered.remove(worker); + if (worker.isDeletable()) + { + registered.remove(worker); + } } @Override @@ -315,7 +316,7 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 { for (var worker : getWorkers()) { - if (worker.involves(annot) && worker.isDeletable()) + if (worker.involves(annot)) { removeWorker(worker); } @@ -368,14 +369,13 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 @Override public boolean isWorking(AlignCalcWorkerI worker) { - if (!registered.containsKey(worker)) - { + var manager = registered.get(worker); + if (manager == null) + manager = oneshot.get(worker); + if (manager == null) return false; - } else - { - return registered.get(worker).getState() == WorkerManager.RUNNING; - } + return manager.isWorking(); } @Override @@ -384,13 +384,14 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 synchronized (registered) { for (var entry : registered.entrySet()) - { - if (entry.getKey().involves(annot) && - entry.getValue().getState() == WorkerManager.RUNNING) - { + if (entry.getKey().involves(annot) && entry.getValue().isWorking()) + return true; + } + synchronized (oneshot) + { + for (var entry : registered.entrySet()) + if (entry.getKey().involves(annot) && entry.getValue().isWorking()) return true; - } - } } return false; } @@ -401,12 +402,14 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 synchronized (registered) { for (var manager : registered.values()) - { - if (manager.getState() == WorkerManager.RUNNING) - { + if (manager.isWorking()) + return true; + } + synchronized (oneshot) + { + for (var manager : oneshot.values()) + if (manager.isWorking()) return true; - } - } } return false; } @@ -416,9 +419,11 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 { Objects.requireNonNull(worker); var manager = registered.get(worker); - if (manager == null) + if (manager == null) { - throw new NoSuchElementException(); + Console.warn("Starting unregistered worker " + worker); + manager = createManager(worker); + oneshot.put(worker, manager); } manager.restart(); } @@ -437,16 +442,18 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 @Override public void cancelWorker(AlignCalcWorkerI worker) - { + { Objects.requireNonNull(worker); var manager = registered.get(worker); - if (manager == null) + if (manager == null) + manager = oneshot.get(worker); + if (manager == null) { throw new NoSuchElementException(); } manager.cancel(); } - + private void notifyQueued(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners)