1 package jalview.workers;
3 import java.util.HashMap;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.ScheduledExecutorService;
12 import java.util.concurrent.TimeUnit;
14 import static java.lang.String.format;
15 import static java.util.Collections.synchronizedMap;
16 import static java.util.Collections.unmodifiableList;
18 import java.util.ArrayList;
20 import jalview.api.AlignCalcListener;
21 import jalview.api.AlignCalcManagerI2;
22 import jalview.api.AlignCalcWorkerI;
23 import jalview.api.PollableAlignCalcWorkerI;
24 import jalview.bin.Cache;
25 import jalview.datamodel.AlignmentAnnotation;
27 public class AlignCalcManager2 implements AlignCalcManagerI2
29 private abstract class WorkerManager
31 protected volatile boolean enabled = true;
33 protected final AlignCalcWorkerI worker;
35 WorkerManager(AlignCalcWorkerI worker)
40 AlignCalcWorkerI getWorker()
50 void setEnabled(boolean enabled)
52 this.enabled = enabled;
55 synchronized void restart()
68 abstract boolean isWorking();
70 protected abstract void submit();
72 abstract void cancel();
76 private class SimpleWorkerManager extends WorkerManager
78 private Future<?> task = null;
80 SimpleWorkerManager(AlignCalcWorkerI worker)
88 return task != null && !task.isDone();
92 protected void submit()
94 if (task != null && !(task.isDone() || task.isCancelled()))
96 throw new IllegalStateException(
97 "Cannot submit new task if the prevoius one is still running");
99 Cache.log.debug(format("Worker %s queued",
100 worker.getClass().getName()));
101 task = executor.submit(() -> {
104 Cache.log.debug(format("Worker %s started",
105 worker.getClass().getName()));
107 Cache.log.debug(format("Worker %s finished",
108 worker.getClass().getName()));
110 catch (InterruptedException e)
112 Cache.log.debug(format("Worker %s interrupted",
113 worker.getClass().getName()));
117 Cache.log.debug(format("Worker %s failed",
118 worker.getClass().getName()), th);
127 synchronized void cancel()
133 Cache.log.debug(format("Cancelling worker %s",
134 worker.getClass().getName()));
140 private class PollableWorkerManager extends WorkerManager
142 private final PollableAlignCalcWorkerI worker;
143 private Future<?> task = null;
145 PollableWorkerManager(PollableAlignCalcWorkerI worker)
148 this.worker = worker;
154 return task != null && !task.isDone();
157 protected void submit()
159 if (task != null && !(task.isDone() || task.isCancelled()))
161 throw new IllegalStateException(
162 "Cannot submit new task if the prevoius one is still running");
164 Cache.log.debug(format("Worker %s queued",
165 worker.getClass().getName()));
166 final var runnable = new Runnable()
168 private boolean started = false;
169 private boolean completed = false;
170 Future<?> future = null;
179 Cache.log.debug(format("Worker %s started",
180 worker.getClass().getName()));
186 Cache.log.debug(format("Polling worker %s",
187 worker.getClass().getName()));
190 Cache.log.debug(format("Worker %s finished",
191 worker.getClass().getName()));
195 } catch (Throwable th)
197 Cache.log.debug(format("Worker %s failed",
198 worker.getClass().getName()), th);
203 Cache.log.debug(format("Finalizing completed worker %s",
204 worker.getClass().getName()));
206 // almost impossible, but the future may be null at this point
207 // let it throw NPE to cancel forcefully
208 future.cancel(false);
212 runnable.future = task = executor.scheduleWithFixedDelay(
213 runnable, 10, 1000, TimeUnit.MILLISECONDS);
216 synchronized protected void cancel()
222 Cache.log.debug(format("Cancelling worker %s",
223 worker.getClass().getName()));
225 executor.submit(() -> {
227 Cache.log.debug(format("Finalizing cancelled worker %s",
228 worker.getClass().getName()));
235 private final ScheduledExecutorService executor =
236 Executors.newSingleThreadScheduledExecutor();
237 private final Map<AlignCalcWorkerI, WorkerManager> registered =
238 synchronizedMap(new HashMap<>());
240 private final List<AlignCalcListener> listeners =
241 new CopyOnWriteArrayList<>();
243 private WorkerManager createManager(AlignCalcWorkerI worker) {
244 if (worker instanceof PollableAlignCalcWorkerI)
245 return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
247 return new SimpleWorkerManager(worker);
251 public void registerWorker(AlignCalcWorkerI worker)
253 Objects.requireNonNull(worker);
254 WorkerManager manager = createManager(worker);
255 registered.putIfAbsent(worker, manager);
260 public List<AlignCalcWorkerI> getWorkers()
262 List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
263 result.addAll(registered.keySet());
268 public List<AlignCalcWorkerI> getWorkersOfClass(
269 Class<? extends AlignCalcWorkerI> cls)
271 List<AlignCalcWorkerI> collected = new ArrayList<>();
272 for (var worker : getWorkers())
274 if (worker.getClass().equals(cls))
276 collected.add(worker);
279 return unmodifiableList(collected);
283 public void removeWorker(AlignCalcWorkerI worker)
285 registered.remove(worker);
289 public void removeWorkerForAnnotation(AlignmentAnnotation annot)
291 synchronized (registered)
293 for (var worker : getWorkers())
295 if (worker.involves(annot) && worker.isDeletable())
297 removeWorker(worker);
304 public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
306 synchronized (registered)
308 for (var worker : getWorkers())
310 if (worker.getClass().equals(cls))
312 removeWorker(worker);
319 public void disableWorker(AlignCalcWorkerI worker)
321 // Null pointer check might be needed
322 registered.get(worker).setEnabled(false);
326 public void enableWorker(AlignCalcWorkerI worker)
328 // Null pointer check might be needed
329 registered.get(worker).setEnabled(true);
333 public boolean isDisabled(AlignCalcWorkerI worker)
335 if (registered.containsKey(worker))
337 return !registered.get(worker).isEnabled();
346 public boolean isWorking(AlignCalcWorkerI worker)
348 if (!registered.containsKey(worker))
354 return registered.get(worker).isWorking();
359 public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
361 synchronized (registered)
363 for (var entry : registered.entrySet())
365 if (entry.getKey().involves(annot) &&
366 entry.getValue().isWorking())
376 public boolean isWorking()
378 synchronized (registered)
380 for (var manager : registered.values())
382 if (manager.isWorking())
392 public void startWorker(AlignCalcWorkerI worker)
394 Objects.requireNonNull(worker);
395 var manager = registered.get(worker);
398 Cache.log.warn("Starting unregistered worker " + worker);
399 manager = createManager(worker);
405 public void restartWorkers()
407 synchronized (registered)
409 for (var manager : registered.values())
417 public void cancelWorker(AlignCalcWorkerI worker)
419 Objects.requireNonNull(worker);
420 var manager = registered.get(worker);
423 throw new NoSuchElementException();
428 private void notifyQueued(AlignCalcWorkerI worker)
430 for (AlignCalcListener listener : listeners)
432 listener.workerQueued(worker);
436 private void notifyStarted(AlignCalcWorkerI worker)
438 for (AlignCalcListener listener : listeners)
440 listener.workerStarted(worker);
444 private void notifyCompleted(AlignCalcWorkerI worker)
446 for (AlignCalcListener listener : listeners)
450 listener.workerCompleted(worker);
451 } catch (RuntimeException e)
458 private void notifyCancelled(AlignCalcWorkerI worker)
460 for (AlignCalcListener listener : listeners)
464 listener.workerCancelled(worker);
465 } catch (RuntimeException e)
472 private void notifyExceptional(AlignCalcWorkerI worker,
475 for (AlignCalcListener listener : listeners)
479 listener.workerExceptional(worker, throwable);
480 } catch (RuntimeException e)
488 public void addAlignCalcListener(AlignCalcListener listener)
490 listeners.add(listener);
494 public void removeAlignCalcListener(AlignCalcListener listener)
496 listeners.remove(listener);
500 public void shutdown()
502 executor.shutdownNow();