JAL-3690 - introduce shutdown method that cleans up manager's resources.
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.List;
4 import java.util.Objects;
5 import java.util.Optional;
6 import java.util.Set;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.CancellationException;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.FutureTask;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.atomic.AtomicInteger;
16
17 import jalview.api.AlignCalcListener;
18 import jalview.api.AlignCalcManagerI2;
19 import jalview.api.AlignCalcWorkerI;
20 import jalview.bin.Cache;
21 import jalview.datamodel.AlignmentAnnotation;
22
23 import static java.util.Collections.synchronizedList;
24 import static java.util.Collections.synchronizedSet;
25 import static java.util.Collections.unmodifiableList;
26
27 import java.util.ArrayList;
28 import java.util.HashSet;
29
30 import static java.lang.String.format;
31
32 public class AlignCalcManager2 implements AlignCalcManagerI2
33 {
34   class AlignCalcTask extends FutureTask<Void>
35   {
36     final AlignCalcWorkerI worker;
37
38     public AlignCalcTask(AlignCalcWorkerI worker)
39     {
40       super(new Callable<Void>()
41       {
42         public Void call() throws Exception
43         {
44           Cache.log.debug(format("Worker %s started%n",
45                   worker.getClass().getName()));
46           notifyStarted(worker);
47           worker.run();
48           return null;
49         }
50       });
51       this.worker = worker;
52     }
53
54     public AlignCalcWorkerI getWorker()
55     {
56       return worker;
57     }
58
59     @Override
60     protected void done()
61     {
62       boolean success = false;
63       Throwable exception = null;
64       try
65       {
66         get();
67         success = true;
68       } catch (CancellationException e)
69       {
70         Cache.log.debug(format("Worker %s cancelled%n",
71                 getWorker().getClass().getName()));
72         notifyCancelled(worker);
73       } catch (ExecutionException e)
74       {
75         exception = e.getCause();
76         if (exception instanceof OutOfMemoryError)
77         {
78           disableWorker(getWorker());
79         }
80       } catch (Throwable e)
81       {
82         exception = e;
83       } finally
84       {
85         inProgress.remove(getWorker());
86         tasks.remove(this);
87       }
88       if (success)
89       {
90         Cache.log.debug(format("Worker %s finished%n",
91                 getWorker().getClass().getName()));
92         notifyCompleted(worker);
93       }
94       else if (exception != null)
95       {
96         Cache.log.warn(format("Worker %s failed%n",
97                 getWorker().getClass().getName()));
98         exception.printStackTrace();
99         notifyExceptional(worker, exception);
100       }
101     }
102   }
103
104   private static class CalcManagerThreadFactory implements ThreadFactory
105   {
106     private static final AtomicInteger threadNumber = new AtomicInteger(1);
107
108     private final ThreadGroup group;
109
110     private static final String namePrefix = "AlignCalcManager-pool-thread-";
111
112     CalcManagerThreadFactory()
113     {
114       var securityManager = System.getSecurityManager();
115       if (securityManager != null)
116       {
117         group = securityManager.getThreadGroup();
118       }
119       else
120       {
121         group = Thread.currentThread().getThreadGroup();
122       }
123     }
124
125     @Override
126     public Thread newThread(Runnable r)
127     {
128       Thread t = new Thread(group, r,
129               namePrefix + threadNumber.getAndIncrement(), 0);
130       t.setDaemon(false);
131       t.setPriority(Thread.NORM_PRIORITY);
132       return t;
133     }
134   }
135
136   // main executor for running workers one-by-one
137   private final ExecutorService executor = Executors
138           .newSingleThreadExecutor(new CalcManagerThreadFactory());
139
140   private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
141
142   // list of all registered workers (other collections are subsets of this)
143   private final List<AlignCalcWorkerI> registered = synchronizedList(
144           new ArrayList<>());
145
146   // list of tasks holding queued and running workers
147   private final List<AlignCalcTask> tasks = synchronizedList(
148           new ArrayList<>());
149
150   // the collection of currently running workers
151   private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(
152           new HashSet<>());
153
154   // the collection of workers that will not be started
155   private final Set<AlignCalcWorkerI> disabled = synchronizedSet(
156           new HashSet<>());
157
158   /*
159    * Register the worker with this manager and scheduler for execution.
160    */
161   @Override
162   public void registerWorker(AlignCalcWorkerI worker)
163   {
164     Objects.requireNonNull(worker);
165     synchronized (registered)
166     {
167       if (!registered.contains(worker))
168         registered.add(worker);
169     }
170     startWorker(worker);
171   }
172
173   @Override
174   public List<AlignCalcWorkerI> getWorkers()
175   {
176     return unmodifiableList(new ArrayList<>(registered));
177   }
178
179   @Override
180   public List<AlignCalcWorkerI> getWorkersOfClass(
181           Class<? extends AlignCalcWorkerI> cls)
182   {
183     synchronized (registered)
184     {
185       List<AlignCalcWorkerI> collected = new ArrayList<>();
186       for (var worker : registered)
187       {
188         if (worker.getClass().equals(cls))
189         {
190           collected.add(worker);
191         }
192       }
193       return unmodifiableList(collected);
194     }
195   }
196
197   @Override
198   public void removeWorker(AlignCalcWorkerI worker)
199   {
200     registered.remove(worker);
201     disabled.remove(worker);
202   }
203
204   @Override
205   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
206   {
207     synchronized (registered)
208     {
209       for (var worker : registered)
210       {
211         if (worker.getClass().equals(cls))
212         {
213           removeWorker(worker);
214         }
215       }
216     }
217   }
218
219   @Override
220   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
221   {
222     synchronized (registered)
223     {
224       for (var worker : registered)
225       {
226         if (worker.involves(annot) && worker.isDeletable())
227         {
228           removeWorker(worker);
229         }
230       }
231     }
232   }
233
234   @Override
235   public void disableWorker(AlignCalcWorkerI worker)
236   {
237     disabled.add(worker);
238   }
239
240   @Override
241   public void enableWorker(AlignCalcWorkerI worker)
242   {
243     disabled.remove(worker);
244   }
245
246   @Override
247   public void restartWorkers()
248   {
249     synchronized (registered)
250     {
251       for (AlignCalcWorkerI worker : registered)
252       {
253         if (!isDisabled(worker))
254           startWorker(worker);
255       }
256     }
257   }
258
259   @Override
260   public void startWorker(AlignCalcWorkerI worker)
261   {
262     Objects.requireNonNull(worker);
263     AlignCalcTask newTask = new AlignCalcTask(worker);
264     synchronized (inProgress)
265     {
266       cancelWorker(worker);
267       inProgress.add(worker);
268       tasks.add(newTask);
269     }
270     notifyQueued(worker);
271     executor.execute(newTask);
272   }
273
274   @Override
275   public void cancelWorker(AlignCalcWorkerI worker)
276   {
277     if (isWorking(worker))
278     {
279       synchronized (tasks)
280       {
281         Optional<AlignCalcTask> oldTask = tasks.stream()
282                 .filter(task -> task.getWorker().equals(worker))
283                 .findFirst();
284         if (oldTask.isPresent())
285         {
286           oldTask.get().cancel(true);
287         }
288       }
289     }
290   }
291
292   @Override
293   public boolean isDisabled(AlignCalcWorkerI worker)
294   {
295     return disabled.contains(worker);
296   }
297
298   @Override
299   public boolean isWorking(AlignCalcWorkerI worker)
300   {
301     return inProgress.contains(worker);
302   }
303
304   @Override
305   public boolean isWorking()
306   {
307     return !inProgress.isEmpty();
308   }
309
310   @Override
311   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
312   {
313     synchronized (inProgress)
314     {
315       for (AlignCalcWorkerI worker : inProgress)
316       {
317         if (worker.involves(annot))
318         {
319           return true;
320         }
321       }
322     }
323     return false;
324   }
325
326   private void notifyQueued(AlignCalcWorkerI worker)
327   {
328     for (AlignCalcListener listener : listeners)
329     {
330       listener.workerQueued(worker);
331     }
332   }
333
334   private void notifyStarted(AlignCalcWorkerI worker)
335   {
336     for (AlignCalcListener listener : listeners)
337     {
338       listener.workerStarted(worker);
339     }
340   }
341
342   private void notifyCompleted(AlignCalcWorkerI worker)
343   {
344     for (AlignCalcListener listener : listeners)
345     {
346       try
347       {
348         listener.workerCompleted(worker);
349       } catch (RuntimeException e)
350       {
351         e.printStackTrace();
352       }
353     }
354   }
355
356   private void notifyCancelled(AlignCalcWorkerI worker)
357   {
358     for (AlignCalcListener listener : listeners)
359     {
360       try
361       {
362         listener.workerCancelled(worker);
363       } catch (RuntimeException e)
364       {
365         e.printStackTrace();
366       }
367     }
368   }
369
370   private void notifyExceptional(AlignCalcWorkerI worker,
371           Throwable throwable)
372   {
373     for (AlignCalcListener listener : listeners)
374     {
375       try
376       {
377         listener.workerExceptional(worker, throwable);
378       } catch (RuntimeException e)
379       {
380         e.printStackTrace();
381       }
382     }
383   }
384
385   @Override
386   public void addAlignCalcListener(AlignCalcListener listener)
387   {
388     listeners.add(listener);
389   }
390
391   @Override
392   public void removeAlignCalcListener(AlignCalcListener listener)
393   {
394     listeners.remove(listener);
395   }
396
397   @Override
398   public void shutdown()
399   {
400     executor.shutdownNow();
401     listeners.clear();
402     registered.clear();
403     tasks.clear();
404     inProgress.clear();
405     disabled.clear();
406   }
407
408 }