package jalview.workers;
import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import jalview.api.AlignCalcListener;
-import jalview.api.AlignCalcManagerI;
import jalview.api.AlignCalcManagerI2;
import jalview.api.AlignCalcWorkerI;
+import jalview.bin.Cache;
import jalview.datamodel.AlignmentAnnotation;
import static java.util.Collections.synchronizedList;
import java.util.ArrayList;
import java.util.HashSet;
+import static java.lang.String.format;
+
public class AlignCalcManager2 implements AlignCalcManagerI2
{
class AlignCalcTask extends FutureTask<Void>
public AlignCalcTask(AlignCalcWorkerI worker)
{
- super(new Callable<Void>() {
- public Void call() throws Exception {
- notifyStarted(worker);
- worker.run();
- return null;
+ super(new Callable<Void>()
+ {
+ public Void call() throws Exception
+ {
+ Cache.log.debug(format("Worker %s started%n",
+ worker.getClass().getName()));
+ notifyStarted(worker);
+ worker.run();
+ return null;
}
});
this.worker = worker;
{
get();
success = true;
+ } catch (CancellationException e)
+ {
+ Cache.log.debug(format("Worker %s cancelled%n",
+ getWorker().getClass().getName()));
+ notifyCancelled(worker);
} catch (ExecutionException e)
{
exception = e.getCause();
- if (exception instanceof OutOfMemoryError) {
+ if (exception instanceof OutOfMemoryError)
+ {
disableWorker(getWorker());
}
} catch (Throwable e)
{
exception = e;
- }
- finally {
+ } finally
+ {
inProgress.remove(getWorker());
tasks.remove(this);
}
if (success)
+ {
+ Cache.log.debug(format("Worker %s finished%n",
+ getWorker().getClass().getName()));
notifyCompleted(worker);
- else
+ }
+ else if (exception != null)
+ {
+ Cache.log.warn(format("Worker %s failed%n",
+ getWorker().getClass().getName()));
+ exception.printStackTrace();
notifyExceptional(worker, exception);
+ }
+ }
+ }
+
+ private static class CalcManagerThreadFactory implements ThreadFactory
+ {
+ private static final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ private final ThreadGroup group;
+
+ private static final String namePrefix = "AlignCalcManager-pool-thread-";
+
+ CalcManagerThreadFactory()
+ {
+ var securityManager = System.getSecurityManager();
+ if (securityManager != null)
+ {
+ group = securityManager.getThreadGroup();
+ }
+ else
+ {
+ group = Thread.currentThread().getThreadGroup();
+ }
+ }
+
+ @Override
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(group, r,
+ namePrefix + threadNumber.getAndIncrement(), 0);
+ t.setDaemon(false);
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
}
}
// main executor for running workers one-by-one
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
+ private final ExecutorService executor = Executors
+ .newSingleThreadExecutor(new CalcManagerThreadFactory());
+
private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
// list of all registered workers (other collections are subsets of this)
- private final List<AlignCalcWorkerI> registered = synchronizedList(new ArrayList<>());
+ private final List<AlignCalcWorkerI> registered = synchronizedList(
+ new ArrayList<>());
// list of tasks holding queued and running workers
- private final List<AlignCalcTask> tasks = synchronizedList(new ArrayList<>());
-
+ private final List<AlignCalcTask> tasks = synchronizedList(
+ new ArrayList<>());
+
// the collection of currently running workers
- private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(new HashSet<>());
+ private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(
+ new HashSet<>());
// the collection of workers that will not be started
- private final Set<AlignCalcWorkerI> disabled = synchronizedSet(new HashSet<>());
+ private final Set<AlignCalcWorkerI> disabled = synchronizedSet(
+ new HashSet<>());
/*
* Register the worker with this manager and scheduler for execution.
@Override
public void registerWorker(AlignCalcWorkerI worker)
{
+ Objects.requireNonNull(worker);
synchronized (registered)
{
if (!registered.contains(worker))
{
return unmodifiableList(new ArrayList<>(registered));
}
-
+
@Override
public List<AlignCalcWorkerI> getWorkersOfClass(
Class<? extends AlignCalcWorkerI> cls)
{
synchronized (registered)
{
- return registered.stream()
- .filter(worker -> worker.getClass().equals(cls))
- .collect(Collectors.toUnmodifiableList());
+ List<AlignCalcWorkerI> collected = new ArrayList<>();
+ for (var worker : registered)
+ {
+ if (worker.getClass().equals(cls))
+ {
+ collected.add(worker);
+ }
+ }
+ return unmodifiableList(collected);
}
}
-
+
@Override
public void removeWorker(AlignCalcWorkerI worker)
{
{
synchronized (registered)
{
- for (var it = registered.iterator(); it.hasNext();)
+ for (var worker : registered)
{
- var worker = it.next();
if (worker.getClass().equals(cls))
{
- it.remove();
- disabled.remove(worker);
+ removeWorker(worker);
}
}
}
{
synchronized (registered)
{
- for (var it = registered.iterator(); it.hasNext();)
+ for (var worker : registered)
{
- var worker = it.next();
if (worker.involves(annot) && worker.isDeletable())
{
- it.remove();
- disabled.remove(worker);
+ removeWorker(worker);
}
}
}
@Override
public void disableWorker(AlignCalcWorkerI worker)
{
- assert registered.contains(worker);
disabled.add(worker);
}
@Override
public void startWorker(AlignCalcWorkerI worker)
{
- assert registered.contains(worker);
- synchronized (tasks) {
- for (var task : tasks)
- {
- if (task.getWorker().equals(worker))
- task.cancel(true);
- }
- }
+ Objects.requireNonNull(worker);
AlignCalcTask newTask = new AlignCalcTask(worker);
- tasks.add(newTask);
+ synchronized (inProgress)
+ {
+ cancelWorker(worker);
+ inProgress.add(worker);
+ tasks.add(newTask);
+ }
notifyQueued(worker);
executor.execute(newTask);
}
-
+
@Override
public void cancelWorker(AlignCalcWorkerI worker)
{
- if (isWorking(worker))
+ if (isWorking(worker))
{
- synchronized (tasks)
+ synchronized (tasks)
{
- for (var task : tasks)
+ Optional<AlignCalcTask> oldTask = tasks.stream()
+ .filter(task -> task.getWorker().equals(worker))
+ .findFirst();
+ if (oldTask.isPresent())
{
- if (task.getWorker().equals(worker))
- task.cancel(true);
+ oldTask.get().cancel(true);
}
}
}
{
return !inProgress.isEmpty();
}
-
+
@Override
public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
{
{
for (AlignCalcListener listener : listeners)
{
- try {
+ try
+ {
listener.workerCompleted(worker);
} catch (RuntimeException e)
{
}
}
+ private void notifyCancelled(AlignCalcWorkerI worker)
+ {
+ for (AlignCalcListener listener : listeners)
+ {
+ try
+ {
+ listener.workerCancelled(worker);
+ } catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
private void notifyExceptional(AlignCalcWorkerI worker,
Throwable throwable)
{
for (AlignCalcListener listener : listeners)
{
- try {
+ try
+ {
listener.workerExceptional(worker, throwable);
} catch (RuntimeException e)
{
{
listeners.add(listener);
}
-
+
@Override
public void removeAlignCalcListener(AlignCalcListener listener)
{
listeners.remove(listener);
}
+ @Override
+ public void shutdown()
+ {
+ executor.shutdownNow();
+ listeners.clear();
+ registered.clear();
+ tasks.clear();
+ inProgress.clear();
+ disabled.clear();
+ }
+
}