JAL-3690 Allow unregistered workers to be run with CalcManager.
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.HashMap;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.ScheduledExecutorService;
12 import java.util.concurrent.TimeUnit;
13
14 import static java.lang.String.format;
15 import static java.util.Collections.synchronizedMap;
16 import static java.util.Collections.unmodifiableList;
17
18 import java.util.ArrayList;
19
20 import jalview.api.AlignCalcListener;
21 import jalview.api.AlignCalcManagerI2;
22 import jalview.api.AlignCalcWorkerI;
23 import jalview.api.PollableAlignCalcWorkerI;
24 import jalview.bin.Cache;
25 import jalview.datamodel.AlignmentAnnotation;
26
27 public class AlignCalcManager2 implements AlignCalcManagerI2
28 {
29   private abstract class WorkerManager
30   {
31     protected volatile boolean enabled = true;
32             
33     protected final AlignCalcWorkerI worker;
34     
35     WorkerManager(AlignCalcWorkerI worker)
36     {
37       this.worker = worker;
38     }
39     
40     AlignCalcWorkerI getWorker()
41     {
42       return worker;
43     }
44     
45     boolean isEnabled()
46     {
47       return enabled;
48     }
49     
50     void setEnabled(boolean enabled)
51     {
52       this.enabled = enabled;
53     }
54     
55     synchronized void restart()
56     {
57       if (!isEnabled())
58       {
59         return;
60       }
61       if (isWorking())
62       {
63         cancel();
64       }
65       submit();
66     }
67     
68     abstract boolean isWorking();
69     
70     protected abstract void submit();
71     
72     abstract void cancel();
73   }
74   
75   
76   private class SimpleWorkerManager extends WorkerManager
77   {
78     private Future<?> task = null;
79     
80     SimpleWorkerManager(AlignCalcWorkerI worker)
81     {
82       super(worker);
83     }
84     
85     @Override
86     boolean isWorking()
87     {
88       return task != null && !task.isDone();
89     }
90
91     @Override
92     protected void submit()
93     {
94       if (task != null && !(task.isDone() || task.isCancelled()))
95       {
96         throw new IllegalStateException(
97             "Cannot submit new task if the prevoius one is still running");
98       }
99       Cache.log.debug(format("Worker %s queued",
100               worker.getClass().getName()));
101       task = executor.submit(() -> {
102         try
103         {
104           Cache.log.debug(format("Worker %s started",
105                   worker.getClass().getName()));
106           worker.run();
107           Cache.log.debug(format("Worker %s finished",
108                   worker.getClass().getName()));
109         }
110         catch (InterruptedException e)
111         {
112           Cache.log.debug(format("Worker %s interrupted",
113                   worker.getClass().getName()));
114         }
115         catch (Throwable th)
116         {
117           Cache.log.debug(format("Worker %s failed",
118                   worker.getClass().getName()), th);
119         }
120         finally
121         {
122         }
123       });
124     }
125
126     @Override
127     synchronized void cancel()
128     {
129       if (!isWorking())
130       {
131         return;
132       }
133       Cache.log.debug(format("Cancelling worker %s",
134               worker.getClass().getName()));
135       task.cancel(true);
136     }
137   }
138   
139   
140   private class PollableWorkerManager extends WorkerManager
141   {
142     private final PollableAlignCalcWorkerI worker;
143     private Future<?> task = null;
144     
145     PollableWorkerManager(PollableAlignCalcWorkerI worker)
146     {
147       super(worker);
148       this.worker = worker;
149     }
150     
151     @Override
152     boolean isWorking()
153     {
154       return task != null && !task.isDone();
155     }
156     
157     protected void submit()
158     {
159       if (task != null && !(task.isDone() || task.isCancelled()))
160       {
161         throw new IllegalStateException(
162             "Cannot submit new task if the prevoius one is still running");
163       }
164       Cache.log.debug(format("Worker %s queued",
165               worker.getClass().getName()));
166       final var runnable = new Runnable()
167       {
168         private boolean started = false;
169         private boolean completed = false;
170         Future<?> future = null;
171         
172         @Override
173         public void run()
174         {
175           try
176           {
177             if (!started)
178             {
179               Cache.log.debug(format("Worker %s started",
180                       worker.getClass().getName()));
181               worker.startUp();
182               started = true;
183             }
184             else if (!completed)
185             {
186               Cache.log.debug(format("Polling worker %s",
187                       worker.getClass().getName()));
188               if (worker.poll())
189               {
190                 Cache.log.debug(format("Worker %s finished",
191                         worker.getClass().getName()));
192                 completed = true;
193               }
194             }
195           } catch (Throwable th)
196           {
197             Cache.log.debug(format("Worker %s failed",
198                     worker.getClass().getName()), th);
199             completed = true;
200           }
201           if (completed)
202           {
203             Cache.log.debug(format("Finalizing completed worker %s",
204                     worker.getClass().getName()));
205             worker.done();
206             // almost impossible, but the future may be null at this point
207             // let it throw NPE to cancel forcefully
208             future.cancel(false);
209           }
210         }
211       };
212       runnable.future = task = executor.scheduleWithFixedDelay(
213               runnable, 10, 1000, TimeUnit.MILLISECONDS);
214     }
215     
216     synchronized protected void cancel()
217     {
218       if (!isWorking())
219       {
220         return;
221       }
222       Cache.log.debug(format("Cancelling worker %s",
223               worker.getClass().getName()));
224       task.cancel(false);
225       executor.submit(() -> {
226         worker.cancel();
227         Cache.log.debug(format("Finalizing cancelled worker %s",
228                 worker.getClass().getName()));
229         worker.done();
230       });
231     }
232   }
233   
234   
235   private final ScheduledExecutorService executor =
236           Executors.newSingleThreadScheduledExecutor();
237   private final Map<AlignCalcWorkerI, WorkerManager> registered =
238           synchronizedMap(new HashMap<>());
239   
240   private final List<AlignCalcListener> listeners =
241           new CopyOnWriteArrayList<>();
242   
243   private WorkerManager createManager(AlignCalcWorkerI worker) {
244     if (worker instanceof PollableAlignCalcWorkerI)
245       return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
246     else
247       return new SimpleWorkerManager(worker);
248   }
249   
250   @Override
251   public void registerWorker(AlignCalcWorkerI worker)
252   {
253     Objects.requireNonNull(worker);
254     WorkerManager manager = createManager(worker);
255     registered.putIfAbsent(worker, manager);
256     startWorker(worker);
257   }
258
259   @Override
260   public List<AlignCalcWorkerI> getWorkers()
261   {
262     List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
263     result.addAll(registered.keySet());
264     return result;
265   }
266
267   @Override
268   public List<AlignCalcWorkerI> getWorkersOfClass(
269           Class<? extends AlignCalcWorkerI> cls)
270   {
271     List<AlignCalcWorkerI> collected = new ArrayList<>();
272     for (var worker : getWorkers())
273     {
274       if (worker.getClass().equals(cls))
275       {
276         collected.add(worker);
277       }
278     }
279     return unmodifiableList(collected);
280   }
281
282   @Override
283   public void removeWorker(AlignCalcWorkerI worker)
284   {
285     registered.remove(worker);
286   }
287
288   @Override
289   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
290   {
291     synchronized (registered)
292     {
293       for (var worker : getWorkers())
294       {
295         if (worker.involves(annot) && worker.isDeletable())
296         {
297           removeWorker(worker);
298         }
299       }
300     }
301   }
302
303   @Override
304   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
305   {
306     synchronized (registered)
307     {
308       for (var worker : getWorkers())
309       {
310         if (worker.getClass().equals(cls))
311         {
312           removeWorker(worker);
313         }
314       }
315     }
316   }
317
318   @Override
319   public void disableWorker(AlignCalcWorkerI worker)
320   {
321     // Null pointer check might be needed
322     registered.get(worker).setEnabled(false);
323   }
324
325   @Override
326   public void enableWorker(AlignCalcWorkerI worker)
327   {
328     // Null pointer check might be needed
329     registered.get(worker).setEnabled(true);
330   }
331
332   @Override
333   public boolean isDisabled(AlignCalcWorkerI worker)
334   {
335     if (registered.containsKey(worker))
336     {
337       return !registered.get(worker).isEnabled();
338     }
339     else
340     {
341       return false;
342     }
343   }
344
345   @Override
346   public boolean isWorking(AlignCalcWorkerI worker)
347   {
348     if (!registered.containsKey(worker))
349     {
350       return false;
351     }
352     else
353     {
354       return registered.get(worker).isWorking();
355     }
356   }
357
358   @Override
359   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
360   {
361     synchronized (registered)
362     {
363       for (var entry : registered.entrySet())
364       {
365         if (entry.getKey().involves(annot) &&
366                 entry.getValue().isWorking())
367         {
368           return true;
369         }
370       }
371     }
372     return false;
373   }
374
375   @Override
376   public boolean isWorking()
377   {
378     synchronized (registered)
379     {
380       for (var manager : registered.values())
381       {
382         if (manager.isWorking())
383         {
384           return true;
385         }
386       }
387     }
388     return false;
389   }
390
391   @Override
392   public void startWorker(AlignCalcWorkerI worker)
393   {
394     Objects.requireNonNull(worker);
395     var manager = registered.get(worker);
396     if (manager == null) 
397     {
398       Cache.log.warn("Starting unregistered worker " + worker);
399       manager = createManager(worker);
400     }
401     manager.restart();
402   }
403
404   @Override
405   public void restartWorkers()
406   {
407     synchronized (registered)
408     {
409       for (var manager : registered.values())
410       {
411         manager.restart();
412       }
413     }
414   }
415
416   @Override
417   public void cancelWorker(AlignCalcWorkerI worker)
418   {    
419     Objects.requireNonNull(worker);
420     var manager = registered.get(worker);
421     if (manager == null) 
422     {
423       throw new NoSuchElementException();
424     }
425     manager.cancel();
426   }
427   
428   private void notifyQueued(AlignCalcWorkerI worker)
429   {
430     for (AlignCalcListener listener : listeners)
431     {
432       listener.workerQueued(worker);
433     }
434   }
435
436   private void notifyStarted(AlignCalcWorkerI worker)
437   {
438     for (AlignCalcListener listener : listeners)
439     {
440       listener.workerStarted(worker);
441     }
442   }
443
444   private void notifyCompleted(AlignCalcWorkerI worker)
445   {
446     for (AlignCalcListener listener : listeners)
447     {
448       try
449       {
450         listener.workerCompleted(worker);
451       } catch (RuntimeException e)
452       {
453         e.printStackTrace();
454       }
455     }
456   }
457
458   private void notifyCancelled(AlignCalcWorkerI worker)
459   {
460     for (AlignCalcListener listener : listeners)
461     {
462       try
463       {
464         listener.workerCancelled(worker);
465       } catch (RuntimeException e)
466       {
467         e.printStackTrace();
468       }
469     }
470   }
471
472   private void notifyExceptional(AlignCalcWorkerI worker,
473           Throwable throwable)
474   {
475     for (AlignCalcListener listener : listeners)
476     {
477       try
478       {
479         listener.workerExceptional(worker, throwable);
480       } catch (RuntimeException e)
481       {
482         e.printStackTrace();
483       }
484     }
485   }
486
487   @Override
488   public void addAlignCalcListener(AlignCalcListener listener)
489   {
490     listeners.add(listener);
491   }
492
493   @Override
494   public void removeAlignCalcListener(AlignCalcListener listener)
495   {
496     listeners.remove(listener);
497   }
498
499   @Override
500   public void shutdown()
501   {
502     executor.shutdownNow();
503     listeners.clear();
504     registered.clear();
505   }
506
507 }