8340900f1e1a40a925a78c7844d6eb3e42ee1077
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.HashMap;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.WeakHashMap;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.TimeUnit;
14
15 import static java.lang.String.format;
16 import static java.util.Collections.synchronizedMap;
17 import static java.util.Collections.unmodifiableList;
18
19 import java.util.ArrayList;
20
21 import jalview.api.AlignCalcListener;
22 import jalview.api.AlignCalcManagerI2;
23 import jalview.api.AlignCalcWorkerI;
24 import jalview.api.PollableAlignCalcWorkerI;
25 import jalview.bin.Cache;
26 import jalview.datamodel.AlignmentAnnotation;
27
28 public class AlignCalcManager2 implements AlignCalcManagerI2
29 {
30   private abstract class WorkerManager
31   {
32     protected volatile boolean enabled = true;
33
34     protected AlignCalcWorkerI worker;
35
36     WorkerManager(AlignCalcWorkerI worker)
37     {
38       this.worker = worker;
39     }
40
41     protected AlignCalcWorkerI getWorker()
42     {
43       return worker;
44     }
45
46     boolean isEnabled()
47     {
48       return enabled;
49     }
50
51     void setEnabled(boolean enabled)
52     {
53       this.enabled = enabled;
54     }
55
56     synchronized void restart()
57     {
58       if (!isEnabled())
59       {
60         return;
61       }
62       if (!isRestartable())
63       {
64         setEnabled(false);
65       }
66       if (isWorking())
67       {
68         cancel();
69       }
70       submit();
71     }
72
73     protected boolean isRestartable()
74     {
75       return registered.containsKey(getWorker());
76     }
77
78     abstract boolean isWorking();
79
80     protected abstract void submit();
81
82     abstract void cancel();
83   }
84
85   private class SimpleWorkerManager extends WorkerManager
86   {
87     private Future<?> task = null;
88
89     SimpleWorkerManager(AlignCalcWorkerI worker)
90     {
91       super(worker);
92     }
93
94     @Override
95     boolean isWorking()
96     {
97       return task != null && !task.isDone();
98     }
99
100     @Override
101     protected void submit()
102     {
103       if (task != null && !(task.isDone() || task.isCancelled()))
104       {
105         throw new IllegalStateException(
106                 "Cannot submit new task if the prevoius one is still running");
107       }
108       Cache.log.debug(
109               format("Worker %s queued", getWorker().getClass().getName()));
110       task = executor.submit(() -> {
111         try
112         {
113           Cache.log.debug(format("Worker %s started",
114                   getWorker().getClass().getName()));
115           getWorker().run();
116           Cache.log.debug(format("Worker %s finished",
117                   getWorker().getClass().getName()));
118         } catch (InterruptedException e)
119         {
120           Cache.log.debug(format("Worker %s interrupted",
121                   getWorker().getClass().getName()));
122         } catch (Throwable th)
123         {
124           Cache.log.debug(format("Worker %s failed",
125                   getWorker().getClass().getName()), th);
126         } finally
127         {
128           if (!isRestartable())
129           {
130             // delete worker reference so garbage collector can remove it
131             worker = null;
132           }
133         }
134       });
135     }
136
137     @Override
138     synchronized void cancel()
139     {
140       if (!isWorking())
141       {
142         return;
143       }
144       Cache.log.debug(format("Cancelling worker %s",
145               getWorker().getClass().getName()));
146       task.cancel(true);
147     }
148   }
149
150   private class PollableWorkerManager extends WorkerManager
151   {
152     private Future<?> task = null;
153
154     PollableWorkerManager(PollableAlignCalcWorkerI worker)
155     {
156       super(worker);
157     }
158
159     @Override
160     protected PollableAlignCalcWorkerI getWorker()
161     {
162       return (PollableAlignCalcWorkerI) super.getWorker();
163     }
164
165     @Override
166     boolean isWorking()
167     {
168       return task != null && !task.isDone();
169     }
170
171     protected void submit()
172     {
173       if (task != null && !(task.isDone() || task.isCancelled()))
174       {
175         throw new IllegalStateException(
176                 "Cannot submit new task if the prevoius one is still running");
177       }
178       Cache.log.debug(
179               format("Worker %s queued", getWorker().getClass().getName()));
180       final var runnable = new Runnable()
181       {
182         private boolean started = false;
183
184         private boolean completed = false;
185
186         Future<?> future = null;
187
188         @Override
189         public void run()
190         {
191           try
192           {
193             if (!started)
194             {
195               Cache.log.debug(format("Worker %s started",
196                       getWorker().getClass().getName()));
197               getWorker().startUp();
198               started = true;
199             }
200             else if (!completed)
201             {
202               Cache.log.debug(format("Polling worker %s",
203                       getWorker().getClass().getName()));
204               if (getWorker().poll())
205               {
206                 Cache.log.debug(format("Worker %s finished",
207                         getWorker().getClass().getName()));
208                 completed = true;
209               }
210             }
211           } catch (Throwable th)
212           {
213             Cache.log.debug(format("Worker %s failed",
214                     getWorker().getClass().getName()), th);
215             completed = true;
216           }
217           if (completed)
218           {
219             final var worker = getWorker();
220             if (!isRestartable())
221               PollableWorkerManager.super.worker = null;
222             Cache.log.debug(format("Finalizing completed worker %s",
223                     worker.getClass().getName()));
224             worker.done();
225             // almost impossible, but the future may be null at this point
226             // let it throw NPE to cancel forcefully
227             future.cancel(false);
228           }
229         }
230       };
231       runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10,
232               1000, TimeUnit.MILLISECONDS);
233     }
234
235     synchronized protected void cancel()
236     {
237       if (!isWorking())
238       {
239         return;
240       }
241       Cache.log.debug(format("Cancelling worker %s",
242               getWorker().getClass().getName()));
243       task.cancel(false);
244       executor.submit(() -> {
245         final var worker = getWorker();
246         if (!isRestartable())
247           PollableWorkerManager.super.worker = null;
248         if (worker != null)
249         {
250           worker.cancel();
251           Cache.log.debug(format("Finalizing cancelled worker %s",
252                   worker.getClass().getName()));
253           worker.done();
254         }
255       });
256     }
257   }
258
259   private final ScheduledExecutorService executor = Executors
260           .newSingleThreadScheduledExecutor();
261
262   private final Map<AlignCalcWorkerI, WorkerManager> registered = synchronizedMap(
263           new HashMap<>());
264
265   private final Map<AlignCalcWorkerI, WorkerManager> oneshot = synchronizedMap(
266           new WeakHashMap<>());
267
268   private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<>();
269
270   private WorkerManager createManager(AlignCalcWorkerI worker)
271   {
272     if (worker instanceof PollableAlignCalcWorkerI)
273     {
274       return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
275     }
276     else
277     {
278       return new SimpleWorkerManager(worker);
279     }
280   }
281
282   @Override
283   public void registerWorker(AlignCalcWorkerI worker)
284   {
285     Objects.requireNonNull(worker);
286     WorkerManager manager = createManager(worker);
287     registered.putIfAbsent(worker, manager);
288     startWorker(worker);
289   }
290
291   @Override
292   public List<AlignCalcWorkerI> getWorkers()
293   {
294     List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
295     result.addAll(registered.keySet());
296     return result;
297   }
298
299   @Override
300   public List<AlignCalcWorkerI> getWorkersOfClass(
301           Class<? extends AlignCalcWorkerI> cls)
302   {
303     List<AlignCalcWorkerI> collected = new ArrayList<>();
304     for (var worker : getWorkers())
305     {
306       if (worker.getClass().equals(cls))
307       {
308         collected.add(worker);
309       }
310     }
311     return unmodifiableList(collected);
312   }
313
314   @Override
315   public void removeWorker(AlignCalcWorkerI worker)
316   {
317     registered.remove(worker);
318   }
319
320   @Override
321   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
322   {
323     synchronized (registered)
324     {
325       for (var worker : getWorkers())
326       {
327         if (worker.involves(annot) && worker.isDeletable())
328         {
329           removeWorker(worker);
330         }
331       }
332     }
333   }
334
335   @Override
336   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
337   {
338     synchronized (registered)
339     {
340       for (var worker : getWorkers())
341       {
342         if (worker.getClass().equals(cls))
343         {
344           removeWorker(worker);
345         }
346       }
347     }
348   }
349
350   @Override
351   public void disableWorker(AlignCalcWorkerI worker)
352   {
353     // Null pointer check might be needed
354     registered.get(worker).setEnabled(false);
355   }
356
357   @Override
358   public void enableWorker(AlignCalcWorkerI worker)
359   {
360     // Null pointer check might be needed
361     registered.get(worker).setEnabled(true);
362   }
363
364   @Override
365   public boolean isDisabled(AlignCalcWorkerI worker)
366   {
367     if (registered.containsKey(worker))
368     {
369       return !registered.get(worker).isEnabled();
370     }
371     else
372     {
373       return false;
374     }
375   }
376
377   @Override
378   public boolean isWorking(AlignCalcWorkerI worker)
379   {
380     var manager = registered.get(worker);
381     if (manager == null)
382       manager = oneshot.get(worker);
383     if (manager == null)
384       return false;
385     else
386       return manager.isWorking();
387   }
388
389   @Override
390   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
391   {
392     synchronized (registered)
393     {
394       for (var entry : registered.entrySet())
395         if (entry.getKey().involves(annot) && entry.getValue().isWorking())
396           return true;
397     }
398     synchronized (oneshot)
399     {
400       for (var entry : registered.entrySet())
401         if (entry.getKey().involves(annot) && entry.getValue().isWorking())
402           return true;
403     }
404     return false;
405   }
406
407   @Override
408   public boolean isWorking()
409   {
410     synchronized (registered)
411     {
412       for (var manager : registered.values())
413         if (manager.isWorking())
414           return true;
415     }
416     synchronized (oneshot)
417     {
418       for (var manager : oneshot.values())
419         if (manager.isWorking())
420           return true;
421     }
422     return false;
423   }
424
425   @Override
426   public void startWorker(AlignCalcWorkerI worker)
427   {
428     Objects.requireNonNull(worker);
429     var manager = registered.get(worker);
430     if (manager == null)
431     {
432       Cache.log.warn("Starting unregistered worker " + worker);
433       manager = createManager(worker);
434       oneshot.put(worker, manager);
435     }
436     manager.restart();
437   }
438
439   @Override
440   public void restartWorkers()
441   {
442     synchronized (registered)
443     {
444       for (var manager : registered.values())
445       {
446         manager.restart();
447       }
448     }
449   }
450
451   @Override
452   public void cancelWorker(AlignCalcWorkerI worker)
453   {
454     Objects.requireNonNull(worker);
455     var manager = registered.get(worker);
456     if (manager == null)
457       manager = oneshot.get(worker);
458     if (manager == null)
459     {
460       throw new NoSuchElementException();
461     }
462     manager.cancel();
463   }
464
465   private void notifyQueued(AlignCalcWorkerI worker)
466   {
467     for (AlignCalcListener listener : listeners)
468     {
469       listener.workerQueued(worker);
470     }
471   }
472
473   private void notifyStarted(AlignCalcWorkerI worker)
474   {
475     for (AlignCalcListener listener : listeners)
476     {
477       listener.workerStarted(worker);
478     }
479   }
480
481   private void notifyCompleted(AlignCalcWorkerI worker)
482   {
483     for (AlignCalcListener listener : listeners)
484     {
485       try
486       {
487         listener.workerCompleted(worker);
488       } catch (RuntimeException e)
489       {
490         e.printStackTrace();
491       }
492     }
493   }
494
495   private void notifyCancelled(AlignCalcWorkerI worker)
496   {
497     for (AlignCalcListener listener : listeners)
498     {
499       try
500       {
501         listener.workerCancelled(worker);
502       } catch (RuntimeException e)
503       {
504         e.printStackTrace();
505       }
506     }
507   }
508
509   private void notifyExceptional(AlignCalcWorkerI worker,
510           Throwable throwable)
511   {
512     for (AlignCalcListener listener : listeners)
513     {
514       try
515       {
516         listener.workerExceptional(worker, throwable);
517       } catch (RuntimeException e)
518       {
519         e.printStackTrace();
520       }
521     }
522   }
523
524   @Override
525   public void addAlignCalcListener(AlignCalcListener listener)
526   {
527     listeners.add(listener);
528   }
529
530   @Override
531   public void removeAlignCalcListener(AlignCalcListener listener)
532   {
533     listeners.remove(listener);
534   }
535
536   @Override
537   public void shutdown()
538   {
539     executor.shutdownNow();
540     listeners.clear();
541     registered.clear();
542   }
543
544 }