import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
-import java.util.stream.Collectors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
import jalview.api.AlignCalcListener;
import jalview.api.AlignCalcManagerI2;
public AlignCalcTask(AlignCalcWorkerI worker)
{
- super(new Callable<Void>() {
- public Void call() throws Exception {
- Cache.log.debug(format("Worker %s started%n", worker.getClass().getName()));
- notifyStarted(worker);
- worker.run();
- return null;
+ super(new Callable<Void>()
+ {
+ public Void call() throws Exception
+ {
+ Cache.log.debug(format("Worker %s started%n",
+ worker.getClass().getName()));
+ notifyStarted(worker);
+ worker.run();
+ return null;
}
});
this.worker = worker;
{
get();
success = true;
- }
- catch (CancellationException e) {
- Cache.log.debug(format("Worker %s cancelled%n", getWorker().getClass().getName()));
+ } catch (CancellationException e)
+ {
+ Cache.log.debug(format("Worker %s cancelled%n",
+ getWorker().getClass().getName()));
notifyCancelled(worker);
- }
- catch (ExecutionException e)
+ } catch (ExecutionException e)
{
exception = e.getCause();
- if (exception instanceof OutOfMemoryError) {
+ if (exception instanceof OutOfMemoryError)
+ {
disableWorker(getWorker());
}
} catch (Throwable e)
{
exception = e;
- }
- finally {
+ } finally
+ {
inProgress.remove(getWorker());
tasks.remove(this);
}
if (success)
{
- Cache.log.debug(format("Worker %s finished%n", getWorker().getClass().getName()));
+ Cache.log.debug(format("Worker %s finished%n",
+ getWorker().getClass().getName()));
notifyCompleted(worker);
}
- else if (exception != null){
- Cache.log.warn(format("Worker %s failed%n", getWorker().getClass().getName()));
+ else if (exception != null)
+ {
+ Cache.log.warn(format("Worker %s failed%n",
+ getWorker().getClass().getName()));
exception.printStackTrace();
notifyExceptional(worker, exception);
}
}
}
+ private static class CalcManagerThreadFactory implements ThreadFactory
+ {
+ private static final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ private final ThreadGroup group;
+
+ private static final String namePrefix = "AlignCalcManager-pool-thread-";
+
+ CalcManagerThreadFactory()
+ {
+ var securityManager = System.getSecurityManager();
+ if (securityManager != null)
+ {
+ group = securityManager.getThreadGroup();
+ }
+ else
+ {
+ group = Thread.currentThread().getThreadGroup();
+ }
+ }
+
+ @Override
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(group, r,
+ namePrefix + threadNumber.getAndIncrement(), 0);
+ t.setDaemon(false);
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ }
+
// main executor for running workers one-by-one
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
+ private final ExecutorService executor = Executors
+ .newSingleThreadExecutor(new CalcManagerThreadFactory());
+
private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
// list of all registered workers (other collections are subsets of this)
- private final List<AlignCalcWorkerI> registered = synchronizedList(new ArrayList<>());
+ private final List<AlignCalcWorkerI> registered = synchronizedList(
+ new ArrayList<>());
// list of tasks holding queued and running workers
- private final List<AlignCalcTask> tasks = synchronizedList(new ArrayList<>());
-
+ private final List<AlignCalcTask> tasks = synchronizedList(
+ new ArrayList<>());
+
// the collection of currently running workers
- private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(new HashSet<>());
+ private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(
+ new HashSet<>());
// the collection of workers that will not be started
- private final Set<AlignCalcWorkerI> disabled = synchronizedSet(new HashSet<>());
+ private final Set<AlignCalcWorkerI> disabled = synchronizedSet(
+ new HashSet<>());
/*
* Register the worker with this manager and scheduler for execution.
{
return unmodifiableList(new ArrayList<>(registered));
}
-
+
@Override
public List<AlignCalcWorkerI> getWorkersOfClass(
Class<? extends AlignCalcWorkerI> cls)
{
synchronized (registered)
{
- return registered.stream()
- .filter(worker -> worker.getClass().equals(cls))
- .collect(Collectors.toUnmodifiableList());
+ List<AlignCalcWorkerI> collected = new ArrayList<>();
+ for (var worker : registered)
+ {
+ if (worker.getClass().equals(cls))
+ {
+ collected.add(worker);
+ }
+ }
+ return unmodifiableList(collected);
}
}
-
+
@Override
public void removeWorker(AlignCalcWorkerI worker)
{
notifyQueued(worker);
executor.execute(newTask);
}
-
+
@Override
public void cancelWorker(AlignCalcWorkerI worker)
{
- if (isWorking(worker))
+ if (isWorking(worker))
{
- synchronized (tasks)
+ synchronized (tasks)
{
Optional<AlignCalcTask> oldTask = tasks.stream()
- .filter(task -> task.getWorker().equals(worker))
- .findFirst();
- if (oldTask.isPresent()) {
+ .filter(task -> task.getWorker().equals(worker))
+ .findFirst();
+ if (oldTask.isPresent())
+ {
oldTask.get().cancel(true);
}
}
{
return !inProgress.isEmpty();
}
-
+
@Override
public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
{
{
for (AlignCalcListener listener : listeners)
{
- try {
+ try
+ {
listener.workerCompleted(worker);
} catch (RuntimeException e)
{
}
}
}
-
+
private void notifyCancelled(AlignCalcWorkerI worker)
{
for (AlignCalcListener listener : listeners)
{
- try {
+ try
+ {
listener.workerCancelled(worker);
} catch (RuntimeException e)
{
{
for (AlignCalcListener listener : listeners)
{
- try {
+ try
+ {
listener.workerExceptional(worker, throwable);
} catch (RuntimeException e)
{
{
listeners.add(listener);
}
-
+
@Override
public void removeAlignCalcListener(AlignCalcListener listener)
{
listeners.remove(listener);
}
+ @Override
+ public void shutdown()
+ {
+ executor.shutdownNow();
+ listeners.clear();
+ registered.clear();
+ tasks.clear();
+ inProgress.clear();
+ disabled.clear();
+ }
+
}