JAL-3690 separate startup and poll code in SeqAnnotationCalcWorker
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.HashMap;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.ScheduledExecutorService;
12 import java.util.concurrent.TimeUnit;
13
14 import static java.lang.String.format;
15 import static java.util.Collections.synchronizedMap;
16 import static java.util.Collections.unmodifiableList;
17
18 import java.util.ArrayList;
19
20 import jalview.api.AlignCalcListener;
21 import jalview.api.AlignCalcManagerI2;
22 import jalview.api.AlignCalcWorkerI;
23 import jalview.api.PollableAlignCalcWorkerI;
24 import jalview.bin.Cache;
25 import jalview.datamodel.AlignmentAnnotation;
26
27 public class AlignCalcManager2 implements AlignCalcManagerI2
28 {
29   private abstract class WorkerManager
30   {
31     protected volatile boolean enabled = true;
32             
33     protected final AlignCalcWorkerI worker;
34     
35     WorkerManager(AlignCalcWorkerI worker)
36     {
37       this.worker = worker;
38     }
39     
40     AlignCalcWorkerI getWorker()
41     {
42       return worker;
43     }
44     
45     boolean isEnabled()
46     {
47       return enabled;
48     }
49     
50     void setEnabled(boolean enabled)
51     {
52       this.enabled = enabled;
53     }
54     
55     synchronized void restart()
56     {
57       if (!isEnabled())
58       {
59         return;
60       }
61       if (isWorking())
62       {
63         cancel();
64       }
65       submit();
66     }
67     
68     abstract boolean isWorking();
69     
70     protected abstract void submit();
71     
72     abstract void cancel();  
73   }
74   
75   
76   private class SimpleWorkerManager extends WorkerManager
77   {
78     private Future<?> task = null;
79     
80     SimpleWorkerManager(AlignCalcWorkerI worker)
81     {
82       super(worker);
83     }
84     
85     @Override
86     boolean isWorking()
87     {
88       return task != null && !task.isDone();
89     }
90
91     @Override
92     protected void submit()
93     {
94       if (task != null && !(task.isDone() || task.isCancelled()))
95       {
96         throw new IllegalStateException(
97             "Cannot submit new task if the prevoius one is still running");
98       }
99       Cache.log.debug(format("Worker %s queued",
100               worker.getClass().getName()));
101       task = executor.submit(() -> {
102         try
103         {
104           Cache.log.debug(format("Worker %s started",
105                   worker.getClass().getName()));
106           worker.run();
107           Cache.log.debug(format("Worker %s finished",
108                   worker.getClass().getName()));
109         }
110         catch (InterruptedException e)
111         {
112           Cache.log.debug(format("Worker %s interrupted",
113                   worker.getClass().getName()));
114         }
115         catch (Throwable th)
116         {
117           Cache.log.debug(format("Worker %s failed",
118                   worker.getClass().getName()), th);
119         }
120         finally
121         {
122         }
123       });
124     }
125
126     @Override
127     synchronized void cancel()
128     {
129       if (!isWorking())
130       {
131         return;
132       }
133       Cache.log.debug(format("Cancelling worker %s",
134               worker.getClass().getName()));
135       task.cancel(true);
136     }
137   }
138   
139   
140   private class PollableWorkerManager extends WorkerManager
141   {
142     private final PollableAlignCalcWorkerI worker;
143     private Future<?> task = null;
144     
145     PollableWorkerManager(PollableAlignCalcWorkerI worker)
146     {
147       super(worker);
148       this.worker = worker;
149     }
150     
151     @Override
152     boolean isWorking()
153     {
154       return task != null && !task.isDone();
155     }
156     
157     protected void submit()
158     {
159       if (task != null && !(task.isDone() || task.isCancelled()))
160       {
161         throw new IllegalStateException(
162             "Cannot submit new task if the prevoius one is still running");
163       }
164       Cache.log.debug(format("Worker %s queued",
165               worker.getClass().getName()));
166       final var runnable = new Runnable()
167       {
168         private boolean started = false;
169         private boolean completed = false;
170         Future<?> future = null;
171         
172         @Override
173         public void run()
174         {
175           try
176           {
177             if (!started)
178             {
179               Cache.log.debug(format("Worker %s started",
180                       worker.getClass().getName()));
181               worker.startUp();
182               started = true;
183             }
184             else if (!completed)
185             {
186               Cache.log.debug(format("Polling worker %s",
187                       worker.getClass().getName()));
188               if (worker.poll())
189               {
190                 Cache.log.debug(format("Worker %s finished",
191                         worker.getClass().getName()));
192                 completed = true;
193               }
194             }
195           } catch (Throwable th)
196           {
197             Cache.log.debug(format("Worker %s failed",
198                     worker.getClass().getName()), th);
199             completed = true;
200           }
201           if (completed)
202           {
203             Cache.log.debug(format("Finalizing completed worker %s",
204                     worker.getClass().getName()));
205             worker.done();
206             // almost impossible, but the future may be null at this point
207             // let it throw NPE to cancel forcefully
208             future.cancel(false);
209           }
210         }
211       };
212       runnable.future = task = executor.scheduleWithFixedDelay(
213               runnable, 10, 1000, TimeUnit.MILLISECONDS);
214     }
215     
216     synchronized protected void cancel()
217     {
218       if (!isWorking())
219       {
220         return;
221       }
222       Cache.log.debug(format("Cancelling worker %s",
223               worker.getClass().getName()));
224       task.cancel(false);
225       executor.submit(() -> {
226         worker.cancel();
227         Cache.log.debug(format("Finalizing cancelled worker %s",
228                 worker.getClass().getName()));
229         worker.done();
230       });
231     }
232   }
233   
234   
235   private final ScheduledExecutorService executor =
236           Executors.newSingleThreadScheduledExecutor();
237   private final Map<AlignCalcWorkerI, WorkerManager> registered =
238           synchronizedMap(new HashMap<>());
239   
240   private final List<AlignCalcListener> listeners =
241           new CopyOnWriteArrayList<>();
242   
243   
244   @Override
245   public void registerWorker(AlignCalcWorkerI worker)
246   {
247     Objects.requireNonNull(worker);
248     WorkerManager manager = (worker instanceof PollableAlignCalcWorkerI) ?
249             new PollableWorkerManager((PollableAlignCalcWorkerI) worker) : 
250               new SimpleWorkerManager(worker);
251     registered.putIfAbsent(worker, manager);
252     startWorker(worker);
253   }
254
255   @Override
256   public List<AlignCalcWorkerI> getWorkers()
257   {
258     return List.copyOf(registered.keySet());
259   }
260
261   @Override
262   public List<AlignCalcWorkerI> getWorkersOfClass(
263           Class<? extends AlignCalcWorkerI> cls)
264   {
265     List<AlignCalcWorkerI> collected = new ArrayList<>();
266     for (var worker : getWorkers())
267     {
268       if (worker.getClass().equals(cls))
269       {
270         collected.add(worker);
271       }
272     }
273     return unmodifiableList(collected);
274   }
275
276   @Override
277   public void removeWorker(AlignCalcWorkerI worker)
278   {
279     registered.remove(worker);
280   }
281
282   @Override
283   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
284   {
285     synchronized (registered)
286     {
287       for (var worker : getWorkers())
288       {
289         if (worker.involves(annot) && worker.isDeletable())
290         {
291           removeWorker(worker);
292         }
293       }
294     }
295   }
296
297   @Override
298   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
299   {
300     synchronized (registered)
301     {
302       for (var worker : getWorkers())
303       {
304         if (worker.getClass().equals(cls))
305         {
306           removeWorker(worker);
307         }
308       }
309     }
310   }
311
312   @Override
313   public void disableWorker(AlignCalcWorkerI worker)
314   {
315     // Null pointer check might be needed
316     registered.get(worker).setEnabled(false);
317   }
318
319   @Override
320   public void enableWorker(AlignCalcWorkerI worker)
321   {
322     // Null pointer check might be needed
323     registered.get(worker).setEnabled(true);
324   }
325
326   @Override
327   public boolean isDisabled(AlignCalcWorkerI worker)
328   {
329     if (registered.containsKey(worker))
330     {
331       return !registered.get(worker).isEnabled();
332     }
333     else
334     {
335       return false;
336     }
337   }
338
339   @Override
340   public boolean isWorking(AlignCalcWorkerI worker)
341   {
342     if (!registered.containsKey(worker))
343     {
344       return false;
345     }
346     else
347     {
348       return registered.get(worker).isWorking();
349     }
350   }
351
352   @Override
353   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
354   {
355     synchronized (registered)
356     {
357       for (var entry : registered.entrySet())
358       {
359         if (entry.getKey().involves(annot) &&
360                 entry.getValue().isWorking())
361         {
362           return true;
363         }
364       }
365     }
366     return false;
367   }
368
369   @Override
370   public boolean isWorking()
371   {
372     synchronized (registered)
373     {
374       for (var manager : registered.values())
375       {
376         if (manager.isWorking())
377         {
378           return true;
379         }
380       }
381     }
382     return false;
383   }
384
385   @Override
386   public void startWorker(AlignCalcWorkerI worker)
387   {
388     Objects.requireNonNull(worker);
389     var manager = registered.get(worker);
390     if (manager == null) 
391     {
392       throw new NoSuchElementException();
393     }
394     manager.restart();
395   }
396
397   @Override
398   public void restartWorkers()
399   {
400     synchronized (registered)
401     {
402       for (var manager : registered.values())
403       {
404         manager.restart();
405       }
406     }
407   }
408
409   @Override
410   public void cancelWorker(AlignCalcWorkerI worker)
411   {    
412     Objects.requireNonNull(worker);
413     var manager = registered.get(worker);
414     if (manager == null) 
415     {
416       throw new NoSuchElementException();
417     }
418     manager.cancel();
419   }
420   
421   private void notifyQueued(AlignCalcWorkerI worker)
422   {
423     for (AlignCalcListener listener : listeners)
424     {
425       listener.workerQueued(worker);
426     }
427   }
428
429   private void notifyStarted(AlignCalcWorkerI worker)
430   {
431     for (AlignCalcListener listener : listeners)
432     {
433       listener.workerStarted(worker);
434     }
435   }
436
437   private void notifyCompleted(AlignCalcWorkerI worker)
438   {
439     for (AlignCalcListener listener : listeners)
440     {
441       try
442       {
443         listener.workerCompleted(worker);
444       } catch (RuntimeException e)
445       {
446         e.printStackTrace();
447       }
448     }
449   }
450
451   private void notifyCancelled(AlignCalcWorkerI worker)
452   {
453     for (AlignCalcListener listener : listeners)
454     {
455       try
456       {
457         listener.workerCancelled(worker);
458       } catch (RuntimeException e)
459       {
460         e.printStackTrace();
461       }
462     }
463   }
464
465   private void notifyExceptional(AlignCalcWorkerI worker,
466           Throwable throwable)
467   {
468     for (AlignCalcListener listener : listeners)
469     {
470       try
471       {
472         listener.workerExceptional(worker, throwable);
473       } catch (RuntimeException e)
474       {
475         e.printStackTrace();
476       }
477     }
478   }
479
480   @Override
481   public void addAlignCalcListener(AlignCalcListener listener)
482   {
483     listeners.add(listener);
484   }
485
486   @Override
487   public void removeAlignCalcListener(AlignCalcListener listener)
488   {
489     listeners.remove(listener);
490   }
491
492   @Override
493   public void shutdown()
494   {
495     executor.shutdownNow();
496     listeners.clear();
497     registered.clear();
498   }
499
500 }