From: Mateusz Warowny Date: Fri, 19 Nov 2021 16:51:40 +0000 (+0100) Subject: JAL-3878 Create abstract workers and their utility classes. X-Git-Url: http://source.jalview.org/gitweb/?a=commitdiff_plain;h=d9764d4ad24b07e7405cffded0aa45503a784856;p=jalview.git JAL-3878 Create abstract workers and their utility classes. --- diff --git a/src/jalview/ws2/operations/AbstractOperation.java b/src/jalview/ws2/operations/AbstractOperation.java index 36a86e3..f3a6e16 100644 --- a/src/jalview/ws2/operations/AbstractOperation.java +++ b/src/jalview/ws2/operations/AbstractOperation.java @@ -32,6 +32,12 @@ public abstract class AbstractOperation implements Operation } @Override + public WebServiceI getWebService() + { + return service; + } + + @Override public String getName() { return service.getName(); diff --git a/src/jalview/ws2/operations/AbstractPollableWorker.java b/src/jalview/ws2/operations/AbstractPollableWorker.java new file mode 100644 index 0000000..973199e --- /dev/null +++ b/src/jalview/ws2/operations/AbstractPollableWorker.java @@ -0,0 +1,75 @@ +package jalview.ws2.operations; + +import static java.lang.String.format; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import jalview.bin.Cache; +import jalview.ws2.PollableTaskI; +import jalview.ws2.WSJob; +import jalview.ws2.WSJobStatus; + +public abstract class AbstractPollableWorker + extends AbstractWorker implements PollableTaskI +{ + + private Map exceptionCount = new HashMap<>(); + + private static final int MAX_RETRY = 5; + + @Override + public boolean poll() + { + boolean done = true; + for (WSJob job : getJobs()) + { + if (!job.getStatus().isDone() && !job.getStatus().isFailed()) + { + Cache.log.debug(format("Polling job %s.", job)); + try + { + getOperation().getWebService().updateProgress(job); + exceptionCount.remove(job.getUid()); + } catch (IOException e) + { + Cache.log.error(format("Polling job %s failed.", job), e); + listeners.firePollException(job, e); + int count = exceptionCount.getOrDefault(job.getUid(), + MAX_RETRY); + if (--count <= 0) + { + job.setStatus(WSJobStatus.SERVER_ERROR); + Cache.log.warn(format( + "Attempts limit exceeded. Droping job %s.", job)); + } + exceptionCount.put(job.getUid(), count); + } catch (OutOfMemoryError e) + { + job.setStatus(WSJobStatus.BROKEN); + Cache.log.error( + format("Out of memory when retrieving job %s", job), e); + } + Cache.log.debug( + format("Job %s status is %s", job, job.getStatus())); + } + done &= job.getStatus().isDone() || job.getStatus().isFailed(); + } + return done; + } + + @Override + public boolean isDone() + { + if (getJobs().size() == 0) + return false; + for (WSJob job : getJobs()) + { + if (!job.getStatus().isDone() && !job.getStatus().isFailed()) + return false; + } + return true; + } + +} diff --git a/src/jalview/ws2/operations/AbstractWorker.java b/src/jalview/ws2/operations/AbstractWorker.java new file mode 100644 index 0000000..926b7ce --- /dev/null +++ b/src/jalview/ws2/operations/AbstractWorker.java @@ -0,0 +1,20 @@ +package jalview.ws2.operations; + +import jalview.util.MathUtils; +import jalview.ws2.WSJob; + +public abstract class AbstractWorker implements WebServiceWorkerI +{ + protected long uid = MathUtils.getUID(); + public long getUID() + { + return uid; + } + + protected WebServiceWorkerListenersList listeners = new WebServiceWorkerListenersList(this); + + public void addListener(WebServiceWorkerListener listener) + { + listeners.addListener(listener); + } +} diff --git a/src/jalview/ws2/operations/Operation.java b/src/jalview/ws2/operations/Operation.java index 8b432ba..5cf2cb5 100644 --- a/src/jalview/ws2/operations/Operation.java +++ b/src/jalview/ws2/operations/Operation.java @@ -1,6 +1,7 @@ package jalview.ws2.operations; import jalview.ws.params.ParamDatastoreI; +import jalview.ws2.WebServiceI; import jalview.ws2.gui.MenuEntryProviderI; /** @@ -18,6 +19,13 @@ import jalview.ws2.gui.MenuEntryProviderI; public interface Operation { /** + * Get the web service instance used to communicate with the web client. + * + * @return web service client instance + */ + public WebServiceI getWebService(); + + /** * Get the name of the operation. Typically fetched from the server. * * @return operation name diff --git a/src/jalview/ws2/operations/WSJobList.java b/src/jalview/ws2/operations/WSJobList.java new file mode 100644 index 0000000..76bc015 --- /dev/null +++ b/src/jalview/ws2/operations/WSJobList.java @@ -0,0 +1,64 @@ +package jalview.ws2.operations; + +import java.util.ArrayList; + +import jalview.ws2.WSJob; + +public class WSJobList extends ArrayList +{ + private static final long serialVersionUID = -1684855135603987602L; + + public int countSubmitted() { + int count = 0; + for (WSJob job : this) { + if (job.getStatus().isSubmitted()) { + count++; + } + } + return count; + } + + public int countQueuing() { + int count = 0; + for (WSJob job : this) { + if (job.getStatus().isQueuing()) count++; + } + return count; + } + + public int countRunning() { + int count = 0; + for (WSJob job : this) { + if (job.getStatus().isRunning()) count++; + } + return count; + } + + public int countDone() { + int count = 0; + for (WSJob job : this) { + if (job.getStatus().isDone()) count++; + } + return count; + } + + public int countFailed() { + int count = 0; + for (WSJob job : this) { + if (job.getStatus().isFailed()) count++; + } + return count; + } + + public int countSuccessful() { + return countDone() - countFailed() - countCancelled(); + } + + public int countCancelled() { + int count = 0; + for (WSJob job : this) { + if (job.getStatus().isCancelled()) count++; + } + return count; + } +} diff --git a/src/jalview/ws2/operations/WebServiceWorkerI.java b/src/jalview/ws2/operations/WebServiceWorkerI.java new file mode 100644 index 0000000..5ec7e97 --- /dev/null +++ b/src/jalview/ws2/operations/WebServiceWorkerI.java @@ -0,0 +1,10 @@ +package jalview.ws2.operations; + +import jalview.ws2.WSJob; + +public interface WebServiceWorkerI +{ + Operation getOperation(); + + WSJobList getJobs(); +} diff --git a/src/jalview/ws2/operations/WebServiceWorkerListener.java b/src/jalview/ws2/operations/WebServiceWorkerListener.java new file mode 100644 index 0000000..1044284 --- /dev/null +++ b/src/jalview/ws2/operations/WebServiceWorkerListener.java @@ -0,0 +1,74 @@ +package jalview.ws2.operations; + +import jalview.ws2.WSJob; + +/** + * The listener interface for receiving signals from the + * {@link WebServiceWorkerI} about the state changes or new sub-jobs spawned. + * + * The {@link WebServiceWorkerListener} objects created from that interface are + * then registered with a worker objects using + * {@link WebServiceWorkerI#addListener} method. When an event occurs, a + * relevant method in the listener is invoked with the worker that emitted the + * signal as a first parameter and, optionally, followed by additional event + * details. + * + * @author mmwarowny + * + */ +public interface WebServiceWorkerListener +{ + /** + * Called when the worker started successfully. + * + * @param source + * worker that emitted the signal + */ + void workerStarted(WebServiceWorkerI source); + + /** + * Called when the worker failed to start the jobs. + * + * @param source + * worker that emitted the signal + */ + void workerNotStarted(WebServiceWorkerI source); + + /** + * Called when the worker creates a new job + * + * @param source + * worker that emitted the signal + * @param job + * newly created job + */ + void jobCreated(WebServiceWorkerI source, WSJob job); + + /** + * Called when polling the job results in an exception. + * + * @param source + * worker that emitted the signal + * @param job + * polled job + * @param e + * exception that occurred + */ + void pollException(WebServiceWorkerI source, WSJob job, Exception e); + + /** + * Called when the polling has finished and the finalization process begun. + * + * @param source + * worker that emitted the signal + */ + void workerCompleting(WebServiceWorkerI source); + + /** + * Called when the worker completes its work. + * + * @param source + * worker that emitted the signal + */ + void workerCompleted(WebServiceWorkerI source); +} diff --git a/src/jalview/ws2/operations/WebServiceWorkerListenersList.java b/src/jalview/ws2/operations/WebServiceWorkerListenersList.java new file mode 100644 index 0000000..acfffb3 --- /dev/null +++ b/src/jalview/ws2/operations/WebServiceWorkerListenersList.java @@ -0,0 +1,144 @@ +package jalview.ws2.operations; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; + +import jalview.ws2.WSJob; + +/** + * Utility class that manages a list of {@link WebServiceWorkerListener} and + * dispatches signals to them. An instance of this class can be used as a member + * field of the {@link WebServiceWorkerI} object to easily store listeners and + * delegate signals to them. Firing any signal with this object will invoke a + * corresponding method on all registered listeners in order they were + * registered. + * + * @author mmwarowny + * + */ +public class WebServiceWorkerListenersList +{ + private WebServiceWorkerI owner; + + private List listeners = new CopyOnWriteArrayList<>(); + + /** + * Constructs a listeners list object with the worker which will be given + * to the listeners as the source of the signals. + * + * @param worker the worker to be given as the source for signals + */ + public WebServiceWorkerListenersList(WebServiceWorkerI worker) + { + this.owner = worker; + } + + /** + * Add listener to the listeners list. The listener will be notified of + * any signals triggered with this object. + * + * @param listener listener to add + */ + public void addListener(WebServiceWorkerListener listener) + { + listeners.add(listener); + } + + /** + * Remove listener from the lsiteners list. The listener will no longer be + * notified of the emitted signals. + * + * @param listener listener to remove + */ + public void removeListener(WebServiceWorkerListener listener) + { + listeners.remove(listener); + } + + /** + * Emit "worker started" signal to all listeners. + */ + public void fireWorkerStarted() + { + for (var listener : listeners) + listener.workerStarted(owner); + } + + /** + * Emit "worker not started" signal to all listeners. + */ + public void fireWorkerNotStarted() + { + for (var listener : listeners) + listener.workerNotStarted(owner); + } + + /** + * Emit "job created" signal to all listeners passing the job that has been + * created to them. + * + * @param job newly created job + */ + public void fireJobCreated(WSJob job) + { + for (var listener : listeners) + listener.jobCreated(owner, job); + } + + /** + * Emit "poll exception" signal to all listener when an exception occurred + * during job polling. The job which caused an exception as well as the exception + * are passed to all listeners. + * + * @param job polled job + * @param e exception that occurred + */ + public void firePollException(WSJob job, Exception e) + { + for (var listener : listeners) + listener.pollException(owner, job, e); + } + + /** + * Emit "worker completing" signal to all listeners when the worker starts + * collecting and parsing the results. + */ + public void fireWorkerCompleting() + { + for (var listener : listeners) + listener.workerCompleting(owner); + } + + /** + * Emit "worker completed" signal to all listeners indicating that the worker + * finished processing data and finalized the jobs. + */ + public void fireWorkerCompleted() + { + for (var listener : listeners) + listener.workerCompleted(owner); + } + + /** + * Get the list of registered listeners. The returned list should not be + * modified externally and its content may change when workers are added + * or removed from the list. + * @return + */ + public List getListeners() + { + return listeners; + } + + /** + * Execute an operation for each listener in the listeners list. + * + * @param consumer listener object consumer + */ + public void forEach(Consumer consumer) + { + for (var listener : listeners) + consumer.accept(listener); + } +}