0f9b8e5d39781f8e3e3ad9c2363691822f401c4a
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.HashMap;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.ScheduledExecutorService;
12 import java.util.concurrent.TimeUnit;
13
14 import static java.lang.String.format;
15 import static java.util.Collections.synchronizedMap;
16 import static java.util.Collections.unmodifiableList;
17
18 import java.util.ArrayList;
19
20 import jalview.api.AlignCalcListener;
21 import jalview.api.AlignCalcManagerI2;
22 import jalview.api.AlignCalcWorkerI;
23 import jalview.api.PollableAlignCalcWorkerI;
24 import jalview.bin.Cache;
25 import jalview.datamodel.AlignmentAnnotation;
26
27 public class AlignCalcManager2 implements AlignCalcManagerI2
28 {
29   private abstract class WorkerManager
30   {
31     protected volatile boolean enabled = true;
32             
33     protected final AlignCalcWorkerI worker;
34     
35     WorkerManager(AlignCalcWorkerI worker)
36     {
37       this.worker = worker;
38     }
39     
40     AlignCalcWorkerI getWorker()
41     {
42       return worker;
43     }
44     
45     boolean isEnabled()
46     {
47       return enabled;
48     }
49     
50     void setEnabled(boolean enabled)
51     {
52       this.enabled = enabled;
53     }
54     
55     synchronized void restart()
56     {
57       if (!isEnabled())
58       {
59         return;
60       }
61       if (isWorking())
62       {
63         cancel();
64       }
65       submit();
66     }
67     
68     abstract boolean isWorking();
69     
70     protected abstract void submit();
71     
72     abstract void cancel();  
73   }
74   
75   
76   private class SimpleWorkerManager extends WorkerManager
77   {
78     private Future<?> task = null;
79     
80     SimpleWorkerManager(AlignCalcWorkerI worker)
81     {
82       super(worker);
83     }
84     
85     @Override
86     boolean isWorking()
87     {
88       return task != null && !task.isDone();
89     }
90
91     @Override
92     protected void submit()
93     {
94       if (task != null && !(task.isDone() || task.isCancelled()))
95       {
96         throw new IllegalStateException(
97             "Cannot submit new task if the prevoius one is still running");
98       }
99       Cache.log.debug(format("Worker %s queued",
100               worker.getClass().getName()));
101       task = executor.submit(() -> {
102         try
103         {
104           Cache.log.debug(format("Worker %s started",
105                   worker.getClass().getName()));
106           worker.run();
107           Cache.log.debug(format("Worker %s finished",
108                   worker.getClass().getName()));
109         }
110         catch (InterruptedException e)
111         {
112           Cache.log.debug(format("Worker %s interrupted",
113                   worker.getClass().getName()));
114         }
115         catch (Throwable th)
116         {
117           Cache.log.debug(format("Worker %s failed",
118                   worker.getClass().getName()), th);
119         }
120         finally
121         {
122         }
123       });
124     }
125
126     @Override
127     synchronized void cancel()
128     {
129       if (!isWorking())
130       {
131         return;
132       }
133       Cache.log.debug(format("Cancelling worker %s",
134               worker.getClass().getName()));
135       task.cancel(true);
136     }
137   }
138   
139   
140   private class PollableWorkerManager extends WorkerManager
141   {
142     private final PollableAlignCalcWorkerI worker;
143     private Future<?> task = null;
144     
145     PollableWorkerManager(PollableAlignCalcWorkerI worker)
146     {
147       super(worker);
148       this.worker = worker;
149     }
150     
151     @Override
152     boolean isWorking()
153     {
154       return task != null && !task.isDone();
155     }
156     
157     protected void submit()
158     {
159       if (task != null && !(task.isDone() || task.isCancelled()))
160       {
161         throw new IllegalStateException(
162             "Cannot submit new task if the prevoius one is still running");
163       }
164       Cache.log.debug(format("Worker %s queued",
165               worker.getClass().getName()));
166       final var runnable = new Runnable()
167       {
168         private boolean started = false;
169         private boolean completed = false;
170         Future<?> future = null;
171         
172         @Override
173         public void run()
174         {
175           try
176           {
177             if (!started)
178             {
179               Cache.log.debug(format("Worker %s started",
180                       worker.getClass().getName()));
181               worker.startUp();
182               started = true;
183             }
184             else if (!completed)
185             {
186               Cache.log.debug(format("Polling worker %s",
187                       worker.getClass().getName()));
188               if (worker.poll())
189               {
190                 Cache.log.debug(format("Worker %s finished",
191                         worker.getClass().getName()));
192                 completed = true;
193               }
194             }
195           } catch (Throwable th)
196           {
197             Cache.log.debug(format("Worker %s failed",
198                     worker.getClass().getName()), th);
199             completed = true;
200           }
201           if (completed)
202           {
203             Cache.log.debug(format("Finalizing completed worker %s",
204                     worker.getClass().getName()));
205             worker.done();
206             // almost impossible, but the future may be null at this point
207             // let it throw NPE to cancel forcefully
208             future.cancel(false);
209           }
210         }
211       };
212       runnable.future = task = executor.scheduleWithFixedDelay(
213               runnable, 10, 1000, TimeUnit.MILLISECONDS);
214     }
215     
216     synchronized protected void cancel()
217     {
218       if (!isWorking())
219       {
220         return;
221       }
222       Cache.log.debug(format("Cancelling worker %s",
223               worker.getClass().getName()));
224       task.cancel(false);
225       executor.submit(() -> {
226         worker.cancel();
227         Cache.log.debug(format("Finalizing cancelled worker %s",
228                 worker.getClass().getName()));
229         worker.done();
230       });
231     }
232   }
233   
234   
235   private final ScheduledExecutorService executor =
236           Executors.newSingleThreadScheduledExecutor();
237   private final Map<AlignCalcWorkerI, WorkerManager> registered =
238           synchronizedMap(new HashMap<>());
239   
240   private final List<AlignCalcListener> listeners =
241           new CopyOnWriteArrayList<>();
242   
243   
244   @Override
245   public void registerWorker(AlignCalcWorkerI worker)
246   {
247     Objects.requireNonNull(worker);
248     WorkerManager manager = (worker instanceof PollableAlignCalcWorkerI) ?
249             new PollableWorkerManager((PollableAlignCalcWorkerI) worker) : 
250               new SimpleWorkerManager(worker);
251     registered.putIfAbsent(worker, manager);
252     startWorker(worker);
253   }
254
255   @Override
256   public List<AlignCalcWorkerI> getWorkers()
257   {
258     List<AlignCalcWorkerI> result = new ArrayList<>(registered.size());
259     result.addAll(registered.keySet());
260     return result;
261   }
262
263   @Override
264   public List<AlignCalcWorkerI> getWorkersOfClass(
265           Class<? extends AlignCalcWorkerI> cls)
266   {
267     List<AlignCalcWorkerI> collected = new ArrayList<>();
268     for (var worker : getWorkers())
269     {
270       if (worker.getClass().equals(cls))
271       {
272         collected.add(worker);
273       }
274     }
275     return unmodifiableList(collected);
276   }
277
278   @Override
279   public void removeWorker(AlignCalcWorkerI worker)
280   {
281     registered.remove(worker);
282   }
283
284   @Override
285   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
286   {
287     synchronized (registered)
288     {
289       for (var worker : getWorkers())
290       {
291         if (worker.involves(annot) && worker.isDeletable())
292         {
293           removeWorker(worker);
294         }
295       }
296     }
297   }
298
299   @Override
300   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
301   {
302     synchronized (registered)
303     {
304       for (var worker : getWorkers())
305       {
306         if (worker.getClass().equals(cls))
307         {
308           removeWorker(worker);
309         }
310       }
311     }
312   }
313
314   @Override
315   public void disableWorker(AlignCalcWorkerI worker)
316   {
317     // Null pointer check might be needed
318     registered.get(worker).setEnabled(false);
319   }
320
321   @Override
322   public void enableWorker(AlignCalcWorkerI worker)
323   {
324     // Null pointer check might be needed
325     registered.get(worker).setEnabled(true);
326   }
327
328   @Override
329   public boolean isDisabled(AlignCalcWorkerI worker)
330   {
331     if (registered.containsKey(worker))
332     {
333       return !registered.get(worker).isEnabled();
334     }
335     else
336     {
337       return false;
338     }
339   }
340
341   @Override
342   public boolean isWorking(AlignCalcWorkerI worker)
343   {
344     if (!registered.containsKey(worker))
345     {
346       return false;
347     }
348     else
349     {
350       return registered.get(worker).isWorking();
351     }
352   }
353
354   @Override
355   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
356   {
357     synchronized (registered)
358     {
359       for (var entry : registered.entrySet())
360       {
361         if (entry.getKey().involves(annot) &&
362                 entry.getValue().isWorking())
363         {
364           return true;
365         }
366       }
367     }
368     return false;
369   }
370
371   @Override
372   public boolean isWorking()
373   {
374     synchronized (registered)
375     {
376       for (var manager : registered.values())
377       {
378         if (manager.isWorking())
379         {
380           return true;
381         }
382       }
383     }
384     return false;
385   }
386
387   @Override
388   public void startWorker(AlignCalcWorkerI worker)
389   {
390     Objects.requireNonNull(worker);
391     var manager = registered.get(worker);
392     if (manager == null) 
393     {
394       throw new NoSuchElementException();
395     }
396     manager.restart();
397   }
398
399   @Override
400   public void restartWorkers()
401   {
402     synchronized (registered)
403     {
404       for (var manager : registered.values())
405       {
406         manager.restart();
407       }
408     }
409   }
410
411   @Override
412   public void cancelWorker(AlignCalcWorkerI worker)
413   {    
414     Objects.requireNonNull(worker);
415     var manager = registered.get(worker);
416     if (manager == null) 
417     {
418       throw new NoSuchElementException();
419     }
420     manager.cancel();
421   }
422   
423   private void notifyQueued(AlignCalcWorkerI worker)
424   {
425     for (AlignCalcListener listener : listeners)
426     {
427       listener.workerQueued(worker);
428     }
429   }
430
431   private void notifyStarted(AlignCalcWorkerI worker)
432   {
433     for (AlignCalcListener listener : listeners)
434     {
435       listener.workerStarted(worker);
436     }
437   }
438
439   private void notifyCompleted(AlignCalcWorkerI worker)
440   {
441     for (AlignCalcListener listener : listeners)
442     {
443       try
444       {
445         listener.workerCompleted(worker);
446       } catch (RuntimeException e)
447       {
448         e.printStackTrace();
449       }
450     }
451   }
452
453   private void notifyCancelled(AlignCalcWorkerI worker)
454   {
455     for (AlignCalcListener listener : listeners)
456     {
457       try
458       {
459         listener.workerCancelled(worker);
460       } catch (RuntimeException e)
461       {
462         e.printStackTrace();
463       }
464     }
465   }
466
467   private void notifyExceptional(AlignCalcWorkerI worker,
468           Throwable throwable)
469   {
470     for (AlignCalcListener listener : listeners)
471     {
472       try
473       {
474         listener.workerExceptional(worker, throwable);
475       } catch (RuntimeException e)
476       {
477         e.printStackTrace();
478       }
479     }
480   }
481
482   @Override
483   public void addAlignCalcListener(AlignCalcListener listener)
484   {
485     listeners.add(listener);
486   }
487
488   @Override
489   public void removeAlignCalcListener(AlignCalcListener listener)
490   {
491     listeners.remove(listener);
492   }
493
494   @Override
495   public void shutdown()
496   {
497     executor.shutdownNow();
498     listeners.clear();
499     registered.clear();
500   }
501
502 }