package jalview.workers; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static java.lang.String.format; import static java.util.Collections.synchronizedMap; import static java.util.Collections.unmodifiableList; import java.util.ArrayList; import jalview.api.AlignCalcListener; import jalview.api.AlignCalcManagerI2; import jalview.api.AlignCalcWorkerI; import jalview.api.PollableAlignCalcWorkerI; import jalview.bin.Cache; import jalview.datamodel.AlignmentAnnotation; public class AlignCalcManager2 implements AlignCalcManagerI2 { private abstract class WorkerManager { protected volatile boolean enabled = true; protected final AlignCalcWorkerI worker; WorkerManager(AlignCalcWorkerI worker) { this.worker = worker; } AlignCalcWorkerI getWorker() { return worker; } boolean isEnabled() { return enabled; } void setEnabled(boolean enabled) { this.enabled = enabled; } synchronized void restart() { if (!isEnabled()) { return; } if (isWorking()) { cancel(); } submit(); } abstract boolean isWorking(); protected abstract void submit(); abstract void cancel(); } private class SimpleWorkerManager extends WorkerManager { private Future task = null; SimpleWorkerManager(AlignCalcWorkerI worker) { super(worker); } @Override boolean isWorking() { return task != null && !task.isDone(); } @Override protected void submit() { if (task != null && !(task.isDone() || task.isCancelled())) { throw new IllegalStateException( "Cannot submit new task if the prevoius one is still running"); } Cache.log.debug(format("Worker %s queued", worker.getClass().getName())); task = executor.submit(() -> { try { Cache.log.debug(format("Worker %s started", worker.getClass().getName())); worker.run(); Cache.log.debug(format("Worker %s finished", worker.getClass().getName())); } catch (InterruptedException e) { Cache.log.debug(format("Worker %s interrupted", worker.getClass().getName())); } catch (Throwable th) { Cache.log.debug(format("Worker %s failed", worker.getClass().getName()), th); } finally { } }); } @Override synchronized void cancel() { if (!isWorking()) { return; } Cache.log.debug(format("Cancelling worker %s", worker.getClass().getName())); task.cancel(true); } } private class PollableWorkerManager extends WorkerManager { private final PollableAlignCalcWorkerI worker; private Future task = null; PollableWorkerManager(PollableAlignCalcWorkerI worker) { super(worker); this.worker = worker; } @Override boolean isWorking() { return task != null && !task.isDone(); } protected void submit() { if (task != null && !(task.isDone() || task.isCancelled())) { throw new IllegalStateException( "Cannot submit new task if the prevoius one is still running"); } Cache.log.debug(format("Worker %s queued", worker.getClass().getName())); final var runnable = new Runnable() { private boolean started = false; private boolean completed = false; Future future = null; @Override public void run() { try { if (!started) { Cache.log.debug(format("Worker %s started", worker.getClass().getName())); worker.startUp(); started = true; } else if (!completed) { Cache.log.debug(format("Polling worker %s", worker.getClass().getName())); if (worker.poll()) { Cache.log.debug(format("Worker %s finished", worker.getClass().getName())); completed = true; } } } catch (Throwable th) { Cache.log.debug(format("Worker %s failed", worker.getClass().getName()), th); completed = true; } if (completed) { Cache.log.debug(format("Finalizing completed worker %s", worker.getClass().getName())); worker.done(); // almost impossible, but the future may be null at this point // let it throw NPE to cancel forcefully future.cancel(false); } } }; runnable.future = task = executor.scheduleWithFixedDelay( runnable, 10, 1000, TimeUnit.MILLISECONDS); } synchronized protected void cancel() { if (!isWorking()) { return; } Cache.log.debug(format("Cancelling worker %s", worker.getClass().getName())); task.cancel(false); executor.submit(() -> { worker.cancel(); Cache.log.debug(format("Finalizing cancelled worker %s", worker.getClass().getName())); worker.done(); }); } } private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); private final Map registered = synchronizedMap(new HashMap<>()); private final List listeners = new CopyOnWriteArrayList<>(); @Override public void registerWorker(AlignCalcWorkerI worker) { Objects.requireNonNull(worker); WorkerManager manager = (worker instanceof PollableAlignCalcWorkerI) ? new PollableWorkerManager((PollableAlignCalcWorkerI) worker) : new SimpleWorkerManager(worker); registered.putIfAbsent(worker, manager); startWorker(worker); } @Override public List getWorkers() { return List.copyOf(registered.keySet()); } @Override public List getWorkersOfClass( Class cls) { List collected = new ArrayList<>(); for (var worker : getWorkers()) { if (worker.getClass().equals(cls)) { collected.add(worker); } } return unmodifiableList(collected); } @Override public void removeWorker(AlignCalcWorkerI worker) { registered.remove(worker); } @Override public void removeWorkerForAnnotation(AlignmentAnnotation annot) { synchronized (registered) { for (var worker : getWorkers()) { if (worker.involves(annot) && worker.isDeletable()) { removeWorker(worker); } } } } @Override public void removeWorkersOfClass(Class cls) { synchronized (registered) { for (var worker : getWorkers()) { if (worker.getClass().equals(cls)) { removeWorker(worker); } } } } @Override public void disableWorker(AlignCalcWorkerI worker) { // Null pointer check might be needed registered.get(worker).setEnabled(false); } @Override public void enableWorker(AlignCalcWorkerI worker) { // Null pointer check might be needed registered.get(worker).setEnabled(true); } @Override public boolean isDisabled(AlignCalcWorkerI worker) { if (registered.containsKey(worker)) { return !registered.get(worker).isEnabled(); } else { return false; } } @Override public boolean isWorking(AlignCalcWorkerI worker) { if (!registered.containsKey(worker)) { return false; } else { return registered.get(worker).isWorking(); } } @Override public boolean isWorkingWithAnnotation(AlignmentAnnotation annot) { synchronized (registered) { for (var entry : registered.entrySet()) { if (entry.getKey().involves(annot) && entry.getValue().isWorking()) { return true; } } } return false; } @Override public boolean isWorking() { synchronized (registered) { for (var manager : registered.values()) { if (manager.isWorking()) { return true; } } } return false; } @Override public void startWorker(AlignCalcWorkerI worker) { Objects.requireNonNull(worker); var manager = registered.get(worker); if (manager == null) { throw new NoSuchElementException(); } manager.restart(); } @Override public void restartWorkers() { synchronized (registered) { for (var manager : registered.values()) { manager.restart(); } } } @Override public void cancelWorker(AlignCalcWorkerI worker) { Objects.requireNonNull(worker); var manager = registered.get(worker); if (manager == null) { throw new NoSuchElementException(); } manager.cancel(); } private void notifyQueued(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { listener.workerQueued(worker); } } private void notifyStarted(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { listener.workerStarted(worker); } } private void notifyCompleted(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { try { listener.workerCompleted(worker); } catch (RuntimeException e) { e.printStackTrace(); } } } private void notifyCancelled(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { try { listener.workerCancelled(worker); } catch (RuntimeException e) { e.printStackTrace(); } } } private void notifyExceptional(AlignCalcWorkerI worker, Throwable throwable) { for (AlignCalcListener listener : listeners) { try { listener.workerExceptional(worker, throwable); } catch (RuntimeException e) { e.printStackTrace(); } } } @Override public void addAlignCalcListener(AlignCalcListener listener) { listeners.add(listener); } @Override public void removeAlignCalcListener(AlignCalcListener listener) { listeners.remove(listener); } @Override public void shutdown() { executor.shutdownNow(); listeners.clear(); registered.clear(); } }