From: Mateusz Warowny Date: Thu, 3 Mar 2022 13:05:37 +0000 (+0100) Subject: JAL-3878 Create abstract base classes for non-interactive tasks X-Git-Url: http://source.jalview.org/gitweb/?a=commitdiff_plain;h=e0b3ba626fc64b59266c72089eb517a3e4559ee7;p=jalview.git JAL-3878 Create abstract base classes for non-interactive tasks --- diff --git a/src/jalview/ws2/actions/AbstractPollableTask.java b/src/jalview/ws2/actions/AbstractPollableTask.java new file mode 100644 index 0000000..d1db921 --- /dev/null +++ b/src/jalview/ws2/actions/AbstractPollableTask.java @@ -0,0 +1,373 @@ +package jalview.ws2.actions; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import jalview.bin.Cache; +import jalview.ws.params.ArgumentI; +import jalview.ws2.actions.api.TaskEventListener; +import jalview.ws2.actions.api.TaskI; +import jalview.ws2.api.Credentials; +import jalview.ws2.api.JobStatus; +import jalview.ws2.api.WebServiceJobHandle; +import jalview.ws2.client.api.WebServiceClientI; +import jalview.ws2.helpers.DelegateJobEventListener; +import jalview.ws2.helpers.TaskEventSupport; +import static java.lang.String.format; + +/** + * An abstract base class for non-interactive tasks which implements common + * tasks methods. Additionally, it manages task execution in a polling loop. + * Subclasses are only required to implement {@link #prepare()} and + * {@link #done()} methods. + * + * @author mmwarowny + * + * @param + * the type of jobs managed by the task + * @param + * the type of result provided by the task + */ +public abstract class AbstractPollableTask implements TaskI +{ + protected final WebServiceClientI client; + + protected final List args; + + protected final Credentials credentials; + + private final TaskEventSupport eventHandler; + + protected JobStatus taskStatus = null; + + private Future future = null; + + protected List jobs = Collections.emptyList(); + + protected R result; + + protected AbstractPollableTask(WebServiceClientI client, List args, + Credentials credentials, TaskEventListener eventListener) + { + this.client = client; + this.args = args; + this.credentials = credentials; + this.eventHandler = new TaskEventSupport(this, eventListener); + } + + /** + * Start the task using provided scheduled executor service. It creates a + * polling loop running at set intervals. + * + * @param executor + * executor to run the polling loop with + */ + public void start(ScheduledExecutorService executor) + { + if (future != null) + throw new IllegalStateException("task already started"); + var runnable = new Runnable() + { + private int stage = STAGE_PREPARE; + + private static final int STAGE_PREPARE = 0; + + private static final int STAGE_START = 1; + + private static final int STAGE_POLL = 2; + + private static final int STAGE_FINALIZE = 3; + + private static final int STAGE_DONE = 4; + + private int retryCount = 0; + + private static final int MAX_RETRY = 5; + + /** + * A polling loop run periodically which carries the task through its + * consecutive execution stages. + */ + @Override + public void run() + { + if (stage == STAGE_PREPARE) + { + // first stage - the input data is collected and the jobs are created + try + { + jobs = prepare(); + } catch (ServiceInputInvalidException e) + { + stage = STAGE_DONE; + setStatus(JobStatus.INVALID); + eventHandler.fireTaskException(e); + throw new CompletionException(e); + } + stage = STAGE_START; + setStatus(JobStatus.READY); + eventHandler.fireTaskStarted(jobs); + var jobListener = new DelegateJobEventListener<>(eventHandler); + for (var job : jobs) + { + job.addPropertyChagneListener(jobListener); + } + } + try + { + if (stage == STAGE_START) + { + // second stage - jobs are submitted to the server + startJobs(); + stage = STAGE_POLL; + setStatus(JobStatus.SUBMITTED); + } + if (stage == STAGE_POLL) + { + // third stage - jobs are poolled until all of them are completed + if (pollJobs()) + { + stage = STAGE_FINALIZE; + } + updateGlobalStatus(); + } + if (stage == STAGE_FINALIZE) + { + // final stage - results are collected and stored + result = done(); + eventHandler.fireTaskCompleted(result); + stage = STAGE_DONE; + } + retryCount = 0; + } catch (IOException e) + { + eventHandler.fireTaskException(e); + if (++retryCount > MAX_RETRY) + { + stage = STAGE_DONE; + cancelJobs(); + setStatus(JobStatus.SERVER_ERROR); + throw new CompletionException(e); + } + } + if (stage == STAGE_DONE) + { + // finalization - terminating the future task + throw new CancellationException("task terminated"); + } + } + }; + if (taskStatus != JobStatus.CANCELLED) + future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS); + } + + @Override + public JobStatus getStatus() + { + return taskStatus; + } + + /** + * Set the status of the task and notify the event handler. + * + * @param status + * new task status + */ + protected void setStatus(JobStatus status) + { + if (this.taskStatus != status) + { + this.taskStatus = status; + eventHandler.fireTaskStatusChanged(status); + } + } + + /** + * Update task status according to the overall status of its jobs. The rules + * of setting the status are following: + *
    + *
  • task is invalid if all jobs are invalid
  • + *
  • task is completed if all but invalid jobs are completed
  • + *
  • task is ready, submitted or queued if at least one job is ready, + * submitted or queued an none proceeded to the next stage excluding + * completed.
  • + *
  • task is running if at least one job is running and none are failed or + * cancelled
  • + *
  • task is cancelled if at least one job is cancelled and none failed
  • + *
  • task is failed or server error if at least one job is failed or server + * error
  • + *
+ */ + private void updateGlobalStatus() + { + JobStatus newStatus = taskStatus; + int currentPrecedence = getPrecedenceOf(newStatus); + for (BaseJob job : jobs) + { + JobStatus status = job.getStatus(); + int jobPrecedence = getPrecedenceOf(status); + if (currentPrecedence < jobPrecedence) + { + currentPrecedence = jobPrecedence; + newStatus = status; + } + } + setStatus(newStatus); + } + + /** + * A precedence order of job statuses used to compute the overall task status. + */ + private static JobStatus[] statusPrecedence = { + JobStatus.INVALID, // all must be invalid for task to be invalid + JobStatus.COMPLETED, // all but invalid must be completed for task to be + // completed + JobStatus.UNKNOWN, // unknown prevents successful completion but not + // running or failure + JobStatus.READY, + JobStatus.SUBMITTED, + JobStatus.QUEUED, + JobStatus.RUNNING, + JobStatus.CANCELLED, // if any is terminated unsuccessfully, the task is + // failed + JobStatus.FAILED, + JobStatus.SERVER_ERROR + }; + + /** + * @param status + * status to find the precedence of + * @return precedence of the status + */ + private static int getPrecedenceOf(JobStatus status) + { + final int len = statusPrecedence.length; + for (int i = 0; i < len; i++) + { + if (statusPrecedence[i] == status) + { + return i; + } + } + return -1; + } + + @Override + public void cancel() + { + setStatus(JobStatus.CANCELLED); + if (future != null) + future.cancel(false); + cancelJobs(); + } + + @Override + public List getSubJobs() + { + return jobs; + } + + /** + * Collect and process input sequences for submission and return the list of + * jobs to be submitted. + * + * @return list of jobs to be submitted + * @throws ServiceInputInvalidException + * input is invalid and the task should not be started + */ + protected abstract List prepare() throws ServiceInputInvalidException; + + /** + * Submit all valid jobs to the server and store their job handles. + * + * @throws IOException + * if server error occurred + */ + protected void startJobs() throws IOException + { + for (BaseJob job : jobs) + { + if (job.isInputValid() && job.getStatus() == JobStatus.READY) + { + WebServiceJobHandle serverJob = client.submit(job.getInputSequences(), + args, credentials); + job.setServerJob(serverJob); + job.setStatus(JobStatus.SUBMITTED); + } + } + } + + /** + * Poll all running jobs and update their status and logs. Polling is repeated + * periodically until this method return true when all jobs are done. + * + * @return {@code true] if all jobs are done @throws IOException if server + * error occurred + */ + protected boolean pollJobs() throws IOException + { + boolean allDone = true; + for (BaseJob job : jobs) + { + if (job.isInputValid() && !job.getStatus().isDone()) + { + WebServiceJobHandle serverJob = job.getServerJob(); + job.setStatus(client.getStatus(serverJob)); + job.setLog(client.getLog(serverJob)); + job.setErrorLog(client.getErrorLog(serverJob)); + } + allDone &= job.isCompleted(); + } + return allDone; + } + + /** + * Fetch and process the outputs produced by jobs and return the final result + * of the task. The method is called once all jobs have finished execution. If + * this method raises {@link IOException} it will be called again after a + * delay. All IO operations should happen before data processing, so + * potentially expensive computation is avoided in case of an error. + * + * @return final result of the computation + * @throws IOException + * if server error occurred + */ + protected abstract R done() throws IOException; + + /** + * Cancel all running jobs. Used in case of task failure to cleanup the + * resources or when the task has been cancelled. + */ + protected void cancelJobs() + { + for (BaseJob job : jobs) + { + if (!job.isCompleted()) + { + try + { + if (job.getServerJob() != null) + { + client.cancel(job.getServerJob()); + } + job.setStatus(JobStatus.CANCELLED); + } catch (IOException e) + { + Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e); + } + } + } + } + + @Override + public R getResult() + { + return result; + } +} diff --git a/src/jalview/ws2/actions/BaseJob.java b/src/jalview/ws2/actions/BaseJob.java new file mode 100644 index 0000000..906b625 --- /dev/null +++ b/src/jalview/ws2/actions/BaseJob.java @@ -0,0 +1,193 @@ +package jalview.ws2.actions; + +import java.beans.PropertyChangeListener; +import java.beans.PropertyChangeSupport; +import java.util.List; + +import jalview.datamodel.SequenceI; +import jalview.util.MathUtils; +import jalview.ws2.actions.api.JobI; +import jalview.ws2.actions.api.TaskEventListener; +import jalview.ws2.api.JobStatus; +import jalview.ws2.api.WebServiceJobHandle; + +/** + * Basic implementation of the {@link JobI} interface which stores internal job + * id, status, log and error log and provides getters to those fields. + * Additionally, it stores sequences that will be submitted as job input and the + * handle to the job on the server. Extending classes can add extra fields in + * order to associate additional data with the job. + * + * Observers can be registered with this bean-like object to listen to changes + * to {@code status}, {@code log}, and {@code errorLog} properties. Typically, + * the events are delegated to the {@link TaskEventListener} objects observing + * the task that created this job. + * + * @author mmwarowny + */ +public abstract class BaseJob implements JobI +{ + protected final long internalId = MathUtils.getUID(); + + protected final List inputSeqs; + + protected JobStatus status = null; + + protected String log = ""; + + protected String errorLog = ""; + + /* FIXME: server job is not specific to the BaseJob and should preferably + * be managed by classes using clients (tasks). */ + protected WebServiceJobHandle serverJob; + + public BaseJob(List inputSeqs) + { + this.inputSeqs = inputSeqs; + } + + /** + * {@inheritDoc} + */ + @Override + public final long getInternalId() + { + return internalId; + } + + /** + * Return the list of input sequences associated with this job. + * + * @return input sequences + */ + public List getInputSequences() + { + return inputSeqs; + } + + /** + * Check if inputs make a valid job. + * + * @return {@code true} if the input is valid. + */ + public abstract boolean isInputValid(); + + /** + * Check if the job is completed, This includes jobs with invalid input, + * successful and unsuccessful termination. + * + * @return {@code true} if job is completed successfully or not + */ + public boolean isCompleted() + { + return !isInputValid() || getStatus().isDone(); + } + + @Override + public final JobStatus getStatus() + { + return status; + } + + /** + * Set new status of the job and notify listeners of the change. Job status is + * managed internally by tasks and should not be modified outside the task + * which created this job. + * + * @param status + * new job status + */ + public final void setStatus(JobStatus status) + { + JobStatus oldStatus = this.status; + this.status = status; + pcs.firePropertyChange("status", oldStatus, status); + } + + @Override + public final String getLog() + { + return log; + } + + /** + * Set log text and notify listeners of the change. Log is managed by tasks + * which created the job and should not be modified by other classes. + * + * @param log + * new log + */ + final void setLog(String log) + { + String oldLog = this.log; + this.log = log; + pcs.firePropertyChange("log", oldLog, log); + } + + @Override + public final String getErrorLog() + { + return errorLog; + } + + /** + * Set error log text and notify listeners of the change. Error log is managed + * by tasks which created the job and should not be modified by other classes. + * + * @param errorLog + */ + final void setErrorLog(String errorLog) + { + String oldLog = this.errorLog; + this.errorLog = errorLog; + pcs.firePropertyChange("errorLog", oldLog, errorLog); + } + + /** + * Return the job handle that identifies this job running on the server or + * {@code null} if the job was not submitted. + * + * @return server job handle + */ + public final WebServiceJobHandle getServerJob() + { + return serverJob; + } + + /** + * Set the server job handle once the job was submitted to the server. The + * handler is managed by the task which created this job and should not be + * modified by other classes. + * + * @param job + */ + final void setServerJob(WebServiceJobHandle job) + { + this.serverJob = job; + } + + private final PropertyChangeSupport pcs = new PropertyChangeSupport(this); + + /** + * Register an observer that will be notified of changes to status, log and + * error log. + * + * @param listener + * property change listener + */ + public final void addPropertyChagneListener(PropertyChangeListener listener) + { + pcs.addPropertyChangeListener(listener); + } + + /** + * Remove the property listener from this object. + * + * @param listener + * listener to remove + */ + public final void removePropertyChangeListener(PropertyChangeListener listener) + { + pcs.removePropertyChangeListener(listener); + } +} diff --git a/src/jalview/ws2/actions/ServiceInputInvalidException.java b/src/jalview/ws2/actions/ServiceInputInvalidException.java new file mode 100644 index 0000000..c816d61 --- /dev/null +++ b/src/jalview/ws2/actions/ServiceInputInvalidException.java @@ -0,0 +1,31 @@ +package jalview.ws2.actions; + +/** + * An exception thrown to indicate that the input is invalid and the service + * cannot be started. + * + * @author mmwarowny + * + */ +public class ServiceInputInvalidException extends Exception +{ + /** + * + */ + private static final long serialVersionUID = 174066679057181584L; + + public ServiceInputInvalidException(String message) + { + super(message); + } + + public ServiceInputInvalidException(Throwable cause) + { + super(cause); + } + + public ServiceInputInvalidException(String message, Throwable cause) + { + super(message, cause); + } +} diff --git a/src/jalview/ws2/helpers/DelegateJobEventListener.java b/src/jalview/ws2/helpers/DelegateJobEventListener.java new file mode 100644 index 0000000..45afefc --- /dev/null +++ b/src/jalview/ws2/helpers/DelegateJobEventListener.java @@ -0,0 +1,73 @@ +package jalview.ws2.helpers; + +import java.beans.PropertyChangeEvent; +import java.beans.PropertyChangeListener; + +import jalview.ws2.actions.BaseJob; +import jalview.ws2.actions.api.JobI; +import jalview.ws2.api.JobStatus; + +/** + * A property change listener to be used by web service tasks that delegates all + * sub-job related events from {@link BaseJob} subclasses to + * {@link TaskEventSupport}. Tasks can create one instance of this class with + * their event handler as a delegate and add it as a property change listener to + * each sub-job supporting property change listeners. It ensures that an + * appropriate {@code fireSubJob*Changed} method of the delegate object will be + * called whenever a {@link PropertyChagneEvent} is emitted by the sub-job. + * + * @author mmwarowny + * + * @param + * result type of the task + */ +public class DelegateJobEventListener implements PropertyChangeListener +{ + private final TaskEventSupport delegate; + + public DelegateJobEventListener(TaskEventSupport delegate) + { + this.delegate = delegate; + } + + @Override + public void propertyChange(PropertyChangeEvent evt) + { + switch (evt.getPropertyName()) + { + case "status": + statusChanged(evt); + break; + case "log": + logChanged(evt); + break; + case "errorLog": + errorLogChanged(evt); + break; + default: + throw new AssertionError(String.format( + "illegal property name \"%s\"", evt.getPropertyName())); + } + } + + private void statusChanged(PropertyChangeEvent evt) + { + JobI job = (JobI) evt.getSource(); + JobStatus status = (JobStatus) evt.getNewValue(); + delegate.fireSubJobStatusChanged(job, status); + } + + private void logChanged(PropertyChangeEvent evt) + { + JobI job = (JobI) evt.getSource(); + String log = (String) evt.getNewValue(); + delegate.fireSubJobLogChanged(job, log); + } + + private void errorLogChanged(PropertyChangeEvent evt) + { + JobI job = (JobI) evt.getSource(); + String errorLog = (String) evt.getNewValue(); + delegate.fireSubJobErrorLogChanged(job, errorLog); + } +} diff --git a/src/jalview/ws2/helpers/TaskEventSupport.java b/src/jalview/ws2/helpers/TaskEventSupport.java new file mode 100644 index 0000000..611eefb --- /dev/null +++ b/src/jalview/ws2/helpers/TaskEventSupport.java @@ -0,0 +1,60 @@ +package jalview.ws2.helpers; + +import java.util.List; + +import jalview.ws2.actions.api.JobI; +import jalview.ws2.actions.api.TaskEventListener; +import jalview.ws2.actions.api.TaskI; +import jalview.ws2.api.JobStatus; + +public class TaskEventSupport +{ + TaskI source; + TaskEventListener handler; + + public TaskEventSupport(TaskI source, TaskEventListener handler) + { + this.source = source; + this.handler = handler; + } + + public void fireTaskStarted(List subJobs) + { + handler.taskStarted(source, subJobs); + } + + public void fireTaskStatusChanged(JobStatus status) + { + handler.taskStatusChanged(source, status); + } + + public void fireTaskCompleted(T result) + { + handler.taskCompleted(source, result); + } + + public void fireTaskException(Exception e) + { + handler.taskException(source, e); + } + + public void taskRestarted() + { + handler.taskRestarted(source); + } + + public void fireSubJobStatusChanged(JobI job, JobStatus status) + { + handler.subJobStatusChanged(source, job, status); + } + + public void fireSubJobLogChanged(JobI job, String log) + { + handler.subJobLogChanged(source, job, log); + } + + public void fireSubJobErrorLogChanged(JobI job, String log) + { + handler.subJobErrorLogChanged(source, job, log); + } +}