From: Mateusz Warowny Date: Tue, 4 Jul 2023 08:00:15 +0000 (+0200) Subject: JAL-4199 Replace AbstractPollingTask with BaseTask X-Git-Url: http://source.jalview.org/gitweb/?p=jalview.git;a=commitdiff_plain;h=eadb6b338471b984b187352ca34f6313b214cdae JAL-4199 Replace AbstractPollingTask with BaseTask BaseTask no longer implements execution system and its use is more generic. Execution of polling tasks is realised by PollingTaskExecutor. --- diff --git a/src/jalview/datamodel/ContiguousI.java b/src/jalview/datamodel/ContiguousI.java index a9b1372..bc72984 100644 --- a/src/jalview/datamodel/ContiguousI.java +++ b/src/jalview/datamodel/ContiguousI.java @@ -20,9 +20,23 @@ */ package jalview.datamodel; +import java.util.Collection; + public interface ContiguousI { int getBegin(); // todo want long for genomic positions? int getEnd(); + + public static int[] toStartEndArray(Collection ranges) + { + int[] startend = new int[ranges.size() * 2]; + int i = 0; + for (var range : ranges) + { + startend[i++] = range.getBegin(); + startend[i++] = range.getEnd(); + } + return startend; + } } diff --git a/src/jalview/ws2/actions/AbstractPollableTask.java b/src/jalview/ws2/actions/AbstractPollableTask.java deleted file mode 100644 index b61711c..0000000 --- a/src/jalview/ws2/actions/AbstractPollableTask.java +++ /dev/null @@ -1,352 +0,0 @@ -package jalview.ws2.actions; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletionException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import jalview.bin.Cache; -import jalview.bin.Console; -import jalview.util.ArrayUtils; -import jalview.util.MathUtils; -import jalview.ws.params.ArgumentI; -import jalview.ws2.actions.api.TaskEventListener; -import jalview.ws2.actions.api.TaskI; -import jalview.ws2.api.Credentials; -import jalview.ws2.api.JobStatus; -import jalview.ws2.api.WebServiceJobHandle; -import jalview.ws2.client.api.WebServiceClientI; -import jalview.ws2.helpers.DelegateJobEventListener; -import jalview.ws2.helpers.TaskEventSupport; -import static java.lang.String.format; - -/** - * An abstract base class for non-interactive tasks which implements common - * tasks methods. Additionally, it manages task execution in a polling loop. - * Subclasses are only required to implement {@link #prepare()} and - * {@link #done()} methods. - * - * @author mmwarowny - * - * @param - * the type of jobs managed by the task - * @param - * the type of result provided by the task - */ -public abstract class AbstractPollableTask implements TaskI -{ - private final long uid = MathUtils.getUID(); - - protected final WebServiceClientI client; - - protected final List args; - - protected final Credentials credentials; - - private final TaskEventSupport eventHandler; - - protected JobStatus taskStatus = JobStatus.CREATED; - - private Future future = null; - - protected List jobs = Collections.emptyList(); - - protected R result; - - protected AbstractPollableTask(WebServiceClientI client, List args, - Credentials credentials, TaskEventListener eventListener) - { - this.client = client; - this.args = args; - this.credentials = credentials; - this.eventHandler = new TaskEventSupport(this, eventListener); - } - - public long getUid() - { - return uid; - } - - /** - * Start the task using provided scheduled executor service. It creates a - * polling loop running at set intervals. - * - * @param executor - * executor to run the polling loop with - */ - public void start(ScheduledExecutorService executor) - { - if (future != null) - throw new IllegalStateException("task already started"); - var runnable = new Runnable() - { - private int stage = STAGE_PREPARE; - - private static final int STAGE_PREPARE = 0; - - private static final int STAGE_START = 1; - - private static final int STAGE_POLL = 2; - - private static final int STAGE_FINALIZE = 3; - - private static final int STAGE_DONE = 4; - - private int retryCount = 0; - - private static final int MAX_RETRY = 5; - - /** - * A polling loop run periodically which carries the task through its - * consecutive execution stages. - */ - @Override - public void run() - { - if (stage == STAGE_PREPARE) - { - // first stage - the input data is collected and the jobs are created - try - { - jobs = prepare(); - } catch (ServiceInputInvalidException e) - { - stage = STAGE_DONE; - setStatus(JobStatus.INVALID); - eventHandler.fireTaskException(e); - throw new CompletionException(e); - } - stage = STAGE_START; - setStatus(JobStatus.READY); - eventHandler.fireTaskStarted(jobs); - var jobListener = new DelegateJobEventListener<>(eventHandler); - for (var job : jobs) - { - job.addPropertyChagneListener(jobListener); - } - } - try - { - if (stage == STAGE_START) - { - // second stage - jobs are submitted to the server - startJobs(); - stage = STAGE_POLL; - setStatus(JobStatus.SUBMITTED); - } - if (stage == STAGE_POLL) - { - // third stage - jobs are poolled until all of them are completed - if (pollJobs()) - { - stage = STAGE_FINALIZE; - } - updateGlobalStatus(); - } - if (stage == STAGE_FINALIZE) - { - // final stage - results are collected and stored - result = done(); - eventHandler.fireTaskCompleted(result); - stage = STAGE_DONE; - } - retryCount = 0; - } catch (IOException e) - { - eventHandler.fireTaskException(e); - if (++retryCount > MAX_RETRY) - { - stage = STAGE_DONE; - cancelJobs(); - setStatus(JobStatus.SERVER_ERROR); - throw new CompletionException(e); - } - } - if (stage == STAGE_DONE) - { - // finalization - terminating the future task - throw new CancellationException("task terminated"); - } - } - }; - if (taskStatus != JobStatus.CANCELLED) - future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS); - } - - @Override - public JobStatus getStatus() - { - return taskStatus; - } - - /** - * Set the status of the task and notify the event handler. - * - * @param status - * new task status - */ - protected void setStatus(JobStatus status) - { - if (this.taskStatus != status) - { - this.taskStatus = status; - eventHandler.fireTaskStatusChanged(status); - } - } - - /** - * Update task status according to the overall status of its jobs. The rules - * of setting the status are following: - *
    - *
  • task is invalid if all jobs are invalid
  • - *
  • task is completed if all but invalid jobs are completed
  • - *
  • task is ready, submitted or queued if at least one job is ready, - * submitted or queued an none proceeded to the next stage excluding - * completed.
  • - *
  • task is running if at least one job is running and none are failed or - * cancelled
  • - *
  • task is cancelled if at least one job is cancelled and none failed
  • - *
  • task is failed or server error if at least one job is failed or server - * error
  • - *
- */ - private void updateGlobalStatus() - { - int precedence = -1; - for (BaseJob job : jobs) - { - JobStatus status = job.getStatus(); - int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status); - if (precedence < jobPrecedence) - precedence = jobPrecedence; - } - if (precedence >= 0) - { - setStatus(JobStatus.statusPrecedence[precedence]); - } - } - - @Override - public void cancel() - { - setStatus(JobStatus.CANCELLED); - if (future != null) - future.cancel(false); - cancelJobs(); - } - - @Override - public List getSubJobs() - { - return jobs; - } - - /** - * Collect and process input sequences for submission and return the list of - * jobs to be submitted. - * - * @return list of jobs to be submitted - * @throws ServiceInputInvalidException - * input is invalid and the task should not be started - */ - protected abstract List prepare() throws ServiceInputInvalidException; - - /** - * Submit all valid jobs to the server and store their job handles. - * - * @throws IOException - * if server error occurred - */ - protected void startJobs() throws IOException - { - for (BaseJob job : jobs) - { - if (job.isInputValid() && job.getStatus() == JobStatus.READY) - { - WebServiceJobHandle serverJob = client.submit(job.getInputSequences(), - args, credentials); - job.setServerJob(serverJob); - job.setStatus(JobStatus.SUBMITTED); - } - } - } - - /** - * Poll all running jobs and update their status and logs. Polling is repeated - * periodically until this method return true when all jobs are done. - * - * @return {@code true] if all jobs are done @throws IOException if server - * error occurred - */ - protected boolean pollJobs() throws IOException - { - boolean allDone = true; - for (BaseJob job : jobs) - { - if (job.isInputValid() && !job.getStatus().isDone()) - { - WebServiceJobHandle serverJob = job.getServerJob(); - job.setStatus(client.getStatus(serverJob)); - job.setLog(client.getLog(serverJob)); - job.setErrorLog(client.getErrorLog(serverJob)); - } - allDone &= job.isCompleted(); - } - return allDone; - } - - /** - * Fetch and process the outputs produced by jobs and return the final result - * of the task. The method is called once all jobs have finished execution. If - * this method raises {@link IOException} it will be called again after a - * delay. All IO operations should happen before data processing, so - * potentially expensive computation is avoided in case of an error. - * - * @return final result of the computation - * @throws IOException - * if server error occurred - */ - protected abstract R done() throws IOException; - - /** - * Cancel all running jobs. Used in case of task failure to cleanup the - * resources or when the task has been cancelled. - */ - protected void cancelJobs() - { - for (BaseJob job : jobs) - { - if (!job.isCompleted()) - { - try - { - if (job.getServerJob() != null) - { - client.cancel(job.getServerJob()); - } - job.setStatus(JobStatus.CANCELLED); - } catch (IOException e) - { - Console.error(format("failed to cancel job %s", job.getServerJob()), e); - } - } - } - } - - @Override - public R getResult() - { - return result; - } - - @Override - public String toString() - { - var status = taskStatus != null ? taskStatus.name() : "UNSET"; - return String.format("%s(%x, %s)", getClass().getSimpleName(), uid, status); - } -} diff --git a/src/jalview/ws2/actions/BaseTask.java b/src/jalview/ws2/actions/BaseTask.java new file mode 100644 index 0000000..40f96b5 --- /dev/null +++ b/src/jalview/ws2/actions/BaseTask.java @@ -0,0 +1,303 @@ +package jalview.ws2.actions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import jalview.bin.Console; +import jalview.util.ArrayUtils; +import jalview.util.MathUtils; +import jalview.ws.params.ArgumentI; +import jalview.ws2.actions.api.TaskEventListener; +import jalview.ws2.actions.api.TaskI; +import jalview.ws2.api.Credentials; +import jalview.ws2.api.JobStatus; +import jalview.ws2.client.api.WebServiceClientI; +import jalview.ws2.helpers.DelegateJobEventListener; +import jalview.ws2.helpers.TaskEventSupport; + +import static java.lang.String.format; + +public abstract class BaseTask implements TaskI +{ + protected final long uid = MathUtils.getUID(); + + protected final WebServiceClientI webClient; + + protected final List args; + + protected final Credentials credentials; + + private final TaskEventSupport eventHandler; + + protected JobStatus status = JobStatus.CREATED; + + protected List jobs = Collections.emptyList(); + + protected R result = null; + + protected Runnable cancelAction = () -> { + }; + + protected BaseTask(WebServiceClientI webClient, List args, + Credentials credentials) + { + this.webClient = webClient; + this.args = args; + this.credentials = credentials; + this.eventHandler = new TaskEventSupport<>(this); + } + + @Override + public final long getUid() + { + return uid; + } + + @Override + public final JobStatus getStatus() + { + return status; + } + + @Override + public final List getSubJobs() + { + return jobs; + } + + @Override + public final void addTaskEventListener(TaskEventListener listener) + { + eventHandler.addListener(listener); + } + + @Override + public final void removeTaskEventListener(TaskEventListener listener) + { + eventHandler.addListener(listener); + } + + @Override + public final R getResult() + { + return result; + } + + @Override + public final void init() throws Exception + { + try + { + jobs = prepareJobs(); + } catch (ServiceInputInvalidException e) + { + setStatus(JobStatus.INVALID); + eventHandler.fireTaskException(e); + throw e; + } + setStatus(JobStatus.READY); + eventHandler.fireTaskStarted(jobs); + var jobListener = new DelegateJobEventListener<>(eventHandler); + for (var job : jobs) + job.addPropertyChangeListener(jobListener); + submitJobs(jobs); + } + + static final int MAX_SUBMIT_RETRY = 5; + + protected final void submitJobs(List jobs) throws IOException + { + var retryCounter = 0; + while (true) + { + try + { + submitJobs0(jobs); + setStatus(JobStatus.SUBMITTED); + break; + } catch (IOException e) + { + eventHandler.fireTaskException(e); + if (++retryCounter > MAX_SUBMIT_RETRY) + { + cancel(); + setStatus(JobStatus.SERVER_ERROR); + throw e; + } + } + } + } + + private final void submitJobs0(List jobs) throws IOException + { + IOException exception = null; + for (BaseJob job : jobs) + { + if (job.getStatus() != JobStatus.READY || !job.isInputValid()) + continue; + try + { + var jobRef = webClient.submit(job.getInputSequences(), args, credentials); + job.setServerJob(jobRef); + job.setStatus(JobStatus.SUBMITTED); + } catch (IOException e) + { + exception = e; + } + } + if (exception != null) + throw exception; + } + + /** + * Poll all running jobs and update their status and logs. Polling is repeated + * periodically until this method return true when all jobs are done. + * + * @return {@code true] if all jobs are done @throws IOException if server + * error occurred + */ + @Override + public final boolean poll() throws IOException + { + boolean allDone = true; + IOException exception = null; + for (BaseJob job : jobs) + { + if (job.isInputValid() && !job.getStatus().isDone()) + { + var serverJob = job.getServerJob(); + try + { + job.setStatus(webClient.getStatus(serverJob)); + job.setLog(webClient.getLog(serverJob)); + job.setErrorLog(webClient.getErrorLog(serverJob)); + } catch (IOException e) + { + exception = e; + } + } + allDone &= job.isCompleted(); + } + updateGlobalStatus(); + if (exception != null) + throw exception; + return allDone; + } + + @Override + public final void complete() throws IOException + { + for (var job : jobs) + { + if (!job.isCompleted()) + { + // a fallback in case the executor decides to finish prematurely + cancelJob(job); + job.setStatus(JobStatus.SERVER_ERROR); + } + } + updateGlobalStatus(); + try { + result = collectResult(jobs); + eventHandler.fireTaskCompleted(result); + } + catch (Exception e) + { + eventHandler.fireTaskException(e); + throw e; + } + } + + /** + * Cancel all running jobs. Used in case of task failure to cleanup the + * resources or when the task has been cancelled. + */ + @Override + public final void cancel() + { + cancelAction.run(); + for (T job : jobs) + { + cancelJob(job); + } + } + + private final void cancelJob(T job) + { + if (!job.isCompleted()) + { + try + { + if (job.getServerJob() != null) + webClient.cancel(job.getServerJob()); + job.setStatus(JobStatus.CANCELLED); + } catch (IOException e) + { + Console.error(format("failed to cancel job %s", job.getServerJob()), e); + } + } + } + + protected final void setStatus(JobStatus status) + { + Objects.requireNonNull(status); + if (this.status != status) + { + this.status = status; + eventHandler.fireTaskStatusChanged(status); + } + } + + protected abstract List prepareJobs() throws ServiceInputInvalidException; + + protected abstract R collectResult(List jobs) throws IOException; + + /** + * Update task status according to the overall status of its jobs. The rules + * of setting the status are following: + *
    + *
  • task is invalid if all jobs are invalid
  • + *
  • task is completed if all but invalid jobs are completed
  • + *
  • task is ready, submitted or queued if at least one job is ready, + * submitted or queued an none proceeded to the next stage excluding + * completed.
  • + *
  • task is running if at least one job is running and none are failed or + * cancelled
  • + *
  • task is cancelled if at least one job is cancelled and none failed
  • + *
  • task is failed or server error if at least one job is failed or server + * error
  • + *
+ */ + protected final void updateGlobalStatus() + { + int precedence = -1; + for (BaseJob job : jobs) + { + JobStatus status = job.getStatus(); + int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status); + if (precedence < jobPrecedence) + precedence = jobPrecedence; + } + if (precedence >= 0) + { + setStatus(JobStatus.statusPrecedence[precedence]); + } + } + + /** + * Set the action that will be run when the {@link #cancel()} method is + * invoked. The action should typically stop the executor polling the task and + * release resources and threads running the task. + * + * @param action + * runnable to be executed when the task is cancelled + */ + public void setCancelAction(Runnable action) + { + Objects.requireNonNull(action); + this.cancelAction = action; + } +} diff --git a/src/jalview/ws2/actions/PollingTaskExecutor.java b/src/jalview/ws2/actions/PollingTaskExecutor.java new file mode 100644 index 0000000..e1775d4 --- /dev/null +++ b/src/jalview/ws2/actions/PollingTaskExecutor.java @@ -0,0 +1,108 @@ +package jalview.ws2.actions; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.WeakHashMap; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import jalview.ws2.actions.api.TaskI; + +public class PollingTaskExecutor +{ + private static final Map executorPool = + Collections.synchronizedMap(new WeakHashMap<>()); + + public static PollingTaskExecutor fromPool(ScheduledExecutorService executor) + { + return executorPool.computeIfAbsent(executor, PollingTaskExecutor::new); + } + + private final ScheduledExecutorService executor; + + public PollingTaskExecutor(ScheduledExecutorService executor) + { + this.executor = executor; + } + + public Future submit(TaskI task) + { + Objects.requireNonNull(task); + return executor.scheduleWithFixedDelay( + new TaskRunnable(task), 2, 2, TimeUnit.SECONDS); + } + + private static class TaskRunnable implements Runnable + { + private final TaskI task; + + private static final int STAGE_INIT = 0; + + private static final int STAGE_POLLING = 2; + + private static final int STAGE_FINISHED = 3; + + private static final int STAGE_STOPPED = 4; + + private int stage = STAGE_INIT; + + private static final int MAX_POLL_RETRY = 5; + + private int pollRetryCount = 0; + + private TaskRunnable(TaskI task) + { + this.task = task; + } + + @Override + public void run() + { + if (task.getStatus().isDone()) + { + stage = STAGE_STOPPED; + } + if (stage == STAGE_INIT) + { + try + { + task.init(); + stage = STAGE_POLLING; + } catch (Exception e) + { + stage = STAGE_STOPPED; + throw new CompletionException(e); + } + } + try + { + if (stage == STAGE_POLLING && task.poll()) + { + stage = STAGE_FINISHED; + } + if (stage == STAGE_FINISHED) + { + task.complete(); + stage = STAGE_STOPPED; + } + } catch (Exception e) + { + if (++pollRetryCount > MAX_POLL_RETRY || e instanceof RuntimeException) + { + task.cancel(); + stage = STAGE_STOPPED; + throw new CompletionException(e); + } + } + if (stage == STAGE_STOPPED) + { + throw new CancellationException(); + } + } + } +} diff --git a/src/jalview/ws2/actions/alignment/AlignmentTask.java b/src/jalview/ws2/actions/alignment/AlignmentTask.java index 6a0c4dd..60ca384 100644 --- a/src/jalview/ws2/actions/alignment/AlignmentTask.java +++ b/src/jalview/ws2/actions/alignment/AlignmentTask.java @@ -25,7 +25,7 @@ import jalview.datamodel.HiddenColumns; import jalview.datamodel.Sequence; import jalview.datamodel.SequenceI; import jalview.ws.params.ArgumentI; -import jalview.ws2.actions.AbstractPollableTask; +import jalview.ws2.actions.BaseTask; import jalview.ws2.actions.ServiceInputInvalidException; import jalview.ws2.actions.api.TaskEventListener; import jalview.ws2.api.Credentials; @@ -39,7 +39,7 @@ import jalview.ws2.client.api.AlignmentWebServiceClientI; * @author mmwarowny * */ -class AlignmentTask extends AbstractPollableTask +class AlignmentTask extends BaseTask { /* task parameters set in the constructor */ private final AlignmentWebServiceClientI client; @@ -62,13 +62,12 @@ class AlignmentTask extends AbstractPollableTask AlignmentTask(AlignmentWebServiceClientI client, AlignmentAction action, List args, Credentials credentials, - AlignmentView msa, AlignViewportI viewport, boolean submitGaps, - TaskEventListener eventListener) + AlignViewportI viewport, boolean submitGaps) { - super(client, args, credentials, eventListener); + super(client, args, credentials); this.client = client; this.action = action; - this.msa = msa; + this.msa = viewport.getAlignmentView(true); this.viewport = viewport; this.submitGaps = submitGaps; this.currentView = viewport.getAlignment(); @@ -80,7 +79,7 @@ class AlignmentTask extends AbstractPollableTask } @Override - protected List prepare() throws ServiceInputInvalidException + protected List prepareJobs() throws ServiceInputInvalidException { Console.info(format("starting alignment service %s:%s", client.getClientName(), action.getName())); @@ -107,7 +106,7 @@ class AlignmentTask extends AbstractPollableTask } @Override - protected AlignmentResult done() throws IOException + protected AlignmentResult collectResult(List jobs) throws IOException { IOException lastIOE = null; for (AlignmentJob job : jobs) diff --git a/src/jalview/ws2/actions/annotation/AnnotationTask.java b/src/jalview/ws2/actions/annotation/AnnotationTask.java index 271d9ed..238439a 100644 --- a/src/jalview/ws2/actions/annotation/AnnotationTask.java +++ b/src/jalview/ws2/actions/annotation/AnnotationTask.java @@ -30,8 +30,8 @@ import jalview.util.MathUtils; import jalview.util.Pair; import jalview.workers.AlignCalcWorker; import jalview.ws.params.ArgumentI; -import jalview.ws2.actions.AbstractPollableTask; import jalview.ws2.actions.BaseJob; +import jalview.ws2.actions.BaseTask; import jalview.ws2.actions.ServiceInputInvalidException; import jalview.ws2.actions.api.JobI; import jalview.ws2.actions.api.TaskEventListener; @@ -43,287 +43,55 @@ import jalview.ws2.client.api.AnnotationWebServiceClientI; import jalview.ws2.helpers.DelegateJobEventListener; import jalview.ws2.helpers.TaskEventSupport; -public class AnnotationTask implements TaskI +public class AnnotationTask extends BaseTask { - private final long uid = MathUtils.getUID(); - private AnnotationWebServiceClientI client; private final AnnotationAction action; - private final List args; - - private final Credentials credentials; - private final AlignViewportI viewport; - private final TaskEventSupport eventHandler; - private JobStatus taskStatus = null; private AlignCalcWorkerAdapter worker = null; - private List jobs = Collections.emptyList(); - - private AnnotationResult result = null; - private DelegateJobEventListener jobEventHandler; - private class AlignCalcWorkerAdapter extends AlignCalcWorker - implements PollableAlignCalcWorkerI - { - private boolean restarting = false; - - AlignCalcWorkerAdapter(AlignCalcManagerI2 calcMan) - { - super(viewport, null); - this.calcMan = calcMan; - } - - String getServiceName() - { - return action.getWebService().getName(); - } - - @Override - public void startUp() throws Throwable - { - if (alignViewport.isClosed()) - { - stop(); - throw new IllegalStateException("Starting annotation for closed viewport"); - } - if (restarting) - eventHandler.fireTaskRestarted(); - else - restarting = true; - jobs = Collections.emptyList(); - try - { - jobs = prepare(); - } catch (ServiceInputInvalidException e) - { - setStatus(JobStatus.INVALID); - eventHandler.fireTaskException(e); - throw e; - } - setStatus(JobStatus.READY); - eventHandler.fireTaskStarted(jobs); - for (var job : jobs) - { - job.addPropertyChagneListener(jobEventHandler); - } - try - { - startJobs(); - } catch (IOException e) - { - eventHandler.fireTaskException(e); - cancelJobs(); - setStatus(JobStatus.SERVER_ERROR); - throw e; - } - setStatus(JobStatus.SUBMITTED); - } - - @Override - public boolean poll() throws Throwable - { - boolean done = AnnotationTask.this.poll(); - updateGlobalStatus(); - if (done) - { - retrieveAndProcessResult(); - eventHandler.fireTaskCompleted(result); - } - return done; - } - - private void retrieveAndProcessResult() throws IOException - { - result = retrieveResult(); - updateOurAnnots(result.annotations); - if (result.transferFeatures) - { - final var featureColours = result.featureColours; - final var featureFilters = result.featureFilters; - viewport.applyFeaturesStyle(new FeatureSettingsAdapter() - { - @Override - public FeatureColourI getFeatureColour(String type) - { - return featureColours.get(type); - } - - @Override - public FeatureMatcherSetI getFeatureFilters(String type) - { - return featureFilters.get(type); - } - - @Override - public boolean isFeatureDisplayed(String type) - { - return featureColours.containsKey(type); - } - }); - } - } - - @Override - public void updateAnnotation() - { - var job = jobs.size() > 0 ? jobs.get(0) : null; - if (!calcMan.isWorking(this) && job != null) - { - var ret = updateResultAnnotation(job, job.returnedAnnotations); - updateOurAnnots(ret.get0()); - } - } - - private void updateOurAnnots(List newAnnots) - { - List oldAnnots = ourAnnots != null ? ourAnnots : Collections.emptyList(); - ourAnnots = newAnnots; - AlignmentI alignment = viewport.getAlignment(); - for (AlignmentAnnotation an : oldAnnots) - { - if (!newAnnots.contains(an)) - { - alignment.deleteAnnotation(an); - } - } - oldAnnots.clear(); - for (AlignmentAnnotation an : ourAnnots) - { - viewport.getAlignment().validateAnnotation(an); - } - } - - @Override - public void cancel() - { - cancelJobs(); - } - - void stop() - { - calcMan.disableWorker(this); - super.abortAndDestroy(); - } - - @Override - public void done() - { - for (var job : jobs) - { - if (job.isInputValid() && !job.isCompleted()) - { - /* if done was called but job is not completed then it - * must have been stopped by an exception */ - job.setStatus(JobStatus.SERVER_ERROR); - } - } - updateGlobalStatus(); - // dispose of unfinished jobs just in case - cancelJobs(); - } - - @Override - public String toString() - { - return AnnotationTask.this.toString() + "$AlignCalcWorker@" - + Integer.toHexString(hashCode()); - } - } - public AnnotationTask(AnnotationWebServiceClientI client, AnnotationAction action, List args, Credentials credentials, - AlignViewportI viewport, - TaskEventListener eventListener) + AlignViewportI viewport) { + super(client, args, credentials); this.client = client; this.action = action; - this.args = args; - this.credentials = credentials; this.viewport = viewport; - this.eventHandler = new TaskEventSupport<>(this, eventListener); - this.jobEventHandler = new DelegateJobEventListener<>(this.eventHandler); - } - - @Override - public long getUid() - { - return uid; } - public void start(AlignCalcManagerI2 calcManager) - { - if (this.worker != null) - throw new IllegalStateException("task already started"); - this.worker = new AlignCalcWorkerAdapter(calcManager); - if (taskStatus != JobStatus.CANCELLED) - { - List oldWorkers = calcManager.getWorkersOfClass( - AlignCalcWorkerAdapter.class); - for (var worker : oldWorkers) - { - if (action.getWebService().getName().equalsIgnoreCase( - ((AlignCalcWorkerAdapter) worker).getServiceName())) - { - // remove interactive workers for the same service. - calcManager.removeWorker(worker); - calcManager.cancelWorker(worker); - } - } - if (action.getWebService().isInteractive()) - calcManager.registerWorker(worker); - else - calcManager.startWorker(worker); - } - } - - /* - * The following methods are mostly copied from the {@link AbstractPollableTask} - * TODO: move common functionality to a base class - */ - @Override - public JobStatus getStatus() - { - return taskStatus; - } - - private void setStatus(JobStatus status) - { - if (this.taskStatus != status) - { - Console.debug(String.format("%s status change to %s", this, status.name())); - this.taskStatus = status; - eventHandler.fireTaskStatusChanged(status); - } - } - - private void updateGlobalStatus() - { - int precedence = -1; - for (BaseJob job : jobs) - { - JobStatus status = job.getStatus(); - int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status); - if (precedence < jobPrecedence) - precedence = jobPrecedence; - } - if (precedence >= 0) - { - setStatus(JobStatus.statusPrecedence[precedence]); - } - } - - @Override - public List getSubJobs() - { - return jobs; - } + // public void start(AlignCalcManagerI2 calcManager) + // { + // if (this.worker != null) + // throw new IllegalStateException("task already started"); + // this.worker = new AlignCalcWorkerAdapter(calcManager); + // if (taskStatus != JobStatus.CANCELLED) + // { + // List oldWorkers = calcManager.getWorkersOfClass( + // AlignCalcWorkerAdapter.class); + // for (var worker : oldWorkers) + // { + // if (action.getWebService().getName().equalsIgnoreCase( + // ((AlignCalcWorkerAdapter) worker).getServiceName())) + // { + // // remove interactive workers for the same service. + // calcManager.removeWorker(worker); + // calcManager.cancelWorker(worker); + // } + // } + // if (action.getWebService().isInteractive()) + // calcManager.registerWorker(worker); + // else + // calcManager.startWorker(worker); + // } + // } /** * Create and return a list of annotation jobs from the current state of the @@ -334,7 +102,8 @@ public class AnnotationTask implements TaskI * @throws ServiceInputInvalidException * input data is not valid */ - private List prepare() throws ServiceInputInvalidException + @Override + public List prepareJobs() throws ServiceInputInvalidException { AlignmentI alignment = viewport.getAlignment(); if (alignment == null || alignment.getWidth() <= 0 || @@ -366,38 +135,8 @@ public class AnnotationTask implements TaskI return List.of(job); } - private void startJobs() throws IOException - { - for (BaseJob job : jobs) - { - if (job.isInputValid() && job.getStatus() == JobStatus.READY) - { - var serverJob = client.submit(job.getInputSequences(), - args, credentials); - job.setServerJob(serverJob); - job.setStatus(JobStatus.SUBMITTED); - } - } - } - - private boolean poll() throws IOException - { - boolean allDone = true; - for (BaseJob job : jobs) - { - if (job.isInputValid() && !job.getStatus().isDone()) - { - WebServiceJobHandle serverJob = job.getServerJob(); - job.setStatus(client.getStatus(serverJob)); - job.setLog(client.getLog(serverJob)); - job.setErrorLog(client.getErrorLog(serverJob)); - } - allDone &= job.isCompleted(); - } - return allDone; - } - - private AnnotationResult retrieveResult() throws IOException + @Override + protected AnnotationResult collectResult(List jobs) throws IOException { final Map featureColours = new HashMap<>(); final Map featureFilters = new HashMap<>(); @@ -410,173 +149,129 @@ public class AnnotationTask implements TaskI * sequence, excluding regions not annotated due to gapMap/column * visibility */ - // update calcId if it is not already set on returned annotation - for (AlignmentAnnotation annot : returnedAnnot) + udpateCalcId(returnedAnnot); + int graphGroup = getNextGraphGroup(viewport.getAlignment()); + shiftGraphGroup(returnedAnnot, graphGroup); + List annotations = new ArrayList<>(); + for (AlignmentAnnotation ala : returnedAnnot) { - if (annot.getCalcId() == null || annot.getCalcId().isEmpty()) - { - annot.setCalcId(action.getFullName()); - } - annot.autoCalculated = action.isAlignmentAnalysis() && - action.getWebService().isInteractive(); - } - job.returnedAnnotations = returnedAnnot; - job.featureColours = featureColours; - job.featureFilters = featureFilters; - var ret = updateResultAnnotation(job, returnedAnnot); - var annotations = ret.get0(); - var transferFeatures = ret.get1(); - return new AnnotationResult(annotations, transferFeatures, featureColours, - featureFilters); - } - - private Pair, Boolean> updateResultAnnotation( - AnnotationJob job, List annotations) - { - List newAnnots = new ArrayList<>(); - // update graphGroup for all annotation - /* find a graphGroup greater than any existing one, could be moved - * to Alignment#getNewGraphGroup() - returns next unused graph group */ - int graphGroup = 1; - if (viewport.getAlignment().getAlignmentAnnotation() != null) - { - for (var ala : viewport.getAlignment().getAlignmentAnnotation()) - { - graphGroup = Math.max(graphGroup, ala.graphGroup); - } - } - // update graphGroup in the annotation rows returned form service' - /* TODO: look at sequence annotation rows and update graph groups in the - * case of reference annotation */ - for (AlignmentAnnotation ala : annotations) - { - if (ala.graphGroup > 0) - ala.graphGroup += graphGroup; - SequenceI aseq = null; - // transfer sequence refs and adjust gapMap - if (ala.sequenceRef != null) - { - aseq = job.seqNames.get(ala.sequenceRef.getName()); - } + SequenceI seq = job.seqNames.get(ala.sequenceRef.getName()); + SequenceI aseq = getRootDatasetSequence(seq); + Annotation[] gappedAnnots = createGappedAnnotations(ala.annotations, job.start, job.gapMap); ala.sequenceRef = aseq; - - Annotation[] resAnnot = ala.annotations; - boolean[] gapMap = job.gapMap; - Annotation[] gappedAnnot = new Annotation[Math.max( - viewport.getAlignment().getWidth(), gapMap.length)]; - for (int p = 0, ap = job.start; ap < gappedAnnot.length; ap++) - { - if (gapMap.length > ap && !gapMap[ap]) - gappedAnnot[ap] = new Annotation("", "", ' ', Float.NaN); - else if (p < resAnnot.length) - gappedAnnot[ap] = resAnnot[p++]; - // is this loop exhaustive of resAnnot? - } - ala.annotations = gappedAnnot; + ala.annotations = gappedAnnots; AlignmentAnnotation newAnnot = viewport.getAlignment() .updateFromOrCopyAnnotation(ala); - if (aseq != null) + if (aseq != null) // I suspect it's always true { aseq.addAlignmentAnnotation(newAnnot); newAnnot.adjustForAlignment(); AlignmentAnnotationUtils.replaceAnnotationOnAlignmentWith( newAnnot, newAnnot.label, newAnnot.getCalcId()); } - newAnnots.add(newAnnot); + annotations.add(newAnnot); } - boolean transferFeatures = false; + boolean hasFeatures = false; for (SequenceI sq : job.getInputSequences()) { - if (!sq.getFeatures().hasFeatures() && - (sq.getDBRefs() == null || sq.getDBRefs().size() == 0)) + if (!sq.getFeatures().hasFeatures() && (sq.getDBRefs() == null || sq.getDBRefs().isEmpty())) continue; - transferFeatures = true; + hasFeatures = true; SequenceI seq = job.seqNames.get(sq.getName()); - SequenceI dseq; - int start = job.start, end = job.end; - boolean[] gapMap = job.gapMap; - ContiguousI seqRange = seq.findPositions(start, end); - while ((dseq = seq).getDatasetSequence() != null) - { - seq = seq.getDatasetSequence(); - } - List sourceRange = new ArrayList<>(); - if (gapMap.length >= end) - { - int lastcol = start, col = start; - do - { - if (col == end || !gapMap[col]) - { - if (lastcol <= (col - 1)) - { - seqRange = seq.findPositions(lastcol, col); - sourceRange.add(seqRange); - } - lastcol = col + 1; - } - } while (col++ < end); - } - else - { - sourceRange.add(seq.findPositions(start, end)); - } + SequenceI datasetSeq = getRootDatasetSequence(seq); + List sourceRange = findContiguousRanges(datasetSeq, job.gapMap, job.start, job.end); + int[] sourceStartEnd = ContiguousI.toStartEndArray(sourceRange); + Mapping mp = new Mapping(new MapList( + sourceStartEnd, new int[] + { datasetSeq.getStart(), datasetSeq.getEnd() }, 1, 1)); + datasetSeq.transferAnnotation(sq, mp); + } + + return new AnnotationResult(annotations, hasFeatures, featureColours, featureFilters); + } - int i = 0; - int sourceStartEnd[] = new int[sourceRange.size() * 2]; - for (ContiguousI range : sourceRange) + /** + * Updates calcId on provided annotations if not already set. + */ + public void udpateCalcId(Iterable annotations) + { + for (var annotation : annotations) + { + if (annotation.getCalcId() == null || annotation.getCalcId().isEmpty()) { - sourceStartEnd[i++] = range.getBegin(); - sourceStartEnd[i++] = range.getEnd(); + annotation.setCalcId(action.getFullName()); } - Mapping mp = new Mapping(new MapList( - sourceStartEnd, new int[] - { seq.getStart(), seq.getEnd() }, 1, 1)); - dseq.transferAnnotation(sq, mp); + annotation.autoCalculated = action.isAlignmentAnalysis() && + action.getWebService().isInteractive(); } + } - return new Pair<>(newAnnots, transferFeatures); + private static int getNextGraphGroup(AlignmentI alignment) + { + if (alignment == null || alignment.getAlignmentAnnotation() == null) + return 1; + int graphGroup = 1; + for (AlignmentAnnotation ala : alignment.getAlignmentAnnotation()) + graphGroup = Math.max(graphGroup, ala.graphGroup); + return graphGroup; } - @Override - public AnnotationResult getResult() + private static void shiftGraphGroup(Iterable annotations, int shift) { - return result; + for (AlignmentAnnotation ala : annotations) + { + if (ala.graphGroup > 0) + { + ala.graphGroup += shift; + } + } } - @Override - public void cancel() + private static SequenceI getRootDatasetSequence(SequenceI sequence) { - setStatus(JobStatus.CANCELLED); - if (worker != null) + while (sequence.getDatasetSequence() != null) { - worker.stop(); + sequence = sequence.getDatasetSequence(); } - cancelJobs(); + return sequence; } - public void cancelJobs() + private Annotation[] createGappedAnnotations(Annotation[] annotations, int start, boolean[] gapMap) { - for (BaseJob job : jobs) + var size = Math.max(viewport.getAlignment().getWidth(), gapMap.length); + Annotation[] gappedAnnotations = new Annotation[size]; + for (int p = 0, ap = start; ap < size; ap++) { - if (!job.isCompleted()) + if (gapMap != null && gapMap.length > ap && !gapMap[ap]) { - try - { - if (job.getServerJob() != null) - { - client.cancel(job.getServerJob()); - } - job.setStatus(JobStatus.CANCELLED); - } catch (IOException e) - { - Console.error(String.format( - "failed to cancel job %s", job.getServerJob()), e); - } + gappedAnnotations[ap] = new Annotation("", "", ' ', Float.NaN); + } + else if (p < annotations.length) + { + gappedAnnotations[ap] = annotations[p++]; } } + return gappedAnnotations; + } + + private List findContiguousRanges(SequenceI seq, boolean[] gapMap, int start, int end) + { + if (gapMap == null || gapMap.length < end) + return List.of(seq.findPositions(start, end)); + List ranges = new ArrayList<>(); + int lastcol = start, col = start; + do + { + if (col == end || !gapMap[col]) + { + if (lastcol < col) + ranges.add(seq.findPositions(lastcol, col)); + lastcol = col + 1; + } + } while (++col <= end); + return ranges; } @Override diff --git a/src/jalview/ws2/actions/api/TaskI.java b/src/jalview/ws2/actions/api/TaskI.java index cb84944..5f4d575 100644 --- a/src/jalview/ws2/actions/api/TaskI.java +++ b/src/jalview/ws2/actions/api/TaskI.java @@ -38,6 +38,10 @@ public interface TaskI */ List getSubJobs(); + void addTaskEventListener(TaskEventListener listener); + + void removeTaskEventListener(TaskEventListener listener); + /** * Get the last result of the task or {@code null} if not present. Note that * the result is subject to change for restartable tasks. @@ -46,6 +50,12 @@ public interface TaskI */ T getResult(); + public void init() throws Exception; + + public boolean poll() throws Exception; + + public void complete() throws Exception; + /** * Cancel the task, stop all sub-jobs running on a server and stop all threads * managing this task.