1 package jalview.workers;
4 import java.util.Objects;
5 import java.util.Optional;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.CancellationException;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.FutureTask;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.atomic.AtomicInteger;
17 import jalview.api.AlignCalcListener;
18 import jalview.api.AlignCalcManagerI2;
19 import jalview.api.AlignCalcWorkerI;
20 import jalview.bin.Cache;
21 import jalview.datamodel.AlignmentAnnotation;
23 import static java.util.Collections.synchronizedList;
24 import static java.util.Collections.synchronizedSet;
25 import static java.util.Collections.unmodifiableList;
27 import java.util.ArrayList;
28 import java.util.HashSet;
30 import static java.lang.String.format;
32 public class AlignCalcManager2 implements AlignCalcManagerI2
34 class AlignCalcTask extends FutureTask<Void>
36 final AlignCalcWorkerI worker;
38 public AlignCalcTask(AlignCalcWorkerI worker)
40 super(new Callable<Void>()
42 public Void call() throws Exception
44 Cache.log.debug(format("Worker %s started%n",
45 worker.getClass().getName()));
46 notifyStarted(worker);
54 public AlignCalcWorkerI getWorker()
62 boolean success = false;
63 Throwable exception = null;
68 } catch (CancellationException e)
70 Cache.log.debug(format("Worker %s cancelled%n",
71 getWorker().getClass().getName()));
72 notifyCancelled(worker);
73 } catch (ExecutionException e)
75 exception = e.getCause();
76 if (exception instanceof OutOfMemoryError)
78 disableWorker(getWorker());
85 inProgress.remove(getWorker());
90 Cache.log.debug(format("Worker %s finished%n",
91 getWorker().getClass().getName()));
92 notifyCompleted(worker);
94 else if (exception != null)
96 Cache.log.warn(format("Worker %s failed%n",
97 getWorker().getClass().getName()));
98 exception.printStackTrace();
99 notifyExceptional(worker, exception);
104 private static class CalcManagerThreadFactory implements ThreadFactory
106 private static final AtomicInteger threadNumber = new AtomicInteger(1);
108 private final ThreadGroup group;
110 private static final String namePrefix = "AlignCalcManager-pool-thread-";
112 CalcManagerThreadFactory()
114 var securityManager = System.getSecurityManager();
115 if (securityManager != null)
117 group = securityManager.getThreadGroup();
121 group = Thread.currentThread().getThreadGroup();
126 public Thread newThread(Runnable r)
128 Thread t = new Thread(group, r,
129 namePrefix + threadNumber.getAndIncrement(), 0);
131 t.setPriority(Thread.NORM_PRIORITY);
136 // main executor for running workers one-by-one
137 private final ExecutorService executor = Executors
138 .newSingleThreadExecutor(new CalcManagerThreadFactory());
140 private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
142 // list of all registered workers (other collections are subsets of this)
143 private final List<AlignCalcWorkerI> registered = synchronizedList(
146 // list of tasks holding queued and running workers
147 private final List<AlignCalcTask> tasks = synchronizedList(
150 // the collection of currently running workers
151 private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(
154 // the collection of workers that will not be started
155 private final Set<AlignCalcWorkerI> disabled = synchronizedSet(
159 * Register the worker with this manager and scheduler for execution.
162 public void registerWorker(AlignCalcWorkerI worker)
164 Objects.requireNonNull(worker);
165 synchronized (registered)
167 if (!registered.contains(worker))
168 registered.add(worker);
174 public List<AlignCalcWorkerI> getWorkers()
176 return unmodifiableList(new ArrayList<>(registered));
180 public List<AlignCalcWorkerI> getWorkersOfClass(
181 Class<? extends AlignCalcWorkerI> cls)
183 synchronized (registered)
185 List<AlignCalcWorkerI> collected = new ArrayList<>();
186 for (var worker : registered)
188 if (worker.getClass().equals(cls))
190 collected.add(worker);
193 return unmodifiableList(collected);
198 public void removeWorker(AlignCalcWorkerI worker)
200 registered.remove(worker);
201 disabled.remove(worker);
205 public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
207 synchronized (registered)
209 for (var worker : registered)
211 if (worker.getClass().equals(cls))
213 removeWorker(worker);
220 public void removeWorkerForAnnotation(AlignmentAnnotation annot)
222 synchronized (registered)
224 for (var worker : registered)
226 if (worker.involves(annot) && worker.isDeletable())
228 removeWorker(worker);
235 public void disableWorker(AlignCalcWorkerI worker)
237 disabled.add(worker);
241 public void enableWorker(AlignCalcWorkerI worker)
243 disabled.remove(worker);
247 public void restartWorkers()
249 synchronized (registered)
251 for (AlignCalcWorkerI worker : registered)
253 if (!isDisabled(worker))
260 public void startWorker(AlignCalcWorkerI worker)
262 Objects.requireNonNull(worker);
263 AlignCalcTask newTask = new AlignCalcTask(worker);
264 synchronized (inProgress)
266 cancelWorker(worker);
267 inProgress.add(worker);
270 notifyQueued(worker);
271 executor.execute(newTask);
275 public void cancelWorker(AlignCalcWorkerI worker)
277 if (isWorking(worker))
281 Optional<AlignCalcTask> oldTask = tasks.stream()
282 .filter(task -> task.getWorker().equals(worker))
284 if (oldTask.isPresent())
286 oldTask.get().cancel(true);
293 public boolean isDisabled(AlignCalcWorkerI worker)
295 return disabled.contains(worker);
299 public boolean isWorking(AlignCalcWorkerI worker)
301 return inProgress.contains(worker);
305 public boolean isWorking()
307 return !inProgress.isEmpty();
311 public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
313 synchronized (inProgress)
315 for (AlignCalcWorkerI worker : inProgress)
317 if (worker.involves(annot))
326 private void notifyQueued(AlignCalcWorkerI worker)
328 for (AlignCalcListener listener : listeners)
330 listener.workerQueued(worker);
334 private void notifyStarted(AlignCalcWorkerI worker)
336 for (AlignCalcListener listener : listeners)
338 listener.workerStarted(worker);
342 private void notifyCompleted(AlignCalcWorkerI worker)
344 for (AlignCalcListener listener : listeners)
348 listener.workerCompleted(worker);
349 } catch (RuntimeException e)
356 private void notifyCancelled(AlignCalcWorkerI worker)
358 for (AlignCalcListener listener : listeners)
362 listener.workerCancelled(worker);
363 } catch (RuntimeException e)
370 private void notifyExceptional(AlignCalcWorkerI worker,
373 for (AlignCalcListener listener : listeners)
377 listener.workerExceptional(worker, throwable);
378 } catch (RuntimeException e)
386 public void addAlignCalcListener(AlignCalcListener listener)
388 listeners.add(listener);
392 public void removeAlignCalcListener(AlignCalcListener listener)
394 listeners.remove(listener);
398 public void shutdown()
400 executor.shutdownNow();