1 package jalview.ws2.actions;
3 import java.io.IOException;
4 import java.util.Collections;
6 import java.util.concurrent.CancellationException;
7 import java.util.concurrent.CompletionException;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.ScheduledExecutorService;
10 import java.util.concurrent.TimeUnit;
12 import jalview.bin.Cache;
13 import jalview.util.ArrayUtils;
14 import jalview.util.MathUtils;
15 import jalview.ws.params.ArgumentI;
16 import jalview.ws2.actions.api.TaskEventListener;
17 import jalview.ws2.actions.api.TaskI;
18 import jalview.ws2.api.Credentials;
19 import jalview.ws2.api.JobStatus;
20 import jalview.ws2.api.WebServiceJobHandle;
21 import jalview.ws2.client.api.WebServiceClientI;
22 import jalview.ws2.helpers.DelegateJobEventListener;
23 import jalview.ws2.helpers.TaskEventSupport;
24 import static java.lang.String.format;
27 * An abstract base class for non-interactive tasks which implements common
28 * tasks methods. Additionally, it manages task execution in a polling loop.
29 * Subclasses are only required to implement {@link #prepare()} and
30 * {@link #done()} methods.
35 * the type of jobs managed by the task
37 * the type of result provided by the task
39 public abstract class AbstractPollableTask<T extends BaseJob, R> implements TaskI<R>
41 private final long uid = MathUtils.getUID();
43 protected final WebServiceClientI client;
45 protected final List<ArgumentI> args;
47 protected final Credentials credentials;
49 private final TaskEventSupport<R> eventHandler;
51 protected JobStatus taskStatus = null;
53 private Future<?> future = null;
55 protected List<T> jobs = Collections.emptyList();
59 protected AbstractPollableTask(WebServiceClientI client, List<ArgumentI> args,
60 Credentials credentials, TaskEventListener<R> eventListener)
64 this.credentials = credentials;
65 this.eventHandler = new TaskEventSupport<R>(this, eventListener);
74 * Start the task using provided scheduled executor service. It creates a
75 * polling loop running at set intervals.
78 * executor to run the polling loop with
80 public void start(ScheduledExecutorService executor)
83 throw new IllegalStateException("task already started");
84 var runnable = new Runnable()
86 private int stage = STAGE_PREPARE;
88 private static final int STAGE_PREPARE = 0;
90 private static final int STAGE_START = 1;
92 private static final int STAGE_POLL = 2;
94 private static final int STAGE_FINALIZE = 3;
96 private static final int STAGE_DONE = 4;
98 private int retryCount = 0;
100 private static final int MAX_RETRY = 5;
103 * A polling loop run periodically which carries the task through its
104 * consecutive execution stages.
109 if (stage == STAGE_PREPARE)
111 // first stage - the input data is collected and the jobs are created
115 } catch (ServiceInputInvalidException e)
118 setStatus(JobStatus.INVALID);
119 eventHandler.fireTaskException(e);
120 throw new CompletionException(e);
123 setStatus(JobStatus.READY);
124 eventHandler.fireTaskStarted(jobs);
125 var jobListener = new DelegateJobEventListener<>(eventHandler);
128 job.addPropertyChagneListener(jobListener);
133 if (stage == STAGE_START)
135 // second stage - jobs are submitted to the server
138 setStatus(JobStatus.SUBMITTED);
140 if (stage == STAGE_POLL)
142 // third stage - jobs are poolled until all of them are completed
145 stage = STAGE_FINALIZE;
147 updateGlobalStatus();
149 if (stage == STAGE_FINALIZE)
151 // final stage - results are collected and stored
153 eventHandler.fireTaskCompleted(result);
157 } catch (IOException e)
159 eventHandler.fireTaskException(e);
160 if (++retryCount > MAX_RETRY)
164 setStatus(JobStatus.SERVER_ERROR);
165 throw new CompletionException(e);
168 if (stage == STAGE_DONE)
170 // finalization - terminating the future task
171 throw new CancellationException("task terminated");
175 if (taskStatus != JobStatus.CANCELLED)
176 future = executor.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
180 public JobStatus getStatus()
186 * Set the status of the task and notify the event handler.
191 protected void setStatus(JobStatus status)
193 if (this.taskStatus != status)
195 this.taskStatus = status;
196 eventHandler.fireTaskStatusChanged(status);
201 * Update task status according to the overall status of its jobs. The rules
202 * of setting the status are following:
204 * <li>task is invalid if all jobs are invalid</li>
205 * <li>task is completed if all but invalid jobs are completed</li>
206 * <li>task is ready, submitted or queued if at least one job is ready,
207 * submitted or queued an none proceeded to the next stage excluding
209 * <li>task is running if at least one job is running and none are failed or
211 * <li>task is cancelled if at least one job is cancelled and none failed</li>
212 * <li>task is failed or server error if at least one job is failed or server
216 private void updateGlobalStatus()
219 for (BaseJob job : jobs)
221 JobStatus status = job.getStatus();
222 int jobPrecedence = ArrayUtils.indexOf(JobStatus.statusPrecedence, status);
223 if (precedence < jobPrecedence)
224 precedence = jobPrecedence;
228 setStatus(JobStatus.statusPrecedence[precedence]);
235 setStatus(JobStatus.CANCELLED);
237 future.cancel(false);
242 public List<? extends BaseJob> getSubJobs()
248 * Collect and process input sequences for submission and return the list of
249 * jobs to be submitted.
251 * @return list of jobs to be submitted
252 * @throws ServiceInputInvalidException
253 * input is invalid and the task should not be started
255 protected abstract List<T> prepare() throws ServiceInputInvalidException;
258 * Submit all valid jobs to the server and store their job handles.
260 * @throws IOException
261 * if server error occurred
263 protected void startJobs() throws IOException
265 for (BaseJob job : jobs)
267 if (job.isInputValid() && job.getStatus() == JobStatus.READY)
269 WebServiceJobHandle serverJob = client.submit(job.getInputSequences(),
271 job.setServerJob(serverJob);
272 job.setStatus(JobStatus.SUBMITTED);
278 * Poll all running jobs and update their status and logs. Polling is repeated
279 * periodically until this method return true when all jobs are done.
281 * @return {@code true] if all jobs are done @throws IOException if server
284 protected boolean pollJobs() throws IOException
286 boolean allDone = true;
287 for (BaseJob job : jobs)
289 if (job.isInputValid() && !job.getStatus().isDone())
291 WebServiceJobHandle serverJob = job.getServerJob();
292 job.setStatus(client.getStatus(serverJob));
293 job.setLog(client.getLog(serverJob));
294 job.setErrorLog(client.getErrorLog(serverJob));
296 allDone &= job.isCompleted();
302 * Fetch and process the outputs produced by jobs and return the final result
303 * of the task. The method is called once all jobs have finished execution. If
304 * this method raises {@link IOException} it will be called again after a
305 * delay. All IO operations should happen before data processing, so
306 * potentially expensive computation is avoided in case of an error.
308 * @return final result of the computation
309 * @throws IOException
310 * if server error occurred
312 protected abstract R done() throws IOException;
315 * Cancel all running jobs. Used in case of task failure to cleanup the
316 * resources or when the task has been cancelled.
318 protected void cancelJobs()
320 for (BaseJob job : jobs)
322 if (!job.isCompleted())
326 if (job.getServerJob() != null)
328 client.cancel(job.getServerJob());
330 job.setStatus(JobStatus.CANCELLED);
331 } catch (IOException e)
333 Cache.log.error(format("failed to cancel job %s", job.getServerJob()), e);