1 package jalview.workers;
3 import java.util.HashMap;
6 import java.util.NoSuchElementException;
7 import java.util.Objects;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.Executors;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.ScheduledExecutorService;
12 import java.util.concurrent.TimeUnit;
14 import static java.lang.String.format;
15 import static java.util.Collections.synchronizedMap;
16 import static java.util.Collections.unmodifiableList;
18 import java.util.ArrayList;
20 import jalview.api.AlignCalcListener;
21 import jalview.api.AlignCalcManagerI2;
22 import jalview.api.AlignCalcWorkerI;
23 import jalview.api.PollableAlignCalcWorkerI;
24 import jalview.bin.Cache;
25 import jalview.datamodel.AlignmentAnnotation;
27 public class AlignCalcManager2 implements AlignCalcManagerI2
29 private abstract class WorkerManager
31 static final int IDLE = 0;
32 static final int QUEUED = 1;
33 static final int RUNNING = 2;
34 static final int CANCELLING = 3;
36 protected volatile int state = IDLE;
37 protected volatile boolean enabled = true;
39 protected final AlignCalcWorkerI worker;
41 WorkerManager(AlignCalcWorkerI worker)
46 AlignCalcWorkerI getWorker()
56 void setEnabled(boolean enabled)
58 this.enabled = enabled;
61 synchronized protected void setState(int state)
81 else if (state == QUEUED)
83 // job already queued, do nothing
85 else if (state == RUNNING)
90 else if (state == CANCELLING)
96 protected abstract void submit();
98 abstract void cancel();
102 private class SimpleWorkerManager extends WorkerManager
104 private Future<?> task = null;
106 SimpleWorkerManager(AlignCalcWorkerI worker)
112 protected void submit()
114 if (task != null && !(task.isDone() || task.isCancelled()))
116 throw new IllegalStateException(
117 "Cannot submit new task if the prevoius one is still running");
119 Cache.log.debug(format("Worker %s queued",
120 worker.getClass().getName()));
122 task = executor.submit(() -> {
126 Cache.log.debug(format("Worker %s started",
127 worker.getClass().getName()));
129 Cache.log.debug(format("Worker %s finished",
130 worker.getClass().getName()));
132 catch (InterruptedException e)
134 Cache.log.debug(format("Worker %s interrupted",
135 worker.getClass().getName()));
139 Cache.log.debug(format("Worker %s failed",
140 worker.getClass().getName()), th);
144 // fixme: should not set to idle if another task is already queued for execution
151 synchronized void cancel()
153 if (task == null || state == IDLE || state == CANCELLING)
157 Cache.log.debug(format("Cancelling worker %s",
158 worker.getClass().getName()));
159 setState(CANCELLING);
161 if (task.isCancelled())
169 private class PollableWorkerManager extends WorkerManager
171 private final PollableAlignCalcWorkerI worker;
172 private Future<?> task = null;
174 PollableWorkerManager(PollableAlignCalcWorkerI worker)
177 this.worker = worker;
180 protected void submit()
182 if (task != null && !(task.isDone() || task.isCancelled()))
184 throw new IllegalStateException(
185 "Cannot submit new task if the prevoius one is still running");
187 Cache.log.debug(format("Worker %s queued",
188 worker.getClass().getName()));
190 final var runnable = new Runnable()
192 private boolean started = false;
193 private boolean completed = false;
194 Future<?> future = null;
203 Cache.log.debug(format("Worker %s started",
204 worker.getClass().getName()));
213 Cache.log.debug(format("Worker %s finished",
214 worker.getClass().getName()));
219 } catch (Throwable th)
221 Cache.log.debug(format("Worker %s failed",
222 worker.getClass().getName()), th);
230 future.cancel(false);
232 catch (NullPointerException ignored)
234 // extremely unlikely to happen
239 runnable.future = task = executor.scheduleWithFixedDelay(
240 runnable, 10, 1000, TimeUnit.MILLISECONDS);
243 synchronized protected void cancel()
245 if (task == null || state == IDLE || state == CANCELLING)
249 Cache.log.debug(format("Cancelling worker %s",
250 worker.getClass().getName()));
251 setState(CANCELLING);
253 if (task.isCancelled())
257 executor.submit(() -> {
264 private final ScheduledExecutorService executor =
265 Executors.newSingleThreadScheduledExecutor();
266 private final Map<AlignCalcWorkerI, WorkerManager> registered =
267 synchronizedMap(new HashMap<>());
269 private final List<AlignCalcListener> listeners =
270 new CopyOnWriteArrayList<>();
274 public void registerWorker(AlignCalcWorkerI worker)
276 Objects.requireNonNull(worker);
277 WorkerManager manager = (worker instanceof PollableAlignCalcWorkerI) ?
278 new PollableWorkerManager((PollableAlignCalcWorkerI) worker) :
279 new SimpleWorkerManager(worker);
280 registered.putIfAbsent(worker, manager);
285 public List<AlignCalcWorkerI> getWorkers()
287 return List.copyOf(registered.keySet());
291 public List<AlignCalcWorkerI> getWorkersOfClass(
292 Class<? extends AlignCalcWorkerI> cls)
294 List<AlignCalcWorkerI> collected = new ArrayList<>();
295 for (var worker : getWorkers())
297 if (worker.getClass().equals(cls))
299 collected.add(worker);
302 return unmodifiableList(collected);
306 public void removeWorker(AlignCalcWorkerI worker)
308 registered.remove(worker);
312 public void removeWorkerForAnnotation(AlignmentAnnotation annot)
314 synchronized (registered)
316 for (var worker : getWorkers())
318 if (worker.involves(annot) && worker.isDeletable())
320 removeWorker(worker);
327 public void removeWorkersOfClass(Class<? extends AlignCalcWorkerI> cls)
329 synchronized (registered)
331 for (var worker : getWorkers())
333 if (worker.getClass().equals(cls))
335 removeWorker(worker);
342 public void disableWorker(AlignCalcWorkerI worker)
344 // Null pointer check might be needed
345 registered.get(worker).setEnabled(false);
349 public void enableWorker(AlignCalcWorkerI worker)
351 // Null pointer check might be needed
352 registered.get(worker).setEnabled(true);
356 public boolean isDisabled(AlignCalcWorkerI worker)
358 if (registered.containsKey(worker))
360 return !registered.get(worker).isEnabled();
369 public boolean isWorking(AlignCalcWorkerI worker)
371 if (!registered.containsKey(worker))
377 return registered.get(worker).getState() == WorkerManager.RUNNING;
382 public boolean isWorkingWithAnnotation(AlignmentAnnotation annot)
384 synchronized (registered)
386 for (var entry : registered.entrySet())
388 if (entry.getKey().involves(annot) &&
389 entry.getValue().getState() == WorkerManager.RUNNING)
399 public boolean isWorking()
401 synchronized (registered)
403 for (var manager : registered.values())
405 if (manager.getState() == WorkerManager.RUNNING)
415 public void startWorker(AlignCalcWorkerI worker)
417 Objects.requireNonNull(worker);
418 var manager = registered.get(worker);
421 throw new NoSuchElementException();
427 public void restartWorkers()
429 synchronized (registered)
431 for (var manager : registered.values())
439 public void cancelWorker(AlignCalcWorkerI worker)
441 Objects.requireNonNull(worker);
442 var manager = registered.get(worker);
445 throw new NoSuchElementException();
450 private void notifyQueued(AlignCalcWorkerI worker)
452 for (AlignCalcListener listener : listeners)
454 listener.workerQueued(worker);
458 private void notifyStarted(AlignCalcWorkerI worker)
460 for (AlignCalcListener listener : listeners)
462 listener.workerStarted(worker);
466 private void notifyCompleted(AlignCalcWorkerI worker)
468 for (AlignCalcListener listener : listeners)
472 listener.workerCompleted(worker);
473 } catch (RuntimeException e)
480 private void notifyCancelled(AlignCalcWorkerI worker)
482 for (AlignCalcListener listener : listeners)
486 listener.workerCancelled(worker);
487 } catch (RuntimeException e)
494 private void notifyExceptional(AlignCalcWorkerI worker,
497 for (AlignCalcListener listener : listeners)
501 listener.workerExceptional(worker, throwable);
502 } catch (RuntimeException e)
510 public void addAlignCalcListener(AlignCalcListener listener)
512 listeners.add(listener);
516 public void removeAlignCalcListener(AlignCalcListener listener)
518 listeners.remove(listener);
522 public void shutdown()
524 executor.shutdownNow();