-package jalview.ws2.actions;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import jalview.bin.Cache;
-import jalview.bin.Console;
-import jalview.util.ArrayUtils;
-import jalview.util.MathUtils;
-import jalview.ws.params.ArgumentI;
-import jalview.ws2.actions.api.TaskEventListener;
-import jalview.ws2.actions.api.TaskI;
-import jalview.ws2.api.Credentials;
-import jalview.ws2.api.JobStatus;
-import jalview.ws2.api.WebServiceJobHandle;
-import jalview.ws2.client.api.WebServiceClientI;
-import jalview.ws2.helpers.DelegateJobEventListener;
-import jalview.ws2.helpers.TaskEventSupport;
-import static java.lang.String.format;
-
-/**
- * An abstract base class for non-interactive tasks which implements common
- * tasks methods. Additionally, it manages task execution in a polling loop.
- * Subclasses are only required to implement {@link #prepare()} and
- * {@link #done()} methods.
- *
- * @author mmwarowny
- *
- * @param <T>
- * the type of jobs managed by the task
- * @param <R>
- * the type of result provided by the task
- */
-public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
-{
- private final long uid = MathUtils.getUID();
-
- protected final WebServiceClientI client;
-
- protected final List<ArgumentI> args;
-
- protected final Credentials credentials;
-
- private final TaskEventSupport<R> eventHandler;
-
- protected JobStatus taskStatus = JobStatus.CREATED;
-
- private Future<?> future = null;
-
- protected List<T> jobs = Collections.emptyList();
-
- protected R result;
-
- protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
- Credentials credentials, TaskEventListener<R> eventListener)
- {
- this.client = client;
- this.args = args;
- this.credentials = credentials;
- this.eventHandler = new TaskEventSupport<R>(this, eventListener);
- }
-
- public long getUid()
- {
- return uid;
- }
-
- /**
- * Start the task using provided scheduled executor service. It creates a
- * polling loop running at set intervals.
- *
- * @param executor
- * executor to run the polling loop with
- */
- public void start(ScheduledExecutorService executor)
- {
- if (future != null)
- throw new IllegalStateException("task already started");
- var runnable = new Runnable()
- {
- private int stage = STAGE_PREPARE;
-
- private static final int STAGE_PREPARE = 0;
-
- private static final int STAGE_START = 1;
-
- private static final int STAGE_POLL = 2;
-
- private static final int STAGE_FINALIZE = 3;
-
- private static final int STAGE_DONE = 4;
-
- private int retryCount = 0;
-
- private static final int MAX_RETRY = 5;
-
- /**
- * A polling loop run periodically which carries the task through its
- * consecutive execution stages.
- */
- @Override
- public void run()
- {
- if (stage == STAGE_PREPARE)
- {
- // first stage - the input data is collected and the jobs are created
- try
- {
- jobs = prepare();
- } catch (ServiceInputInvalidException e)
- {
- stage = STAGE_DONE;
- setStatus(JobStatus.INVALID);
- eventHandler.fireTaskException(e);
- throw new CompletionException(e);
- }
- stage = STAGE_START;
- setStatus(JobStatus.READY);
- eventHandler.fireTaskStarted(jobs);
- var jobListener = new DelegateJobEventListener<>(eventHandler);
- for (var job : jobs)
- {
- job.addPropertyChagneListener(jobListener);
- }
- }
- try
- {
- if (stage == STAGE_START)
- {
- // second stage - jobs are submitted to the server
- startJobs();
- stage = STAGE_POLL;
- setStatus(JobStatus.SUBMITTED);
- }
- if (stage == STAGE_POLL)
- {
- // third stage - jobs are poolled until all of them are completed
- if (pollJobs())
- {
- stage = STAGE_FINALIZE;
- }
- updateGlobalStatus();
- }
- if (stage == STAGE_FINALIZE)
- {
- // final stage - results are collected and stored
- result = done();
- eventHandler.fireTaskCompleted(result);
- stage = STAGE_DONE;
- }
- retryCount = 0;
- } catch (IOException e)
- {
- eventHandler.fireTaskException(e);
- if (++retryCount > MAX_RETRY)
- {
- stage = STAGE_DONE;
- cancelJobs();
- setStatus(JobStatus.SERVER_ERROR);
- throw new CompletionException(e);
- }
- }
- if (stage == STAGE_DONE)
- {
- // finalization - terminating the future task
- throw new CancellationException("task terminated");
- }
- }
- };
- if (taskStatus != JobStatus.CANCELLED)
- future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
- }
-
- @Override
- public JobStatus getStatus()
- {
- return taskStatus;
- }
-
- /**
- * Set the status of the task and notify the event handler.
- *
- * @param status
- * new task status
- */
- protected void setStatus(JobStatus status)
- {
- if (this.taskStatus != status)
- {
- this.taskStatus = status;
- eventHandler.fireTaskStatusChanged(status);
- }
- }
-
- /**
- * Update task status according to the overall status of its jobs. The rules
- * of setting the status are following:
- * <ul>
- * <li>task is invalid if all jobs are invalid</li>
- * <li>task is completed if all but invalid jobs are completed</li>
- * <li>task is ready, submitted or queued if at least one job is ready,
- * submitted or queued an none proceeded to the next stage excluding
- * completed.</li>
- * <li>task is running if at least one job is running and none are failed or
- * cancelled</li>
- * <li>task is cancelled if at least one job is cancelled and none failed</li>
- * <li>task is failed or server error if at least one job is failed or server
- * error</li>
- * </ul>
- */
- private void updateGlobalStatus()
- {
- int precedence = -1;
- for (BaseJob job : jobs)
- {
- JobStatus status = job.getStatus();
- int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
- if (precedence < jobPrecedence)
- precedence = jobPrecedence;
- }
- if (precedence >= 0)
- {
- setStatus(JobStatus.statusPrecedence[precedence]);
- }
- }
-
- @Override
- public void cancel()
- {
- setStatus(JobStatus.CANCELLED);
- if (future != null)
- future.cancel(false);
- cancelJobs();
- }
-
- @Override
- public List<? extends BaseJob> getSubJobs()
- {
- return jobs;
- }
-
- /**
- * Collect and process input sequences for submission and return the list of
- * jobs to be submitted.
- *
- * @return list of jobs to be submitted
- * @throws ServiceInputInvalidException
- * input is invalid and the task should not be started
- */
- protected abstract List<T> prepare() throws ServiceInputInvalidException;
-
- /**
- * Submit all valid jobs to the server and store their job handles.
- *
- * @throws IOException
- * if server error occurred
- */
- protected void startJobs() throws IOException
- {
- for (BaseJob job : jobs)
- {
- if (job.isInputValid() && job.getStatus() == JobStatus.READY)
- {
- WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
- args, credentials);
- job.setServerJob(serverJob);
- job.setStatus(JobStatus.SUBMITTED);
- }
- }
- }
-
- /**
- * Poll all running jobs and update their status and logs. Polling is repeated
- * periodically until this method return true when all jobs are done.
- *
- * @return {@code true] if all jobs are done @throws IOException if server
- * error occurred
- */
- protected boolean pollJobs() throws IOException
- {
- boolean allDone = true;
- for (BaseJob job : jobs)
- {
- if (job.isInputValid() && !job.getStatus().isDone())
- {
- WebServiceJobHandle serverJob = job.getServerJob();
- job.setStatus(client.getStatus(serverJob));
- job.setLog(client.getLog(serverJob));
- job.setErrorLog(client.getErrorLog(serverJob));
- }
- allDone &= job.isCompleted();
- }
- return allDone;
- }
-
- /**
- * Fetch and process the outputs produced by jobs and return the final result
- * of the task. The method is called once all jobs have finished execution. If
- * this method raises {@link IOException} it will be called again after a
- * delay. All IO operations should happen before data processing, so
- * potentially expensive computation is avoided in case of an error.
- *
- * @return final result of the computation
- * @throws IOException
- * if server error occurred
- */
- protected abstract R done() throws IOException;
-
- /**
- * Cancel all running jobs. Used in case of task failure to cleanup the
- * resources or when the task has been cancelled.
- */
- protected void cancelJobs()
- {
- for (BaseJob job : jobs)
- {
- if (!job.isCompleted())
- {
- try
- {
- if (job.getServerJob() != null)
- {
- client.cancel(job.getServerJob());
- }
- job.setStatus(JobStatus.CANCELLED);
- } catch (IOException e)
- {
- Console.error(format("failed to cancel job %s", job.getServerJob()), e);
- }
- }
- }
- }
-
- @Override
- public R getResult()
- {
- return result;
- }
-
- @Override
- public String toString()
- {
- var status = taskStatus != null ? taskStatus.name() : "UNSET";
- return String.format("%s(%x, %s)", getClass().getSimpleName(), uid, status);
- }
-}