X-Git-Url: http://source.jalview.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fjalview%2Fworkers%2FAlignCalcManager2.java;h=582c84f7523ecc2b4913fbaea2ef558a5da5af7d;hb=8680991d282c52d1524c65f1b8e9053b55634171;hp=7d3cca68bda9f22b87e82c1adf8e8abad8d21a91;hpb=6a6fb5010c2cd06b14a1e055eabaeb4848e65d2d;p=jalview.git diff --git a/src/jalview/workers/AlignCalcManager2.java b/src/jalview/workers/AlignCalcManager2.java index 7d3cca6..582c84f 100644 --- a/src/jalview/workers/AlignCalcManager2.java +++ b/src/jalview/workers/AlignCalcManager2.java @@ -1,168 +1,335 @@ package jalview.workers; +import java.util.HashMap; import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.WeakHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; +import static java.util.Collections.synchronizedMap; +import static java.util.Collections.unmodifiableList; + +import java.util.ArrayList; import jalview.api.AlignCalcListener; -import jalview.api.AlignCalcManagerI; import jalview.api.AlignCalcManagerI2; import jalview.api.AlignCalcWorkerI; +import jalview.api.PollableAlignCalcWorkerI; +import jalview.bin.Cache; +import jalview.bin.Console; import jalview.datamodel.AlignmentAnnotation; -import static java.util.Collections.synchronizedList; -import static java.util.Collections.synchronizedSet; -import static java.util.Collections.unmodifiableList; - -import java.util.ArrayList; -import java.util.HashSet; - public class AlignCalcManager2 implements AlignCalcManagerI2 { - class AlignCalcTask extends FutureTask + private abstract class WorkerManager { - final AlignCalcWorkerI worker; + protected volatile boolean enabled = true; + + protected AlignCalcWorkerI worker; - public AlignCalcTask(AlignCalcWorkerI worker) + WorkerManager(AlignCalcWorkerI worker) { - super(new Callable() { - public Void call() throws Exception { - notifyStarted(worker); - worker.run(); - return null; - } - }); this.worker = worker; } - public AlignCalcWorkerI getWorker() + protected AlignCalcWorkerI getWorker() { return worker; } - @Override - protected void done() + boolean isEnabled() { - boolean success = false; - Throwable exception = null; - try + return enabled; + } + + void setEnabled(boolean enabled) + { + this.enabled = enabled; + } + + synchronized void restart() + { + if (!isEnabled()) + { + return; + } + if (!isRegistered()) { - get(); - success = true; - } catch (ExecutionException e) + setEnabled(false); + } + if (isWorking()) { - exception = e.getCause(); - if (exception instanceof OutOfMemoryError) { - disableWorker(getWorker()); + cancel(); + } + submit(); + } + + protected boolean isRegistered() + { + return registered.containsKey(getWorker()); + } + + abstract boolean isWorking(); + + protected abstract void submit(); + + abstract void cancel(); + } + + private class SimpleWorkerManager extends WorkerManager + { + private Future task = null; + + SimpleWorkerManager(AlignCalcWorkerI worker) + { + super(worker); + } + + @Override + boolean isWorking() + { + return task != null && !task.isDone(); + } + + @Override + protected void submit() + { + if (task != null && !(task.isDone() || task.isCancelled())) + { + throw new IllegalStateException( + "Cannot submit new task if the prevoius one is still running"); + } + Console.debug( + format("Worker %s queued", getWorker())); + task = executor.submit(() -> { + try + { + Console.debug(format("Worker %s started", getWorker())); + getWorker().run(); + Console.debug(format("Worker %s finished", getWorker())); + } catch (InterruptedException e) + { + Console.debug(format("Worker %s interrupted", getWorker())); + } catch (Throwable th) + { + Console.debug(format("Worker %s failed", getWorker()), th); + } finally + { + if (!isRegistered()) + { + // delete worker reference so garbage collector can remove it + worker = null; + } } - } catch (Throwable e) + }); + } + + @Override + synchronized void cancel() + { + if (!isWorking()) + { + return; + } + Console.debug(format("Cancelling worker %s", getWorker())); + task.cancel(true); + } + } + + private class PollableWorkerManager extends WorkerManager + { + private Future task = null; + + PollableWorkerManager(PollableAlignCalcWorkerI worker) + { + super(worker); + } + + @Override + protected PollableAlignCalcWorkerI getWorker() + { + return (PollableAlignCalcWorkerI) super.getWorker(); + } + + @Override + boolean isWorking() + { + return task != null && !task.isDone(); + } + + protected void submit() + { + if (task != null && !(task.isDone() || task.isCancelled())) { - exception = e; + throw new IllegalStateException( + "Cannot submit new task if the previous one is still running"); } - finally { - inProgress.remove(getWorker()); - tasks.remove(this); + Console.debug( format("Worker %s queued", getWorker())); + final var runnable = new Runnable() + { + private boolean started = false; + + private boolean completed = false; + + Future future = null; + + @Override + public void run() + { + try + { + if (!started) + { + Console.debug(format("Worker %s started", getWorker())); + getWorker().startUp(); + started = true; + } + else if (!completed) + { + Console.debug(format("Polling worker %s", getWorker())); + if (getWorker().poll()) + { + Console.debug(format("Worker %s finished", getWorker())); + completed = true; + } + } + } catch (Throwable th) + { + Console.debug(format("Worker %s failed", getWorker()), th); + completed = true; + } + if (completed) + { + final var worker = getWorker(); + if (!isRegistered()) + PollableWorkerManager.super.worker = null; + Console.debug(format("Finalizing completed worker %s", worker)); + worker.done(); + // almost impossible, but the future may be null at this point + // let it throw NPE to cancel forcefully + future.cancel(false); + } + } + }; + runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10, + 1000, TimeUnit.MILLISECONDS); + } + + synchronized protected void cancel() + { + if (!isWorking()) + { + return; } - if (success) - notifyCompleted(worker); - else - notifyExceptional(worker, exception); + Console.debug(format("Cancelling worker %s", getWorker())); + task.cancel(false); + executor.submit(() -> { + final var worker = getWorker(); + if (!isRegistered()) + PollableWorkerManager.super.worker = null; + if (worker != null) + { + worker.cancel(); + Console.debug(format("Finalizing cancelled worker %s", worker)); + worker.done(); + } + }); } } - // main executor for running workers one-by-one - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - - private final List listeners = new CopyOnWriteArrayList(); + private final ScheduledExecutorService executor = Executors + .newSingleThreadScheduledExecutor(); - // list of all registered workers (other collections are subsets of this) - private final List registered = synchronizedList(new ArrayList<>()); + private final Map registered = synchronizedMap( + new HashMap<>()); - // list of tasks holding queued and running workers - private final List tasks = synchronizedList(new ArrayList<>()); - - // the collection of currently running workers - private final Set inProgress = synchronizedSet(new HashSet<>()); + private final Map oneshot = synchronizedMap( + new WeakHashMap<>()); - // the collection of workers that will not be started - private final Set disabled = synchronizedSet(new HashSet<>()); + private final List listeners = new CopyOnWriteArrayList<>(); - /* - * Register the worker with this manager and scheduler for execution. - */ - @Override - public void registerWorker(AlignCalcWorkerI worker) + private WorkerManager createManager(AlignCalcWorkerI worker) { - synchronized (registered) + if (worker instanceof PollableAlignCalcWorkerI) + { + return new PollableWorkerManager((PollableAlignCalcWorkerI) worker); + } + else { - if (!registered.contains(worker)) - registered.add(worker); + return new SimpleWorkerManager(worker); } + } + + @Override + public void registerWorker(AlignCalcWorkerI worker) + { + Objects.requireNonNull(worker); + WorkerManager manager = createManager(worker); + registered.putIfAbsent(worker, manager); startWorker(worker); } @Override public List getWorkers() { - return unmodifiableList(new ArrayList<>(registered)); + List result = new ArrayList<>(registered.size()); + result.addAll(registered.keySet()); + return result; } - + @Override public List getWorkersOfClass( Class cls) { - synchronized (registered) + List collected = new ArrayList<>(); + for (var worker : getWorkers()) { - return registered.stream() - .filter(worker -> worker.getClass().equals(cls)) - .collect(Collectors.toUnmodifiableList()); + if (worker.getClass().equals(cls)) + { + collected.add(worker); + } } + return unmodifiableList(collected); } - + @Override public void removeWorker(AlignCalcWorkerI worker) { registered.remove(worker); - disabled.remove(worker); } @Override - public void removeWorkersOfClass(Class cls) + public void removeWorkerForAnnotation(AlignmentAnnotation annot) { synchronized (registered) { - for (var it = registered.iterator(); it.hasNext();) + for (var worker : getWorkers()) { - var worker = it.next(); - if (worker.getClass().equals(cls)) + if (worker.involves(annot)) { - it.remove(); - disabled.remove(worker); + removeWorker(worker); } } } } @Override - public void removeWorkerForAnnotation(AlignmentAnnotation annot) + public void removeWorkersOfClass(Class cls) { synchronized (registered) { - for (var it = registered.iterator(); it.hasNext();) + for (var worker : getWorkers()) { - var worker = it.next(); - if (worker.involves(annot) && worker.isDeletable()) + if (worker.getClass().equals(cls)) { - it.remove(); - disabled.remove(worker); + removeWorker(worker); } } } @@ -171,94 +338,116 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 @Override public void disableWorker(AlignCalcWorkerI worker) { - assert registered.contains(worker); - disabled.add(worker); + // Null pointer check might be needed + registered.get(worker).setEnabled(false); } @Override public void enableWorker(AlignCalcWorkerI worker) { - disabled.remove(worker); + // Null pointer check might be needed + registered.get(worker).setEnabled(true); } @Override - public void restartWorkers() + public boolean isDisabled(AlignCalcWorkerI worker) { - synchronized (registered) + if (registered.containsKey(worker)) { - for (AlignCalcWorkerI worker : registered) - { - if (!isDisabled(worker)) - startWorker(worker); - } + return !registered.get(worker).isEnabled(); + } + else + { + return false; } } @Override - public void startWorker(AlignCalcWorkerI worker) + public boolean isWorking(AlignCalcWorkerI worker) { - assert registered.contains(worker); - synchronized (tasks) { - for (var task : tasks) - { - if (task.getWorker().equals(worker)) - task.cancel(true); - } - } - AlignCalcTask newTask = new AlignCalcTask(worker); - tasks.add(newTask); - notifyQueued(worker); - executor.execute(newTask); + var manager = registered.get(worker); + if (manager == null) + manager = oneshot.get(worker); + if (manager == null) + return false; + else + return manager.isWorking(); } - + @Override - public void cancelWorker(AlignCalcWorkerI worker) + public boolean isWorkingWithAnnotation(AlignmentAnnotation annot) { - if (isWorking(worker)) + synchronized (registered) { - synchronized (tasks) - { - for (var task : tasks) - { - if (task.getWorker().equals(worker)) - task.cancel(true); - } - } + for (var entry : registered.entrySet()) + if (entry.getKey().involves(annot) && entry.getValue().isWorking()) + return true; + } + synchronized (oneshot) + { + for (var entry : registered.entrySet()) + if (entry.getKey().involves(annot) && entry.getValue().isWorking()) + return true; } + return false; } @Override - public boolean isDisabled(AlignCalcWorkerI worker) + public boolean isWorking() { - return disabled.contains(worker); + synchronized (registered) + { + for (var manager : registered.values()) + if (manager.isWorking()) + return true; + } + synchronized (oneshot) + { + for (var manager : oneshot.values()) + if (manager.isWorking()) + return true; + } + return false; } @Override - public boolean isWorking(AlignCalcWorkerI worker) + public void startWorker(AlignCalcWorkerI worker) { - return inProgress.contains(worker); + Objects.requireNonNull(worker); + var manager = registered.get(worker); + if (manager == null) + { + Console.warn("Starting unregistered worker " + worker); + manager = createManager(worker); + oneshot.put(worker, manager); + } + manager.restart(); } @Override - public boolean isWorking() + public void restartWorkers() { - return !inProgress.isEmpty(); + synchronized (registered) + { + for (var manager : registered.values()) + { + manager.restart(); + } + } } - + @Override - public boolean isWorkingWithAnnotation(AlignmentAnnotation annot) + public void cancelWorker(AlignCalcWorkerI worker) { - synchronized (inProgress) + Objects.requireNonNull(worker); + var manager = registered.get(worker); + if (manager == null) + manager = oneshot.get(worker); + if (manager == null) { - for (AlignCalcWorkerI worker : inProgress) - { - if (worker.involves(annot)) - { - return true; - } - } + throw new NoSuchElementException(); } - return false; + manager.cancel(); } private void notifyQueued(AlignCalcWorkerI worker) @@ -281,7 +470,8 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 { for (AlignCalcListener listener : listeners) { - try { + try + { listener.workerCompleted(worker); } catch (RuntimeException e) { @@ -290,12 +480,27 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 } } + private void notifyCancelled(AlignCalcWorkerI worker) + { + for (AlignCalcListener listener : listeners) + { + try + { + listener.workerCancelled(worker); + } catch (RuntimeException e) + { + e.printStackTrace(); + } + } + } + private void notifyExceptional(AlignCalcWorkerI worker, Throwable throwable) { for (AlignCalcListener listener : listeners) { - try { + try + { listener.workerExceptional(worker, throwable); } catch (RuntimeException e) { @@ -309,11 +514,19 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 { listeners.add(listener); } - + @Override public void removeAlignCalcListener(AlignCalcListener listener) { listeners.remove(listener); } + @Override + public void shutdown() + { + executor.shutdownNow(); + listeners.clear(); + registered.clear(); + } + }