package jalview.workers;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+import static java.util.Collections.synchronizedMap;
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
import jalview.api.AlignCalcListener;
import jalview.api.AlignCalcManagerI2;
import jalview.api.AlignCalcWorkerI;
+import jalview.api.PollableAlignCalcWorkerI;
import jalview.bin.Cache;
import jalview.datamodel.AlignmentAnnotation;
-import static java.util.Collections.synchronizedList;
-import static java.util.Collections.synchronizedSet;
-import static java.util.Collections.unmodifiableList;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-
-import static java.lang.String.format;
-
public class AlignCalcManager2 implements AlignCalcManagerI2
{
- class AlignCalcTask extends FutureTask<Void>
+ private abstract class WorkerManager
{
- final AlignCalcWorkerI worker;
-
- public AlignCalcTask(AlignCalcWorkerI worker)
+ static final int IDLE = 0;
+ static final int QUEUED = 1;
+ static final int RUNNING = 2;
+ static final int CANCELLING = 3;
+
+ protected volatile int state = IDLE;
+ protected volatile boolean enabled = true;
+
+ protected final AlignCalcWorkerI worker;
+
+ WorkerManager(AlignCalcWorkerI worker)
{
- super(new Callable<Void>()
- {
- public Void call() throws Exception
- {
- Cache.log.debug(format("Worker %s started%n",
- worker.getClass().getName()));
- notifyStarted(worker);
- worker.run();
- return null;
- }
- });
this.worker = worker;
}
-
- public AlignCalcWorkerI getWorker()
+
+ AlignCalcWorkerI getWorker()
{
return worker;
}
-
- @Override
- protected void done()
+
+ boolean isEnabled()
{
- boolean success = false;
- Throwable exception = null;
- try
- {
- get();
- success = true;
- } catch (CancellationException e)
- {
- Cache.log.debug(format("Worker %s cancelled%n",
- getWorker().getClass().getName()));
- notifyCancelled(worker);
- } catch (ExecutionException e)
+ return enabled;
+ }
+
+ void setEnabled(boolean enabled)
+ {
+ this.enabled = enabled;
+ }
+
+ synchronized protected void setState(int state)
+ {
+ this.state = state;
+ }
+
+ int getState()
+ {
+ return state;
+ }
+
+ void restart()
+ {
+ if (!isEnabled())
{
- exception = e.getCause();
- if (exception instanceof OutOfMemoryError)
- {
- disableWorker(getWorker());
- }
- } catch (Throwable e)
+ return;
+ }
+ if (state == IDLE)
{
- exception = e;
- } finally
+ submit();
+ }
+ else if (state == QUEUED)
{
- inProgress.remove(getWorker());
- tasks.remove(this);
+ // job already queued, do nothing
}
- if (success)
+ else if (state == RUNNING)
{
- Cache.log.debug(format("Worker %s finished%n",
- getWorker().getClass().getName()));
- notifyCompleted(worker);
+ cancel();
+ submit();
}
- else if (exception != null)
+ else if (state == CANCELLING)
{
- Cache.log.warn(format("Worker %s failed%n",
- getWorker().getClass().getName()));
- exception.printStackTrace();
- notifyExceptional(worker, exception);
+ submit();
}
}
+
+ protected abstract void submit();
+
+ abstract void cancel();
}
-
- private static class CalcManagerThreadFactory implements ThreadFactory
+
+
+ private class SimpleWorkerManager extends WorkerManager
{
- private static final AtomicInteger threadNumber = new AtomicInteger(1);
-
- private final ThreadGroup group;
+ private Future<?> task = null;
+
+ SimpleWorkerManager(AlignCalcWorkerI worker)
+ {
+ super(worker);
+ }
- private static final String namePrefix = "AlignCalcManager-pool-thread-";
+ @Override
+ protected void submit()
+ {
+ if (task != null && !(task.isDone() || task.isCancelled()))
+ {
+ throw new IllegalStateException(
+ "Cannot submit new task if the prevoius one is still running");
+ }
+ Cache.log.debug(format("Worker %s queued",
+ worker.getClass().getName()));
+ setState(QUEUED);
+ task = executor.submit(() -> {
+ setState(RUNNING);
+ try
+ {
+ Cache.log.debug(format("Worker %s started",
+ worker.getClass().getName()));
+ worker.run();
+ Cache.log.debug(format("Worker %s finished",
+ worker.getClass().getName()));
+ }
+ catch (InterruptedException e)
+ {
+ Cache.log.debug(format("Worker %s interrupted",
+ worker.getClass().getName()));
+ }
+ catch (Throwable th)
+ {
+ Cache.log.debug(format("Worker %s failed",
+ worker.getClass().getName()), th);
+ }
+ finally
+ {
+ // fixme: should not set to idle if another task is already queued for execution
+ setState(IDLE);
+ }
+ });
+ }
- CalcManagerThreadFactory()
+ @Override
+ synchronized void cancel()
{
- var securityManager = System.getSecurityManager();
- if (securityManager != null)
+ if (task == null || state == IDLE || state == CANCELLING)
{
- group = securityManager.getThreadGroup();
+ return;
}
- else
+ Cache.log.debug(format("Cancelling worker %s",
+ worker.getClass().getName()));
+ setState(CANCELLING);
+ task.cancel(true);
+ if (task.isCancelled())
{
- group = Thread.currentThread().getThreadGroup();
+ setState(IDLE);
}
}
-
- @Override
- public Thread newThread(Runnable r)
+ }
+
+
+ private class PollableWorkerManager extends WorkerManager
+ {
+ private final PollableAlignCalcWorkerI worker;
+ private Future<?> task = null;
+
+ PollableWorkerManager(PollableAlignCalcWorkerI worker)
{
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(), 0);
- t.setDaemon(false);
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
+ super(worker);
+ this.worker = worker;
+ }
+
+ protected void submit()
+ {
+ if (task != null && !(task.isDone() || task.isCancelled()))
+ {
+ throw new IllegalStateException(
+ "Cannot submit new task if the prevoius one is still running");
+ }
+ Cache.log.debug(format("Worker %s queued",
+ worker.getClass().getName()));
+ setState(QUEUED);
+ final var runnable = new Runnable()
+ {
+ private boolean started = false;
+ private boolean completed = false;
+ Future<?> future = null;
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (!started)
+ {
+ Cache.log.debug(format("Worker %s started",
+ worker.getClass().getName()));
+ setState(RUNNING);
+ worker.startUp();
+ started = true;
+ }
+ else if (!completed)
+ {
+ if (worker.poll())
+ {
+ Cache.log.debug(format("Worker %s finished",
+ worker.getClass().getName()));
+ completed = true;
+ setState(IDLE);
+ }
+ }
+ } catch (Throwable th)
+ {
+ Cache.log.debug(format("Worker %s failed",
+ worker.getClass().getName()), th);
+ completed = true;
+ setState(IDLE);
+ }
+ if (completed)
+ {
+ try
+ {
+ future.cancel(false);
+ }
+ catch (NullPointerException ignored)
+ {
+ // extremely unlikely to happen
+ }
+ }
+ }
+ };
+ runnable.future = task = executor.scheduleWithFixedDelay(
+ runnable, 10, 1000, TimeUnit.MILLISECONDS);
+ }
+
+ synchronized protected void cancel()
+ {
+ if (task == null || state == IDLE || state == CANCELLING)
+ {
+ return;
+ }
+ Cache.log.debug(format("Cancelling worker %s",
+ worker.getClass().getName()));
+ setState(CANCELLING);
+ task.cancel(false);
+ if (task.isCancelled())
+ {
+ setState(IDLE);
+ }
+ executor.submit(() -> {
+ worker.cancel();
+ });
}
}
-
- // main executor for running workers one-by-one
- private final ExecutorService executor = Executors
- .newSingleThreadExecutor(new CalcManagerThreadFactory());
-
- private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
-
- // list of all registered workers (other collections are subsets of this)
- private final List<AlignCalcWorkerI> registered = synchronizedList(
- new ArrayList<>());
-
- // list of tasks holding queued and running workers
- private final List<AlignCalcTask> tasks = synchronizedList(
- new ArrayList<>());
-
- // the collection of currently running workers
- private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(
- new HashSet<>());
-
- // the collection of workers that will not be started
- private final Set<AlignCalcWorkerI> disabled = synchronizedSet(
- new HashSet<>());
-
- /*
- * Register the worker with this manager and scheduler for execution.
- */
+
+
+ private final ScheduledExecutorService executor =
+ Executors.newSingleThreadScheduledExecutor();
+ private final Map<AlignCalcWorkerI, WorkerManager> registered =
+ synchronizedMap(new HashMap<>());
+
+ private final List<AlignCalcListener> listeners =
+ new CopyOnWriteArrayList<>();
+
+
@Override
public void registerWorker(AlignCalcWorkerI worker)
{
Objects.requireNonNull(worker);
- synchronized (registered)
- {
- if (!registered.contains(worker))
- registered.add(worker);
- }
+ WorkerManager manager = (worker instanceof PollableAlignCalcWorkerI) ?
+ new PollableWorkerManager((PollableAlignCalcWorkerI) worker) :
+ new SimpleWorkerManager(worker);
+ registered.putIfAbsent(worker, manager);
startWorker(worker);
}
@Override
public List<AlignCalcWorkerI> getWorkers()
{
- return unmodifiableList(new ArrayList<>(registered));
+ return List.copyOf(registered.keySet());
}
@Override
public List<AlignCalcWorkerI> getWorkersOfClass(
Class<? extends AlignCalcWorkerI> cls)
{
- synchronized (registered)
+ List<AlignCalcWorkerI> collected = new ArrayList<>();
+ for (var worker : getWorkers())
{
- List<AlignCalcWorkerI> collected = new ArrayList<>();
- for (var worker : registered)
+ if (worker.getClass().equals(cls))
{
- if (worker.getClass().equals(cls))
- {
- collected.add(worker);
- }
+ collected.add(worker);
}
- return unmodifiableList(collected);
}
+ return unmodifiableList(collected);
}
@Override
public void removeWorker(AlignCalcWorkerI worker)
{
registered.remove(worker);
- disabled.remove(worker);
}
@Override
- public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
+ public void removeWorkerForAnnotation(AlignmentAnnotation annot)
{
synchronized (registered)
{
- for (var worker : registered)
+ for (var worker : getWorkers())
{
- if (worker.getClass().equals(cls))
+ if (worker.involves(annot) && worker.isDeletable())
{
removeWorker(worker);
}
}
@Override
- public void removeWorkerForAnnotation(AlignmentAnnotation annot)
+ public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
{
synchronized (registered)
{
- for (var worker : registered)
+ for (var worker : getWorkers())
{
- if (worker.involves(annot) && worker.isDeletable())
+ if (worker.getClass().equals(cls))
{
removeWorker(worker);
}
@Override
public void disableWorker(AlignCalcWorkerI worker)
{
- disabled.add(worker);
+ // Null pointer check might be needed
+ registered.get(worker).setEnabled(false);
}
@Override
public void enableWorker(AlignCalcWorkerI worker)
{
- disabled.remove(worker);
+ // Null pointer check might be needed
+ registered.get(worker).setEnabled(true);
}
@Override
- public void restartWorkers()
+ public boolean isDisabled(AlignCalcWorkerI worker)
{
- synchronized (registered)
+ if (registered.containsKey(worker))
{
- for (AlignCalcWorkerI worker : registered)
- {
- if (!isDisabled(worker))
- startWorker(worker);
- }
+ return !registered.get(worker).isEnabled();
+ }
+ else
+ {
+ return false;
}
}
@Override
- public void startWorker(AlignCalcWorkerI worker)
+ public boolean isWorking(AlignCalcWorkerI worker)
{
- Objects.requireNonNull(worker);
- AlignCalcTask newTask = new AlignCalcTask(worker);
- synchronized (inProgress)
+ if (!registered.containsKey(worker))
{
- cancelWorker(worker);
- inProgress.add(worker);
- tasks.add(newTask);
+ return false;
+ }
+ else
+ {
+ return registered.get(worker).getState() == WorkerManager.RUNNING;
}
- notifyQueued(worker);
- executor.execute(newTask);
}
@Override
- public void cancelWorker(AlignCalcWorkerI worker)
+ public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
{
- if (isWorking(worker))
+ synchronized (registered)
{
- synchronized (tasks)
+ for (var entry : registered.entrySet())
{
- Optional<AlignCalcTask> oldTask = tasks.stream()
- .filter(task -> task.getWorker().equals(worker))
- .findFirst();
- if (oldTask.isPresent())
+ if (entry.getKey().involves(annot) &&
+ entry.getValue().getState() == WorkerManager.RUNNING)
{
- oldTask.get().cancel(true);
+ return true;
}
}
}
+ return false;
}
@Override
- public boolean isDisabled(AlignCalcWorkerI worker)
- {
- return disabled.contains(worker);
- }
-
- @Override
- public boolean isWorking(AlignCalcWorkerI worker)
+ public boolean isWorking()
{
- return inProgress.contains(worker);
+ synchronized (registered)
+ {
+ for (var manager : registered.values())
+ {
+ if (manager.getState() == WorkerManager.RUNNING)
+ {
+ return true;
+ }
+ }
+ }
+ return false;
}
@Override
- public boolean isWorking()
+ public void startWorker(AlignCalcWorkerI worker)
{
- return !inProgress.isEmpty();
+ Objects.requireNonNull(worker);
+ var manager = registered.get(worker);
+ if (manager == null)
+ {
+ throw new NoSuchElementException();
+ }
+ manager.restart();
}
@Override
- public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
+ public void restartWorkers()
{
- synchronized (inProgress)
+ synchronized (registered)
{
- for (AlignCalcWorkerI worker : inProgress)
+ for (var manager : registered.values())
{
- if (worker.involves(annot))
- {
- return true;
- }
+ manager.restart();
}
}
- return false;
}
+ @Override
+ public void cancelWorker(AlignCalcWorkerI worker)
+ {
+ Objects.requireNonNull(worker);
+ var manager = registered.get(worker);
+ if (manager == null)
+ {
+ throw new NoSuchElementException();
+ }
+ manager.cancel();
+ }
+
private void notifyQueued(AlignCalcWorkerI worker)
{
for (AlignCalcListener listener : listeners)
executor.shutdownNow();
listeners.clear();
registered.clear();
- tasks.clear();
- inProgress.clear();
- disabled.clear();
}
}