package jalview.ws2.actions; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import jalview.bin.Cache; import jalview.util.ArrayUtils; import jalview.util.MathUtils; import jalview.ws.params.ArgumentI; import jalview.ws2.actions.api.TaskEventListener; import jalview.ws2.actions.api.TaskI; import jalview.ws2.api.Credentials; import jalview.ws2.api.JobStatus; import jalview.ws2.api.WebServiceJobHandle; import jalview.ws2.client.api.WebServiceClientI; import jalview.ws2.helpers.DelegateJobEventListener; import jalview.ws2.helpers.TaskEventSupport; import static java.lang.String.format; /** * An abstract base class for non-interactive tasks which implements common * tasks methods. Additionally, it manages task execution in a polling loop. * Subclasses are only required to implement {@link #prepare()} and * {@link #done()} methods. * * @author mmwarowny * * @param * the type of jobs managed by the task * @param * the type of result provided by the task */ public abstract class AbstractPollableTask implements TaskI { private final long uid = MathUtils.getUID(); protected final WebServiceClientI client; protected final List args; protected final Credentials credentials; private final TaskEventSupport eventHandler; protected JobStatus taskStatus = null; private Future future = null; protected List jobs = Collections.emptyList(); protected R result; protected AbstractPollableTask(WebServiceClientI client, List args, Credentials credentials, TaskEventListener eventListener) { this.client = client; this.args = args; this.credentials = credentials; this.eventHandler = new TaskEventSupport(this, eventListener); } public long getUid() { return uid; } /** * Start the task using provided scheduled executor service. It creates a * polling loop running at set intervals. * * @param executor * executor to run the polling loop with */ public void start(ScheduledExecutorService executor) { if (future != null) throw new IllegalStateException("task already started"); var runnable = new Runnable() { private int stage = STAGE_PREPARE; private static final int STAGE_PREPARE = 0; private static final int STAGE_START = 1; private static final int STAGE_POLL = 2; private static final int STAGE_FINALIZE = 3; private static final int STAGE_DONE = 4; private int retryCount = 0; private static final int MAX_RETRY = 5; /** * A polling loop run periodically which carries the task through its * consecutive execution stages. */ @Override public void run() { if (stage == STAGE_PREPARE) { // first stage - the input data is collected and the jobs are created try { jobs = prepare(); } catch (ServiceInputInvalidException e) { stage = STAGE_DONE; setStatus(JobStatus.INVALID); eventHandler.fireTaskException(e); throw new CompletionException(e); } stage = STAGE_START; setStatus(JobStatus.READY); eventHandler.fireTaskStarted(jobs); var jobListener = new DelegateJobEventListener<>(eventHandler); for (var job : jobs) { job.addPropertyChagneListener(jobListener); } } try { if (stage == STAGE_START) { // second stage - jobs are submitted to the server startJobs(); stage = STAGE_POLL; setStatus(JobStatus.SUBMITTED); } if (stage == STAGE_POLL) { // third stage - jobs are poolled until all of them are completed if (pollJobs()) { stage = STAGE_FINALIZE; } updateGlobalStatus(); } if (stage == STAGE_FINALIZE) { // final stage - results are collected and stored result = done(); eventHandler.fireTaskCompleted(result); stage = STAGE_DONE; } retryCount = 0; } catch (IOException e) { eventHandler.fireTaskException(e); if (++retryCount > MAX_RETRY) { stage = STAGE_DONE; cancelJobs(); setStatus(JobStatus.SERVER_ERROR); throw new CompletionException(e); } } if (stage == STAGE_DONE) { // finalization - terminating the future task throw new CancellationException("task terminated"); } } }; if (taskStatus != JobStatus.CANCELLED) future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS); } @Override public JobStatus getStatus() { return taskStatus; } /** * Set the status of the task and notify the event handler. * * @param status * new task status */ protected void setStatus(JobStatus status) { if (this.taskStatus != status) { this.taskStatus = status; eventHandler.fireTaskStatusChanged(status); } } /** * Update task status according to the overall status of its jobs. The rules * of setting the status are following: *
    *
  • task is invalid if all jobs are invalid
  • *
  • task is completed if all but invalid jobs are completed
  • *
  • task is ready, submitted or queued if at least one job is ready, * submitted or queued an none proceeded to the next stage excluding * completed.
  • *
  • task is running if at least one job is running and none are failed or * cancelled
  • *
  • task is cancelled if at least one job is cancelled and none failed
  • *
  • task is failed or server error if at least one job is failed or server * error
  • *
*/ private void updateGlobalStatus() { int precedence = -1; for (BaseJob job : jobs) { JobStatus status = job.getStatus(); int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status); if (precedence < jobPrecedence) precedence = jobPrecedence; } if (precedence >= 0) { setStatus(JobStatus.statusPrecedence[precedence]); } } @Override public void cancel() { setStatus(JobStatus.CANCELLED); if (future != null) future.cancel(false); cancelJobs(); } @Override public List getSubJobs() { return jobs; } /** * Collect and process input sequences for submission and return the list of * jobs to be submitted. * * @return list of jobs to be submitted * @throws ServiceInputInvalidException * input is invalid and the task should not be started */ protected abstract List prepare() throws ServiceInputInvalidException; /** * Submit all valid jobs to the server and store their job handles. * * @throws IOException * if server error occurred */ protected void startJobs() throws IOException { for (BaseJob job : jobs) { if (job.isInputValid() && job.getStatus() == JobStatus.READY) { WebServiceJobHandle serverJob = client.submit(job.getInputSequences(), args, credentials); job.setServerJob(serverJob); job.setStatus(JobStatus.SUBMITTED); } } } /** * Poll all running jobs and update their status and logs. Polling is repeated * periodically until this method return true when all jobs are done. * * @return {@code true] if all jobs are done @throws IOException if server * error occurred */ protected boolean pollJobs() throws IOException { boolean allDone = true; for (BaseJob job : jobs) { if (job.isInputValid() && !job.getStatus().isDone()) { WebServiceJobHandle serverJob = job.getServerJob(); job.setStatus(client.getStatus(serverJob)); job.setLog(client.getLog(serverJob)); job.setErrorLog(client.getErrorLog(serverJob)); } allDone &= job.isCompleted(); } return allDone; } /** * Fetch and process the outputs produced by jobs and return the final result * of the task. The method is called once all jobs have finished execution. If * this method raises {@link IOException} it will be called again after a * delay. All IO operations should happen before data processing, so * potentially expensive computation is avoided in case of an error. * * @return final result of the computation * @throws IOException * if server error occurred */ protected abstract R done() throws IOException; /** * Cancel all running jobs. Used in case of task failure to cleanup the * resources or when the task has been cancelled. */ protected void cancelJobs() { for (BaseJob job : jobs) { if (!job.isCompleted()) { try { if (job.getServerJob() != null) { client.cancel(job.getServerJob()); } job.setStatus(JobStatus.CANCELLED); } catch (IOException e) { Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e); } } } } @Override public R getResult() { return result; } }