1 package jalview.workers;
3 import java.util.HashMap;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.WeakHashMap;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.TimeUnit;
15 import static java.lang.String.format;
16 import static java.util.Collections.synchronizedMap;
17 import static java.util.Collections.unmodifiableList;
19 import java.util.ArrayList;
21 import jalview.api.AlignCalcListener;
22 import jalview.api.AlignCalcManagerI2;
23 import jalview.api.AlignCalcWorkerI;
24 import jalview.api.PollableAlignCalcWorkerI;
25 import jalview.bin.Cache;
26 import jalview.bin.Console;
27 import jalview.datamodel.AlignmentAnnotation;
29 public class AlignCalcManager2 implements AlignCalcManagerI2
31 private abstract class WorkerManager
33 protected volatile boolean enabled = true;
35 protected AlignCalcWorkerI worker;
37 WorkerManager(AlignCalcWorkerI worker)
42 protected AlignCalcWorkerI getWorker()
52 void setEnabled(boolean enabled)
54 this.enabled = enabled;
57 synchronized void restart()
74 protected boolean isRegistered()
76 return registered.containsKey(getWorker());
79 abstract boolean isWorking();
81 protected abstract void submit();
83 abstract void cancel();
86 private class SimpleWorkerManager extends WorkerManager
88 private Future<?> task = null;
90 SimpleWorkerManager(AlignCalcWorkerI worker)
98 return task != null && !task.isDone();
102 protected void submit()
104 if (task != null && !(task.isDone() || task.isCancelled()))
106 throw new IllegalStateException(
107 "Cannot submit new task if the prevoius one is still running");
110 format("Worker %s queued", getWorker().getClass().getName()));
111 task = executor.submit(() -> {
114 Console.debug(format("Worker %s started", getWorker()));
116 Console.debug(format("Worker %s finished", getWorker()));
117 } catch (InterruptedException e)
119 Console.debug(format("Worker %s interrupted", getWorker()));
120 } catch (Throwable th)
122 Console.debug(format("Worker %s failed", getWorker()), th);
127 // delete worker reference so garbage collector can remove it
135 synchronized void cancel()
141 Console.debug(format("Cancelling worker %s", getWorker()));
146 private class PollableWorkerManager extends WorkerManager
148 private Future<?> task = null;
150 PollableWorkerManager(PollableAlignCalcWorkerI worker)
156 protected PollableAlignCalcWorkerI getWorker()
158 return (PollableAlignCalcWorkerI) super.getWorker();
164 return task != null && !task.isDone();
167 protected void submit()
169 if (task != null && !(task.isDone() || task.isCancelled()))
171 throw new IllegalStateException(
172 "Cannot submit new task if the prevoius one is still running");
175 format("Worker %s queued", getWorker()));
176 final var runnable = new Runnable()
178 private boolean started = false;
180 private boolean completed = false;
182 Future<?> future = null;
191 Console.debug(format("Worker %s started", getWorker()));
192 getWorker().startUp();
197 Console.debug(format("Polling worker %s", getWorker()));
198 if (getWorker().poll())
200 Console.debug(format("Worker %s finished", getWorker()));
204 } catch (Throwable th)
206 Console.debug(format("Worker %s failed", getWorker()), th);
211 final var worker = getWorker();
213 PollableWorkerManager.super.worker = null;
214 Console.debug(format("Finalizing completed worker %s", worker));
216 // almost impossible, but the future may be null at this point
217 // let it throw NPE to cancel forcefully
218 future.cancel(false);
222 runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10,
223 1000, TimeUnit.MILLISECONDS);
226 synchronized protected void cancel()
232 Console.debug(format("Cancelling worker %s", getWorker()));
234 executor.submit(() -> {
235 final var worker = getWorker();
237 PollableWorkerManager.super.worker = null;
241 Console.debug(format("Finalizing cancelled worker %s", worker));
248 private final ScheduledExecutorService executor = Executors
249 .newSingleThreadScheduledExecutor();
251 private final Map<AlignCalcWorkerI, WorkerManager> registered = synchronizedMap(
254 private final Map<AlignCalcWorkerI, WorkerManager> oneshot = synchronizedMap(
255 new WeakHashMap<>());
257 private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<>();
259 private WorkerManager createManager(AlignCalcWorkerI worker)
261 if (worker instanceof PollableAlignCalcWorkerI)
263 return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
267 return new SimpleWorkerManager(worker);
272 public void registerWorker(AlignCalcWorkerI worker)
274 Objects.requireNonNull(worker);
275 WorkerManager manager = createManager(worker);
276 registered.putIfAbsent(worker, manager);
281 public List<AlignCalcWorkerI> getWorkers()
283 List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
284 result.addAll(registered.keySet());
289 public List<AlignCalcWorkerI> getWorkersOfClass(
290 Class<? extends AlignCalcWorkerI> cls)
292 List<AlignCalcWorkerI> collected = new ArrayList<>();
293 for (var worker : getWorkers())
295 if (worker.getClass().equals(cls))
297 collected.add(worker);
300 return unmodifiableList(collected);
304 public void removeWorker(AlignCalcWorkerI worker)
306 if (worker.isDeletable())
308 registered.remove(worker);
313 public void removeWorkerForAnnotation(AlignmentAnnotation annot)
315 synchronized (registered)
317 for (var worker : getWorkers())
319 if (worker.involves(annot))
321 removeWorker(worker);
328 public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
330 synchronized (registered)
332 for (var worker : getWorkers())
334 if (worker.getClass().equals(cls))
336 removeWorker(worker);
343 public void disableWorker(AlignCalcWorkerI worker)
345 // Null pointer check might be needed
346 registered.get(worker).setEnabled(false);
350 public void enableWorker(AlignCalcWorkerI worker)
352 // Null pointer check might be needed
353 registered.get(worker).setEnabled(true);
357 public boolean isDisabled(AlignCalcWorkerI worker)
359 if (registered.containsKey(worker))
361 return !registered.get(worker).isEnabled();
370 public boolean isWorking(AlignCalcWorkerI worker)
372 var manager = registered.get(worker);
374 manager = oneshot.get(worker);
378 return manager.isWorking();
382 public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
384 synchronized (registered)
386 for (var entry : registered.entrySet())
387 if (entry.getKey().involves(annot) && entry.getValue().isWorking())
390 synchronized (oneshot)
392 for (var entry : registered.entrySet())
393 if (entry.getKey().involves(annot) && entry.getValue().isWorking())
400 public boolean isWorking()
402 synchronized (registered)
404 for (var manager : registered.values())
405 if (manager.isWorking())
408 synchronized (oneshot)
410 for (var manager : oneshot.values())
411 if (manager.isWorking())
418 public void startWorker(AlignCalcWorkerI worker)
420 Objects.requireNonNull(worker);
421 var manager = registered.get(worker);
424 Console.warn("Starting unregistered worker " + worker);
425 manager = createManager(worker);
426 oneshot.put(worker, manager);
432 public void restartWorkers()
434 synchronized (registered)
436 for (var manager : registered.values())
444 public void cancelWorker(AlignCalcWorkerI worker)
446 Objects.requireNonNull(worker);
447 var manager = registered.get(worker);
449 manager = oneshot.get(worker);
452 throw new NoSuchElementException();
457 private void notifyQueued(AlignCalcWorkerI worker)
459 for (AlignCalcListener listener : listeners)
461 listener.workerQueued(worker);
465 private void notifyStarted(AlignCalcWorkerI worker)
467 for (AlignCalcListener listener : listeners)
469 listener.workerStarted(worker);
473 private void notifyCompleted(AlignCalcWorkerI worker)
475 for (AlignCalcListener listener : listeners)
479 listener.workerCompleted(worker);
480 } catch (RuntimeException e)
487 private void notifyCancelled(AlignCalcWorkerI worker)
489 for (AlignCalcListener listener : listeners)
493 listener.workerCancelled(worker);
494 } catch (RuntimeException e)
501 private void notifyExceptional(AlignCalcWorkerI worker,
504 for (AlignCalcListener listener : listeners)
508 listener.workerExceptional(worker, throwable);
509 } catch (RuntimeException e)
517 public void addAlignCalcListener(AlignCalcListener listener)
519 listeners.add(listener);
523 public void removeAlignCalcListener(AlignCalcListener listener)
525 listeners.remove(listener);
529 public void shutdown()
531 executor.shutdownNow();