1 package jalview.workers;
4 import java.util.Objects;
5 import java.util.Optional;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.CancellationException;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.FutureTask;
14 import java.util.stream.Collectors;
16 import jalview.api.AlignCalcListener;
17 import jalview.api.AlignCalcManagerI2;
18 import jalview.api.AlignCalcWorkerI;
19 import jalview.bin.Cache;
20 import jalview.datamodel.AlignmentAnnotation;
22 import static java.util.Collections.synchronizedList;
23 import static java.util.Collections.synchronizedSet;
24 import static java.util.Collections.unmodifiableList;
26 import java.util.ArrayList;
27 import java.util.HashSet;
29 import static java.lang.String.format;
31 public class AlignCalcManager2 implements AlignCalcManagerI2
33 class AlignCalcTask extends FutureTask<Void>
35 final AlignCalcWorkerI worker;
37 public AlignCalcTask(AlignCalcWorkerI worker)
39 super(new Callable<Void>() {
40 public Void call() throws Exception {
41 Cache.log.debug(format("Worker %s started%n", worker.getClass().getName()));
42 notifyStarted(worker);
50 public AlignCalcWorkerI getWorker()
58 boolean success = false;
59 Throwable exception = null;
65 catch (ExecutionException e)
67 exception = e.getCause();
68 if (exception instanceof OutOfMemoryError) {
69 disableWorker(getWorker());
76 inProgress.remove(getWorker());
81 Cache.log.debug(format("Worker %s finished%n", getWorker().getClass().getName()));
82 notifyCompleted(worker);
84 else if (exception != null){
85 Cache.log.warn(format("Worker %s failed%n", getWorker().getClass().getName()));
86 exception.printStackTrace();
87 notifyExceptional(worker, exception);
92 // main executor for running workers one-by-one
93 private final ExecutorService executor = Executors.newSingleThreadExecutor();
95 private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
97 // list of all registered workers (other collections are subsets of this)
98 private final List<AlignCalcWorkerI> registered = synchronizedList(new ArrayList<>());
100 // list of tasks holding queued and running workers
101 private final List<AlignCalcTask> tasks = synchronizedList(new ArrayList<>());
103 // the collection of currently running workers
104 private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(new HashSet<>());
106 // the collection of workers that will not be started
107 private final Set<AlignCalcWorkerI> disabled = synchronizedSet(new HashSet<>());
110 * Register the worker with this manager and scheduler for execution.
113 public void registerWorker(AlignCalcWorkerI worker)
115 Objects.requireNonNull(worker);
116 synchronized (registered)
118 if (!registered.contains(worker))
119 registered.add(worker);
125 public List<AlignCalcWorkerI> getWorkers()
127 return unmodifiableList(new ArrayList<>(registered));
131 public List<AlignCalcWorkerI> getWorkersOfClass(
132 Class<? extends AlignCalcWorkerI> cls)
134 synchronized (registered)
136 return registered.stream()
137 .filter(worker -> worker.getClass().equals(cls))
138 .collect(Collectors.toUnmodifiableList());
143 public void removeWorker(AlignCalcWorkerI worker)
145 registered.remove(worker);
146 disabled.remove(worker);
150 public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
152 synchronized (registered)
154 for (var worker : registered)
156 if (worker.getClass().equals(cls))
158 removeWorker(worker);
165 public void removeWorkerForAnnotation(AlignmentAnnotation annot)
167 synchronized (registered)
169 for (var worker : registered)
171 if (worker.involves(annot) && worker.isDeletable())
173 removeWorker(worker);
180 public void disableWorker(AlignCalcWorkerI worker)
182 disabled.add(worker);
186 public void enableWorker(AlignCalcWorkerI worker)
188 disabled.remove(worker);
192 public void restartWorkers()
194 synchronized (registered)
196 for (AlignCalcWorkerI worker : registered)
198 if (!isDisabled(worker))
205 public void startWorker(AlignCalcWorkerI worker)
207 Objects.requireNonNull(worker);
208 AlignCalcTask newTask = new AlignCalcTask(worker);
209 synchronized (inProgress)
211 cancelWorker(worker);
212 inProgress.add(worker);
215 notifyQueued(worker);
216 executor.execute(newTask);
220 public void cancelWorker(AlignCalcWorkerI worker)
222 if (isWorking(worker))
226 Optional<AlignCalcTask> oldTask = tasks.stream()
227 .filter(task -> task.getWorker().equals(worker))
229 if (oldTask.isPresent()) {
230 oldTask.get().cancel(true);
237 public boolean isDisabled(AlignCalcWorkerI worker)
239 return disabled.contains(worker);
243 public boolean isWorking(AlignCalcWorkerI worker)
245 return inProgress.contains(worker);
249 public boolean isWorking()
251 return !inProgress.isEmpty();
255 public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
257 synchronized (inProgress)
259 for (AlignCalcWorkerI worker : inProgress)
261 if (worker.involves(annot))
270 private void notifyQueued(AlignCalcWorkerI worker)
272 for (AlignCalcListener listener : listeners)
274 listener.workerQueued(worker);
278 private void notifyStarted(AlignCalcWorkerI worker)
280 for (AlignCalcListener listener : listeners)
282 listener.workerStarted(worker);
286 private void notifyCompleted(AlignCalcWorkerI worker)
288 for (AlignCalcListener listener : listeners)
291 listener.workerCompleted(worker);
292 } catch (RuntimeException e)
299 private void notifyExceptional(AlignCalcWorkerI worker,
302 for (AlignCalcListener listener : listeners)
305 listener.workerExceptional(worker, throwable);
306 } catch (RuntimeException e)
314 public void addAlignCalcListener(AlignCalcListener listener)
316 listeners.add(listener);
320 public void removeAlignCalcListener(AlignCalcListener listener)
322 listeners.remove(listener);