JAL-3690 Catch and notify about worker cancellation separately.
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.List;
4 import java.util.Objects;
5 import java.util.Optional;
6 import java.util.Set;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.CancellationException;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.FutureTask;
14 import java.util.stream.Collectors;
15
16 import jalview.api.AlignCalcListener;
17 import jalview.api.AlignCalcManagerI2;
18 import jalview.api.AlignCalcWorkerI;
19 import jalview.bin.Cache;
20 import jalview.datamodel.AlignmentAnnotation;
21
22 import static java.util.Collections.synchronizedList;
23 import static java.util.Collections.synchronizedSet;
24 import static java.util.Collections.unmodifiableList;
25
26 import java.util.ArrayList;
27 import java.util.HashSet;
28
29 import static java.lang.String.format;
30
31 public class AlignCalcManager2 implements AlignCalcManagerI2
32 {
33   class AlignCalcTask extends FutureTask<Void>
34   {
35     final AlignCalcWorkerI worker;
36
37     public AlignCalcTask(AlignCalcWorkerI worker)
38     {
39       super(new Callable<Void>() {
40         public Void call() throws Exception {
41             Cache.log.debug(format("Worker %s started%n", worker.getClass().getName()));
42             notifyStarted(worker);
43             worker.run();
44             return null;
45         }
46       });
47       this.worker = worker;
48     }
49
50     public AlignCalcWorkerI getWorker()
51     {
52       return worker;
53     }
54
55     @Override
56     protected void done()
57     {
58       boolean success = false;
59       Throwable exception = null;
60       try
61       {
62         get();
63         success = true;
64       }
65       catch (CancellationException e) {
66         Cache.log.debug(format("Worker %s cancelled%n", getWorker().getClass().getName()));
67         notifyCancelled(worker);
68       }
69       catch (ExecutionException e)
70       {
71         exception = e.getCause();
72         if (exception instanceof OutOfMemoryError) {
73           disableWorker(getWorker());
74         }
75       } catch (Throwable e)
76       {
77         exception = e;
78       }
79       finally {
80         inProgress.remove(getWorker());
81         tasks.remove(this);
82       }
83       if (success)
84       {
85         Cache.log.debug(format("Worker %s finished%n", getWorker().getClass().getName()));
86         notifyCompleted(worker);
87       }
88       else if (exception != null){
89         Cache.log.warn(format("Worker %s failed%n", getWorker().getClass().getName()));
90         exception.printStackTrace();
91         notifyExceptional(worker, exception);
92       }
93     }
94   }
95
96   // main executor for running workers one-by-one
97   private final ExecutorService executor = Executors.newSingleThreadExecutor();
98   
99   private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
100
101   // list of all registered workers (other collections are subsets of this)
102   private final List<AlignCalcWorkerI> registered = synchronizedList(new ArrayList<>());
103
104   // list of tasks holding queued and running workers
105   private final List<AlignCalcTask> tasks = synchronizedList(new ArrayList<>());
106   
107   // the collection of currently running workers
108   private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(new HashSet<>());
109
110   // the collection of workers that will not be started
111   private final Set<AlignCalcWorkerI> disabled = synchronizedSet(new HashSet<>());
112
113   /*
114    * Register the worker with this manager and scheduler for execution.
115    */
116   @Override
117   public void registerWorker(AlignCalcWorkerI worker)
118   {
119     Objects.requireNonNull(worker);
120     synchronized (registered)
121     {
122       if (!registered.contains(worker))
123         registered.add(worker);
124     }
125     startWorker(worker);
126   }
127
128   @Override
129   public List<AlignCalcWorkerI> getWorkers()
130   {
131     return unmodifiableList(new ArrayList<>(registered));
132   }
133   
134   @Override
135   public List<AlignCalcWorkerI> getWorkersOfClass(
136           Class<? extends AlignCalcWorkerI> cls)
137   {
138     synchronized (registered)
139     {
140       return registered.stream()
141               .filter(worker -> worker.getClass().equals(cls))
142               .collect(Collectors.toUnmodifiableList());
143     }
144   }
145   
146   @Override
147   public void removeWorker(AlignCalcWorkerI worker)
148   {
149     registered.remove(worker);
150     disabled.remove(worker);
151   }
152
153   @Override
154   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
155   {
156     synchronized (registered)
157     {
158       for (var worker : registered)
159       {
160         if (worker.getClass().equals(cls))
161         {
162           removeWorker(worker);
163         }
164       }
165     }
166   }
167
168   @Override
169   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
170   {
171     synchronized (registered)
172     {
173       for (var worker : registered)
174       {
175         if (worker.involves(annot) && worker.isDeletable())
176         {
177           removeWorker(worker);
178         }
179       }
180     }
181   }
182
183   @Override
184   public void disableWorker(AlignCalcWorkerI worker)
185   {
186     disabled.add(worker);
187   }
188
189   @Override
190   public void enableWorker(AlignCalcWorkerI worker)
191   {
192     disabled.remove(worker);
193   }
194
195   @Override
196   public void restartWorkers()
197   {
198     synchronized (registered)
199     {
200       for (AlignCalcWorkerI worker : registered)
201       {
202         if (!isDisabled(worker))
203           startWorker(worker);
204       }
205     }
206   }
207
208   @Override
209   public void startWorker(AlignCalcWorkerI worker)
210   {
211     Objects.requireNonNull(worker);
212     AlignCalcTask newTask = new AlignCalcTask(worker);
213     synchronized (inProgress)
214     {
215       cancelWorker(worker);
216       inProgress.add(worker);
217       tasks.add(newTask);
218     }
219     notifyQueued(worker);
220     executor.execute(newTask);
221   }
222   
223   @Override
224   public void cancelWorker(AlignCalcWorkerI worker)
225   {
226     if (isWorking(worker)) 
227     {
228       synchronized (tasks) 
229       {
230         Optional<AlignCalcTask> oldTask = tasks.stream()
231             .filter(task -> task.getWorker().equals(worker))
232             .findFirst();
233         if (oldTask.isPresent()) {
234           oldTask.get().cancel(true);
235         }
236       }
237     }
238   }
239
240   @Override
241   public boolean isDisabled(AlignCalcWorkerI worker)
242   {
243     return disabled.contains(worker);
244   }
245
246   @Override
247   public boolean isWorking(AlignCalcWorkerI worker)
248   {
249     return inProgress.contains(worker);
250   }
251
252   @Override
253   public boolean isWorking()
254   {
255     return !inProgress.isEmpty();
256   }
257   
258   @Override
259   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
260   {
261     synchronized (inProgress)
262     {
263       for (AlignCalcWorkerI worker : inProgress)
264       {
265         if (worker.involves(annot))
266         {
267           return true;
268         }
269       }
270     }
271     return false;
272   }
273
274   private void notifyQueued(AlignCalcWorkerI worker)
275   {
276     for (AlignCalcListener listener : listeners)
277     {
278       listener.workerQueued(worker);
279     }
280   }
281
282   private void notifyStarted(AlignCalcWorkerI worker)
283   {
284     for (AlignCalcListener listener : listeners)
285     {
286       listener.workerStarted(worker);
287     }
288   }
289
290   private void notifyCompleted(AlignCalcWorkerI worker)
291   {
292     for (AlignCalcListener listener : listeners)
293     {
294       try {
295         listener.workerCompleted(worker);
296       } catch (RuntimeException e)
297       {
298         e.printStackTrace();
299       }
300     }
301   }
302   
303   private void notifyCancelled(AlignCalcWorkerI worker)
304   {
305     for (AlignCalcListener listener : listeners)
306     {
307       try {
308         listener.workerCancelled(worker);
309       } catch (RuntimeException e)
310       {
311         e.printStackTrace();
312       }
313     }
314   }
315
316   private void notifyExceptional(AlignCalcWorkerI worker,
317           Throwable throwable)
318   {
319     for (AlignCalcListener listener : listeners)
320     {
321       try {
322         listener.workerExceptional(worker, throwable);
323       } catch (RuntimeException e)
324       {
325         e.printStackTrace();
326       }
327     }
328   }
329
330   @Override
331   public void addAlignCalcListener(AlignCalcListener listener)
332   {
333     listeners.add(listener);
334   }
335   
336   @Override
337   public void removeAlignCalcListener(AlignCalcListener listener)
338   {
339     listeners.remove(listener);
340   }
341
342 }