JAL-3878 Add methods to get and remove workers by their calc name.
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.HashMap;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.WeakHashMap;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.TimeUnit;
14
15 import static java.lang.String.format;
16 import static java.util.Collections.synchronizedMap;
17 import static java.util.Collections.unmodifiableList;
18
19 import java.util.ArrayList;
20
21 import jalview.api.AlignCalcListener;
22 import jalview.api.AlignCalcManagerI2;
23 import jalview.api.AlignCalcWorkerI;
24 import jalview.api.PollableAlignCalcWorkerI;
25 import jalview.bin.Cache;
26 import jalview.datamodel.AlignmentAnnotation;
27
28 public class AlignCalcManager2 implements AlignCalcManagerI2
29 {
30   private abstract class WorkerManager
31   {
32     protected volatile boolean enabled = true;
33
34     protected AlignCalcWorkerI worker;
35
36     WorkerManager(AlignCalcWorkerI worker)
37     {
38       this.worker = worker;
39     }
40
41     protected AlignCalcWorkerI getWorker()
42     {
43       return worker;
44     }
45
46     boolean isEnabled()
47     {
48       return enabled;
49     }
50
51     void setEnabled(boolean enabled)
52     {
53       this.enabled = enabled;
54     }
55
56     synchronized void restart()
57     {
58       if (!isEnabled())
59       {
60         return;
61       }
62       if (!isRegistered())
63       {
64         setEnabled(false);
65       }
66       if (isWorking())
67       {
68         cancel();
69       }
70       submit();
71     }
72
73     protected boolean isRegistered()
74     {
75       return registered.containsKey(getWorker());
76     }
77
78     abstract boolean isWorking();
79
80     protected abstract void submit();
81
82     abstract void cancel();
83   }
84
85   private class SimpleWorkerManager extends WorkerManager
86   {
87     private Future<?> task = null;
88
89     SimpleWorkerManager(AlignCalcWorkerI worker)
90     {
91       super(worker);
92     }
93
94     @Override
95     boolean isWorking()
96     {
97       return task != null && !task.isDone();
98     }
99
100     @Override
101     protected void submit()
102     {
103       if (task != null && !(task.isDone() || task.isCancelled()))
104       {
105         throw new IllegalStateException(
106                 "Cannot submit new task if the prevoius one is still running");
107       }
108       Cache.log.debug(
109               format("Worker %s queued", getWorker().getClass().getName()));
110       task = executor.submit(() -> {
111         try
112         {
113           Cache.log.debug(format("Worker %s started",
114                   getWorker().getClass().getName()));
115           getWorker().run();
116           Cache.log.debug(format("Worker %s finished",
117                   getWorker().getClass().getName()));
118         } catch (InterruptedException e)
119         {
120           Cache.log.debug(format("Worker %s interrupted",
121                   getWorker().getClass().getName()));
122         } catch (Throwable th)
123         {
124           Cache.log.debug(format("Worker %s failed",
125                   getWorker().getClass().getName()), th);
126         } finally
127         {
128           if (!isRegistered())
129           {
130             // delete worker reference so garbage collector can remove it
131             worker = null;
132           }
133         }
134       });
135     }
136
137     @Override
138     synchronized void cancel()
139     {
140       if (!isWorking())
141       {
142         return;
143       }
144       Cache.log.debug(format("Cancelling worker %s",
145               getWorker().getClass().getName()));
146       task.cancel(true);
147     }
148   }
149
150   private class PollableWorkerManager extends WorkerManager
151   {
152     private Future<?> task = null;
153
154     PollableWorkerManager(PollableAlignCalcWorkerI worker)
155     {
156       super(worker);
157     }
158
159     @Override
160     protected PollableAlignCalcWorkerI getWorker()
161     {
162       return (PollableAlignCalcWorkerI) super.getWorker();
163     }
164
165     @Override
166     boolean isWorking()
167     {
168       return task != null && !task.isDone();
169     }
170
171     protected void submit()
172     {
173       if (task != null && !(task.isDone() || task.isCancelled()))
174       {
175         throw new IllegalStateException(
176                 "Cannot submit new task if the prevoius one is still running");
177       }
178       Cache.log.debug(
179               format("Worker %s queued", getWorker().getClass().getName()));
180       final var runnable = new Runnable()
181       {
182         private boolean started = false;
183
184         private boolean completed = false;
185
186         Future<?> future = null;
187
188         @Override
189         public void run()
190         {
191           try
192           {
193             if (!started)
194             {
195               Cache.log.debug(format("Worker %s started",
196                       getWorker().getClass().getName()));
197               getWorker().startUp();
198               started = true;
199             }
200             else if (!completed)
201             {
202               Cache.log.debug(format("Polling worker %s",
203                       getWorker().getClass().getName()));
204               if (getWorker().poll())
205               {
206                 Cache.log.debug(format("Worker %s finished",
207                         getWorker().getClass().getName()));
208                 completed = true;
209               }
210             }
211           } catch (Throwable th)
212           {
213             Cache.log.debug(format("Worker %s failed",
214                     getWorker().getClass().getName()), th);
215             completed = true;
216           }
217           if (completed)
218           {
219             final var worker = getWorker();
220             if (!isRegistered())
221               PollableWorkerManager.super.worker = null;
222             Cache.log.debug(format("Finalizing completed worker %s",
223                     worker.getClass().getName()));
224             worker.done();
225             // almost impossible, but the future may be null at this point
226             // let it throw NPE to cancel forcefully
227             future.cancel(false);
228           }
229         }
230       };
231       runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10,
232               1000, TimeUnit.MILLISECONDS);
233     }
234
235     synchronized protected void cancel()
236     {
237       if (!isWorking())
238       {
239         return;
240       }
241       Cache.log.debug(format("Cancelling worker %s",
242               getWorker().getClass().getName()));
243       task.cancel(false);
244       executor.submit(() -> {
245         final var worker = getWorker();
246         if (!isRegistered())
247           PollableWorkerManager.super.worker = null;
248         if (worker != null)
249         {
250           worker.cancel();
251           Cache.log.debug(format("Finalizing cancelled worker %s",
252                   worker.getClass().getName()));
253           worker.done();
254         }
255       });
256     }
257   }
258
259   private final ScheduledExecutorService executor = Executors
260           .newSingleThreadScheduledExecutor();
261
262   private final Map<AlignCalcWorkerI, WorkerManager> registered = synchronizedMap(
263           new HashMap<>());
264
265   private final Map<AlignCalcWorkerI, WorkerManager> oneshot = synchronizedMap(
266           new WeakHashMap<>());
267
268   private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<>();
269
270   private WorkerManager createManager(AlignCalcWorkerI worker)
271   {
272     if (worker instanceof PollableAlignCalcWorkerI)
273     {
274       return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
275     }
276     else
277     {
278       return new SimpleWorkerManager(worker);
279     }
280   }
281
282   @Override
283   public void registerWorker(AlignCalcWorkerI worker)
284   {
285     Objects.requireNonNull(worker);
286     WorkerManager manager = createManager(worker);
287     registered.putIfAbsent(worker, manager);
288     startWorker(worker);
289   }
290
291   @Override
292   public List<AlignCalcWorkerI> getWorkers()
293   {
294     List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
295     result.addAll(registered.keySet());
296     return result;
297   }
298
299   @Override
300   public List<AlignCalcWorkerI> getWorkersOfClass(
301           Class<? extends AlignCalcWorkerI> cls)
302   {
303     List<AlignCalcWorkerI> collected = new ArrayList<>();
304     for (var worker : getWorkers())
305     {
306       if (worker.getClass().equals(cls))
307       {
308         collected.add(worker);
309       }
310     }
311     return unmodifiableList(collected);
312   }
313
314   @Override
315   public List<AlignCalcWorkerI> getWorkersForName(String name)
316   {
317     List<AlignCalcWorkerI> collected = new ArrayList<>();
318     for (var worker : getWorkers())
319     {
320       if (worker.getCalcName().equals(name))
321       {
322         collected.add(worker);
323       }
324     }
325     return collected;
326   }
327
328   @Override
329   public void removeWorker(AlignCalcWorkerI worker)
330   {
331     if (worker.isDeletable())
332     {
333       registered.remove(worker);
334     }
335   }
336
337   @Override
338   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
339   {
340     synchronized (registered)
341     {
342       for (var worker : getWorkers())
343       {
344         if (worker.involves(annot))
345         {
346           removeWorker(worker);
347         }
348       }
349     }
350   }
351
352   @Override
353   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
354   {
355     synchronized (registered)
356     {
357       for (var worker : getWorkers())
358       {
359         if (worker.getClass().equals(cls))
360         {
361           removeWorker(worker);
362         }
363       }
364     }
365   }
366
367   @Override
368   public void removeWorkersForName(String name)
369   {
370     synchronized (registered)
371     {
372       for (var worker : getWorkers())
373       {
374         if (worker.getCalcName().equals(name))
375         {
376           removeWorker(worker);
377         }
378       }
379     }
380   }
381
382   @Override
383   public void disableWorker(AlignCalcWorkerI worker)
384   {
385     // Null pointer check might be needed
386     registered.get(worker).setEnabled(false);
387   }
388
389   @Override
390   public void enableWorker(AlignCalcWorkerI worker)
391   {
392     // Null pointer check might be needed
393     registered.get(worker).setEnabled(true);
394   }
395
396   @Override
397   public boolean isDisabled(AlignCalcWorkerI worker)
398   {
399     if (registered.containsKey(worker))
400     {
401       return !registered.get(worker).isEnabled();
402     }
403     else
404     {
405       return false;
406     }
407   }
408
409   @Override
410   public boolean isWorking(AlignCalcWorkerI worker)
411   {
412     var manager = registered.get(worker);
413     if (manager == null)
414       manager = oneshot.get(worker);
415     if (manager == null)
416       return false;
417     else
418       return manager.isWorking();
419   }
420
421   @Override
422   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
423   {
424     synchronized (registered)
425     {
426       for (var entry : registered.entrySet())
427         if (entry.getKey().involves(annot) && entry.getValue().isWorking())
428           return true;
429     }
430     synchronized (oneshot)
431     {
432       for (var entry : registered.entrySet())
433         if (entry.getKey().involves(annot) && entry.getValue().isWorking())
434           return true;
435     }
436     return false;
437   }
438
439   @Override
440   public boolean isWorking()
441   {
442     synchronized (registered)
443     {
444       for (var manager : registered.values())
445         if (manager.isWorking())
446           return true;
447     }
448     synchronized (oneshot)
449     {
450       for (var manager : oneshot.values())
451         if (manager.isWorking())
452           return true;
453     }
454     return false;
455   }
456
457   @Override
458   public void startWorker(AlignCalcWorkerI worker)
459   {
460     Objects.requireNonNull(worker);
461     var manager = registered.get(worker);
462     if (manager == null)
463     {
464       Cache.log.warn("Starting unregistered worker " + worker);
465       manager = createManager(worker);
466       oneshot.put(worker, manager);
467     }
468     manager.restart();
469   }
470
471   @Override
472   public void restartWorkers()
473   {
474     synchronized (registered)
475     {
476       for (var manager : registered.values())
477       {
478         manager.restart();
479       }
480     }
481   }
482
483   @Override
484   public void cancelWorker(AlignCalcWorkerI worker)
485   {
486     Objects.requireNonNull(worker);
487     var manager = registered.get(worker);
488     if (manager == null)
489       manager = oneshot.get(worker);
490     if (manager == null)
491     {
492       throw new NoSuchElementException();
493     }
494     manager.cancel();
495   }
496
497   private void notifyQueued(AlignCalcWorkerI worker)
498   {
499     for (AlignCalcListener listener : listeners)
500     {
501       listener.workerQueued(worker);
502     }
503   }
504
505   private void notifyStarted(AlignCalcWorkerI worker)
506   {
507     for (AlignCalcListener listener : listeners)
508     {
509       listener.workerStarted(worker);
510     }
511   }
512
513   private void notifyCompleted(AlignCalcWorkerI worker)
514   {
515     for (AlignCalcListener listener : listeners)
516     {
517       try
518       {
519         listener.workerCompleted(worker);
520       } catch (RuntimeException e)
521       {
522         e.printStackTrace();
523       }
524     }
525   }
526
527   private void notifyCancelled(AlignCalcWorkerI worker)
528   {
529     for (AlignCalcListener listener : listeners)
530     {
531       try
532       {
533         listener.workerCancelled(worker);
534       } catch (RuntimeException e)
535       {
536         e.printStackTrace();
537       }
538     }
539   }
540
541   private void notifyExceptional(AlignCalcWorkerI worker,
542           Throwable throwable)
543   {
544     for (AlignCalcListener listener : listeners)
545     {
546       try
547       {
548         listener.workerExceptional(worker, throwable);
549       } catch (RuntimeException e)
550       {
551         e.printStackTrace();
552       }
553     }
554   }
555
556   @Override
557   public void addAlignCalcListener(AlignCalcListener listener)
558   {
559     listeners.add(listener);
560   }
561
562   @Override
563   public void removeAlignCalcListener(AlignCalcListener listener)
564   {
565     listeners.remove(listener);
566   }
567
568   @Override
569   public void shutdown()
570   {
571     executor.shutdownNow();
572     listeners.clear();
573     registered.clear();
574   }
575
576 }