WIP
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.HashMap;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.WeakHashMap;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.TimeUnit;
14
15 import static java.lang.String.format;
16 import static java.util.Collections.synchronizedMap;
17 import static java.util.Collections.unmodifiableList;
18
19 import java.util.ArrayList;
20
21 import jalview.api.AlignCalcListener;
22 import jalview.api.AlignCalcManagerI2;
23 import jalview.api.AlignCalcWorkerI;
24 import jalview.api.PollableAlignCalcWorkerI;
25 import jalview.bin.Cache;
26 import jalview.datamodel.AlignmentAnnotation;
27
28 public class AlignCalcManager2 implements AlignCalcManagerI2
29 {
30   private abstract class WorkerManager
31   {
32     protected volatile boolean enabled = true;
33
34     protected AlignCalcWorkerI worker;
35
36     WorkerManager(AlignCalcWorkerI worker)
37     {
38       this.worker = worker;
39     }
40
41     protected AlignCalcWorkerI getWorker()
42     {
43       return worker;
44     }
45
46     boolean isEnabled()
47     {
48       return enabled;
49     }
50
51     void setEnabled(boolean enabled)
52     {
53       this.enabled = enabled;
54     }
55
56     synchronized void restart()
57     {
58       if (!isEnabled())
59       {
60         return;
61       }
62       if (!isRegistered())
63       {
64         setEnabled(false);
65       }
66       if (isWorking())
67       {
68         cancel();
69       }
70       submit();
71     }
72
73     protected boolean isRegistered()
74     {
75       return registered.containsKey(getWorker());
76     }
77
78     abstract boolean isWorking();
79
80     protected abstract void submit();
81
82     abstract void cancel();
83   }
84
85   private class SimpleWorkerManager extends WorkerManager
86   {
87     private Future<?> task = null;
88
89     SimpleWorkerManager(AlignCalcWorkerI worker)
90     {
91       super(worker);
92     }
93
94     @Override
95     boolean isWorking()
96     {
97       return task != null && !task.isDone();
98     }
99
100     @Override
101     protected void submit()
102     {
103       if (task != null && !(task.isDone() || task.isCancelled()))
104       {
105         throw new IllegalStateException(
106                 "Cannot submit new task if the prevoius one is still running");
107       }
108       Cache.log.debug(
109               format("Worker %s queued", getWorker().getClass().getName()));
110       task = executor.submit(() -> {
111         try
112         {
113           Cache.log.debug(format("Worker %s started", getWorker()));
114           getWorker().run();
115           Cache.log.debug(format("Worker %s finished", getWorker()));
116         } catch (InterruptedException e)
117         {
118           Cache.log.debug(format("Worker %s interrupted", getWorker()));
119         } catch (Throwable th)
120         {
121           Cache.log.debug(format("Worker %s failed", getWorker()), th);
122         } finally
123         {
124           if (!isRegistered())
125           {
126             // delete worker reference so garbage collector can remove it
127             worker = null;
128           }
129         }
130       });
131     }
132
133     @Override
134     synchronized void cancel()
135     {
136       if (!isWorking())
137       {
138         return;
139       }
140       Cache.log.debug(format("Cancelling worker %s", getWorker()));
141       task.cancel(true);
142     }
143   }
144
145   private class PollableWorkerManager extends WorkerManager
146   {
147     private Future<?> task = null;
148
149     PollableWorkerManager(PollableAlignCalcWorkerI worker)
150     {
151       super(worker);
152     }
153
154     @Override
155     protected PollableAlignCalcWorkerI getWorker()
156     {
157       return (PollableAlignCalcWorkerI) super.getWorker();
158     }
159
160     @Override
161     boolean isWorking()
162     {
163       return task != null && !task.isDone();
164     }
165
166     protected void submit()
167     {
168       if (task != null && !(task.isDone() || task.isCancelled()))
169       {
170         throw new IllegalStateException(
171                 "Cannot submit new task if the prevoius one is still running");
172       }
173       Cache.log.debug(
174               format("Worker %s queued", getWorker()));
175       final var runnable = new Runnable()
176       {
177         private boolean started = false;
178
179         private boolean completed = false;
180
181         Future<?> future = null;
182
183         @Override
184         public void run()
185         {
186           try
187           {
188             if (!started)
189             {
190               Cache.log.debug(format("Worker %s started", getWorker()));
191               getWorker().startUp();
192               started = true;
193             }
194             else if (!completed)
195             {
196               Cache.log.debug(format("Polling worker %s", getWorker()));
197               if (getWorker().poll())
198               {
199                 Cache.log.debug(format("Worker %s finished", getWorker()));
200                 completed = true;
201               }
202             }
203           } catch (Throwable th)
204           {
205             Cache.log.debug(format("Worker %s failed", getWorker()), th);
206             completed = true;
207           }
208           if (completed)
209           {
210             final var worker = getWorker();
211             if (!isRegistered())
212               PollableWorkerManager.super.worker = null;
213             Cache.log.debug(format("Finalizing completed worker %s", worker));
214             worker.done();
215             // almost impossible, but the future may be null at this point
216             // let it throw NPE to cancel forcefully
217             future.cancel(false);
218           }
219         }
220       };
221       runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10,
222               1000, TimeUnit.MILLISECONDS);
223     }
224
225     synchronized protected void cancel()
226     {
227       if (!isWorking())
228       {
229         return;
230       }
231       Cache.log.debug(format("Cancelling worker %s", getWorker()));
232       task.cancel(false);
233       executor.submit(() -> {
234         final var worker = getWorker();
235         if (!isRegistered())
236           PollableWorkerManager.super.worker = null;
237         if (worker != null)
238         {
239           worker.cancel();
240           Cache.log.debug(format("Finalizing cancelled worker %s", worker));
241           worker.done();
242         }
243       });
244     }
245   }
246
247   private final ScheduledExecutorService executor = Executors
248           .newSingleThreadScheduledExecutor();
249
250   private final Map<AlignCalcWorkerI, WorkerManager> registered = synchronizedMap(
251           new HashMap<>());
252
253   private final Map<AlignCalcWorkerI, WorkerManager> oneshot = synchronizedMap(
254           new WeakHashMap<>());
255
256   private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<>();
257
258   private WorkerManager createManager(AlignCalcWorkerI worker)
259   {
260     if (worker instanceof PollableAlignCalcWorkerI)
261     {
262       return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
263     }
264     else
265     {
266       return new SimpleWorkerManager(worker);
267     }
268   }
269
270   @Override
271   public void registerWorker(AlignCalcWorkerI worker)
272   {
273     Objects.requireNonNull(worker);
274     WorkerManager manager = createManager(worker);
275     registered.putIfAbsent(worker, manager);
276     startWorker(worker);
277   }
278
279   @Override
280   public List<AlignCalcWorkerI> getWorkers()
281   {
282     List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
283     result.addAll(registered.keySet());
284     return result;
285   }
286
287   @Override
288   public List<AlignCalcWorkerI> getWorkersOfClass(
289           Class<? extends AlignCalcWorkerI> cls)
290   {
291     List<AlignCalcWorkerI> collected = new ArrayList<>();
292     for (var worker : getWorkers())
293     {
294       if (worker.getClass().equals(cls))
295       {
296         collected.add(worker);
297       }
298     }
299     return unmodifiableList(collected);
300   }
301
302   @Override
303   public void removeWorker(AlignCalcWorkerI worker)
304   {
305     if (worker.isDeletable())
306     {
307       registered.remove(worker);
308     }
309   }
310
311   @Override
312   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
313   {
314     synchronized (registered)
315     {
316       for (var worker : getWorkers())
317       {
318         if (worker.involves(annot))
319         {
320           removeWorker(worker);
321         }
322       }
323     }
324   }
325
326   @Override
327   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
328   {
329     synchronized (registered)
330     {
331       for (var worker : getWorkers())
332       {
333         if (worker.getClass().equals(cls))
334         {
335           removeWorker(worker);
336         }
337       }
338     }
339   }
340
341   @Override
342   public void disableWorker(AlignCalcWorkerI worker)
343   {
344     // Null pointer check might be needed
345     registered.get(worker).setEnabled(false);
346   }
347
348   @Override
349   public void enableWorker(AlignCalcWorkerI worker)
350   {
351     // Null pointer check might be needed
352     registered.get(worker).setEnabled(true);
353   }
354
355   @Override
356   public boolean isDisabled(AlignCalcWorkerI worker)
357   {
358     if (registered.containsKey(worker))
359     {
360       return !registered.get(worker).isEnabled();
361     }
362     else
363     {
364       return false;
365     }
366   }
367
368   @Override
369   public boolean isWorking(AlignCalcWorkerI worker)
370   {
371     var manager = registered.get(worker);
372     if (manager == null)
373       manager = oneshot.get(worker);
374     if (manager == null)
375       return false;
376     else
377       return manager.isWorking();
378   }
379
380   @Override
381   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
382   {
383     synchronized (registered)
384     {
385       for (var entry : registered.entrySet())
386         if (entry.getKey().involves(annot) && entry.getValue().isWorking())
387           return true;
388     }
389     synchronized (oneshot)
390     {
391       for (var entry : registered.entrySet())
392         if (entry.getKey().involves(annot) && entry.getValue().isWorking())
393           return true;
394     }
395     return false;
396   }
397
398   @Override
399   public boolean isWorking()
400   {
401     synchronized (registered)
402     {
403       for (var manager : registered.values())
404         if (manager.isWorking())
405           return true;
406     }
407     synchronized (oneshot)
408     {
409       for (var manager : oneshot.values())
410         if (manager.isWorking())
411           return true;
412     }
413     return false;
414   }
415
416   @Override
417   public void startWorker(AlignCalcWorkerI worker)
418   {
419     Objects.requireNonNull(worker);
420     var manager = registered.get(worker);
421     if (manager == null)
422     {
423       Cache.log.warn("Starting unregistered worker " + worker);
424       manager = createManager(worker);
425       oneshot.put(worker, manager);
426     }
427     manager.restart();
428   }
429
430   @Override
431   public void restartWorkers()
432   {
433     synchronized (registered)
434     {
435       for (var manager : registered.values())
436       {
437         manager.restart();
438       }
439     }
440   }
441
442   @Override
443   public void cancelWorker(AlignCalcWorkerI worker)
444   {
445     Objects.requireNonNull(worker);
446     var manager = registered.get(worker);
447     if (manager == null)
448       manager = oneshot.get(worker);
449     if (manager == null)
450     {
451       throw new NoSuchElementException();
452     }
453     manager.cancel();
454   }
455
456   private void notifyQueued(AlignCalcWorkerI worker)
457   {
458     for (AlignCalcListener listener : listeners)
459     {
460       listener.workerQueued(worker);
461     }
462   }
463
464   private void notifyStarted(AlignCalcWorkerI worker)
465   {
466     for (AlignCalcListener listener : listeners)
467     {
468       listener.workerStarted(worker);
469     }
470   }
471
472   private void notifyCompleted(AlignCalcWorkerI worker)
473   {
474     for (AlignCalcListener listener : listeners)
475     {
476       try
477       {
478         listener.workerCompleted(worker);
479       } catch (RuntimeException e)
480       {
481         e.printStackTrace();
482       }
483     }
484   }
485
486   private void notifyCancelled(AlignCalcWorkerI worker)
487   {
488     for (AlignCalcListener listener : listeners)
489     {
490       try
491       {
492         listener.workerCancelled(worker);
493       } catch (RuntimeException e)
494       {
495         e.printStackTrace();
496       }
497     }
498   }
499
500   private void notifyExceptional(AlignCalcWorkerI worker,
501           Throwable throwable)
502   {
503     for (AlignCalcListener listener : listeners)
504     {
505       try
506       {
507         listener.workerExceptional(worker, throwable);
508       } catch (RuntimeException e)
509       {
510         e.printStackTrace();
511       }
512     }
513   }
514
515   @Override
516   public void addAlignCalcListener(AlignCalcListener listener)
517   {
518     listeners.add(listener);
519   }
520
521   @Override
522   public void removeAlignCalcListener(AlignCalcListener listener)
523   {
524     listeners.remove(listener);
525   }
526
527   @Override
528   public void shutdown()
529   {
530     executor.shutdownNow();
531     listeners.clear();
532     registered.clear();
533   }
534
535 }