package jalview.workers; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.WeakHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static java.lang.String.format; import static java.util.Collections.synchronizedMap; import static java.util.Collections.unmodifiableList; import java.util.ArrayList; import jalview.api.AlignCalcListener; import jalview.api.AlignCalcManagerI2; import jalview.api.AlignCalcWorkerI; import jalview.api.PollableAlignCalcWorkerI; import jalview.bin.Cache; import jalview.datamodel.AlignmentAnnotation; public class AlignCalcManager2 implements AlignCalcManagerI2 { private abstract class WorkerManager { protected volatile boolean enabled = true; protected AlignCalcWorkerI worker; WorkerManager(AlignCalcWorkerI worker) { this.worker = worker; } protected AlignCalcWorkerI getWorker() { return worker; } boolean isEnabled() { return enabled; } void setEnabled(boolean enabled) { this.enabled = enabled; } synchronized void restart() { if (!isEnabled()) { return; } if (!isRegistered()) { setEnabled(false); } if (isWorking()) { cancel(); } submit(); } protected boolean isRegistered() { return registered.containsKey(getWorker()); } abstract boolean isWorking(); protected abstract void submit(); abstract void cancel(); } private class SimpleWorkerManager extends WorkerManager { private Future task = null; SimpleWorkerManager(AlignCalcWorkerI worker) { super(worker); } @Override boolean isWorking() { return task != null && !task.isDone(); } @Override protected void submit() { if (task != null && !(task.isDone() || task.isCancelled())) { throw new IllegalStateException( "Cannot submit new task if the prevoius one is still running"); } Cache.log.debug( format("Worker %s queued", getWorker().getClass().getName())); task = executor.submit(() -> { try { Cache.log.debug(format("Worker %s started", getWorker().getClass().getName())); getWorker().run(); Cache.log.debug(format("Worker %s finished", getWorker().getClass().getName())); } catch (InterruptedException e) { Cache.log.debug(format("Worker %s interrupted", getWorker().getClass().getName())); } catch (Throwable th) { Cache.log.debug(format("Worker %s failed", getWorker().getClass().getName()), th); } finally { if (!isRegistered()) { // delete worker reference so garbage collector can remove it worker = null; } } }); } @Override synchronized void cancel() { if (!isWorking()) { return; } Cache.log.debug(format("Cancelling worker %s", getWorker().getClass().getName())); task.cancel(true); } } private class PollableWorkerManager extends WorkerManager { private Future task = null; PollableWorkerManager(PollableAlignCalcWorkerI worker) { super(worker); } @Override protected PollableAlignCalcWorkerI getWorker() { return (PollableAlignCalcWorkerI) super.getWorker(); } @Override boolean isWorking() { return task != null && !task.isDone(); } protected void submit() { if (task != null && !(task.isDone() || task.isCancelled())) { throw new IllegalStateException( "Cannot submit new task if the prevoius one is still running"); } Cache.log.debug( format("Worker %s queued", getWorker().getClass().getName())); final var runnable = new Runnable() { private boolean started = false; private boolean completed = false; Future future = null; @Override public void run() { try { if (!started) { Cache.log.debug(format("Worker %s started", getWorker().getClass().getName())); getWorker().startUp(); started = true; } else if (!completed) { Cache.log.debug(format("Polling worker %s", getWorker().getClass().getName())); if (getWorker().poll()) { Cache.log.debug(format("Worker %s finished", getWorker().getClass().getName())); completed = true; } } } catch (Throwable th) { Cache.log.debug(format("Worker %s failed", getWorker().getClass().getName()), th); completed = true; } if (completed) { final var worker = getWorker(); if (!isRegistered()) PollableWorkerManager.super.worker = null; Cache.log.debug(format("Finalizing completed worker %s", worker.getClass().getName())); worker.done(); // almost impossible, but the future may be null at this point // let it throw NPE to cancel forcefully future.cancel(false); } } }; runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10, 1000, TimeUnit.MILLISECONDS); } synchronized protected void cancel() { if (!isWorking()) { return; } Cache.log.debug(format("Cancelling worker %s", getWorker().getClass().getName())); task.cancel(false); executor.submit(() -> { final var worker = getWorker(); if (!isRegistered()) PollableWorkerManager.super.worker = null; if (worker != null) { worker.cancel(); Cache.log.debug(format("Finalizing cancelled worker %s", worker.getClass().getName())); worker.done(); } }); } } private final ScheduledExecutorService executor = Executors .newSingleThreadScheduledExecutor(); private final Map registered = synchronizedMap( new HashMap<>()); private final Map oneshot = synchronizedMap( new WeakHashMap<>()); private final List listeners = new CopyOnWriteArrayList<>(); private WorkerManager createManager(AlignCalcWorkerI worker) { if (worker instanceof PollableAlignCalcWorkerI) { return new PollableWorkerManager((PollableAlignCalcWorkerI) worker); } else { return new SimpleWorkerManager(worker); } } @Override public void registerWorker(AlignCalcWorkerI worker) { Objects.requireNonNull(worker); WorkerManager manager = createManager(worker); registered.putIfAbsent(worker, manager); startWorker(worker); } @Override public List getWorkers() { List result = new ArrayList<>(registered.size()); result.addAll(registered.keySet()); return result; } @Override public List getWorkersOfClass( Class cls) { List collected = new ArrayList<>(); for (var worker : getWorkers()) { if (worker.getClass().equals(cls)) { collected.add(worker); } } return unmodifiableList(collected); } @Override public List getWorkersForName(String name) { List collected = new ArrayList<>(); for (var worker : getWorkers()) { if (worker.getCalcName().equals(name)) { collected.add(worker); } } return collected; } @Override public void removeWorker(AlignCalcWorkerI worker) { if (worker.isDeletable()) { registered.remove(worker); } } @Override public void removeWorkerForAnnotation(AlignmentAnnotation annot) { synchronized (registered) { for (var worker : getWorkers()) { if (worker.involves(annot)) { removeWorker(worker); } } } } @Override public void removeWorkersOfClass(Class cls) { synchronized (registered) { for (var worker : getWorkers()) { if (worker.getClass().equals(cls)) { removeWorker(worker); } } } } @Override public void removeWorkersForName(String name) { synchronized (registered) { for (var worker : getWorkers()) { if (worker.getCalcName().equals(name)) { removeWorker(worker); } } } } @Override public void disableWorker(AlignCalcWorkerI worker) { // Null pointer check might be needed registered.get(worker).setEnabled(false); } @Override public void enableWorker(AlignCalcWorkerI worker) { // Null pointer check might be needed registered.get(worker).setEnabled(true); } @Override public boolean isDisabled(AlignCalcWorkerI worker) { if (registered.containsKey(worker)) { return !registered.get(worker).isEnabled(); } else { return false; } } @Override public boolean isWorking(AlignCalcWorkerI worker) { var manager = registered.get(worker); if (manager == null) manager = oneshot.get(worker); if (manager == null) return false; else return manager.isWorking(); } @Override public boolean isWorkingWithAnnotation(AlignmentAnnotation annot) { synchronized (registered) { for (var entry : registered.entrySet()) if (entry.getKey().involves(annot) && entry.getValue().isWorking()) return true; } synchronized (oneshot) { for (var entry : registered.entrySet()) if (entry.getKey().involves(annot) && entry.getValue().isWorking()) return true; } return false; } @Override public boolean isWorking() { synchronized (registered) { for (var manager : registered.values()) if (manager.isWorking()) return true; } synchronized (oneshot) { for (var manager : oneshot.values()) if (manager.isWorking()) return true; } return false; } @Override public void startWorker(AlignCalcWorkerI worker) { Objects.requireNonNull(worker); var manager = registered.get(worker); if (manager == null) { Cache.log.warn("Starting unregistered worker " + worker); manager = createManager(worker); oneshot.put(worker, manager); } manager.restart(); } @Override public void restartWorkers() { synchronized (registered) { for (var manager : registered.values()) { manager.restart(); } } } @Override public void cancelWorker(AlignCalcWorkerI worker) { Objects.requireNonNull(worker); var manager = registered.get(worker); if (manager == null) manager = oneshot.get(worker); if (manager == null) { throw new NoSuchElementException(); } manager.cancel(); } private void notifyQueued(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { listener.workerQueued(worker); } } private void notifyStarted(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { listener.workerStarted(worker); } } private void notifyCompleted(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { try { listener.workerCompleted(worker); } catch (RuntimeException e) { e.printStackTrace(); } } } private void notifyCancelled(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { try { listener.workerCancelled(worker); } catch (RuntimeException e) { e.printStackTrace(); } } } private void notifyExceptional(AlignCalcWorkerI worker, Throwable throwable) { for (AlignCalcListener listener : listeners) { try { listener.workerExceptional(worker, throwable); } catch (RuntimeException e) { e.printStackTrace(); } } } @Override public void addAlignCalcListener(AlignCalcListener listener) { listeners.add(listener); } @Override public void removeAlignCalcListener(AlignCalcListener listener) { listeners.remove(listener); } @Override public void shutdown() { executor.shutdownNow(); listeners.clear(); registered.clear(); } }