*/
package jalview.datamodel;
+import java.util.Collection;
+
public interface ContiguousI
{
int getBegin(); // todo want long for genomic positions?
int getEnd();
+
+ public static int[] toStartEndArray(Collection<? extends ContiguousI> ranges)
+ {
+ int[] startend = new int[ranges.size() * 2];
+ int i = 0;
+ for (var range : ranges)
+ {
+ startend[i++] = range.getBegin();
+ startend[i++] = range.getEnd();
+ }
+ return startend;
+ }
}
+++ /dev/null
-package jalview.ws2.actions;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import jalview.bin.Cache;
-import jalview.bin.Console;
-import jalview.util.ArrayUtils;
-import jalview.util.MathUtils;
-import jalview.ws.params.ArgumentI;
-import jalview.ws2.actions.api.TaskEventListener;
-import jalview.ws2.actions.api.TaskI;
-import jalview.ws2.api.Credentials;
-import jalview.ws2.api.JobStatus;
-import jalview.ws2.api.WebServiceJobHandle;
-import jalview.ws2.client.api.WebServiceClientI;
-import jalview.ws2.helpers.DelegateJobEventListener;
-import jalview.ws2.helpers.TaskEventSupport;
-import static java.lang.String.format;
-
-/**
- * An abstract base class for non-interactive tasks which implements common
- * tasks methods. Additionally, it manages task execution in a polling loop.
- * Subclasses are only required to implement {@link #prepare()} and
- * {@link #done()} methods.
- *
- * @author mmwarowny
- *
- * @param <T>
- * the type of jobs managed by the task
- * @param <R>
- * the type of result provided by the task
- */
-public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
-{
- private final long uid = MathUtils.getUID();
-
- protected final WebServiceClientI client;
-
- protected final List<ArgumentI> args;
-
- protected final Credentials credentials;
-
- private final TaskEventSupport<R> eventHandler;
-
- protected JobStatus taskStatus = JobStatus.CREATED;
-
- private Future<?> future = null;
-
- protected List<T> jobs = Collections.emptyList();
-
- protected R result;
-
- protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
- Credentials credentials, TaskEventListener<R> eventListener)
- {
- this.client = client;
- this.args = args;
- this.credentials = credentials;
- this.eventHandler = new TaskEventSupport<R>(this, eventListener);
- }
-
- public long getUid()
- {
- return uid;
- }
-
- /**
- * Start the task using provided scheduled executor service. It creates a
- * polling loop running at set intervals.
- *
- * @param executor
- * executor to run the polling loop with
- */
- public void start(ScheduledExecutorService executor)
- {
- if (future != null)
- throw new IllegalStateException("task already started");
- var runnable = new Runnable()
- {
- private int stage = STAGE_PREPARE;
-
- private static final int STAGE_PREPARE = 0;
-
- private static final int STAGE_START = 1;
-
- private static final int STAGE_POLL = 2;
-
- private static final int STAGE_FINALIZE = 3;
-
- private static final int STAGE_DONE = 4;
-
- private int retryCount = 0;
-
- private static final int MAX_RETRY = 5;
-
- /**
- * A polling loop run periodically which carries the task through its
- * consecutive execution stages.
- */
- @Override
- public void run()
- {
- if (stage == STAGE_PREPARE)
- {
- // first stage - the input data is collected and the jobs are created
- try
- {
- jobs = prepare();
- } catch (ServiceInputInvalidException e)
- {
- stage = STAGE_DONE;
- setStatus(JobStatus.INVALID);
- eventHandler.fireTaskException(e);
- throw new CompletionException(e);
- }
- stage = STAGE_START;
- setStatus(JobStatus.READY);
- eventHandler.fireTaskStarted(jobs);
- var jobListener = new DelegateJobEventListener<>(eventHandler);
- for (var job : jobs)
- {
- job.addPropertyChagneListener(jobListener);
- }
- }
- try
- {
- if (stage == STAGE_START)
- {
- // second stage - jobs are submitted to the server
- startJobs();
- stage = STAGE_POLL;
- setStatus(JobStatus.SUBMITTED);
- }
- if (stage == STAGE_POLL)
- {
- // third stage - jobs are poolled until all of them are completed
- if (pollJobs())
- {
- stage = STAGE_FINALIZE;
- }
- updateGlobalStatus();
- }
- if (stage == STAGE_FINALIZE)
- {
- // final stage - results are collected and stored
- result = done();
- eventHandler.fireTaskCompleted(result);
- stage = STAGE_DONE;
- }
- retryCount = 0;
- } catch (IOException e)
- {
- eventHandler.fireTaskException(e);
- if (++retryCount > MAX_RETRY)
- {
- stage = STAGE_DONE;
- cancelJobs();
- setStatus(JobStatus.SERVER_ERROR);
- throw new CompletionException(e);
- }
- }
- if (stage == STAGE_DONE)
- {
- // finalization - terminating the future task
- throw new CancellationException("task terminated");
- }
- }
- };
- if (taskStatus != JobStatus.CANCELLED)
- future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
- }
-
- @Override
- public JobStatus getStatus()
- {
- return taskStatus;
- }
-
- /**
- * Set the status of the task and notify the event handler.
- *
- * @param status
- * new task status
- */
- protected void setStatus(JobStatus status)
- {
- if (this.taskStatus != status)
- {
- this.taskStatus = status;
- eventHandler.fireTaskStatusChanged(status);
- }
- }
-
- /**
- * Update task status according to the overall status of its jobs. The rules
- * of setting the status are following:
- * <ul>
- * <li>task is invalid if all jobs are invalid</li>
- * <li>task is completed if all but invalid jobs are completed</li>
- * <li>task is ready, submitted or queued if at least one job is ready,
- * submitted or queued an none proceeded to the next stage excluding
- * completed.</li>
- * <li>task is running if at least one job is running and none are failed or
- * cancelled</li>
- * <li>task is cancelled if at least one job is cancelled and none failed</li>
- * <li>task is failed or server error if at least one job is failed or server
- * error</li>
- * </ul>
- */
- private void updateGlobalStatus()
- {
- int precedence = -1;
- for (BaseJob job : jobs)
- {
- JobStatus status = job.getStatus();
- int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
- if (precedence < jobPrecedence)
- precedence = jobPrecedence;
- }
- if (precedence >= 0)
- {
- setStatus(JobStatus.statusPrecedence[precedence]);
- }
- }
-
- @Override
- public void cancel()
- {
- setStatus(JobStatus.CANCELLED);
- if (future != null)
- future.cancel(false);
- cancelJobs();
- }
-
- @Override
- public List<? extends BaseJob> getSubJobs()
- {
- return jobs;
- }
-
- /**
- * Collect and process input sequences for submission and return the list of
- * jobs to be submitted.
- *
- * @return list of jobs to be submitted
- * @throws ServiceInputInvalidException
- * input is invalid and the task should not be started
- */
- protected abstract List<T> prepare() throws ServiceInputInvalidException;
-
- /**
- * Submit all valid jobs to the server and store their job handles.
- *
- * @throws IOException
- * if server error occurred
- */
- protected void startJobs() throws IOException
- {
- for (BaseJob job : jobs)
- {
- if (job.isInputValid() && job.getStatus() == JobStatus.READY)
- {
- WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
- args, credentials);
- job.setServerJob(serverJob);
- job.setStatus(JobStatus.SUBMITTED);
- }
- }
- }
-
- /**
- * Poll all running jobs and update their status and logs. Polling is repeated
- * periodically until this method return true when all jobs are done.
- *
- * @return {@code true] if all jobs are done @throws IOException if server
- * error occurred
- */
- protected boolean pollJobs() throws IOException
- {
- boolean allDone = true;
- for (BaseJob job : jobs)
- {
- if (job.isInputValid() && !job.getStatus().isDone())
- {
- WebServiceJobHandle serverJob = job.getServerJob();
- job.setStatus(client.getStatus(serverJob));
- job.setLog(client.getLog(serverJob));
- job.setErrorLog(client.getErrorLog(serverJob));
- }
- allDone &= job.isCompleted();
- }
- return allDone;
- }
-
- /**
- * Fetch and process the outputs produced by jobs and return the final result
- * of the task. The method is called once all jobs have finished execution. If
- * this method raises {@link IOException} it will be called again after a
- * delay. All IO operations should happen before data processing, so
- * potentially expensive computation is avoided in case of an error.
- *
- * @return final result of the computation
- * @throws IOException
- * if server error occurred
- */
- protected abstract R done() throws IOException;
-
- /**
- * Cancel all running jobs. Used in case of task failure to cleanup the
- * resources or when the task has been cancelled.
- */
- protected void cancelJobs()
- {
- for (BaseJob job : jobs)
- {
- if (!job.isCompleted())
- {
- try
- {
- if (job.getServerJob() != null)
- {
- client.cancel(job.getServerJob());
- }
- job.setStatus(JobStatus.CANCELLED);
- } catch (IOException e)
- {
- Console.error(format("failed to cancel job %s", job.getServerJob()), e);
- }
- }
- }
- }
-
- @Override
- public R getResult()
- {
- return result;
- }
-
- @Override
- public String toString()
- {
- var status = taskStatus != null ? taskStatus.name() : "UNSET";
- return String.format("%s(%x, %s)", getClass().getSimpleName(), uid, status);
- }
-}
--- /dev/null
+package jalview.ws2.actions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import jalview.bin.Console;
+import jalview.util.ArrayUtils;
+import jalview.util.MathUtils;
+import jalview.ws.params.ArgumentI;
+import jalview.ws2.actions.api.TaskEventListener;
+import jalview.ws2.actions.api.TaskI;
+import jalview.ws2.api.Credentials;
+import jalview.ws2.api.JobStatus;
+import jalview.ws2.client.api.WebServiceClientI;
+import jalview.ws2.helpers.DelegateJobEventListener;
+import jalview.ws2.helpers.TaskEventSupport;
+
+import static java.lang.String.format;
+
+public abstract class BaseTask<T extends BaseJob, R> implements TaskI<R>
+{
+ protected final long uid = MathUtils.getUID();
+
+ protected final WebServiceClientI webClient;
+
+ protected final List<ArgumentI> args;
+
+ protected final Credentials credentials;
+
+ private final TaskEventSupport<R> eventHandler;
+
+ protected JobStatus status = JobStatus.CREATED;
+
+ protected List<T> jobs = Collections.emptyList();
+
+ protected R result = null;
+
+ protected Runnable cancelAction = () -> {
+ };
+
+ protected BaseTask(WebServiceClientI webClient, List<ArgumentI> args,
+ Credentials credentials)
+ {
+ this.webClient = webClient;
+ this.args = args;
+ this.credentials = credentials;
+ this.eventHandler = new TaskEventSupport<>(this);
+ }
+
+ @Override
+ public final long getUid()
+ {
+ return uid;
+ }
+
+ @Override
+ public final JobStatus getStatus()
+ {
+ return status;
+ }
+
+ @Override
+ public final List<? extends BaseJob> getSubJobs()
+ {
+ return jobs;
+ }
+
+ @Override
+ public final void addTaskEventListener(TaskEventListener<R> listener)
+ {
+ eventHandler.addListener(listener);
+ }
+
+ @Override
+ public final void removeTaskEventListener(TaskEventListener<R> listener)
+ {
+ eventHandler.addListener(listener);
+ }
+
+ @Override
+ public final R getResult()
+ {
+ return result;
+ }
+
+ @Override
+ public final void init() throws Exception
+ {
+ try
+ {
+ jobs = prepareJobs();
+ } catch (ServiceInputInvalidException e)
+ {
+ setStatus(JobStatus.INVALID);
+ eventHandler.fireTaskException(e);
+ throw e;
+ }
+ setStatus(JobStatus.READY);
+ eventHandler.fireTaskStarted(jobs);
+ var jobListener = new DelegateJobEventListener<>(eventHandler);
+ for (var job : jobs)
+ job.addPropertyChangeListener(jobListener);
+ submitJobs(jobs);
+ }
+
+ static final int MAX_SUBMIT_RETRY = 5;
+
+ protected final void submitJobs(List<T> jobs) throws IOException
+ {
+ var retryCounter = 0;
+ while (true)
+ {
+ try
+ {
+ submitJobs0(jobs);
+ setStatus(JobStatus.SUBMITTED);
+ break;
+ } catch (IOException e)
+ {
+ eventHandler.fireTaskException(e);
+ if (++retryCounter > MAX_SUBMIT_RETRY)
+ {
+ cancel();
+ setStatus(JobStatus.SERVER_ERROR);
+ throw e;
+ }
+ }
+ }
+ }
+
+ private final void submitJobs0(List<T> jobs) throws IOException
+ {
+ IOException exception = null;
+ for (BaseJob job : jobs)
+ {
+ if (job.getStatus() != JobStatus.READY || !job.isInputValid())
+ continue;
+ try
+ {
+ var jobRef = webClient.submit(job.getInputSequences(), args, credentials);
+ job.setServerJob(jobRef);
+ job.setStatus(JobStatus.SUBMITTED);
+ } catch (IOException e)
+ {
+ exception = e;
+ }
+ }
+ if (exception != null)
+ throw exception;
+ }
+
+ /**
+ * Poll all running jobs and update their status and logs. Polling is repeated
+ * periodically until this method return true when all jobs are done.
+ *
+ * @return {@code true] if all jobs are done @throws IOException if server
+ * error occurred
+ */
+ @Override
+ public final boolean poll() throws IOException
+ {
+ boolean allDone = true;
+ IOException exception = null;
+ for (BaseJob job : jobs)
+ {
+ if (job.isInputValid() && !job.getStatus().isDone())
+ {
+ var serverJob = job.getServerJob();
+ try
+ {
+ job.setStatus(webClient.getStatus(serverJob));
+ job.setLog(webClient.getLog(serverJob));
+ job.setErrorLog(webClient.getErrorLog(serverJob));
+ } catch (IOException e)
+ {
+ exception = e;
+ }
+ }
+ allDone &= job.isCompleted();
+ }
+ updateGlobalStatus();
+ if (exception != null)
+ throw exception;
+ return allDone;
+ }
+
+ @Override
+ public final void complete() throws IOException
+ {
+ for (var job : jobs)
+ {
+ if (!job.isCompleted())
+ {
+ // a fallback in case the executor decides to finish prematurely
+ cancelJob(job);
+ job.setStatus(JobStatus.SERVER_ERROR);
+ }
+ }
+ updateGlobalStatus();
+ try {
+ result = collectResult(jobs);
+ eventHandler.fireTaskCompleted(result);
+ }
+ catch (Exception e)
+ {
+ eventHandler.fireTaskException(e);
+ throw e;
+ }
+ }
+
+ /**
+ * Cancel all running jobs. Used in case of task failure to cleanup the
+ * resources or when the task has been cancelled.
+ */
+ @Override
+ public final void cancel()
+ {
+ cancelAction.run();
+ for (T job : jobs)
+ {
+ cancelJob(job);
+ }
+ }
+
+ private final void cancelJob(T job)
+ {
+ if (!job.isCompleted())
+ {
+ try
+ {
+ if (job.getServerJob() != null)
+ webClient.cancel(job.getServerJob());
+ job.setStatus(JobStatus.CANCELLED);
+ } catch (IOException e)
+ {
+ Console.error(format("failed to cancel job %s", job.getServerJob()), e);
+ }
+ }
+ }
+
+ protected final void setStatus(JobStatus status)
+ {
+ Objects.requireNonNull(status);
+ if (this.status != status)
+ {
+ this.status = status;
+ eventHandler.fireTaskStatusChanged(status);
+ }
+ }
+
+ protected abstract List<T> prepareJobs() throws ServiceInputInvalidException;
+
+ protected abstract R collectResult(List<T> jobs) throws IOException;
+
+ /**
+ * Update task status according to the overall status of its jobs. The rules
+ * of setting the status are following:
+ * <ul>
+ * <li>task is invalid if all jobs are invalid</li>
+ * <li>task is completed if all but invalid jobs are completed</li>
+ * <li>task is ready, submitted or queued if at least one job is ready,
+ * submitted or queued an none proceeded to the next stage excluding
+ * completed.</li>
+ * <li>task is running if at least one job is running and none are failed or
+ * cancelled</li>
+ * <li>task is cancelled if at least one job is cancelled and none failed</li>
+ * <li>task is failed or server error if at least one job is failed or server
+ * error</li>
+ * </ul>
+ */
+ protected final void updateGlobalStatus()
+ {
+ int precedence = -1;
+ for (BaseJob job : jobs)
+ {
+ JobStatus status = job.getStatus();
+ int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
+ if (precedence < jobPrecedence)
+ precedence = jobPrecedence;
+ }
+ if (precedence >= 0)
+ {
+ setStatus(JobStatus.statusPrecedence[precedence]);
+ }
+ }
+
+ /**
+ * Set the action that will be run when the {@link #cancel()} method is
+ * invoked. The action should typically stop the executor polling the task and
+ * release resources and threads running the task.
+ *
+ * @param action
+ * runnable to be executed when the task is cancelled
+ */
+ public void setCancelAction(Runnable action)
+ {
+ Objects.requireNonNull(action);
+ this.cancelAction = action;
+ }
+}
--- /dev/null
+package jalview.ws2.actions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.WeakHashMap;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import jalview.ws2.actions.api.TaskI;
+
+public class PollingTaskExecutor
+{
+ private static final Map<ScheduledExecutorService, PollingTaskExecutor> executorPool =
+ Collections.synchronizedMap(new WeakHashMap<>());
+
+ public static PollingTaskExecutor fromPool(ScheduledExecutorService executor)
+ {
+ return executorPool.computeIfAbsent(executor, PollingTaskExecutor::new);
+ }
+
+ private final ScheduledExecutorService executor;
+
+ public PollingTaskExecutor(ScheduledExecutorService executor)
+ {
+ this.executor = executor;
+ }
+
+ public Future<?> submit(TaskI<?> task)
+ {
+ Objects.requireNonNull(task);
+ return executor.scheduleWithFixedDelay(
+ new TaskRunnable(task), 2, 2, TimeUnit.SECONDS);
+ }
+
+ private static class TaskRunnable implements Runnable
+ {
+ private final TaskI<?> task;
+
+ private static final int STAGE_INIT = 0;
+
+ private static final int STAGE_POLLING = 2;
+
+ private static final int STAGE_FINISHED = 3;
+
+ private static final int STAGE_STOPPED = 4;
+
+ private int stage = STAGE_INIT;
+
+ private static final int MAX_POLL_RETRY = 5;
+
+ private int pollRetryCount = 0;
+
+ private TaskRunnable(TaskI<?> task)
+ {
+ this.task = task;
+ }
+
+ @Override
+ public void run()
+ {
+ if (task.getStatus().isDone())
+ {
+ stage = STAGE_STOPPED;
+ }
+ if (stage == STAGE_INIT)
+ {
+ try
+ {
+ task.init();
+ stage = STAGE_POLLING;
+ } catch (Exception e)
+ {
+ stage = STAGE_STOPPED;
+ throw new CompletionException(e);
+ }
+ }
+ try
+ {
+ if (stage == STAGE_POLLING && task.poll())
+ {
+ stage = STAGE_FINISHED;
+ }
+ if (stage == STAGE_FINISHED)
+ {
+ task.complete();
+ stage = STAGE_STOPPED;
+ }
+ } catch (Exception e)
+ {
+ if (++pollRetryCount > MAX_POLL_RETRY || e instanceof RuntimeException)
+ {
+ task.cancel();
+ stage = STAGE_STOPPED;
+ throw new CompletionException(e);
+ }
+ }
+ if (stage == STAGE_STOPPED)
+ {
+ throw new CancellationException();
+ }
+ }
+ }
+}
import jalview.datamodel.Sequence;
import jalview.datamodel.SequenceI;
import jalview.ws.params.ArgumentI;
-import jalview.ws2.actions.AbstractPollableTask;
+import jalview.ws2.actions.BaseTask;
import jalview.ws2.actions.ServiceInputInvalidException;
import jalview.ws2.actions.api.TaskEventListener;
import jalview.ws2.api.Credentials;
* @author mmwarowny
*
*/
-class AlignmentTask extends AbstractPollableTask<AlignmentJob, AlignmentResult>
+class AlignmentTask extends BaseTask<AlignmentJob, AlignmentResult>
{
/* task parameters set in the constructor */
private final AlignmentWebServiceClientI client;
AlignmentTask(AlignmentWebServiceClientI client, AlignmentAction action,
List<ArgumentI> args, Credentials credentials,
- AlignmentView msa, AlignViewportI viewport, boolean submitGaps,
- TaskEventListener<AlignmentResult> eventListener)
+ AlignViewportI viewport, boolean submitGaps)
{
- super(client, args, credentials, eventListener);
+ super(client, args, credentials);
this.client = client;
this.action = action;
- this.msa = msa;
+ this.msa = viewport.getAlignmentView(true);
this.viewport = viewport;
this.submitGaps = submitGaps;
this.currentView = viewport.getAlignment();
}
@Override
- protected List<AlignmentJob> prepare() throws ServiceInputInvalidException
+ protected List<AlignmentJob> prepareJobs() throws ServiceInputInvalidException
{
Console.info(format("starting alignment service %s:%s",
client.getClientName(), action.getName()));
}
@Override
- protected AlignmentResult done() throws IOException
+ protected AlignmentResult collectResult(List<AlignmentJob> jobs) throws IOException
{
IOException lastIOE = null;
for (AlignmentJob job : jobs)
import jalview.util.Pair;
import jalview.workers.AlignCalcWorker;
import jalview.ws.params.ArgumentI;
-import jalview.ws2.actions.AbstractPollableTask;
import jalview.ws2.actions.BaseJob;
+import jalview.ws2.actions.BaseTask;
import jalview.ws2.actions.ServiceInputInvalidException;
import jalview.ws2.actions.api.JobI;
import jalview.ws2.actions.api.TaskEventListener;
import jalview.ws2.helpers.DelegateJobEventListener;
import jalview.ws2.helpers.TaskEventSupport;
-public class AnnotationTask implements TaskI<AnnotationResult>
+public class AnnotationTask extends BaseTask<AnnotationJob, AnnotationResult>
{
- private final long uid = MathUtils.getUID();
-
private AnnotationWebServiceClientI client;
private final AnnotationAction action;
- private final List<ArgumentI> args;
-
- private final Credentials credentials;
-
private final AlignViewportI viewport;
- private final TaskEventSupport<AnnotationResult> eventHandler;
-
private JobStatus taskStatus = null;
private AlignCalcWorkerAdapter worker = null;
- private List<AnnotationJob> jobs = Collections.emptyList();
-
- private AnnotationResult result = null;
-
private DelegateJobEventListener<AnnotationResult> jobEventHandler;
- private class AlignCalcWorkerAdapter extends AlignCalcWorker
- implements PollableAlignCalcWorkerI
- {
- private boolean restarting = false;
-
- AlignCalcWorkerAdapter(AlignCalcManagerI2 calcMan)
- {
- super(viewport, null);
- this.calcMan = calcMan;
- }
-
- String getServiceName()
- {
- return action.getWebService().getName();
- }
-
- @Override
- public void startUp() throws Throwable
- {
- if (alignViewport.isClosed())
- {
- stop();
- throw new IllegalStateException("Starting annotation for closed viewport");
- }
- if (restarting)
- eventHandler.fireTaskRestarted();
- else
- restarting = true;
- jobs = Collections.emptyList();
- try
- {
- jobs = prepare();
- } catch (ServiceInputInvalidException e)
- {
- setStatus(JobStatus.INVALID);
- eventHandler.fireTaskException(e);
- throw e;
- }
- setStatus(JobStatus.READY);
- eventHandler.fireTaskStarted(jobs);
- for (var job : jobs)
- {
- job.addPropertyChagneListener(jobEventHandler);
- }
- try
- {
- startJobs();
- } catch (IOException e)
- {
- eventHandler.fireTaskException(e);
- cancelJobs();
- setStatus(JobStatus.SERVER_ERROR);
- throw e;
- }
- setStatus(JobStatus.SUBMITTED);
- }
-
- @Override
- public boolean poll() throws Throwable
- {
- boolean done = AnnotationTask.this.poll();
- updateGlobalStatus();
- if (done)
- {
- retrieveAndProcessResult();
- eventHandler.fireTaskCompleted(result);
- }
- return done;
- }
-
- private void retrieveAndProcessResult() throws IOException
- {
- result = retrieveResult();
- updateOurAnnots(result.annotations);
- if (result.transferFeatures)
- {
- final var featureColours = result.featureColours;
- final var featureFilters = result.featureFilters;
- viewport.applyFeaturesStyle(new FeatureSettingsAdapter()
- {
- @Override
- public FeatureColourI getFeatureColour(String type)
- {
- return featureColours.get(type);
- }
-
- @Override
- public FeatureMatcherSetI getFeatureFilters(String type)
- {
- return featureFilters.get(type);
- }
-
- @Override
- public boolean isFeatureDisplayed(String type)
- {
- return featureColours.containsKey(type);
- }
- });
- }
- }
-
- @Override
- public void updateAnnotation()
- {
- var job = jobs.size() > 0 ? jobs.get(0) : null;
- if (!calcMan.isWorking(this) && job != null)
- {
- var ret = updateResultAnnotation(job, job.returnedAnnotations);
- updateOurAnnots(ret.get0());
- }
- }
-
- private void updateOurAnnots(List<AlignmentAnnotation> newAnnots)
- {
- List<AlignmentAnnotation> oldAnnots = ourAnnots != null ? ourAnnots : Collections.emptyList();
- ourAnnots = newAnnots;
- AlignmentI alignment = viewport.getAlignment();
- for (AlignmentAnnotation an : oldAnnots)
- {
- if (!newAnnots.contains(an))
- {
- alignment.deleteAnnotation(an);
- }
- }
- oldAnnots.clear();
- for (AlignmentAnnotation an : ourAnnots)
- {
- viewport.getAlignment().validateAnnotation(an);
- }
- }
-
- @Override
- public void cancel()
- {
- cancelJobs();
- }
-
- void stop()
- {
- calcMan.disableWorker(this);
- super.abortAndDestroy();
- }
-
- @Override
- public void done()
- {
- for (var job : jobs)
- {
- if (job.isInputValid() && !job.isCompleted())
- {
- /* if done was called but job is not completed then it
- * must have been stopped by an exception */
- job.setStatus(JobStatus.SERVER_ERROR);
- }
- }
- updateGlobalStatus();
- // dispose of unfinished jobs just in case
- cancelJobs();
- }
-
- @Override
- public String toString()
- {
- return AnnotationTask.this.toString() + "$AlignCalcWorker@"
- + Integer.toHexString(hashCode());
- }
- }
-
public AnnotationTask(AnnotationWebServiceClientI client,
AnnotationAction action, List<ArgumentI> args, Credentials credentials,
- AlignViewportI viewport,
- TaskEventListener<AnnotationResult> eventListener)
+ AlignViewportI viewport)
{
+ super(client, args, credentials);
this.client = client;
this.action = action;
- this.args = args;
- this.credentials = credentials;
this.viewport = viewport;
- this.eventHandler = new TaskEventSupport<>(this, eventListener);
- this.jobEventHandler = new DelegateJobEventListener<>(this.eventHandler);
- }
-
- @Override
- public long getUid()
- {
- return uid;
}
- public void start(AlignCalcManagerI2 calcManager)
- {
- if (this.worker != null)
- throw new IllegalStateException("task already started");
- this.worker = new AlignCalcWorkerAdapter(calcManager);
- if (taskStatus != JobStatus.CANCELLED)
- {
- List<AlignCalcWorkerI> oldWorkers = calcManager.getWorkersOfClass(
- AlignCalcWorkerAdapter.class);
- for (var worker : oldWorkers)
- {
- if (action.getWebService().getName().equalsIgnoreCase(
- ((AlignCalcWorkerAdapter) worker).getServiceName()))
- {
- // remove interactive workers for the same service.
- calcManager.removeWorker(worker);
- calcManager.cancelWorker(worker);
- }
- }
- if (action.getWebService().isInteractive())
- calcManager.registerWorker(worker);
- else
- calcManager.startWorker(worker);
- }
- }
-
- /*
- * The following methods are mostly copied from the {@link AbstractPollableTask}
- * TODO: move common functionality to a base class
- */
- @Override
- public JobStatus getStatus()
- {
- return taskStatus;
- }
-
- private void setStatus(JobStatus status)
- {
- if (this.taskStatus != status)
- {
- Console.debug(String.format("%s status change to %s", this, status.name()));
- this.taskStatus = status;
- eventHandler.fireTaskStatusChanged(status);
- }
- }
-
- private void updateGlobalStatus()
- {
- int precedence = -1;
- for (BaseJob job : jobs)
- {
- JobStatus status = job.getStatus();
- int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
- if (precedence < jobPrecedence)
- precedence = jobPrecedence;
- }
- if (precedence >= 0)
- {
- setStatus(JobStatus.statusPrecedence[precedence]);
- }
- }
-
- @Override
- public List<? extends JobI> getSubJobs()
- {
- return jobs;
- }
+ // public void start(AlignCalcManagerI2 calcManager)
+ // {
+ // if (this.worker != null)
+ // throw new IllegalStateException("task already started");
+ // this.worker = new AlignCalcWorkerAdapter(calcManager);
+ // if (taskStatus != JobStatus.CANCELLED)
+ // {
+ // List<AlignCalcWorkerI> oldWorkers = calcManager.getWorkersOfClass(
+ // AlignCalcWorkerAdapter.class);
+ // for (var worker : oldWorkers)
+ // {
+ // if (action.getWebService().getName().equalsIgnoreCase(
+ // ((AlignCalcWorkerAdapter) worker).getServiceName()))
+ // {
+ // // remove interactive workers for the same service.
+ // calcManager.removeWorker(worker);
+ // calcManager.cancelWorker(worker);
+ // }
+ // }
+ // if (action.getWebService().isInteractive())
+ // calcManager.registerWorker(worker);
+ // else
+ // calcManager.startWorker(worker);
+ // }
+ // }
/**
* Create and return a list of annotation jobs from the current state of the
* @throws ServiceInputInvalidException
* input data is not valid
*/
- private List<AnnotationJob> prepare() throws ServiceInputInvalidException
+ @Override
+ public List<AnnotationJob> prepareJobs() throws ServiceInputInvalidException
{
AlignmentI alignment = viewport.getAlignment();
if (alignment == null || alignment.getWidth() <= 0 ||
return List.of(job);
}
- private void startJobs() throws IOException
- {
- for (BaseJob job : jobs)
- {
- if (job.isInputValid() && job.getStatus() == JobStatus.READY)
- {
- var serverJob = client.submit(job.getInputSequences(),
- args, credentials);
- job.setServerJob(serverJob);
- job.setStatus(JobStatus.SUBMITTED);
- }
- }
- }
-
- private boolean poll() throws IOException
- {
- boolean allDone = true;
- for (BaseJob job : jobs)
- {
- if (job.isInputValid() && !job.getStatus().isDone())
- {
- WebServiceJobHandle serverJob = job.getServerJob();
- job.setStatus(client.getStatus(serverJob));
- job.setLog(client.getLog(serverJob));
- job.setErrorLog(client.getErrorLog(serverJob));
- }
- allDone &= job.isCompleted();
- }
- return allDone;
- }
-
- private AnnotationResult retrieveResult() throws IOException
+ @Override
+ protected AnnotationResult collectResult(List<AnnotationJob> jobs) throws IOException
{
final Map<String, FeatureColourI> featureColours = new HashMap<>();
final Map<String, FeatureMatcherSetI> featureFilters = new HashMap<>();
* sequence, excluding regions not annotated due to gapMap/column
* visibility */
- // update calcId if it is not already set on returned annotation
- for (AlignmentAnnotation annot : returnedAnnot)
+ udpateCalcId(returnedAnnot);
+ int graphGroup = getNextGraphGroup(viewport.getAlignment());
+ shiftGraphGroup(returnedAnnot, graphGroup);
+ List<AlignmentAnnotation> annotations = new ArrayList<>();
+ for (AlignmentAnnotation ala : returnedAnnot)
{
- if (annot.getCalcId() == null || annot.getCalcId().isEmpty())
- {
- annot.setCalcId(action.getFullName());
- }
- annot.autoCalculated = action.isAlignmentAnalysis() &&
- action.getWebService().isInteractive();
- }
- job.returnedAnnotations = returnedAnnot;
- job.featureColours = featureColours;
- job.featureFilters = featureFilters;
- var ret = updateResultAnnotation(job, returnedAnnot);
- var annotations = ret.get0();
- var transferFeatures = ret.get1();
- return new AnnotationResult(annotations, transferFeatures, featureColours,
- featureFilters);
- }
-
- private Pair<List<AlignmentAnnotation>, Boolean> updateResultAnnotation(
- AnnotationJob job, List<AlignmentAnnotation> annotations)
- {
- List<AlignmentAnnotation> newAnnots = new ArrayList<>();
- // update graphGroup for all annotation
- /* find a graphGroup greater than any existing one, could be moved
- * to Alignment#getNewGraphGroup() - returns next unused graph group */
- int graphGroup = 1;
- if (viewport.getAlignment().getAlignmentAnnotation() != null)
- {
- for (var ala : viewport.getAlignment().getAlignmentAnnotation())
- {
- graphGroup = Math.max(graphGroup, ala.graphGroup);
- }
- }
- // update graphGroup in the annotation rows returned form service'
- /* TODO: look at sequence annotation rows and update graph groups in the
- * case of reference annotation */
- for (AlignmentAnnotation ala : annotations)
- {
- if (ala.graphGroup > 0)
- ala.graphGroup += graphGroup;
- SequenceI aseq = null;
- // transfer sequence refs and adjust gapMap
- if (ala.sequenceRef != null)
- {
- aseq = job.seqNames.get(ala.sequenceRef.getName());
- }
+ SequenceI seq = job.seqNames.get(ala.sequenceRef.getName());
+ SequenceI aseq = getRootDatasetSequence(seq);
+ Annotation[] gappedAnnots = createGappedAnnotations(ala.annotations, job.start, job.gapMap);
ala.sequenceRef = aseq;
-
- Annotation[] resAnnot = ala.annotations;
- boolean[] gapMap = job.gapMap;
- Annotation[] gappedAnnot = new Annotation[Math.max(
- viewport.getAlignment().getWidth(), gapMap.length)];
- for (int p = 0, ap = job.start; ap < gappedAnnot.length; ap++)
- {
- if (gapMap.length > ap && !gapMap[ap])
- gappedAnnot[ap] = new Annotation("", "", ' ', Float.NaN);
- else if (p < resAnnot.length)
- gappedAnnot[ap] = resAnnot[p++];
- // is this loop exhaustive of resAnnot?
- }
- ala.annotations = gappedAnnot;
+ ala.annotations = gappedAnnots;
AlignmentAnnotation newAnnot = viewport.getAlignment()
.updateFromOrCopyAnnotation(ala);
- if (aseq != null)
+ if (aseq != null) // I suspect it's always true
{
aseq.addAlignmentAnnotation(newAnnot);
newAnnot.adjustForAlignment();
AlignmentAnnotationUtils.replaceAnnotationOnAlignmentWith(
newAnnot, newAnnot.label, newAnnot.getCalcId());
}
- newAnnots.add(newAnnot);
+ annotations.add(newAnnot);
}
- boolean transferFeatures = false;
+ boolean hasFeatures = false;
for (SequenceI sq : job.getInputSequences())
{
- if (!sq.getFeatures().hasFeatures() &&
- (sq.getDBRefs() == null || sq.getDBRefs().size() == 0))
+ if (!sq.getFeatures().hasFeatures() && (sq.getDBRefs() == null || sq.getDBRefs().isEmpty()))
continue;
- transferFeatures = true;
+ hasFeatures = true;
SequenceI seq = job.seqNames.get(sq.getName());
- SequenceI dseq;
- int start = job.start, end = job.end;
- boolean[] gapMap = job.gapMap;
- ContiguousI seqRange = seq.findPositions(start, end);
- while ((dseq = seq).getDatasetSequence() != null)
- {
- seq = seq.getDatasetSequence();
- }
- List<ContiguousI> sourceRange = new ArrayList<>();
- if (gapMap.length >= end)
- {
- int lastcol = start, col = start;
- do
- {
- if (col == end || !gapMap[col])
- {
- if (lastcol <= (col - 1))
- {
- seqRange = seq.findPositions(lastcol, col);
- sourceRange.add(seqRange);
- }
- lastcol = col + 1;
- }
- } while (col++ < end);
- }
- else
- {
- sourceRange.add(seq.findPositions(start, end));
- }
+ SequenceI datasetSeq = getRootDatasetSequence(seq);
+ List<ContiguousI> sourceRange = findContiguousRanges(datasetSeq, job.gapMap, job.start, job.end);
+ int[] sourceStartEnd = ContiguousI.toStartEndArray(sourceRange);
+ Mapping mp = new Mapping(new MapList(
+ sourceStartEnd, new int[]
+ { datasetSeq.getStart(), datasetSeq.getEnd() }, 1, 1));
+ datasetSeq.transferAnnotation(sq, mp);
+ }
+
+ return new AnnotationResult(annotations, hasFeatures, featureColours, featureFilters);
+ }
- int i = 0;
- int sourceStartEnd[] = new int[sourceRange.size() * 2];
- for (ContiguousI range : sourceRange)
+ /**
+ * Updates calcId on provided annotations if not already set.
+ */
+ public void udpateCalcId(Iterable<AlignmentAnnotation> annotations)
+ {
+ for (var annotation : annotations)
+ {
+ if (annotation.getCalcId() == null || annotation.getCalcId().isEmpty())
{
- sourceStartEnd[i++] = range.getBegin();
- sourceStartEnd[i++] = range.getEnd();
+ annotation.setCalcId(action.getFullName());
}
- Mapping mp = new Mapping(new MapList(
- sourceStartEnd, new int[]
- { seq.getStart(), seq.getEnd() }, 1, 1));
- dseq.transferAnnotation(sq, mp);
+ annotation.autoCalculated = action.isAlignmentAnalysis() &&
+ action.getWebService().isInteractive();
}
+ }
- return new Pair<>(newAnnots, transferFeatures);
+ private static int getNextGraphGroup(AlignmentI alignment)
+ {
+ if (alignment == null || alignment.getAlignmentAnnotation() == null)
+ return 1;
+ int graphGroup = 1;
+ for (AlignmentAnnotation ala : alignment.getAlignmentAnnotation())
+ graphGroup = Math.max(graphGroup, ala.graphGroup);
+ return graphGroup;
}
- @Override
- public AnnotationResult getResult()
+ private static void shiftGraphGroup(Iterable<AlignmentAnnotation> annotations, int shift)
{
- return result;
+ for (AlignmentAnnotation ala : annotations)
+ {
+ if (ala.graphGroup > 0)
+ {
+ ala.graphGroup += shift;
+ }
+ }
}
- @Override
- public void cancel()
+ private static SequenceI getRootDatasetSequence(SequenceI sequence)
{
- setStatus(JobStatus.CANCELLED);
- if (worker != null)
+ while (sequence.getDatasetSequence() != null)
{
- worker.stop();
+ sequence = sequence.getDatasetSequence();
}
- cancelJobs();
+ return sequence;
}
- public void cancelJobs()
+ private Annotation[] createGappedAnnotations(Annotation[] annotations, int start, boolean[] gapMap)
{
- for (BaseJob job : jobs)
+ var size = Math.max(viewport.getAlignment().getWidth(), gapMap.length);
+ Annotation[] gappedAnnotations = new Annotation[size];
+ for (int p = 0, ap = start; ap < size; ap++)
{
- if (!job.isCompleted())
+ if (gapMap != null && gapMap.length > ap && !gapMap[ap])
{
- try
- {
- if (job.getServerJob() != null)
- {
- client.cancel(job.getServerJob());
- }
- job.setStatus(JobStatus.CANCELLED);
- } catch (IOException e)
- {
- Console.error(String.format(
- "failed to cancel job %s", job.getServerJob()), e);
- }
+ gappedAnnotations[ap] = new Annotation("", "", ' ', Float.NaN);
+ }
+ else if (p < annotations.length)
+ {
+ gappedAnnotations[ap] = annotations[p++];
}
}
+ return gappedAnnotations;
+ }
+
+ private List<ContiguousI> findContiguousRanges(SequenceI seq, boolean[] gapMap, int start, int end)
+ {
+ if (gapMap == null || gapMap.length < end)
+ return List.of(seq.findPositions(start, end));
+ List<ContiguousI> ranges = new ArrayList<>();
+ int lastcol = start, col = start;
+ do
+ {
+ if (col == end || !gapMap[col])
+ {
+ if (lastcol < col)
+ ranges.add(seq.findPositions(lastcol, col));
+ lastcol = col + 1;
+ }
+ } while (++col <= end);
+ return ranges;
}
@Override
*/
List<? extends JobI> getSubJobs();
+ void addTaskEventListener(TaskEventListener<T> listener);
+
+ void removeTaskEventListener(TaskEventListener<T> listener);
+
/**
* Get the last result of the task or {@code null} if not present. Note that
* the result is subject to change for restartable tasks.
*/
T getResult();
+ public void init() throws Exception;
+
+ public boolean poll() throws Exception;
+
+ public void complete() throws Exception;
+
/**
* Cancel the task, stop all sub-jobs running on a server and stop all threads
* managing this task.