package jalview.ws2.actions; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; import jalview.bin.Console; import jalview.util.ArrayUtils; import jalview.util.MathUtils; import jalview.ws.params.ArgumentI; import jalview.ws2.actions.api.TaskEventListener; import jalview.ws2.actions.api.TaskI; import jalview.ws2.api.Credentials; import jalview.ws2.api.JobStatus; import jalview.ws2.client.api.WebServiceClientI; import jalview.ws2.helpers.DelegateJobEventListener; import jalview.ws2.helpers.TaskEventSupport; import static java.lang.String.format; public abstract class BaseTask implements TaskI { protected final long uid = MathUtils.getUID(); protected final WebServiceClientI webClient; protected final List args; protected final Credentials credentials; private final TaskEventSupport eventHandler; protected JobStatus status = JobStatus.CREATED; protected List jobs = Collections.emptyList(); protected R result = null; protected Runnable cancelAction = () -> { }; protected BaseTask(WebServiceClientI webClient, List args, Credentials credentials) { this.webClient = webClient; this.args = args; this.credentials = credentials; this.eventHandler = new TaskEventSupport<>(this); } @Override public final long getUid() { return uid; } @Override public final JobStatus getStatus() { return status; } @Override public final List getSubJobs() { return jobs; } @Override public final void addTaskEventListener(TaskEventListener listener) { eventHandler.addListener(listener); } @Override public final void removeTaskEventListener(TaskEventListener listener) { eventHandler.addListener(listener); } @Override public final R getResult() { return result; } @Override public final void init() throws Exception { try { jobs = prepareJobs(); } catch (ServiceInputInvalidException e) { setStatus(JobStatus.INVALID); eventHandler.fireTaskException(e); throw e; } setStatus(JobStatus.READY); eventHandler.fireTaskStarted(jobs); var jobListener = new DelegateJobEventListener<>(eventHandler); for (var job : jobs) job.addPropertyChangeListener(jobListener); submitJobs(jobs); } static final int MAX_SUBMIT_RETRY = 5; protected final void submitJobs(List jobs) throws IOException { var retryCounter = 0; while (true) { try { submitJobs0(jobs); setStatus(JobStatus.SUBMITTED); break; } catch (IOException e) { eventHandler.fireTaskException(e); if (++retryCounter > MAX_SUBMIT_RETRY) { cancel(); setStatus(JobStatus.SERVER_ERROR); throw e; } } } } private final void submitJobs0(List jobs) throws IOException { IOException exception = null; for (BaseJob job : jobs) { if (job.getStatus() != JobStatus.READY || !job.isInputValid()) continue; try { var jobRef = webClient.submit(job.getInputSequences(), args, credentials); job.setServerJob(jobRef); job.setStatus(JobStatus.SUBMITTED); } catch (IOException e) { exception = e; } } if (exception != null) throw exception; } /** * Poll all running jobs and update their status and logs. Polling is repeated * periodically until this method return true when all jobs are done. * * @return {@code true] if all jobs are done @throws IOException if server * error occurred */ @Override public final boolean poll() throws IOException { boolean allDone = true; IOException exception = null; for (BaseJob job : jobs) { if (job.isInputValid() && !job.getStatus().isDone()) { var serverJob = job.getServerJob(); try { job.setStatus(webClient.getStatus(serverJob)); job.setLog(webClient.getLog(serverJob)); job.setErrorLog(webClient.getErrorLog(serverJob)); } catch (IOException e) { exception = e; } } allDone &= job.isCompleted(); } updateGlobalStatus(); if (exception != null) throw exception; return allDone; } @Override public final void complete() throws IOException { for (var job : jobs) { if (!job.isCompleted()) { // a fallback in case the executor decides to finish prematurely cancelJob(job); job.setStatus(JobStatus.SERVER_ERROR); } } updateGlobalStatus(); try { result = collectResult(jobs); eventHandler.fireTaskCompleted(result); } catch (Exception e) { eventHandler.fireTaskException(e); setStatus(JobStatus.SERVER_ERROR); throw e; } } /** * Cancel all running jobs. Used in case of task failure to cleanup the * resources or when the task has been cancelled. */ @Override public final void cancel() { cancelAction.run(); for (T job : jobs) { cancelJob(job); } setStatus(JobStatus.CANCELLED); } private final void cancelJob(T job) { if (!job.isCompleted()) { try { if (job.getServerJob() != null) webClient.cancel(job.getServerJob()); job.setStatus(JobStatus.CANCELLED); } catch (IOException e) { Console.error(format("failed to cancel job %s", job.getServerJob()), e); } } } protected final void setStatus(JobStatus status) { Objects.requireNonNull(status); if (this.status != status) { this.status = status; eventHandler.fireTaskStatusChanged(status); } } protected abstract List prepareJobs() throws ServiceInputInvalidException; protected abstract R collectResult(List jobs) throws IOException; /** * Update task status according to the overall status of its jobs. The rules * of setting the status are following: *
    *
  • task is invalid if all jobs are invalid
  • *
  • task is completed if all but invalid jobs are completed
  • *
  • task is ready, submitted or queued if at least one job is ready, * submitted or queued an none proceeded to the next stage excluding * completed.
  • *
  • task is running if at least one job is running and none are failed or * cancelled
  • *
  • task is cancelled if at least one job is cancelled and none failed
  • *
  • task is failed or server error if at least one job is failed or server * error
  • *
*/ protected final void updateGlobalStatus() { int precedence = -1; for (BaseJob job : jobs) { JobStatus status = job.getStatus(); int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status); if (precedence < jobPrecedence) precedence = jobPrecedence; } if (precedence >= 0) { setStatus(JobStatus.statusPrecedence[precedence]); } } /** * Set the action that will be run when the {@link #cancel()} method is * invoked. The action should typically stop the executor polling the task and * release resources and threads running the task. * * @param action * runnable to be executed when the task is cancelled */ public void setCancelAction(Runnable action) { Objects.requireNonNull(action); this.cancelAction = action; } @Override public String toString() { var statusName = status != null ? status.name() : "UNSET"; return String.format("%s(%x, %s)", getClass().getSimpleName(), uid, statusName); } }