From dfe85e81db67f4fa98b00be50fd3ec6f250977bd Mon Sep 17 00:00:00 2001 From: Mateusz Warowny Date: Fri, 19 Nov 2021 16:36:02 +0100 Subject: [PATCH] JAL-3878 Create pollable task and executor it can be run with. --- src/jalview/viewmodel/AlignmentViewport.java | 10 ++ src/jalview/ws2/PollableTaskI.java | 51 ++++++++ src/jalview/ws2/PollableTaskListenerI.java | 57 +++++++++ src/jalview/ws2/PollingTaskExecutor.java | 164 ++++++++++++++++++++++++++ 4 files changed, 282 insertions(+) create mode 100644 src/jalview/ws2/PollableTaskI.java create mode 100644 src/jalview/ws2/PollableTaskListenerI.java create mode 100644 src/jalview/ws2/PollingTaskExecutor.java diff --git a/src/jalview/viewmodel/AlignmentViewport.java b/src/jalview/viewmodel/AlignmentViewport.java index 1a08b1b..0cae401 100644 --- a/src/jalview/viewmodel/AlignmentViewport.java +++ b/src/jalview/viewmodel/AlignmentViewport.java @@ -62,6 +62,7 @@ import jalview.workers.ComplementConsensusThread; import jalview.workers.ConsensusThread; import jalview.workers.InformationThread; import jalview.workers.StrucConsensusThread; +import jalview.ws2.PollingTaskExecutor; import java.awt.Color; import java.beans.PropertyChangeSupport; @@ -994,6 +995,13 @@ public abstract class AlignmentViewport } return false; } + + private PollingTaskExecutor wsExecutor = new PollingTaskExecutor(); + + public PollingTaskExecutor getWSExecutor() + { + return wsExecutor; + } public void setAlignment(AlignmentI align) { @@ -1024,6 +1032,8 @@ public abstract class AlignmentViewport gapcounts = null; calculator.shutdown(); calculator = null; + wsExecutor.shutdown(); + wsExecutor = null; residueShading = null; // may hold a reference to Consensus changeSupport = null; ranges = null; diff --git a/src/jalview/ws2/PollableTaskI.java b/src/jalview/ws2/PollableTaskI.java new file mode 100644 index 0000000..d097974 --- /dev/null +++ b/src/jalview/ws2/PollableTaskI.java @@ -0,0 +1,51 @@ +package jalview.ws2; + +/** + * The {@code PollableTaskI} interface should be implemented by classes + * representing a background task that must be polled repeatedly to check for + * completion. Those are typically jobs that run on a remote host and need to be + * periodically checked for status updates. + * + * The life-cycle of a task consist of calling {@link #start} method once to + * start the process, followed by repeated calls to {@link #poll} that should + * check for execution status and finally {@link #done} method that finalizes + * the process. + * + * The instances can be started with {@link PollingTaskExecutor} which manages + * start up, polling and finalization of the task using a thread executor. + * + * @author mmwarowny + * + */ +public interface PollableTaskI +{ + /** + * Called by the executor once and the beginning to start the task. May throw + * any exception, in such case the task will be interrupted. + * + * @throws Exception + */ + void start() throws Exception; + + /** + * Called repeatedly by the executor to check for task completion. The + * implementation should check the remote host for job status updates and + * return true when the task is finished. If any exception is thrown, the task + * is interrupted. + * + * @return whether the task is done + * @throws Exception + */ + boolean poll() throws Exception; + + /** + * @return whether the task is done + */ + boolean isDone(); + + /** + * Called once the task is done running ({@link #poll} returned true) to + * finalize the task and collect the results. + */ + void done(); +} diff --git a/src/jalview/ws2/PollableTaskListenerI.java b/src/jalview/ws2/PollableTaskListenerI.java new file mode 100644 index 0000000..d69b2d9 --- /dev/null +++ b/src/jalview/ws2/PollableTaskListenerI.java @@ -0,0 +1,57 @@ +package jalview.ws2; + +/** + * Classes listening to the pollable task events must implement + * {@link PollableTaskListenerI}. They can be added to the + * {@link PollingTaskExecutor} to respond to the task execution events. + * + * @author mmwarowny + * + */ +public interface PollableTaskListenerI +{ + /** + * Called when a new task is submitted for execution after its + * {@link PollableTask#start} method was called successfully. + * + * @param task + * submitted task + */ + public void submitted(PollableTaskI task); + + /** + * Called when a new task failed to start and raised an uncaught exception. + * + * @param task + * task that failed + * @param e + * raised exception + */ + public void submissionFailed(PollableTaskI task, Exception e); + + /** + * Called when polling resulted in an uncaught exception. + * + * @param task + * task that failed + * @param e + * raised exception + */ + public void pollFailed(PollableTaskI task, Exception e); + + /** + * Called when a task is cancelled. + * + * @param task + * cancelled task + */ + public void cancelled(PollableTaskI task); + + /** + * Called when the task finished execution either successfully or not. + * + * @param task + * finished task + */ + public void done(PollableTaskI task); +} diff --git a/src/jalview/ws2/PollingTaskExecutor.java b/src/jalview/ws2/PollingTaskExecutor.java new file mode 100644 index 0000000..c003e49 --- /dev/null +++ b/src/jalview/ws2/PollingTaskExecutor.java @@ -0,0 +1,164 @@ +package jalview.ws2; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import jalview.bin.Cache; + +/** + * An object that executes submitted {@link PollableTaskI} tasks using + * {@link SchedulekExecutorservice}. The task is first started using its + * {@link PollableTaskI#start} method and then repeatedly polled every second + * with {@link PollableTaskI#poll}. + * + * The {@link PollingTaskExecutor} automates the process of running tasks and + * provides convenient interface to listen to events associated with task + * execution. + * + * @author mmwarowny + * + */ +public class PollingTaskExecutor +{ + private ScheduledExecutorService executor = Executors + .newSingleThreadScheduledExecutor(); + + /** + * Submit the task for execution. Calls task's {@code start} method and, if + * started successfully, schedules next poll after one second. + * + * @param task + * task to submit + */ + public void submit(final PollableTaskI task) + { + executor.submit(() -> { + try + { + task.start(); + wsThreadSupport.submitted(task); + } catch (Exception e) + { + Cache.log.error("Failed to submit web service jobs.", e); + wsThreadSupport.submissionFailed(task, e); + return; + } + executor.schedule(() -> poll(task), 1, TimeUnit.SECONDS); + }); + } + + /** + * Poll the task by calling it's {@code poll} method. If not finished, the + * next poll is scheduled to happen after one second, otherwise task's + * {@code done} method is called immediately. + * + * @param task + * task to poll + */ + private void poll(PollableTaskI task) + { + boolean done; + try + { + done = task.poll(); + } catch (Exception e) + { + Cache.log.error("Failed to poll task.", e); + wsThreadSupport.pollFailed(task, e); + return; + } + if (!done) + { + executor.schedule(() -> poll(task), 1, TimeUnit.SECONDS); + } + else + { + task.done(); + wsThreadSupport.done(task); + } + } + + private WebServiceThreadSupport wsThreadSupport = new WebServiceThreadSupport(); + + /** + * Add listener of the task related events. + * + * @param listener + * listener to add + */ + public void addThreadListener(PollableTaskListenerI listener) + { + wsThreadSupport.addListener(listener); + } + + /** + * @param listener + * listener to be removed + */ + public void removeThreadListener(PollableTaskListenerI listener) + { + wsThreadSupport.removeListener(listener); + } + + public void shutdown() + { + executor.shutdownNow(); + } + +} + +class WebServiceThreadSupport implements PollableTaskListenerI +{ + List listeners = new CopyOnWriteArrayList<>(); + + @Override + public void submitted(PollableTaskI task) + { + for (var listener : listeners) + listener.submitted(task); + } + + @Override + public void submissionFailed(PollableTaskI task, Exception e) + { + for (var listener : listeners) + listener.submissionFailed(task, e); + } + + @Override + public void pollFailed(PollableTaskI task, Exception e) + { + for (var listener : listeners) + listener.pollFailed(task, e); + } + + @Override + public void cancelled(PollableTaskI task) + { + for (var listener : listeners) + listener.cancelled(task); + } + + @Override + public void done(PollableTaskI task) + { + for (var listener : listeners) + listener.done(task); + } + + public void addListener(PollableTaskListenerI listener) + { + if (!listeners.contains(listener)) + { + listeners.add(listener); + } + } + + public void removeListener(PollableTaskListenerI listener) + { + listeners.remove(listener); + } +} -- 1.7.10.2