}
@Override
+ public WebServiceI getWebService()
+ {
+ return service;
+ }
+
+ @Override
public String getName()
{
return service.getName();
--- /dev/null
+package jalview.ws2.operations;
+
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import jalview.bin.Cache;
+import jalview.ws2.PollableTaskI;
+import jalview.ws2.WSJob;
+import jalview.ws2.WSJobStatus;
+
+public abstract class AbstractPollableWorker
+ extends AbstractWorker implements PollableTaskI
+{
+
+ private Map<Long, Integer> exceptionCount = new HashMap<>();
+
+ private static final int MAX_RETRY = 5;
+
+ @Override
+ public boolean poll()
+ {
+ boolean done = true;
+ for (WSJob job : getJobs())
+ {
+ if (!job.getStatus().isDone() && !job.getStatus().isFailed())
+ {
+ Cache.log.debug(format("Polling job %s.", job));
+ try
+ {
+ getOperation().getWebService().updateProgress(job);
+ exceptionCount.remove(job.getUid());
+ } catch (IOException e)
+ {
+ Cache.log.error(format("Polling job %s failed.", job), e);
+ listeners.firePollException(job, e);
+ int count = exceptionCount.getOrDefault(job.getUid(),
+ MAX_RETRY);
+ if (--count <= 0)
+ {
+ job.setStatus(WSJobStatus.SERVER_ERROR);
+ Cache.log.warn(format(
+ "Attempts limit exceeded. Droping job %s.", job));
+ }
+ exceptionCount.put(job.getUid(), count);
+ } catch (OutOfMemoryError e)
+ {
+ job.setStatus(WSJobStatus.BROKEN);
+ Cache.log.error(
+ format("Out of memory when retrieving job %s", job), e);
+ }
+ Cache.log.debug(
+ format("Job %s status is %s", job, job.getStatus()));
+ }
+ done &= job.getStatus().isDone() || job.getStatus().isFailed();
+ }
+ return done;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ if (getJobs().size() == 0)
+ return false;
+ for (WSJob job : getJobs())
+ {
+ if (!job.getStatus().isDone() && !job.getStatus().isFailed())
+ return false;
+ }
+ return true;
+ }
+
+}
--- /dev/null
+package jalview.ws2.operations;
+
+import jalview.util.MathUtils;
+import jalview.ws2.WSJob;
+
+public abstract class AbstractWorker implements WebServiceWorkerI
+{
+ protected long uid = MathUtils.getUID();
+ public long getUID()
+ {
+ return uid;
+ }
+
+ protected WebServiceWorkerListenersList listeners = new WebServiceWorkerListenersList(this);
+
+ public void addListener(WebServiceWorkerListener listener)
+ {
+ listeners.addListener(listener);
+ }
+}
package jalview.ws2.operations;
import jalview.ws.params.ParamDatastoreI;
+import jalview.ws2.WebServiceI;
import jalview.ws2.gui.MenuEntryProviderI;
/**
public interface Operation
{
/**
+ * Get the web service instance used to communicate with the web client.
+ *
+ * @return web service client instance
+ */
+ public WebServiceI getWebService();
+
+ /**
* Get the name of the operation. Typically fetched from the server.
*
* @return operation name
--- /dev/null
+package jalview.ws2.operations;
+
+import java.util.ArrayList;
+
+import jalview.ws2.WSJob;
+
+public class WSJobList<T extends WSJob> extends ArrayList<T>
+{
+ private static final long serialVersionUID = -1684855135603987602L;
+
+ public int countSubmitted() {
+ int count = 0;
+ for (WSJob job : this) {
+ if (job.getStatus().isSubmitted()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ public int countQueuing() {
+ int count = 0;
+ for (WSJob job : this) {
+ if (job.getStatus().isQueuing()) count++;
+ }
+ return count;
+ }
+
+ public int countRunning() {
+ int count = 0;
+ for (WSJob job : this) {
+ if (job.getStatus().isRunning()) count++;
+ }
+ return count;
+ }
+
+ public int countDone() {
+ int count = 0;
+ for (WSJob job : this) {
+ if (job.getStatus().isDone()) count++;
+ }
+ return count;
+ }
+
+ public int countFailed() {
+ int count = 0;
+ for (WSJob job : this) {
+ if (job.getStatus().isFailed()) count++;
+ }
+ return count;
+ }
+
+ public int countSuccessful() {
+ return countDone() - countFailed() - countCancelled();
+ }
+
+ public int countCancelled() {
+ int count = 0;
+ for (WSJob job : this) {
+ if (job.getStatus().isCancelled()) count++;
+ }
+ return count;
+ }
+}
--- /dev/null
+package jalview.ws2.operations;
+
+import jalview.ws2.WSJob;
+
+public interface WebServiceWorkerI
+{
+ Operation getOperation();
+
+ WSJobList<? extends WSJob> getJobs();
+}
--- /dev/null
+package jalview.ws2.operations;
+
+import jalview.ws2.WSJob;
+
+/**
+ * The listener interface for receiving signals from the
+ * {@link WebServiceWorkerI} about the state changes or new sub-jobs spawned.
+ *
+ * The {@link WebServiceWorkerListener} objects created from that interface are
+ * then registered with a worker objects using
+ * {@link WebServiceWorkerI#addListener} method. When an event occurs, a
+ * relevant method in the listener is invoked with the worker that emitted the
+ * signal as a first parameter and, optionally, followed by additional event
+ * details.
+ *
+ * @author mmwarowny
+ *
+ */
+public interface WebServiceWorkerListener
+{
+ /**
+ * Called when the worker started successfully.
+ *
+ * @param source
+ * worker that emitted the signal
+ */
+ void workerStarted(WebServiceWorkerI source);
+
+ /**
+ * Called when the worker failed to start the jobs.
+ *
+ * @param source
+ * worker that emitted the signal
+ */
+ void workerNotStarted(WebServiceWorkerI source);
+
+ /**
+ * Called when the worker creates a new job
+ *
+ * @param source
+ * worker that emitted the signal
+ * @param job
+ * newly created job
+ */
+ void jobCreated(WebServiceWorkerI source, WSJob job);
+
+ /**
+ * Called when polling the job results in an exception.
+ *
+ * @param source
+ * worker that emitted the signal
+ * @param job
+ * polled job
+ * @param e
+ * exception that occurred
+ */
+ void pollException(WebServiceWorkerI source, WSJob job, Exception e);
+
+ /**
+ * Called when the polling has finished and the finalization process begun.
+ *
+ * @param source
+ * worker that emitted the signal
+ */
+ void workerCompleting(WebServiceWorkerI source);
+
+ /**
+ * Called when the worker completes its work.
+ *
+ * @param source
+ * worker that emitted the signal
+ */
+ void workerCompleted(WebServiceWorkerI source);
+}
--- /dev/null
+package jalview.ws2.operations;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
+
+import jalview.ws2.WSJob;
+
+/**
+ * Utility class that manages a list of {@link WebServiceWorkerListener} and
+ * dispatches signals to them. An instance of this class can be used as a member
+ * field of the {@link WebServiceWorkerI} object to easily store listeners and
+ * delegate signals to them. Firing any signal with this object will invoke a
+ * corresponding method on all registered listeners in order they were
+ * registered.
+ *
+ * @author mmwarowny
+ *
+ */
+public class WebServiceWorkerListenersList
+{
+ private WebServiceWorkerI owner;
+
+ private List<WebServiceWorkerListener> listeners = new CopyOnWriteArrayList<>();
+
+ /**
+ * Constructs a listeners list object with the worker which will be given
+ * to the listeners as the source of the signals.
+ *
+ * @param worker the worker to be given as the source for signals
+ */
+ public WebServiceWorkerListenersList(WebServiceWorkerI worker)
+ {
+ this.owner = worker;
+ }
+
+ /**
+ * Add listener to the listeners list. The listener will be notified of
+ * any signals triggered with this object.
+ *
+ * @param listener listener to add
+ */
+ public void addListener(WebServiceWorkerListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ /**
+ * Remove listener from the lsiteners list. The listener will no longer be
+ * notified of the emitted signals.
+ *
+ * @param listener listener to remove
+ */
+ public void removeListener(WebServiceWorkerListener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ /**
+ * Emit "worker started" signal to all listeners.
+ */
+ public void fireWorkerStarted()
+ {
+ for (var listener : listeners)
+ listener.workerStarted(owner);
+ }
+
+ /**
+ * Emit "worker not started" signal to all listeners.
+ */
+ public void fireWorkerNotStarted()
+ {
+ for (var listener : listeners)
+ listener.workerNotStarted(owner);
+ }
+
+ /**
+ * Emit "job created" signal to all listeners passing the job that has been
+ * created to them.
+ *
+ * @param job newly created job
+ */
+ public void fireJobCreated(WSJob job)
+ {
+ for (var listener : listeners)
+ listener.jobCreated(owner, job);
+ }
+
+ /**
+ * Emit "poll exception" signal to all listener when an exception occurred
+ * during job polling. The job which caused an exception as well as the exception
+ * are passed to all listeners.
+ *
+ * @param job polled job
+ * @param e exception that occurred
+ */
+ public void firePollException(WSJob job, Exception e)
+ {
+ for (var listener : listeners)
+ listener.pollException(owner, job, e);
+ }
+
+ /**
+ * Emit "worker completing" signal to all listeners when the worker starts
+ * collecting and parsing the results.
+ */
+ public void fireWorkerCompleting()
+ {
+ for (var listener : listeners)
+ listener.workerCompleting(owner);
+ }
+
+ /**
+ * Emit "worker completed" signal to all listeners indicating that the worker
+ * finished processing data and finalized the jobs.
+ */
+ public void fireWorkerCompleted()
+ {
+ for (var listener : listeners)
+ listener.workerCompleted(owner);
+ }
+
+ /**
+ * Get the list of registered listeners. The returned list should not be
+ * modified externally and its content may change when workers are added
+ * or removed from the list.
+ * @return
+ */
+ public List<WebServiceWorkerListener> getListeners()
+ {
+ return listeners;
+ }
+
+ /**
+ * Execute an operation for each listener in the listeners list.
+ *
+ * @param consumer listener object consumer
+ */
+ public void forEach(Consumer<WebServiceWorkerListener> consumer)
+ {
+ for (var listener : listeners)
+ consumer.accept(listener);
+ }
+}