JAL - 3690 AlignCalc rebuilt - FutureTask-based manager
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.List;
4 import java.util.Set;
5 import java.util.concurrent.Callable;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CopyOnWriteArrayList;
8 import java.util.concurrent.ExecutionException;
9 import java.util.concurrent.ExecutorService;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.FutureTask;
12 import java.util.concurrent.atomic.AtomicInteger;
13 import java.util.stream.Collectors;
14
15 import jalview.api.AlignCalcListener;
16 import jalview.api.AlignCalcManagerI;
17 import jalview.api.AlignCalcManagerI2;
18 import jalview.api.AlignCalcWorkerI;
19 import jalview.datamodel.AlignmentAnnotation;
20
21 import static java.util.Collections.synchronizedList;
22 import static java.util.Collections.synchronizedSet;
23 import static java.util.Collections.unmodifiableList;
24
25 import java.util.ArrayList;
26 import java.util.HashSet;
27
28 public class AlignCalcManager2 implements AlignCalcManagerI2
29 {
30   class AlignCalcTask extends FutureTask<Void>
31   {
32     final AlignCalcWorkerI worker;
33
34     public AlignCalcTask(AlignCalcWorkerI worker)
35     {
36       super(new Callable<Void>() {
37         public Void call() throws Exception {
38             notifyStarted(worker);
39             worker.run();
40             return null;
41         }
42       });
43       this.worker = worker;
44     }
45
46     public AlignCalcWorkerI getWorker()
47     {
48       return worker;
49     }
50
51     @Override
52     protected void done()
53     {
54       boolean success = false;
55       Throwable exception = null;
56       try
57       {
58         get();
59         success = true;
60       } catch (ExecutionException e)
61       {
62         exception = e.getCause();
63         if (exception instanceof OutOfMemoryError) {
64           disableWorker(getWorker());
65         }
66       } catch (Throwable e)
67       {
68         exception = e;
69       }
70       finally {
71         inProgress.remove(getWorker());
72         tasks.remove(this);
73       }
74       if (success)
75         notifyCompleted(worker);
76       else
77         notifyExceptional(worker, exception);
78     }
79   }
80
81   // main executor for running workers one-by-one
82   private final ExecutorService executor = Executors.newSingleThreadExecutor();
83   
84   private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
85
86   // list of all registered workers (other collections are subsets of this)
87   private final List<AlignCalcWorkerI> registered = synchronizedList(new ArrayList<>());
88
89   // list of tasks holding queued and running workers
90   private final List<AlignCalcTask> tasks = synchronizedList(new ArrayList<>());
91   
92   // the collection of currently running workers
93   private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(new HashSet<>());
94
95   // the collection of workers that will not be started
96   private final Set<AlignCalcWorkerI> disabled = synchronizedSet(new HashSet<>());
97
98   /*
99    * Register the worker with this manager and scheduler for execution.
100    */
101   @Override
102   public void registerWorker(AlignCalcWorkerI worker)
103   {
104     synchronized (registered)
105     {
106       if (!registered.contains(worker))
107         registered.add(worker);
108     }
109     startWorker(worker);
110   }
111
112   @Override
113   public List<AlignCalcWorkerI> getWorkers()
114   {
115     return unmodifiableList(new ArrayList<>(registered));
116   }
117   
118   @Override
119   public List<AlignCalcWorkerI> getWorkersOfClass(
120           Class<? extends AlignCalcWorkerI> cls)
121   {
122     synchronized (registered)
123     {
124       return registered.stream()
125               .filter(worker -> worker.getClass().equals(cls))
126               .collect(Collectors.toUnmodifiableList());
127     }
128   }
129   
130   @Override
131   public void removeWorker(AlignCalcWorkerI worker)
132   {
133     registered.remove(worker);
134     disabled.remove(worker);
135   }
136
137   @Override
138   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
139   {
140     synchronized (registered)
141     {
142       for (var it = registered.iterator(); it.hasNext();)
143       {
144         var worker = it.next();
145         if (worker.getClass().equals(cls))
146         {
147           it.remove();
148           disabled.remove(worker);
149         }
150       }
151     }
152   }
153
154   @Override
155   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
156   {
157     synchronized (registered)
158     {
159       for (var it = registered.iterator(); it.hasNext();)
160       {
161         var worker = it.next();
162         if (worker.involves(annot) && worker.isDeletable())
163         {
164           it.remove();
165           disabled.remove(worker);
166         }
167       }
168     }
169   }
170
171   @Override
172   public void disableWorker(AlignCalcWorkerI worker)
173   {
174     assert registered.contains(worker);
175     disabled.add(worker);
176   }
177
178   @Override
179   public void enableWorker(AlignCalcWorkerI worker)
180   {
181     disabled.remove(worker);
182   }
183
184   @Override
185   public void restartWorkers()
186   {
187     synchronized (registered)
188     {
189       for (AlignCalcWorkerI worker : registered)
190       {
191         if (!isDisabled(worker))
192           startWorker(worker);
193       }
194     }
195   }
196
197   @Override
198   public void startWorker(AlignCalcWorkerI worker)
199   {
200     assert registered.contains(worker);
201     synchronized (tasks) {
202       for (var task : tasks)
203       {
204         if (task.getWorker().equals(worker))
205           task.cancel(true);
206       }
207     }
208     AlignCalcTask newTask = new AlignCalcTask(worker);
209     tasks.add(newTask);
210     notifyQueued(worker);
211     executor.execute(newTask);
212   }
213   
214   @Override
215   public void cancelWorker(AlignCalcWorkerI worker)
216   {
217     if (isWorking(worker)) 
218     {
219       synchronized (tasks) 
220       {
221         for (var task : tasks)
222         {
223           if (task.getWorker().equals(worker))
224             task.cancel(true);
225         }
226       }
227     }
228   }
229
230   @Override
231   public boolean isDisabled(AlignCalcWorkerI worker)
232   {
233     return disabled.contains(worker);
234   }
235
236   @Override
237   public boolean isWorking(AlignCalcWorkerI worker)
238   {
239     return inProgress.contains(worker);
240   }
241
242   @Override
243   public boolean isWorking()
244   {
245     return !inProgress.isEmpty();
246   }
247   
248   @Override
249   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
250   {
251     synchronized (inProgress)
252     {
253       for (AlignCalcWorkerI worker : inProgress)
254       {
255         if (worker.involves(annot))
256         {
257           return true;
258         }
259       }
260     }
261     return false;
262   }
263
264   private void notifyQueued(AlignCalcWorkerI worker)
265   {
266     for (AlignCalcListener listener : listeners)
267     {
268       listener.workerQueued(worker);
269     }
270   }
271
272   private void notifyStarted(AlignCalcWorkerI worker)
273   {
274     for (AlignCalcListener listener : listeners)
275     {
276       listener.workerStarted(worker);
277     }
278   }
279
280   private void notifyCompleted(AlignCalcWorkerI worker)
281   {
282     for (AlignCalcListener listener : listeners)
283     {
284       try {
285         listener.workerCompleted(worker);
286       } catch (RuntimeException e)
287       {
288         e.printStackTrace();
289       }
290     }
291   }
292
293   private void notifyExceptional(AlignCalcWorkerI worker,
294           Throwable throwable)
295   {
296     for (AlignCalcListener listener : listeners)
297     {
298       try {
299         listener.workerExceptional(worker, throwable);
300       } catch (RuntimeException e)
301       {
302         e.printStackTrace();
303       }
304     }
305   }
306
307   @Override
308   public void addAlignCalcListener(AlignCalcListener listener)
309   {
310     listeners.add(listener);
311   }
312   
313   @Override
314   public void removeAlignCalcListener(AlignCalcListener listener)
315   {
316     listeners.remove(listener);
317   }
318
319 }