JAL-4199 Restore deletable test when removing workers
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.HashMap;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.WeakHashMap;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.TimeUnit;
14
15 import static java.lang.String.format;
16 import static java.util.Collections.synchronizedMap;
17 import static java.util.Collections.unmodifiableList;
18
19 import java.util.ArrayList;
20
21 import jalview.api.AlignCalcListener;
22 import jalview.api.AlignCalcManagerI2;
23 import jalview.api.AlignCalcWorkerI;
24 import jalview.api.PollableAlignCalcWorkerI;
25 import jalview.bin.Cache;
26 import jalview.bin.Console;
27 import jalview.datamodel.AlignmentAnnotation;
28
29 public class AlignCalcManager2 implements AlignCalcManagerI2
30 {
31   private abstract class WorkerManager
32   {
33     protected volatile boolean enabled = true;
34
35     protected AlignCalcWorkerI worker;
36
37     WorkerManager(AlignCalcWorkerI worker)
38     {
39       this.worker = worker;
40     }
41
42     protected AlignCalcWorkerI getWorker()
43     {
44       return worker;
45     }
46
47     boolean isEnabled()
48     {
49       return enabled;
50     }
51
52     void setEnabled(boolean enabled)
53     {
54       this.enabled = enabled;
55     }
56
57     synchronized void restart()
58     {
59       if (!isEnabled())
60       {
61         return;
62       }
63       if (!isRegistered())
64       {
65         setEnabled(false);
66       }
67       if (isWorking())
68       {
69         cancel();
70       }
71       submit();
72     }
73
74     protected boolean isRegistered()
75     {
76       return registered.containsKey(getWorker());
77     }
78
79     abstract boolean isWorking();
80
81     protected abstract void submit();
82
83     abstract void cancel();
84   }
85
86   private class SimpleWorkerManager extends WorkerManager
87   {
88     private Future<?> task = null;
89
90     SimpleWorkerManager(AlignCalcWorkerI worker)
91     {
92       super(worker);
93     }
94
95     @Override
96     boolean isWorking()
97     {
98       return task != null && !task.isDone();
99     }
100
101     @Override
102     protected void submit()
103     {
104       if (task != null && !(task.isDone() || task.isCancelled()))
105       {
106         throw new IllegalStateException(
107                 "Cannot submit new task if the prevoius one is still running");
108       }
109       Console.debug(
110               format("Worker %s queued", getWorker()));
111       task = executor.submit(() -> {
112         try
113         {
114           Console.debug(format("Worker %s started", getWorker()));
115           getWorker().run();
116           Console.debug(format("Worker %s finished", getWorker()));
117         } catch (InterruptedException e)
118         {
119           Console.debug(format("Worker %s interrupted", getWorker()));
120         } catch (Throwable th)
121         {
122           Console.debug(format("Worker %s failed", getWorker()), th);
123         } finally
124         {
125           if (!isRegistered())
126           {
127             // delete worker reference so garbage collector can remove it
128             worker = null;
129           }
130         }
131       });
132     }
133
134     @Override
135     synchronized void cancel()
136     {
137       if (!isWorking())
138       {
139         return;
140       }
141       Console.debug(format("Cancelling worker %s", getWorker()));
142       task.cancel(true);
143     }
144   }
145
146   private class PollableWorkerManager extends WorkerManager
147   {
148     private Future<?> task = null;
149
150     PollableWorkerManager(PollableAlignCalcWorkerI worker)
151     {
152       super(worker);
153     }
154
155     @Override
156     protected PollableAlignCalcWorkerI getWorker()
157     {
158       return (PollableAlignCalcWorkerI) super.getWorker();
159     }
160
161     @Override
162     boolean isWorking()
163     {
164       return task != null && !task.isDone();
165     }
166
167     protected void submit()
168     {
169       if (task != null && !(task.isDone() || task.isCancelled()))
170       {
171         throw new IllegalStateException(
172                 "Cannot submit new task if the previous one is still running");
173       }
174       Console.debug( format("Worker %s queued", getWorker()));
175       final var runnable = new Runnable()
176       {
177         private boolean started = false;
178
179         private boolean completed = false;
180
181         Future<?> future = null;
182
183         @Override
184         public void run()
185         {
186           try
187           {
188             if (!started)
189             {
190               Console.debug(format("Worker %s started", getWorker()));
191               getWorker().startUp();
192               started = true;
193             }
194             else if (!completed)
195             {
196               Console.debug(format("Polling worker %s", getWorker()));
197               if (getWorker().poll())
198               {
199                 Console.debug(format("Worker %s finished", getWorker()));
200                 completed = true;
201               }
202             }
203           } catch (Throwable th)
204           {
205             Console.debug(format("Worker %s failed", getWorker()), th);
206             completed = true;
207           }
208           if (completed)
209           {
210             final var worker = getWorker();
211             if (!isRegistered())
212               PollableWorkerManager.super.worker = null;
213             Console.debug(format("Finalizing completed worker %s", worker));
214             worker.done();
215             // almost impossible, but the future may be null at this point
216             // let it throw NPE to cancel forcefully
217             future.cancel(false);
218           }
219         }
220       };
221       runnable.future = task = executor.scheduleWithFixedDelay(runnable, 10,
222               1000, TimeUnit.MILLISECONDS);
223     }
224
225     synchronized protected void cancel()
226     {
227       if (!isWorking())
228       {
229         return;
230       }
231       Console.debug(format("Cancelling worker %s", getWorker()));
232       task.cancel(false);
233       executor.submit(() -> {
234         final var worker = getWorker();
235         if (!isRegistered())
236           PollableWorkerManager.super.worker = null;
237         if (worker != null)
238         {
239           worker.cancel();
240           Console.debug(format("Finalizing cancelled worker %s", worker));
241           worker.done();
242         }
243       });
244     }
245   }
246
247   private final ScheduledExecutorService executor = Executors
248           .newSingleThreadScheduledExecutor();
249
250   private final Map<AlignCalcWorkerI, WorkerManager> registered = synchronizedMap(
251           new HashMap<>());
252
253   private final Map<AlignCalcWorkerI, WorkerManager> oneshot = synchronizedMap(
254           new WeakHashMap<>());
255
256   private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<>();
257
258   private WorkerManager createManager(AlignCalcWorkerI worker)
259   {
260     if (worker instanceof PollableAlignCalcWorkerI)
261     {
262       return new PollableWorkerManager((PollableAlignCalcWorkerI) worker);
263     }
264     else
265     {
266       return new SimpleWorkerManager(worker);
267     }
268   }
269
270   @Override
271   public void registerWorker(AlignCalcWorkerI worker)
272   {
273     Objects.requireNonNull(worker);
274     WorkerManager manager = createManager(worker);
275     registered.putIfAbsent(worker, manager);
276     startWorker(worker);
277   }
278
279   @Override
280   public List<AlignCalcWorkerI> getWorkers()
281   {
282     List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
283     result.addAll(registered.keySet());
284     return result;
285   }
286
287   @Override
288   public List<AlignCalcWorkerI> getWorkersOfClass(
289           Class<? extends AlignCalcWorkerI> cls)
290   {
291     List<AlignCalcWorkerI> collected = new ArrayList<>();
292     for (var worker : getWorkers())
293     {
294       if (worker.getClass().equals(cls))
295       {
296         collected.add(worker);
297       }
298     }
299     return unmodifiableList(collected);
300   }
301
302   @Override
303   public void removeWorker(AlignCalcWorkerI worker)
304   {
305     if (worker.isDeletable())
306     {
307       registered.remove(worker);
308     }
309   }
310
311   @Override
312   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
313   {
314     synchronized (registered)
315     {
316       for (var worker : getWorkers())
317       {
318         if (worker.involves(annot))
319         {
320           removeWorker(worker);
321         }
322       }
323     }
324   }
325
326   @Override
327   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
328   {
329     synchronized (registered)
330     {
331       for (var worker : getWorkers())
332       {
333         if (worker.getClass().equals(cls))
334         {
335           removeWorker(worker);
336         }
337       }
338     }
339   }
340
341   @Override
342   public void disableWorker(AlignCalcWorkerI worker)
343   {
344     // Null pointer check might be needed
345     registered.get(worker).setEnabled(false);
346   }
347
348   @Override
349   public void enableWorker(AlignCalcWorkerI worker)
350   {
351     // Null pointer check might be needed
352     registered.get(worker).setEnabled(true);
353   }
354
355   @Override
356   public boolean isDisabled(AlignCalcWorkerI worker)
357   {
358     if (registered.containsKey(worker))
359     {
360       return !registered.get(worker).isEnabled();
361     }
362     else
363     {
364       return false;
365     }
366   }
367
368   @Override
369   public boolean isWorking(AlignCalcWorkerI worker)
370   {
371     var manager = registered.get(worker);
372     if (manager == null)
373       manager = oneshot.get(worker);
374     if (manager == null)
375       return false;
376     else
377       return manager.isWorking();
378   }
379
380   @Override
381   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
382   {
383     synchronized (registered)
384     {
385       for (var entry : registered.entrySet())
386         if (entry.getKey().involves(annot) && entry.getValue().isWorking())
387           return true;
388     }
389     synchronized (oneshot)
390     {
391       for (var entry : registered.entrySet())
392         if (entry.getKey().involves(annot) && entry.getValue().isWorking())
393           return true;
394     }
395     return false;
396   }
397
398   @Override
399   public boolean isWorking()
400   {
401     synchronized (registered)
402     {
403       for (var manager : registered.values())
404         if (manager.isWorking())
405           return true;
406     }
407     synchronized (oneshot)
408     {
409       for (var manager : oneshot.values())
410         if (manager.isWorking())
411           return true;
412     }
413     return false;
414   }
415
416   @Override
417   public void startWorker(AlignCalcWorkerI worker)
418   {
419     Objects.requireNonNull(worker);
420     var manager = registered.get(worker);
421     if (manager == null)
422     {
423       Console.warn("Starting unregistered worker " + worker);
424       manager = createManager(worker);
425       oneshot.put(worker, manager);
426     }
427     manager.restart();
428   }
429
430   @Override
431   public void restartWorkers()
432   {
433     synchronized (registered)
434     {
435       for (var manager : registered.values())
436       {
437         manager.restart();
438       }
439     }
440   }
441
442   @Override
443   public void cancelWorker(AlignCalcWorkerI worker)
444   {
445     Objects.requireNonNull(worker);
446     var manager = registered.get(worker);
447     if (manager == null)
448       manager = oneshot.get(worker);
449     if (manager == null)
450     {
451       throw new NoSuchElementException();
452     }
453     manager.cancel();
454   }
455
456   private void notifyQueued(AlignCalcWorkerI worker)
457   {
458     for (AlignCalcListener listener : listeners)
459     {
460       listener.workerQueued(worker);
461     }
462   }
463
464   private void notifyStarted(AlignCalcWorkerI worker)
465   {
466     for (AlignCalcListener listener : listeners)
467     {
468       listener.workerStarted(worker);
469     }
470   }
471
472   private void notifyCompleted(AlignCalcWorkerI worker)
473   {
474     for (AlignCalcListener listener : listeners)
475     {
476       try
477       {
478         listener.workerCompleted(worker);
479       } catch (RuntimeException e)
480       {
481         e.printStackTrace();
482       }
483     }
484   }
485
486   private void notifyCancelled(AlignCalcWorkerI worker)
487   {
488     for (AlignCalcListener listener : listeners)
489     {
490       try
491       {
492         listener.workerCancelled(worker);
493       } catch (RuntimeException e)
494       {
495         e.printStackTrace();
496       }
497     }
498   }
499
500   private void notifyExceptional(AlignCalcWorkerI worker,
501           Throwable throwable)
502   {
503     for (AlignCalcListener listener : listeners)
504     {
505       try
506       {
507         listener.workerExceptional(worker, throwable);
508       } catch (RuntimeException e)
509       {
510         e.printStackTrace();
511       }
512     }
513   }
514
515   @Override
516   public void addAlignCalcListener(AlignCalcListener listener)
517   {
518     listeners.add(listener);
519   }
520
521   @Override
522   public void removeAlignCalcListener(AlignCalcListener listener)
523   {
524     listeners.remove(listener);
525   }
526
527   @Override
528   public void shutdown()
529   {
530     executor.shutdownNow();
531     listeners.clear();
532     registered.clear();
533   }
534
535 }