JAL-3690 - fixed state inconsistency when restarting the calcworker
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.HashMap;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.ScheduledExecutorService;
12 import java.util.concurrent.TimeUnit;
13
14 import static java.lang.String.format;
15 import static java.util.Collections.synchronizedMap;
16 import static java.util.Collections.unmodifiableList;
17
18 import java.util.ArrayList;
19
20 import jalview.api.AlignCalcListener;
21 import jalview.api.AlignCalcManagerI2;
22 import jalview.api.AlignCalcWorkerI;
23 import jalview.api.PollableAlignCalcWorkerI;
24 import jalview.bin.Cache;
25 import jalview.datamodel.AlignmentAnnotation;
26
27 public class AlignCalcManager2 implements AlignCalcManagerI2
28 {
29   private abstract class WorkerManager
30   {
31     protected volatile boolean enabled = true;
32             
33     protected final AlignCalcWorkerI worker;
34     
35     WorkerManager(AlignCalcWorkerI worker)
36     {
37       this.worker = worker;
38     }
39     
40     AlignCalcWorkerI getWorker()
41     {
42       return worker;
43     }
44     
45     boolean isEnabled()
46     {
47       return enabled;
48     }
49     
50     void setEnabled(boolean enabled)
51     {
52       this.enabled = enabled;
53     }
54     
55     synchronized void restart()
56     {
57       if (!isEnabled())
58       {
59         return;
60       }
61       if (isWorking())
62       {
63         cancel();
64       }
65       submit();
66     }
67     
68     abstract boolean isWorking();
69     
70     protected abstract void submit();
71     
72     abstract void cancel();  
73   }
74   
75   
76   private class SimpleWorkerManager extends WorkerManager
77   {
78     private Future<?> task = null;
79     
80     SimpleWorkerManager(AlignCalcWorkerI worker)
81     {
82       super(worker);
83     }
84     
85     @Override
86     boolean isWorking()
87     {
88       return task != null && !task.isDone();
89     }
90
91     @Override
92     protected void submit()
93     {
94       if (task != null && !(task.isDone() || task.isCancelled()))
95       {
96         throw new IllegalStateException(
97             "Cannot submit new task if the prevoius one is still running");
98       }
99       Cache.log.debug(format("Worker %s queued",
100               worker.getClass().getName()));
101       task = executor.submit(() -> {
102         try
103         {
104           Cache.log.debug(format("Worker %s started",
105                   worker.getClass().getName()));
106           worker.run();
107           Cache.log.debug(format("Worker %s finished",
108                   worker.getClass().getName()));
109         }
110         catch (InterruptedException e)
111         {
112           Cache.log.debug(format("Worker %s interrupted",
113                   worker.getClass().getName()));
114         }
115         catch (Throwable th)
116         {
117           Cache.log.debug(format("Worker %s failed",
118                   worker.getClass().getName()), th);
119         }
120         finally
121         {
122         }
123       });
124     }
125
126     @Override
127     synchronized void cancel()
128     {
129       if (!isWorking())
130       {
131         return;
132       }
133       Cache.log.debug(format("Cancelling worker %s",
134               worker.getClass().getName()));
135       task.cancel(true);
136     }
137   }
138   
139   
140   private class PollableWorkerManager extends WorkerManager
141   {
142     private final PollableAlignCalcWorkerI worker;
143     private Future<?> task = null;
144     
145     PollableWorkerManager(PollableAlignCalcWorkerI worker)
146     {
147       super(worker);
148       this.worker = worker;
149     }
150     
151     @Override
152     boolean isWorking()
153     {
154       return task != null && !task.isDone();
155     }
156     
157     protected void submit()
158     {
159       if (task != null && !(task.isDone() || task.isCancelled()))
160       {
161         throw new IllegalStateException(
162             "Cannot submit new task if the prevoius one is still running");
163       }
164       Cache.log.debug(format("Worker %s queued",
165               worker.getClass().getName()));
166       final var runnable = new Runnable()
167       {
168         private boolean started = false;
169         private boolean completed = false;
170         Future<?> future = null;
171         
172         @Override
173         public void run()
174         {
175           try
176           {
177             if (!started)
178             {
179               Cache.log.debug(format("Worker %s started",
180                       worker.getClass().getName()));
181               worker.startUp();
182               started = true;
183             }
184             else if (!completed)
185             {
186               if (worker.poll())
187               {
188                 Cache.log.debug(format("Worker %s finished",
189                         worker.getClass().getName()));
190                 completed = true;
191               }
192             }
193           } catch (Throwable th)
194           {
195             Cache.log.debug(format("Worker %s failed",
196                     worker.getClass().getName()), th);
197             completed = true;
198           }
199           if (completed)
200           {
201             try
202             {
203               future.cancel(false);
204             }
205             catch (NullPointerException ignored)
206             {
207               // extremely unlikely to happen
208             }
209           }
210         }
211       };
212       runnable.future = task = executor.scheduleWithFixedDelay(
213               runnable, 10, 1000, TimeUnit.MILLISECONDS);
214     }
215     
216     synchronized protected void cancel()
217     {
218       if (!isWorking())
219       {
220         return;
221       }
222       Cache.log.debug(format("Cancelling worker %s",
223               worker.getClass().getName()));
224       task.cancel(false);
225       executor.submit(() -> {
226         worker.cancel();
227       });
228     }
229   }
230   
231   
232   private final ScheduledExecutorService executor =
233           Executors.newSingleThreadScheduledExecutor();
234   private final Map<AlignCalcWorkerI, WorkerManager> registered =
235           synchronizedMap(new HashMap<>());
236   
237   private final List<AlignCalcListener> listeners =
238           new CopyOnWriteArrayList<>();
239   
240   
241   @Override
242   public void registerWorker(AlignCalcWorkerI worker)
243   {
244     Objects.requireNonNull(worker);
245     WorkerManager manager = (worker instanceof PollableAlignCalcWorkerI) ?
246             new PollableWorkerManager((PollableAlignCalcWorkerI) worker) : 
247               new SimpleWorkerManager(worker);
248     registered.putIfAbsent(worker, manager);
249     startWorker(worker);
250   }
251
252   @Override
253   public List<AlignCalcWorkerI> getWorkers()
254   {
255     return List.copyOf(registered.keySet());
256   }
257
258   @Override
259   public List<AlignCalcWorkerI> getWorkersOfClass(
260           Class<? extends AlignCalcWorkerI> cls)
261   {
262     List<AlignCalcWorkerI> collected = new ArrayList<>();
263     for (var worker : getWorkers())
264     {
265       if (worker.getClass().equals(cls))
266       {
267         collected.add(worker);
268       }
269     }
270     return unmodifiableList(collected);
271   }
272
273   @Override
274   public void removeWorker(AlignCalcWorkerI worker)
275   {
276     registered.remove(worker);
277   }
278
279   @Override
280   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
281   {
282     synchronized (registered)
283     {
284       for (var worker : getWorkers())
285       {
286         if (worker.involves(annot) && worker.isDeletable())
287         {
288           removeWorker(worker);
289         }
290       }
291     }
292   }
293
294   @Override
295   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
296   {
297     synchronized (registered)
298     {
299       for (var worker : getWorkers())
300       {
301         if (worker.getClass().equals(cls))
302         {
303           removeWorker(worker);
304         }
305       }
306     }
307   }
308
309   @Override
310   public void disableWorker(AlignCalcWorkerI worker)
311   {
312     // Null pointer check might be needed
313     registered.get(worker).setEnabled(false);
314   }
315
316   @Override
317   public void enableWorker(AlignCalcWorkerI worker)
318   {
319     // Null pointer check might be needed
320     registered.get(worker).setEnabled(true);
321   }
322
323   @Override
324   public boolean isDisabled(AlignCalcWorkerI worker)
325   {
326     if (registered.containsKey(worker))
327     {
328       return !registered.get(worker).isEnabled();
329     }
330     else
331     {
332       return false;
333     }
334   }
335
336   @Override
337   public boolean isWorking(AlignCalcWorkerI worker)
338   {
339     if (!registered.containsKey(worker))
340     {
341       return false;
342     }
343     else
344     {
345       return registered.get(worker).isWorking();
346     }
347   }
348
349   @Override
350   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
351   {
352     synchronized (registered)
353     {
354       for (var entry : registered.entrySet())
355       {
356         if (entry.getKey().involves(annot) &&
357                 entry.getValue().isWorking())
358         {
359           return true;
360         }
361       }
362     }
363     return false;
364   }
365
366   @Override
367   public boolean isWorking()
368   {
369     synchronized (registered)
370     {
371       for (var manager : registered.values())
372       {
373         if (manager.isWorking())
374         {
375           return true;
376         }
377       }
378     }
379     return false;
380   }
381
382   @Override
383   public void startWorker(AlignCalcWorkerI worker)
384   {
385     Objects.requireNonNull(worker);
386     var manager = registered.get(worker);
387     if (manager == null) 
388     {
389       throw new NoSuchElementException();
390     }
391     manager.restart();
392   }
393
394   @Override
395   public void restartWorkers()
396   {
397     synchronized (registered)
398     {
399       for (var manager : registered.values())
400       {
401         manager.restart();
402       }
403     }
404   }
405
406   @Override
407   public void cancelWorker(AlignCalcWorkerI worker)
408   {    
409     Objects.requireNonNull(worker);
410     var manager = registered.get(worker);
411     if (manager == null) 
412     {
413       throw new NoSuchElementException();
414     }
415     manager.cancel();
416   }
417   
418   private void notifyQueued(AlignCalcWorkerI worker)
419   {
420     for (AlignCalcListener listener : listeners)
421     {
422       listener.workerQueued(worker);
423     }
424   }
425
426   private void notifyStarted(AlignCalcWorkerI worker)
427   {
428     for (AlignCalcListener listener : listeners)
429     {
430       listener.workerStarted(worker);
431     }
432   }
433
434   private void notifyCompleted(AlignCalcWorkerI worker)
435   {
436     for (AlignCalcListener listener : listeners)
437     {
438       try
439       {
440         listener.workerCompleted(worker);
441       } catch (RuntimeException e)
442       {
443         e.printStackTrace();
444       }
445     }
446   }
447
448   private void notifyCancelled(AlignCalcWorkerI worker)
449   {
450     for (AlignCalcListener listener : listeners)
451     {
452       try
453       {
454         listener.workerCancelled(worker);
455       } catch (RuntimeException e)
456       {
457         e.printStackTrace();
458       }
459     }
460   }
461
462   private void notifyExceptional(AlignCalcWorkerI worker,
463           Throwable throwable)
464   {
465     for (AlignCalcListener listener : listeners)
466     {
467       try
468       {
469         listener.workerExceptional(worker, throwable);
470       } catch (RuntimeException e)
471       {
472         e.printStackTrace();
473       }
474     }
475   }
476
477   @Override
478   public void addAlignCalcListener(AlignCalcListener listener)
479   {
480     listeners.add(listener);
481   }
482
483   @Override
484   public void removeAlignCalcListener(AlignCalcListener listener)
485   {
486     listeners.remove(listener);
487   }
488
489   @Override
490   public void shutdown()
491   {
492     executor.shutdownNow();
493     listeners.clear();
494     registered.clear();
495   }
496
497 }