dcaff10201c4c404f9fd05f87a9893beebcd9a8a
[jalview.git] / src / jalview / workers / AlignCalcManager2.java
1 package jalview.workers;
2
3 import java.util.List;
4 import java.util.Objects;
5 import java.util.Optional;
6 import java.util.Set;
7 import java.util.concurrent.Callable;
8 import java.util.concurrent.CancellationException;
9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.FutureTask;
14 import java.util.stream.Collectors;
15
16 import jalview.api.AlignCalcListener;
17 import jalview.api.AlignCalcManagerI2;
18 import jalview.api.AlignCalcWorkerI;
19 import jalview.bin.Cache;
20 import jalview.datamodel.AlignmentAnnotation;
21
22 import static java.util.Collections.synchronizedList;
23 import static java.util.Collections.synchronizedSet;
24 import static java.util.Collections.unmodifiableList;
25
26 import java.util.ArrayList;
27 import java.util.HashSet;
28
29 import static java.lang.String.format;
30
31 public class AlignCalcManager2 implements AlignCalcManagerI2
32 {
33   class AlignCalcTask extends FutureTask<Void>
34   {
35     final AlignCalcWorkerI worker;
36
37     public AlignCalcTask(AlignCalcWorkerI worker)
38     {
39       super(new Callable<Void>() {
40         public Void call() throws Exception {
41             Cache.log.debug(format("Worker %s started%n", worker.getClass().getName()));
42             notifyStarted(worker);
43             worker.run();
44             return null;
45         }
46       });
47       this.worker = worker;
48     }
49
50     public AlignCalcWorkerI getWorker()
51     {
52       return worker;
53     }
54
55     @Override
56     protected void done()
57     {
58       boolean success = false;
59       Throwable exception = null;
60       try
61       {
62         get();
63         success = true;
64       }
65       catch (ExecutionException e)
66       {
67         exception = e.getCause();
68         if (exception instanceof OutOfMemoryError) {
69           disableWorker(getWorker());
70         }
71       } catch (Throwable e)
72       {
73         exception = e;
74       }
75       finally {
76         inProgress.remove(getWorker());
77         tasks.remove(this);
78       }
79       if (success)
80       {
81         Cache.log.debug(format("Worker %s finished%n", getWorker().getClass().getName()));
82         notifyCompleted(worker);
83       }
84       else if (exception != null){
85         Cache.log.warn(format("Worker %s failed%n", getWorker().getClass().getName()));
86         exception.printStackTrace();
87         notifyExceptional(worker, exception);
88       }
89     }
90   }
91
92   // main executor for running workers one-by-one
93   private final ExecutorService executor = Executors.newSingleThreadExecutor();
94   
95   private final List<AlignCalcListener> listeners = new CopyOnWriteArrayList<AlignCalcListener>();
96
97   // list of all registered workers (other collections are subsets of this)
98   private final List<AlignCalcWorkerI> registered = synchronizedList(new ArrayList<>());
99
100   // list of tasks holding queued and running workers
101   private final List<AlignCalcTask> tasks = synchronizedList(new ArrayList<>());
102   
103   // the collection of currently running workers
104   private final Set<AlignCalcWorkerI> inProgress = synchronizedSet(new HashSet<>());
105
106   // the collection of workers that will not be started
107   private final Set<AlignCalcWorkerI> disabled = synchronizedSet(new HashSet<>());
108
109   /*
110    * Register the worker with this manager and scheduler for execution.
111    */
112   @Override
113   public void registerWorker(AlignCalcWorkerI worker)
114   {
115     Objects.requireNonNull(worker);
116     synchronized (registered)
117     {
118       if (!registered.contains(worker))
119         registered.add(worker);
120     }
121     startWorker(worker);
122   }
123
124   @Override
125   public List<AlignCalcWorkerI> getWorkers()
126   {
127     return unmodifiableList(new ArrayList<>(registered));
128   }
129   
130   @Override
131   public List<AlignCalcWorkerI> getWorkersOfClass(
132           Class<? extends AlignCalcWorkerI> cls)
133   {
134     synchronized (registered)
135     {
136       return registered.stream()
137               .filter(worker -> worker.getClass().equals(cls))
138               .collect(Collectors.toUnmodifiableList());
139     }
140   }
141   
142   @Override
143   public void removeWorker(AlignCalcWorkerI worker)
144   {
145     registered.remove(worker);
146     disabled.remove(worker);
147   }
148
149   @Override
150   public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
151   {
152     synchronized (registered)
153     {
154       for (var worker : registered)
155       {
156         if (worker.getClass().equals(cls))
157         {
158           removeWorker(worker);
159         }
160       }
161     }
162   }
163
164   @Override
165   public void removeWorkerForAnnotation(AlignmentAnnotation annot)
166   {
167     synchronized (registered)
168     {
169       for (var worker : registered)
170       {
171         if (worker.involves(annot) && worker.isDeletable())
172         {
173           removeWorker(worker);
174         }
175       }
176     }
177   }
178
179   @Override
180   public void disableWorker(AlignCalcWorkerI worker)
181   {
182     disabled.add(worker);
183   }
184
185   @Override
186   public void enableWorker(AlignCalcWorkerI worker)
187   {
188     disabled.remove(worker);
189   }
190
191   @Override
192   public void restartWorkers()
193   {
194     synchronized (registered)
195     {
196       for (AlignCalcWorkerI worker : registered)
197       {
198         if (!isDisabled(worker))
199           startWorker(worker);
200       }
201     }
202   }
203
204   @Override
205   public void startWorker(AlignCalcWorkerI worker)
206   {
207     Objects.requireNonNull(worker);
208     AlignCalcTask newTask = new AlignCalcTask(worker);
209     synchronized (inProgress)
210     {
211       cancelWorker(worker);
212       inProgress.add(worker);
213       tasks.add(newTask);
214     }
215     notifyQueued(worker);
216     executor.execute(newTask);
217   }
218   
219   @Override
220   public void cancelWorker(AlignCalcWorkerI worker)
221   {
222     if (isWorking(worker)) 
223     {
224       synchronized (tasks) 
225       {
226         Optional<AlignCalcTask> oldTask = tasks.stream()
227             .filter(task -> task.getWorker().equals(worker))
228             .findFirst();
229         if (oldTask.isPresent()) {
230           oldTask.get().cancel(true);
231         }
232       }
233     }
234   }
235
236   @Override
237   public boolean isDisabled(AlignCalcWorkerI worker)
238   {
239     return disabled.contains(worker);
240   }
241
242   @Override
243   public boolean isWorking(AlignCalcWorkerI worker)
244   {
245     return inProgress.contains(worker);
246   }
247
248   @Override
249   public boolean isWorking()
250   {
251     return !inProgress.isEmpty();
252   }
253   
254   @Override
255   public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
256   {
257     synchronized (inProgress)
258     {
259       for (AlignCalcWorkerI worker : inProgress)
260       {
261         if (worker.involves(annot))
262         {
263           return true;
264         }
265       }
266     }
267     return false;
268   }
269
270   private void notifyQueued(AlignCalcWorkerI worker)
271   {
272     for (AlignCalcListener listener : listeners)
273     {
274       listener.workerQueued(worker);
275     }
276   }
277
278   private void notifyStarted(AlignCalcWorkerI worker)
279   {
280     for (AlignCalcListener listener : listeners)
281     {
282       listener.workerStarted(worker);
283     }
284   }
285
286   private void notifyCompleted(AlignCalcWorkerI worker)
287   {
288     for (AlignCalcListener listener : listeners)
289     {
290       try {
291         listener.workerCompleted(worker);
292       } catch (RuntimeException e)
293       {
294         e.printStackTrace();
295       }
296     }
297   }
298
299   private void notifyExceptional(AlignCalcWorkerI worker,
300           Throwable throwable)
301   {
302     for (AlignCalcListener listener : listeners)
303     {
304       try {
305         listener.workerExceptional(worker, throwable);
306       } catch (RuntimeException e)
307       {
308         e.printStackTrace();
309       }
310     }
311   }
312
313   @Override
314   public void addAlignCalcListener(AlignCalcListener listener)
315   {
316     listeners.add(listener);
317   }
318   
319   @Override
320   public void removeAlignCalcListener(AlignCalcListener listener)
321   {
322     listeners.remove(listener);
323   }
324
325 }