package jalview.workers;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
+import java.util.WeakHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.stream.Collectors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+import static java.util.Collections.synchronizedMap;
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
import jalview.api.AlignCalcListener;
import jalview.api.AlignCalcManagerI2;
import jalview.api.AlignCalcWorkerI;
+import jalview.api.PollableAlignCalcWorkerI;
import jalview.bin.Cache;
+import jalview.bin.Console;
import jalview.datamodel.AlignmentAnnotation;
-import static java.util.Collections.synchronizedList;
-import static java.util.Collections.synchronizedSet;
-import static java.util.Collections.unmodifiableList;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-
-import static java.lang.String.format;
-
public class AlignCalcManager2 implements AlignCalcManagerI2
{
- class AlignCalcTask extends FutureTask<Void>
+ private abstract class WorkerManager
{
- final AlignCalcWorkerI worker;
+ protected volatile boolean enabled = true;
+
+ protected AlignCalcWorkerI worker;
- public AlignCalcTask(AlignCalcWorkerI worker)
+ WorkerManager(AlignCalcWorkerI worker)
{
- super(new Callable<Void>() {
- public Void call() throws Exception {
- Cache.log.debug(format("Worker %s started%n", worker.getClass().getName()));
- notifyStarted(worker);
- worker.run();
- return null;
- }
- });
this.worker = worker;
}
- public AlignCalcWorkerI getWorker()
+ protected AlignCalcWorkerI getWorker()
{
return worker;
}
- @Override
- protected void done()
+ boolean isEnabled()
{
- boolean success = false;
- Throwable exception = null;
- try
+ return enabled;
+ }
+
+ void setEnabled(boolean enabled)
+ {
+ this.enabled = enabled;
+ }
+
+ synchronized void restart()
+ {
+ if (!isEnabled())
{
- get();
- success = true;
+ return;
}
- catch (ExecutionException e)
+ if (!isRegistered())
{
- exception = e.getCause();
- if (exception instanceof OutOfMemoryError) {
- disableWorker(getWorker());
- }
- } catch (Throwable e)
+ setEnabled(false);
+ }
+ if (isWorking())
{
- exception = e;
+ cancel();
}
- finally {
- inProgress.remove(getWorker());
- tasks.remove(this);
+ submit();
+ }
+
+ protected boolean isRegistered()
+ {
+ return registered.containsKey(getWorker());
+ }
+
+ abstract boolean isWorking();
+
+ protected abstract void submit();
+
+ abstract void cancel();
+ }
+
+ private class SimpleWorkerManager extends WorkerManager
+ {
+ private Future<?> task = null;
+
+ SimpleWorkerManager(AlignCalcWorkerI worker)
+ {
+ super(worker);
+ }
+
+ @Override
+ boolean isWorking()
+ {
+ return task != null && !task.isDone();
+ }
+
+ @Override
+ protected void submit()
+ {
+ if (task != null && !(task.isDone() || task.isCancelled()))
+ {
+ throw new IllegalStateException(
+ "Cannot submit new task if the prevoius one is still running");
}
- if (success)
+ Console.debug(
+ format("Worker %s queued", getWorker().getClass().getName()));
+ task = executor.submit(() -> {
+ try
+ {
+ Console.debug(format("Worker %s started", getWorker()));
+ getWorker().run();
+ Console.debug(format("Worker %s finished", getWorker()));
+ } catch (InterruptedException e)
+ {
+ Console.debug(format("Worker %s interrupted", getWorker()));
+ } catch (Throwable th)
+ {
+ Console.debug(format("Worker %s failed", getWorker()), th);
+ } finally
+ {
+ if (!isRegistered())
+ {
+ // delete worker reference so garbage collector can remove it
+ worker = null;
+ }
+ }
+ });
+ }
+
+ @Override
+ synchronized void cancel()
+ {
+ if (!isWorking())
{
- Cache.log.debug(format("Worker %s finished%n", getWorker().getClass().getName()));
- notifyCompleted(worker);
+ return;
}
- else if (exception != null){
- Cache.log.warn(format("Worker %s failed%n", getWorker().getClass().getName()));
- exception.printStackTrace();
- notifyExceptional(worker, exception);
+ Console.debug(format("Cancelling worker %s", getWorker()));
+ task.cancel(true);
+ }
+ }
+
+ private class PollableWorkerManager extends WorkerManager
+ {
+ private Future<?> task = null;
+
+ PollableWorkerManager(PollableAlignCalcWorkerI worker)
+ {
+ super(worker);
+ }
+
+ @Override
+ protected PollableAlignCalcWorkerI getWorker()
+ {
+ return (PollableAlignCalcWorkerI) super.getWorker();
+ }
+
+ @Override
+ boolean isWorking()
+ {
+ return task != null && !task.isDone();
+ }
+
+ protected void submit()
+ {
+ if (task != null && !(task.isDone() || task.isCancelled()))
+ {
+ throw new IllegalStateException(
+ "Cannot submit new task if the prevoius one is still running");
}
+ Console.debug(
+ format("Worker %s queued", getWorker()));
+ final var runnable = new Runnable()
+ {
+ private boolean started = false;
+
+ private boolean completed = false;
+
+ Future<?> future = null;
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (!started)
+ {
+ Console.debug(format("Worker %s started", getWorker()));
+ getWorker().startUp();
+ started = true;
+ }
+ else if (!completed)
+ {
+ Console.debug(format("Polling worker %s", getWorker()));
+ if (getWorker().poll())
+ {
+ Console.debug(format("Worker %s finished", getWorker()));
+ completed = true;
+ }
+ }
+ } catch (Throwable th)
+ {
+ Console.debug(format("Worker %s failed", getWorker()), th);
+ completed = true;
+ }
+ if (completed)
+ {
+ final var worker = getWorker();
+ if (!isRegistered())
+ PollableWorkerManager.super.worker = null;
+ Console.debug(format("Finalizing completed worker %s", worker));
+ worker.done();
+ // almost impossible, but the future may be null at this point
+ // let it throw NPE to cancel forcefully
+ future.cancel(false);
+ }
+ }
+ };
+ runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10,
+ 1000, TimeUnit.MILLISECONDS);
+ }
+
+ synchronized protected void cancel()
+ {
+ if (!isWorking())
+ {
+ return;
+ }
+ Console.debug(format("Cancelling worker %s", getWorker()));
+ task.cancel(false);
+ executor.submit(() -> {
+ final var worker = getWorker();
+ if (!isRegistered())
+ PollableWorkerManager.super.worker = null;
+ if (worker != null)
+ {
+ worker.cancel();
+ Console.debug(format("Finalizing cancelled worker %s", worker));
+ worker.done();
+ }
+ });
}
}
- // main executor for running workers one-by-one
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
- private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
+ private final ScheduledExecutorService executor = Executors
+ .newSingleThreadScheduledExecutor();
- // list of all registered workers (other collections are subsets of this)
- private final List<AlignCalcWorkerI> registered = synchronizedList(new ArrayList<>());
+ private final Map<AlignCalcWorkerI, WorkerManager> registered = synchronizedMap(
+ new HashMap<>());
- // list of tasks holding queued and running workers
- private final List<AlignCalcTask> tasks = synchronizedList(new ArrayList<>());
-
- // the collection of currently running workers
- private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(new HashSet<>());
+ private final Map<AlignCalcWorkerI, WorkerManager> oneshot = synchronizedMap(
+ new WeakHashMap<>());
- // the collection of workers that will not be started
- private final Set<AlignCalcWorkerI> disabled = synchronizedSet(new HashSet<>());
+ private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<>();
+
+ private WorkerManager createManager(AlignCalcWorkerI worker)
+ {
+ if (worker instanceof PollableAlignCalcWorkerI)
+ {
+ return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
+ }
+ else
+ {
+ return new SimpleWorkerManager(worker);
+ }
+ }
- /*
- * Register the worker with this manager and scheduler for execution.
- */
@Override
public void registerWorker(AlignCalcWorkerI worker)
{
Objects.requireNonNull(worker);
- synchronized (registered)
- {
- if (!registered.contains(worker))
- registered.add(worker);
- }
+ WorkerManager manager = createManager(worker);
+ registered.putIfAbsent(worker, manager);
startWorker(worker);
}
@Override
public List<AlignCalcWorkerI> getWorkers()
{
- return unmodifiableList(new ArrayList<>(registered));
+ List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
+ result.addAll(registered.keySet());
+ return result;
}
-
+
@Override
public List<AlignCalcWorkerI> getWorkersOfClass(
Class<? extends AlignCalcWorkerI> cls)
{
- synchronized (registered)
+ List<AlignCalcWorkerI> collected = new ArrayList<>();
+ for (var worker : getWorkers())
{
- return registered.stream()
- .filter(worker -> worker.getClass().equals(cls))
- .collect(Collectors.toUnmodifiableList());
+ if (worker.getClass().equals(cls))
+ {
+ collected.add(worker);
+ }
}
+ return unmodifiableList(collected);
}
-
+
@Override
public void removeWorker(AlignCalcWorkerI worker)
{
- registered.remove(worker);
- disabled.remove(worker);
+ if (worker.isDeletable())
+ {
+ registered.remove(worker);
+ }
}
@Override
- public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
+ public void removeWorkerForAnnotation(AlignmentAnnotation annot)
{
synchronized (registered)
{
- for (var worker : registered)
+ for (var worker : getWorkers())
{
- if (worker.getClass().equals(cls))
+ if (worker.involves(annot))
{
removeWorker(worker);
}
}
@Override
- public void removeWorkerForAnnotation(AlignmentAnnotation annot)
+ public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
{
synchronized (registered)
{
- for (var worker : registered)
+ for (var worker : getWorkers())
{
- if (worker.involves(annot) && worker.isDeletable())
+ if (worker.getClass().equals(cls))
{
removeWorker(worker);
}
@Override
public void disableWorker(AlignCalcWorkerI worker)
{
- disabled.add(worker);
+ // Null pointer check might be needed
+ registered.get(worker).setEnabled(false);
}
@Override
public void enableWorker(AlignCalcWorkerI worker)
{
- disabled.remove(worker);
+ // Null pointer check might be needed
+ registered.get(worker).setEnabled(true);
}
@Override
- public void restartWorkers()
+ public boolean isDisabled(AlignCalcWorkerI worker)
{
- synchronized (registered)
+ if (registered.containsKey(worker))
{
- for (AlignCalcWorkerI worker : registered)
- {
- if (!isDisabled(worker))
- startWorker(worker);
- }
+ return !registered.get(worker).isEnabled();
+ }
+ else
+ {
+ return false;
}
}
@Override
- public void startWorker(AlignCalcWorkerI worker)
+ public boolean isWorking(AlignCalcWorkerI worker)
{
- Objects.requireNonNull(worker);
- AlignCalcTask newTask = new AlignCalcTask(worker);
- synchronized (inProgress)
- {
- cancelWorker(worker);
- inProgress.add(worker);
- tasks.add(newTask);
- }
- notifyQueued(worker);
- executor.execute(newTask);
+ var manager = registered.get(worker);
+ if (manager == null)
+ manager = oneshot.get(worker);
+ if (manager == null)
+ return false;
+ else
+ return manager.isWorking();
}
-
+
@Override
- public void cancelWorker(AlignCalcWorkerI worker)
+ public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
{
- if (isWorking(worker))
+ synchronized (registered)
{
- synchronized (tasks)
- {
- Optional<AlignCalcTask> oldTask = tasks.stream()
- .filter(task -> task.getWorker().equals(worker))
- .findFirst();
- if (oldTask.isPresent()) {
- oldTask.get().cancel(true);
- }
- }
+ for (var entry : registered.entrySet())
+ if (entry.getKey().involves(annot) && entry.getValue().isWorking())
+ return true;
}
+ synchronized (oneshot)
+ {
+ for (var entry : registered.entrySet())
+ if (entry.getKey().involves(annot) && entry.getValue().isWorking())
+ return true;
+ }
+ return false;
}
@Override
- public boolean isDisabled(AlignCalcWorkerI worker)
+ public boolean isWorking()
{
- return disabled.contains(worker);
+ synchronized (registered)
+ {
+ for (var manager : registered.values())
+ if (manager.isWorking())
+ return true;
+ }
+ synchronized (oneshot)
+ {
+ for (var manager : oneshot.values())
+ if (manager.isWorking())
+ return true;
+ }
+ return false;
}
@Override
- public boolean isWorking(AlignCalcWorkerI worker)
+ public void startWorker(AlignCalcWorkerI worker)
{
- return inProgress.contains(worker);
+ Objects.requireNonNull(worker);
+ var manager = registered.get(worker);
+ if (manager == null)
+ {
+ Console.warn("Starting unregistered worker " + worker);
+ manager = createManager(worker);
+ oneshot.put(worker, manager);
+ }
+ manager.restart();
}
@Override
- public boolean isWorking()
+ public void restartWorkers()
{
- return !inProgress.isEmpty();
+ synchronized (registered)
+ {
+ for (var manager : registered.values())
+ {
+ manager.restart();
+ }
+ }
}
-
+
@Override
- public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
+ public void cancelWorker(AlignCalcWorkerI worker)
{
- synchronized (inProgress)
+ Objects.requireNonNull(worker);
+ var manager = registered.get(worker);
+ if (manager == null)
+ manager = oneshot.get(worker);
+ if (manager == null)
{
- for (AlignCalcWorkerI worker : inProgress)
- {
- if (worker.involves(annot))
- {
- return true;
- }
- }
+ throw new NoSuchElementException();
}
- return false;
+ manager.cancel();
}
private void notifyQueued(AlignCalcWorkerI worker)
{
for (AlignCalcListener listener : listeners)
{
- try {
+ try
+ {
listener.workerCompleted(worker);
} catch (RuntimeException e)
{
}
}
+ private void notifyCancelled(AlignCalcWorkerI worker)
+ {
+ for (AlignCalcListener listener : listeners)
+ {
+ try
+ {
+ listener.workerCancelled(worker);
+ } catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
private void notifyExceptional(AlignCalcWorkerI worker,
Throwable throwable)
{
for (AlignCalcListener listener : listeners)
{
- try {
+ try
+ {
listener.workerExceptional(worker, throwable);
} catch (RuntimeException e)
{
{
listeners.add(listener);
}
-
+
@Override
public void removeAlignCalcListener(AlignCalcListener listener)
{
listeners.remove(listener);
}
+ @Override
+ public void shutdown()
+ {
+ executor.shutdownNow();
+ listeners.clear();
+ registered.clear();
+ }
+
}