1 package jalview.ws2.actions;
3 import java.io.IOException;
4 import java.util.Collections;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
12 import jalview.bin.Cache;
13 import jalview.bin.Console;
14 import jalview.util.ArrayUtils;
15 import jalview.util.MathUtils;
16 import jalview.ws.params.ArgumentI;
17 import jalview.ws2.actions.api.TaskEventListener;
18 import jalview.ws2.actions.api.TaskI;
19 import jalview.ws2.api.Credentials;
20 import jalview.ws2.api.JobStatus;
21 import jalview.ws2.api.WebServiceJobHandle;
22 import jalview.ws2.client.api.WebServiceClientI;
23 import jalview.ws2.helpers.DelegateJobEventListener;
24 import jalview.ws2.helpers.TaskEventSupport;
25 import static java.lang.String.format;
28 * An abstract base class for non-interactive tasks which implements common
29 * tasks methods. Additionally, it manages task execution in a polling loop.
30 * Subclasses are only required to implement {@link #prepare()} and
31 * {@link #done()} methods.
36 * the type of jobs managed by the task
38 * the type of result provided by the task
40 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
42 private final long uid = MathUtils.getUID();
44 protected final WebServiceClientI client;
46 protected final List<ArgumentI> args;
48 protected final Credentials credentials;
50 private final TaskEventSupport<R> eventHandler;
52 protected JobStatus taskStatus = null;
54 private Future<?> future = null;
56 protected List<T> jobs = Collections.emptyList();
60 protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
61 Credentials credentials, TaskEventListener<R> eventListener)
65 this.credentials = credentials;
66 this.eventHandler = new TaskEventSupport<R>(this, eventListener);
75 * Start the task using provided scheduled executor service. It creates a
76 * polling loop running at set intervals.
79 * executor to run the polling loop with
81 public void start(ScheduledExecutorService executor)
84 throw new IllegalStateException("task already started");
85 var runnable = new Runnable()
87 private int stage = STAGE_PREPARE;
89 private static final int STAGE_PREPARE = 0;
91 private static final int STAGE_START = 1;
93 private static final int STAGE_POLL = 2;
95 private static final int STAGE_FINALIZE = 3;
97 private static final int STAGE_DONE = 4;
99 private int retryCount = 0;
101 private static final int MAX_RETRY = 5;
104 * A polling loop run periodically which carries the task through its
105 * consecutive execution stages.
110 if (stage == STAGE_PREPARE)
112 // first stage - the input data is collected and the jobs are created
116 } catch (ServiceInputInvalidException e)
119 setStatus(JobStatus.INVALID);
120 eventHandler.fireTaskException(e);
121 throw new CompletionException(e);
124 setStatus(JobStatus.READY);
125 eventHandler.fireTaskStarted(jobs);
126 var jobListener = new DelegateJobEventListener<>(eventHandler);
129 job.addPropertyChagneListener(jobListener);
134 if (stage == STAGE_START)
136 // second stage - jobs are submitted to the server
139 setStatus(JobStatus.SUBMITTED);
141 if (stage == STAGE_POLL)
143 // third stage - jobs are poolled until all of them are completed
146 stage = STAGE_FINALIZE;
148 updateGlobalStatus();
150 if (stage == STAGE_FINALIZE)
152 // final stage - results are collected and stored
154 eventHandler.fireTaskCompleted(result);
158 } catch (IOException e)
160 eventHandler.fireTaskException(e);
161 if (++retryCount > MAX_RETRY)
165 setStatus(JobStatus.SERVER_ERROR);
166 throw new CompletionException(e);
169 if (stage == STAGE_DONE)
171 // finalization - terminating the future task
172 throw new CancellationException("task terminated");
176 if (taskStatus != JobStatus.CANCELLED)
177 future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
181 public JobStatus getStatus()
187 * Set the status of the task and notify the event handler.
192 protected void setStatus(JobStatus status)
194 if (this.taskStatus != status)
196 this.taskStatus = status;
197 eventHandler.fireTaskStatusChanged(status);
202 * Update task status according to the overall status of its jobs. The rules
203 * of setting the status are following:
205 * <li>task is invalid if all jobs are invalid</li>
206 * <li>task is completed if all but invalid jobs are completed</li>
207 * <li>task is ready, submitted or queued if at least one job is ready,
208 * submitted or queued an none proceeded to the next stage excluding
210 * <li>task is running if at least one job is running and none are failed or
212 * <li>task is cancelled if at least one job is cancelled and none failed</li>
213 * <li>task is failed or server error if at least one job is failed or server
217 private void updateGlobalStatus()
220 for (BaseJob job : jobs)
222 JobStatus status = job.getStatus();
223 int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
224 if (precedence < jobPrecedence)
225 precedence = jobPrecedence;
229 setStatus(JobStatus.statusPrecedence[precedence]);
236 setStatus(JobStatus.CANCELLED);
238 future.cancel(false);
243 public List<? extends BaseJob> getSubJobs()
249 * Collect and process input sequences for submission and return the list of
250 * jobs to be submitted.
252 * @return list of jobs to be submitted
253 * @throws ServiceInputInvalidException
254 * input is invalid and the task should not be started
256 protected abstract List<T> prepare() throws ServiceInputInvalidException;
259 * Submit all valid jobs to the server and store their job handles.
261 * @throws IOException
262 * if server error occurred
264 protected void startJobs() throws IOException
266 for (BaseJob job : jobs)
268 if (job.isInputValid() && job.getStatus() == JobStatus.READY)
270 WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
272 job.setServerJob(serverJob);
273 job.setStatus(JobStatus.SUBMITTED);
279 * Poll all running jobs and update their status and logs. Polling is repeated
280 * periodically until this method return true when all jobs are done.
282 * @return {@code true] if all jobs are done @throws IOException if server
285 protected boolean pollJobs() throws IOException
287 boolean allDone = true;
288 for (BaseJob job : jobs)
290 if (job.isInputValid() && !job.getStatus().isDone())
292 WebServiceJobHandle serverJob = job.getServerJob();
293 job.setStatus(client.getStatus(serverJob));
294 job.setLog(client.getLog(serverJob));
295 job.setErrorLog(client.getErrorLog(serverJob));
297 allDone &= job.isCompleted();
303 * Fetch and process the outputs produced by jobs and return the final result
304 * of the task. The method is called once all jobs have finished execution. If
305 * this method raises {@link IOException} it will be called again after a
306 * delay. All IO operations should happen before data processing, so
307 * potentially expensive computation is avoided in case of an error.
309 * @return final result of the computation
310 * @throws IOException
311 * if server error occurred
313 protected abstract R done() throws IOException;
316 * Cancel all running jobs. Used in case of task failure to cleanup the
317 * resources or when the task has been cancelled.
319 protected void cancelJobs()
321 for (BaseJob job : jobs)
323 if (!job.isCompleted())
327 if (job.getServerJob() != null)
329 client.cancel(job.getServerJob());
331 job.setStatus(JobStatus.CANCELLED);
332 } catch (IOException e)
334 Console.error(format("failed to cancel job %s", job.getServerJob()), e);
347 public String toString()
349 var status = taskStatus != null ? taskStatus.name() : "UNSET";
350 return String.format("%s(%x, %s)", getClass().getSimpleName(), uid, status);