package jalview.workers; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.stream.Collectors; import jalview.api.AlignCalcListener; import jalview.api.AlignCalcManagerI2; import jalview.api.AlignCalcWorkerI; import jalview.bin.Cache; import jalview.datamodel.AlignmentAnnotation; import static java.util.Collections.synchronizedList; import static java.util.Collections.synchronizedSet; import static java.util.Collections.unmodifiableList; import java.util.ArrayList; import java.util.HashSet; import static java.lang.String.format; public class AlignCalcManager2 implements AlignCalcManagerI2 { class AlignCalcTask extends FutureTask { final AlignCalcWorkerI worker; public AlignCalcTask(AlignCalcWorkerI worker) { super(new Callable() { public Void call() throws Exception { Cache.log.debug(format("Worker %s started%n", worker.getClass().getName())); notifyStarted(worker); worker.run(); return null; } }); this.worker = worker; } public AlignCalcWorkerI getWorker() { return worker; } @Override protected void done() { boolean success = false; Throwable exception = null; try { get(); success = true; } catch (ExecutionException e) { exception = e.getCause(); if (exception instanceof OutOfMemoryError) { disableWorker(getWorker()); } } catch (Throwable e) { exception = e; } finally { inProgress.remove(getWorker()); tasks.remove(this); } if (success) { Cache.log.debug(format("Worker %s finished%n", getWorker().getClass().getName())); notifyCompleted(worker); } else if (exception != null){ Cache.log.warn(format("Worker %s failed%n", getWorker().getClass().getName())); exception.printStackTrace(); notifyExceptional(worker, exception); } } } // main executor for running workers one-by-one private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final List listeners = new CopyOnWriteArrayList(); // list of all registered workers (other collections are subsets of this) private final List registered = synchronizedList(new ArrayList<>()); // list of tasks holding queued and running workers private final List tasks = synchronizedList(new ArrayList<>()); // the collection of currently running workers private final Set inProgress = synchronizedSet(new HashSet<>()); // the collection of workers that will not be started private final Set disabled = synchronizedSet(new HashSet<>()); /* * Register the worker with this manager and scheduler for execution. */ @Override public void registerWorker(AlignCalcWorkerI worker) { Objects.requireNonNull(worker); synchronized (registered) { if (!registered.contains(worker)) registered.add(worker); } startWorker(worker); } @Override public List getWorkers() { return unmodifiableList(new ArrayList<>(registered)); } @Override public List getWorkersOfClass( Class cls) { synchronized (registered) { return registered.stream() .filter(worker -> worker.getClass().equals(cls)) .collect(Collectors.toUnmodifiableList()); } } @Override public void removeWorker(AlignCalcWorkerI worker) { registered.remove(worker); disabled.remove(worker); } @Override public void removeWorkersOfClass(Class cls) { synchronized (registered) { for (var worker : registered) { if (worker.getClass().equals(cls)) { removeWorker(worker); } } } } @Override public void removeWorkerForAnnotation(AlignmentAnnotation annot) { synchronized (registered) { for (var worker : registered) { if (worker.involves(annot) && worker.isDeletable()) { removeWorker(worker); } } } } @Override public void disableWorker(AlignCalcWorkerI worker) { disabled.add(worker); } @Override public void enableWorker(AlignCalcWorkerI worker) { disabled.remove(worker); } @Override public void restartWorkers() { synchronized (registered) { for (AlignCalcWorkerI worker : registered) { if (!isDisabled(worker)) startWorker(worker); } } } @Override public void startWorker(AlignCalcWorkerI worker) { Objects.requireNonNull(worker); AlignCalcTask newTask = new AlignCalcTask(worker); synchronized (inProgress) { cancelWorker(worker); inProgress.add(worker); tasks.add(newTask); } notifyQueued(worker); executor.execute(newTask); } @Override public void cancelWorker(AlignCalcWorkerI worker) { if (isWorking(worker)) { synchronized (tasks) { Optional oldTask = tasks.stream() .filter(task -> task.getWorker().equals(worker)) .findFirst(); if (oldTask.isPresent()) { oldTask.get().cancel(true); } } } } @Override public boolean isDisabled(AlignCalcWorkerI worker) { return disabled.contains(worker); } @Override public boolean isWorking(AlignCalcWorkerI worker) { return inProgress.contains(worker); } @Override public boolean isWorking() { return !inProgress.isEmpty(); } @Override public boolean isWorkingWithAnnotation(AlignmentAnnotation annot) { synchronized (inProgress) { for (AlignCalcWorkerI worker : inProgress) { if (worker.involves(annot)) { return true; } } } return false; } private void notifyQueued(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { listener.workerQueued(worker); } } private void notifyStarted(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { listener.workerStarted(worker); } } private void notifyCompleted(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) { try { listener.workerCompleted(worker); } catch (RuntimeException e) { e.printStackTrace(); } } } private void notifyExceptional(AlignCalcWorkerI worker, Throwable throwable) { for (AlignCalcListener listener : listeners) { try { listener.workerExceptional(worker, throwable); } catch (RuntimeException e) { e.printStackTrace(); } } } @Override public void addAlignCalcListener(AlignCalcListener listener) { listeners.add(listener); } @Override public void removeAlignCalcListener(AlignCalcListener listener) { listeners.remove(listener); } }