From 29288682745147c9da172e5a6f2c21f1f3dbb669 Mon Sep 17 00:00:00 2001 From: Mateusz Waronwy Date: Wed, 11 Nov 2020 19:42:48 +0100 Subject: [PATCH] JAL-3690 partially reworked AlignCalcManager todo: - modify SeqAnnotationServiceCalcWorker to implement PollableAlignCalcWorkerI - fix race condition where worker state gets overriden by the old job (QUEUED to IDLE) --- src/jalview/api/AlignCalcWorkerI.java | 17 +- src/jalview/api/PollableAlignCalcWorkerI.java | 18 + src/jalview/workers/AlignCalcManager2.java | 487 +++++++++++++++---------- 3 files changed, 325 insertions(+), 197 deletions(-) create mode 100644 src/jalview/api/PollableAlignCalcWorkerI.java diff --git a/src/jalview/api/AlignCalcWorkerI.java b/src/jalview/api/AlignCalcWorkerI.java index 1184853..b51b94e 100644 --- a/src/jalview/api/AlignCalcWorkerI.java +++ b/src/jalview/api/AlignCalcWorkerI.java @@ -28,7 +28,7 @@ import jalview.datamodel.AlignmentAnnotation; * Interface describing a worker that calculates alignment annotation(s). The * main (re-)calculation should be performed by the inherited run() method. */ -public interface AlignCalcWorkerI extends Callable +public interface AlignCalcWorkerI { /** * Answers true if this worker updates the given annotation (regardless of its @@ -51,22 +51,11 @@ public interface AlignCalcWorkerI extends Callable */ void removeAnnotation(); - - /** - * Default implementation of call which calls run and propagates the - * exception. - */ - @Override - public default Void call() throws Exception - { - run(); - return null; - } - /** * The main calculation happens here + * @throws Throwable */ - public void run() throws Exception; + public void run() throws Throwable; /** * Answers true if the worker should be deleted entirely when its annotation diff --git a/src/jalview/api/PollableAlignCalcWorkerI.java b/src/jalview/api/PollableAlignCalcWorkerI.java new file mode 100644 index 0000000..9697187 --- /dev/null +++ b/src/jalview/api/PollableAlignCalcWorkerI.java @@ -0,0 +1,18 @@ +package jalview.api; + +public interface PollableAlignCalcWorkerI extends AlignCalcWorkerI +{ + @Override + public default void run() throws Throwable + { + startUp(); + } + + public void startUp() throws Throwable; + + public boolean poll() throws Throwable; + + public void cancel(); + + public void cleanUp(); +} diff --git a/src/jalview/workers/AlignCalcManager2.java b/src/jalview/workers/AlignCalcManager2.java index b914d65..f7d263a 100644 --- a/src/jalview/workers/AlignCalcManager2.java +++ b/src/jalview/workers/AlignCalcManager2.java @@ -1,214 +1,321 @@ package jalview.workers; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; +import static java.util.Collections.synchronizedMap; +import static java.util.Collections.unmodifiableList; + +import java.util.ArrayList; import jalview.api.AlignCalcListener; import jalview.api.AlignCalcManagerI2; import jalview.api.AlignCalcWorkerI; +import jalview.api.PollableAlignCalcWorkerI; import jalview.bin.Cache; import jalview.datamodel.AlignmentAnnotation; -import static java.util.Collections.synchronizedList; -import static java.util.Collections.synchronizedSet; -import static java.util.Collections.unmodifiableList; - -import java.util.ArrayList; -import java.util.HashSet; - -import static java.lang.String.format; - public class AlignCalcManager2 implements AlignCalcManagerI2 { - class AlignCalcTask extends FutureTask + private abstract class WorkerManager { - final AlignCalcWorkerI worker; - - public AlignCalcTask(AlignCalcWorkerI worker) + static final int IDLE = 0; + static final int QUEUED = 1; + static final int RUNNING = 2; + static final int CANCELLING = 3; + + protected volatile int state = IDLE; + protected volatile boolean enabled = true; + + protected final AlignCalcWorkerI worker; + + WorkerManager(AlignCalcWorkerI worker) { - super(new Callable() - { - public Void call() throws Exception - { - Cache.log.debug(format("Worker %s started%n", - worker.getClass().getName())); - notifyStarted(worker); - worker.run(); - return null; - } - }); this.worker = worker; } - - public AlignCalcWorkerI getWorker() + + AlignCalcWorkerI getWorker() { return worker; } - - @Override - protected void done() + + boolean isEnabled() { - boolean success = false; - Throwable exception = null; - try - { - get(); - success = true; - } catch (CancellationException e) - { - Cache.log.debug(format("Worker %s cancelled%n", - getWorker().getClass().getName())); - notifyCancelled(worker); - } catch (ExecutionException e) + return enabled; + } + + void setEnabled(boolean enabled) + { + this.enabled = enabled; + } + + synchronized protected void setState(int state) + { + this.state = state; + } + + int getState() + { + return state; + } + + void restart() + { + if (!isEnabled()) { - exception = e.getCause(); - if (exception instanceof OutOfMemoryError) - { - disableWorker(getWorker()); - } - } catch (Throwable e) + return; + } + if (state == IDLE) { - exception = e; - } finally + submit(); + } + else if (state == QUEUED) { - inProgress.remove(getWorker()); - tasks.remove(this); + // job already queued, do nothing } - if (success) + else if (state == RUNNING) { - Cache.log.debug(format("Worker %s finished%n", - getWorker().getClass().getName())); - notifyCompleted(worker); + cancel(); + submit(); } - else if (exception != null) + else if (state == CANCELLING) { - Cache.log.warn(format("Worker %s failed%n", - getWorker().getClass().getName())); - exception.printStackTrace(); - notifyExceptional(worker, exception); + submit(); } } + + protected abstract void submit(); + + abstract void cancel(); } - - private static class CalcManagerThreadFactory implements ThreadFactory + + + private class SimpleWorkerManager extends WorkerManager { - private static final AtomicInteger threadNumber = new AtomicInteger(1); - - private final ThreadGroup group; + private Future task = null; + + SimpleWorkerManager(AlignCalcWorkerI worker) + { + super(worker); + } - private static final String namePrefix = "AlignCalcManager-pool-thread-"; + @Override + protected void submit() + { + if (task != null && !(task.isDone() || task.isCancelled())) + { + throw new IllegalStateException( + "Cannot submit new task if the prevoius one is still running"); + } + Cache.log.debug(format("Worker %s queued", + worker.getClass().getName())); + setState(QUEUED); + task = executor.submit(() -> { + setState(RUNNING); + try + { + Cache.log.debug(format("Worker %s started", + worker.getClass().getName())); + worker.run(); + Cache.log.debug(format("Worker %s finished", + worker.getClass().getName())); + } + catch (InterruptedException e) + { + Cache.log.debug(format("Worker %s interrupted", + worker.getClass().getName())); + } + catch (Throwable th) + { + Cache.log.debug(format("Worker %s failed", + worker.getClass().getName()), th); + } + finally + { + // fixme: should not set to idle if another task is already queued for execution + setState(IDLE); + } + }); + } - CalcManagerThreadFactory() + @Override + synchronized void cancel() { - var securityManager = System.getSecurityManager(); - if (securityManager != null) + if (task == null || state == IDLE || state == CANCELLING) { - group = securityManager.getThreadGroup(); + return; } - else + Cache.log.debug(format("Cancelling worker %s", + worker.getClass().getName())); + setState(CANCELLING); + task.cancel(true); + if (task.isCancelled()) { - group = Thread.currentThread().getThreadGroup(); + setState(IDLE); } } - - @Override - public Thread newThread(Runnable r) + } + + + private class PollableWorkerManager extends WorkerManager + { + private final PollableAlignCalcWorkerI worker; + private Future task = null; + + PollableWorkerManager(PollableAlignCalcWorkerI worker) { - Thread t = new Thread(group, r, - namePrefix + threadNumber.getAndIncrement(), 0); - t.setDaemon(false); - t.setPriority(Thread.NORM_PRIORITY); - return t; + super(worker); + this.worker = worker; + } + + protected void submit() + { + if (task != null && !(task.isDone() || task.isCancelled())) + { + throw new IllegalStateException( + "Cannot submit new task if the prevoius one is still running"); + } + Cache.log.debug(format("Worker %s queued", + worker.getClass().getName())); + setState(QUEUED); + final var runnable = new Runnable() + { + private boolean started = false; + private boolean completed = false; + Future future = null; + + @Override + public void run() + { + try + { + if (!started) + { + Cache.log.debug(format("Worker %s started", + worker.getClass().getName())); + setState(RUNNING); + worker.startUp(); + started = true; + } + else if (!completed) + { + if (worker.poll()) + { + Cache.log.debug(format("Worker %s finished", + worker.getClass().getName())); + completed = true; + setState(IDLE); + } + } + } catch (Throwable th) + { + Cache.log.debug(format("Worker %s failed", + worker.getClass().getName()), th); + completed = true; + setState(IDLE); + } + if (completed) + { + try + { + future.cancel(false); + } + catch (NullPointerException ignored) + { + // extremely unlikely to happen + } + } + } + }; + runnable.future = task = executor.scheduleWithFixedDelay( + runnable, 10, 1000, TimeUnit.MILLISECONDS); + } + + synchronized protected void cancel() + { + if (task == null || state == IDLE || state == CANCELLING) + { + return; + } + Cache.log.debug(format("Cancelling worker %s", + worker.getClass().getName())); + setState(CANCELLING); + task.cancel(false); + if (task.isCancelled()) + { + setState(IDLE); + } + executor.submit(() -> { + worker.cancel(); + }); } } - - // main executor for running workers one-by-one - private final ExecutorService executor = Executors - .newSingleThreadExecutor(new CalcManagerThreadFactory()); - - private final List listeners = new CopyOnWriteArrayList(); - - // list of all registered workers (other collections are subsets of this) - private final List registered = synchronizedList( - new ArrayList<>()); - - // list of tasks holding queued and running workers - private final List tasks = synchronizedList( - new ArrayList<>()); - - // the collection of currently running workers - private final Set inProgress = synchronizedSet( - new HashSet<>()); - - // the collection of workers that will not be started - private final Set disabled = synchronizedSet( - new HashSet<>()); - - /* - * Register the worker with this manager and scheduler for execution. - */ + + + private final ScheduledExecutorService executor = + Executors.newSingleThreadScheduledExecutor(); + private final Map registered = + synchronizedMap(new HashMap<>()); + + private final List listeners = + new CopyOnWriteArrayList<>(); + + @Override public void registerWorker(AlignCalcWorkerI worker) { Objects.requireNonNull(worker); - synchronized (registered) - { - if (!registered.contains(worker)) - registered.add(worker); - } + WorkerManager manager = (worker instanceof PollableAlignCalcWorkerI) ? + new PollableWorkerManager((PollableAlignCalcWorkerI) worker) : + new SimpleWorkerManager(worker); + registered.putIfAbsent(worker, manager); startWorker(worker); } @Override public List getWorkers() { - return unmodifiableList(new ArrayList<>(registered)); + return List.copyOf(registered.keySet()); } @Override public List getWorkersOfClass( Class cls) { - synchronized (registered) + List collected = new ArrayList<>(); + for (var worker : getWorkers()) { - List collected = new ArrayList<>(); - for (var worker : registered) + if (worker.getClass().equals(cls)) { - if (worker.getClass().equals(cls)) - { - collected.add(worker); - } + collected.add(worker); } - return unmodifiableList(collected); } + return unmodifiableList(collected); } @Override public void removeWorker(AlignCalcWorkerI worker) { registered.remove(worker); - disabled.remove(worker); } @Override - public void removeWorkersOfClass(Class cls) + public void removeWorkerForAnnotation(AlignmentAnnotation annot) { synchronized (registered) { - for (var worker : registered) + for (var worker : getWorkers()) { - if (worker.getClass().equals(cls)) + if (worker.involves(annot) && worker.isDeletable()) { removeWorker(worker); } @@ -217,13 +324,13 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 } @Override - public void removeWorkerForAnnotation(AlignmentAnnotation annot) + public void removeWorkersOfClass(Class cls) { synchronized (registered) { - for (var worker : registered) + for (var worker : getWorkers()) { - if (worker.involves(annot) && worker.isDeletable()) + if (worker.getClass().equals(cls)) { removeWorker(worker); } @@ -234,95 +341,112 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 @Override public void disableWorker(AlignCalcWorkerI worker) { - disabled.add(worker); + // Null pointer check might be needed + registered.get(worker).setEnabled(false); } @Override public void enableWorker(AlignCalcWorkerI worker) { - disabled.remove(worker); + // Null pointer check might be needed + registered.get(worker).setEnabled(true); } @Override - public void restartWorkers() + public boolean isDisabled(AlignCalcWorkerI worker) { - synchronized (registered) + if (registered.containsKey(worker)) { - for (AlignCalcWorkerI worker : registered) - { - if (!isDisabled(worker)) - startWorker(worker); - } + return !registered.get(worker).isEnabled(); + } + else + { + return false; } } @Override - public void startWorker(AlignCalcWorkerI worker) + public boolean isWorking(AlignCalcWorkerI worker) { - Objects.requireNonNull(worker); - AlignCalcTask newTask = new AlignCalcTask(worker); - synchronized (inProgress) + if (!registered.containsKey(worker)) { - cancelWorker(worker); - inProgress.add(worker); - tasks.add(newTask); + return false; + } + else + { + return registered.get(worker).getState() == WorkerManager.RUNNING; } - notifyQueued(worker); - executor.execute(newTask); } @Override - public void cancelWorker(AlignCalcWorkerI worker) + public boolean isWorkingWithAnnotation(AlignmentAnnotation annot) { - if (isWorking(worker)) + synchronized (registered) { - synchronized (tasks) + for (var entry : registered.entrySet()) { - Optional oldTask = tasks.stream() - .filter(task -> task.getWorker().equals(worker)) - .findFirst(); - if (oldTask.isPresent()) + if (entry.getKey().involves(annot) && + entry.getValue().getState() == WorkerManager.RUNNING) { - oldTask.get().cancel(true); + return true; } } } + return false; } @Override - public boolean isDisabled(AlignCalcWorkerI worker) - { - return disabled.contains(worker); - } - - @Override - public boolean isWorking(AlignCalcWorkerI worker) + public boolean isWorking() { - return inProgress.contains(worker); + synchronized (registered) + { + for (var manager : registered.values()) + { + if (manager.getState() == WorkerManager.RUNNING) + { + return true; + } + } + } + return false; } @Override - public boolean isWorking() + public void startWorker(AlignCalcWorkerI worker) { - return !inProgress.isEmpty(); + Objects.requireNonNull(worker); + var manager = registered.get(worker); + if (manager == null) + { + throw new NoSuchElementException(); + } + manager.restart(); } @Override - public boolean isWorkingWithAnnotation(AlignmentAnnotation annot) + public void restartWorkers() { - synchronized (inProgress) + synchronized (registered) { - for (AlignCalcWorkerI worker : inProgress) + for (var manager : registered.values()) { - if (worker.involves(annot)) - { - return true; - } + manager.restart(); } } - return false; } + @Override + public void cancelWorker(AlignCalcWorkerI worker) + { + Objects.requireNonNull(worker); + var manager = registered.get(worker); + if (manager == null) + { + throw new NoSuchElementException(); + } + manager.cancel(); + } + private void notifyQueued(AlignCalcWorkerI worker) { for (AlignCalcListener listener : listeners) @@ -400,9 +524,6 @@ public class AlignCalcManager2 implements AlignCalcManagerI2 executor.shutdownNow(); listeners.clear(); registered.clear(); - tasks.clear(); - inProgress.clear(); - disabled.clear(); } } -- 1.7.10.2