1 package jalview.workers;
3 import java.util.HashMap;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.WeakHashMap;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.TimeUnit;
15 import static java.lang.String.format;
16 import static java.util.Collections.synchronizedMap;
17 import static java.util.Collections.unmodifiableList;
19 import java.util.ArrayList;
21 import jalview.api.AlignCalcListener;
22 import jalview.api.AlignCalcManagerI2;
23 import jalview.api.AlignCalcWorkerI;
24 import jalview.api.PollableAlignCalcWorkerI;
25 import jalview.bin.Cache;
26 import jalview.datamodel.AlignmentAnnotation;
28 public class AlignCalcManager2 implements AlignCalcManagerI2
30 private abstract class WorkerManager
32 protected volatile boolean enabled = true;
34 protected AlignCalcWorkerI worker;
36 WorkerManager(AlignCalcWorkerI worker)
41 protected AlignCalcWorkerI getWorker()
51 void setEnabled(boolean enabled)
53 this.enabled = enabled;
56 synchronized void restart()
73 protected boolean isRegistered()
75 return registered.containsKey(getWorker());
78 abstract boolean isWorking();
80 protected abstract void submit();
82 abstract void cancel();
85 private class SimpleWorkerManager extends WorkerManager
87 private Future<?> task = null;
89 SimpleWorkerManager(AlignCalcWorkerI worker)
97 return task != null && !task.isDone();
101 protected void submit()
103 if (task != null && !(task.isDone() || task.isCancelled()))
105 throw new IllegalStateException(
106 "Cannot submit new task if the prevoius one is still running");
109 format("Worker %s queued", getWorker().getClass().getName()));
110 task = executor.submit(() -> {
113 Cache.log.debug(format("Worker %s started",
114 getWorker().getClass().getName()));
116 Cache.log.debug(format("Worker %s finished",
117 getWorker().getClass().getName()));
118 } catch (InterruptedException e)
120 Cache.log.debug(format("Worker %s interrupted",
121 getWorker().getClass().getName()));
122 } catch (Throwable th)
124 Cache.log.debug(format("Worker %s failed",
125 getWorker().getClass().getName()), th);
130 // delete worker reference so garbage collector can remove it
138 synchronized void cancel()
144 Cache.log.debug(format("Cancelling worker %s",
145 getWorker().getClass().getName()));
150 private class PollableWorkerManager extends WorkerManager
152 private Future<?> task = null;
154 PollableWorkerManager(PollableAlignCalcWorkerI worker)
160 protected PollableAlignCalcWorkerI getWorker()
162 return (PollableAlignCalcWorkerI) super.getWorker();
168 return task != null && !task.isDone();
171 protected void submit()
173 if (task != null && !(task.isDone() || task.isCancelled()))
175 throw new IllegalStateException(
176 "Cannot submit new task if the prevoius one is still running");
179 format("Worker %s queued", getWorker().getClass().getName()));
180 final var runnable = new Runnable()
182 private boolean started = false;
184 private boolean completed = false;
186 Future<?> future = null;
195 Cache.log.debug(format("Worker %s started",
196 getWorker().getClass().getName()));
197 getWorker().startUp();
202 Cache.log.debug(format("Polling worker %s",
203 getWorker().getClass().getName()));
204 if (getWorker().poll())
206 Cache.log.debug(format("Worker %s finished",
207 getWorker().getClass().getName()));
211 } catch (Throwable th)
213 Cache.log.debug(format("Worker %s failed",
214 getWorker().getClass().getName()), th);
219 final var worker = getWorker();
221 PollableWorkerManager.super.worker = null;
222 Cache.log.debug(format("Finalizing completed worker %s",
223 worker.getClass().getName()));
225 // almost impossible, but the future may be null at this point
226 // let it throw NPE to cancel forcefully
227 future.cancel(false);
231 runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10,
232 1000, TimeUnit.MILLISECONDS);
235 synchronized protected void cancel()
241 Cache.log.debug(format("Cancelling worker %s",
242 getWorker().getClass().getName()));
244 executor.submit(() -> {
245 final var worker = getWorker();
247 PollableWorkerManager.super.worker = null;
251 Cache.log.debug(format("Finalizing cancelled worker %s",
252 worker.getClass().getName()));
259 private final ScheduledExecutorService executor = Executors
260 .newSingleThreadScheduledExecutor();
262 private final Map<AlignCalcWorkerI, WorkerManager> registered = synchronizedMap(
265 private final Map<AlignCalcWorkerI, WorkerManager> oneshot = synchronizedMap(
266 new WeakHashMap<>());
268 private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<>();
270 private WorkerManager createManager(AlignCalcWorkerI worker)
272 if (worker instanceof PollableAlignCalcWorkerI)
274 return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
278 return new SimpleWorkerManager(worker);
283 public void registerWorker(AlignCalcWorkerI worker)
285 Objects.requireNonNull(worker);
286 WorkerManager manager = createManager(worker);
287 registered.putIfAbsent(worker, manager);
292 public List<AlignCalcWorkerI> getWorkers()
294 List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
295 result.addAll(registered.keySet());
300 public List<AlignCalcWorkerI> getWorkersOfClass(
301 Class<? extends AlignCalcWorkerI> cls)
303 List<AlignCalcWorkerI> collected = new ArrayList<>();
304 for (var worker : getWorkers())
306 if (worker.getClass().equals(cls))
308 collected.add(worker);
311 return unmodifiableList(collected);
315 public List<AlignCalcWorkerI> getWorkersForName(String name)
317 List<AlignCalcWorkerI> collected = new ArrayList<>();
318 for (var worker : getWorkers())
320 if (worker.getCalcName().equals(name))
322 collected.add(worker);
329 public void removeWorker(AlignCalcWorkerI worker)
331 if (worker.isDeletable())
333 registered.remove(worker);
338 public void removeWorkerForAnnotation(AlignmentAnnotation annot)
340 synchronized (registered)
342 for (var worker : getWorkers())
344 if (worker.involves(annot))
346 removeWorker(worker);
353 public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
355 synchronized (registered)
357 for (var worker : getWorkers())
359 if (worker.getClass().equals(cls))
361 removeWorker(worker);
368 public void removeWorkersForName(String name)
370 synchronized (registered)
372 for (var worker : getWorkers())
374 if (worker.getCalcName().equals(name))
376 removeWorker(worker);
383 public void disableWorker(AlignCalcWorkerI worker)
385 // Null pointer check might be needed
386 registered.get(worker).setEnabled(false);
390 public void enableWorker(AlignCalcWorkerI worker)
392 // Null pointer check might be needed
393 registered.get(worker).setEnabled(true);
397 public boolean isDisabled(AlignCalcWorkerI worker)
399 if (registered.containsKey(worker))
401 return !registered.get(worker).isEnabled();
410 public boolean isWorking(AlignCalcWorkerI worker)
412 var manager = registered.get(worker);
414 manager = oneshot.get(worker);
418 return manager.isWorking();
422 public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
424 synchronized (registered)
426 for (var entry : registered.entrySet())
427 if (entry.getKey().involves(annot) && entry.getValue().isWorking())
430 synchronized (oneshot)
432 for (var entry : registered.entrySet())
433 if (entry.getKey().involves(annot) && entry.getValue().isWorking())
440 public boolean isWorking()
442 synchronized (registered)
444 for (var manager : registered.values())
445 if (manager.isWorking())
448 synchronized (oneshot)
450 for (var manager : oneshot.values())
451 if (manager.isWorking())
458 public void startWorker(AlignCalcWorkerI worker)
460 Objects.requireNonNull(worker);
461 var manager = registered.get(worker);
464 Cache.log.warn("Starting unregistered worker " + worker);
465 manager = createManager(worker);
466 oneshot.put(worker, manager);
472 public void restartWorkers()
474 synchronized (registered)
476 for (var manager : registered.values())
484 public void cancelWorker(AlignCalcWorkerI worker)
486 Objects.requireNonNull(worker);
487 var manager = registered.get(worker);
489 manager = oneshot.get(worker);
492 throw new NoSuchElementException();
497 private void notifyQueued(AlignCalcWorkerI worker)
499 for (AlignCalcListener listener : listeners)
501 listener.workerQueued(worker);
505 private void notifyStarted(AlignCalcWorkerI worker)
507 for (AlignCalcListener listener : listeners)
509 listener.workerStarted(worker);
513 private void notifyCompleted(AlignCalcWorkerI worker)
515 for (AlignCalcListener listener : listeners)
519 listener.workerCompleted(worker);
520 } catch (RuntimeException e)
527 private void notifyCancelled(AlignCalcWorkerI worker)
529 for (AlignCalcListener listener : listeners)
533 listener.workerCancelled(worker);
534 } catch (RuntimeException e)
541 private void notifyExceptional(AlignCalcWorkerI worker,
544 for (AlignCalcListener listener : listeners)
548 listener.workerExceptional(worker, throwable);
549 } catch (RuntimeException e)
557 public void addAlignCalcListener(AlignCalcListener listener)
559 listeners.add(listener);
563 public void removeAlignCalcListener(AlignCalcListener listener)
565 listeners.remove(listener);
569 public void shutdown()
571 executor.shutdownNow();