1 /* Copyright (c) 2009 Peter Troshin
\r
3 * JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0
\r
5 * This library is free software; you can redistribute it and/or modify it under the terms of the
\r
6 * Apache License version 2 as published by the Apache Software Foundation
\r
8 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
\r
9 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
\r
10 * License for more details.
\r
12 * A copy of the license is in apache_license.txt. It is also available here:
\r
13 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
\r
15 * Any republication or derived work distributed in source code form
\r
16 * must include this copyright and license notice.
\r
19 package compbio.engine.cluster.drmaa;
\r
21 import java.io.IOException;
\r
22 import java.util.Collections;
\r
23 import java.util.List;
\r
24 import java.util.Map;
\r
26 import org.apache.log4j.Logger;
\r
27 import org.ggf.drmaa.DrmaaException;
\r
28 import org.ggf.drmaa.InvalidJobException;
\r
29 import org.ggf.drmaa.JobInfo;
\r
30 import org.ggf.drmaa.JobTemplate;
\r
31 import org.ggf.drmaa.Session;
\r
33 import compbio.engine.Cleaner;
\r
34 import compbio.engine.ClusterJobId;
\r
35 import compbio.engine.Configurator;
\r
36 import compbio.engine.SyncExecutor;
\r
38 import compbio.engine.client.ConfiguredExecutable;
\r
39 import compbio.engine.client.Executable;
\r
40 import compbio.engine.client.PathValidator;
\r
41 import compbio.engine.client.PipedExecutable;
\r
42 import compbio.engine.client.Util;
\r
43 import compbio.engine.client.Executable.ExecProvider;
\r
44 import compbio.metadata.JobExecutionException;
\r
45 import compbio.metadata.JobStatus;
\r
46 import compbio.metadata.JobSubmissionException;
\r
47 import compbio.metadata.ResultNotAvailableException;
\r
50 * Single cluster job runner class
\r
55 * TODO after call to submitJob() no setters really work as the job
\r
56 * template gets deleted, this needs to be taken into account in this
\r
59 public class JobRunner implements SyncExecutor {
\r
61 final JobTemplate jobtempl;
\r
62 static ClusterSession clustSession = ClusterSession.getInstance();
\r
63 static Session session = clustSession.getSession();
\r
64 static final Logger log = Logger.getLogger(JobRunner.class);
\r
65 final ConfiguredExecutable<?> confExecutable;
\r
66 private final String workDirectory;
\r
69 public JobRunner(ConfiguredExecutable<?> confExec)
\r
70 throws JobSubmissionException {
\r
72 String command = confExec.getCommand(ExecProvider.Cluster);
\r
73 PathValidator.validateExecutable(command);
\r
74 log.debug("Setting command " + command);
\r
76 jobtempl = session.createJobTemplate();
\r
77 jobtempl.setRemoteCommand(command);
\r
78 jobtempl.setJoinFiles(false);
\r
79 setJobName(confExec.getExecutable().getClass().getSimpleName());
\r
81 this.workDirectory = confExec.getWorkDirectory();
\r
82 assert !compbio.util.Util.isEmpty(workDirectory);
\r
84 // Tell the job where to get/put things
\r
85 jobtempl.setWorkingDirectory(this.workDirectory);
\r
88 * Set environment variables for the process if any
\r
90 Map<String, String> jobEnv = confExec.getEnvironment();
\r
91 if (jobEnv != null && !jobEnv.isEmpty()) {
\r
92 setJobEnvironmentVariables(jobEnv);
\r
94 List<String> args = confExec.getParameters().getCommands();
\r
95 // Set optional parameters
\r
96 if (args != null && args.size() > 0) {
\r
97 jobtempl.setArgs(args);
\r
101 * If executable need in/out data to be piped into it
\r
103 if (confExec.getExecutable() instanceof PipedExecutable<?>) {
\r
104 setPipes(confExec);
\r
108 * If executable require special cluster configuration parameters to
\r
109 * be set e.g. queue, ram, time etc
\r
111 setNativeSpecs(confExec.getExecutable());
\r
114 log.trace("using arguments: " + jobtempl.getArgs());
\r
115 this.confExecutable = confExec;
\r
116 // Save run configuration
\r
117 confExec.saveRunConfiguration();
\r
119 } catch (DrmaaException e) {
\r
120 log.error(e.getLocalizedMessage(), e.getCause());
\r
121 throw new JobSubmissionException(e);
\r
122 } catch (IOException e) {
\r
123 log.error(e.getLocalizedMessage(), e.getCause());
\r
124 throw new JobSubmissionException(e);
\r
129 void setPipes(ConfiguredExecutable<?> executable) throws DrmaaException {
\r
131 String output = executable.getOutput();
\r
132 String error = executable.getError();
\r
133 // Standard drmaa path format is hostname:path
\r
134 // to avoid supplying hostnames with all local paths just prepend colon
\r
136 // Input and output can be null as in and out files may be defined in
\r
139 * Use this for piping input into the process if (input != null) { if
\r
140 * (!input.contains(":")) { input = makeLocalPath(input);
\r
141 * log.trace("converting input to " + input); }
\r
142 * jobtempl.setInputPath(input); log.debug("use Input: " +
\r
143 * jobtempl.getInputPath()); }
\r
145 if (output != null) {
\r
146 if (!output.contains(":")) {
\r
147 output = makeLocalPath(output);
\r
149 jobtempl.setOutputPath(output);
\r
150 log.debug("Output to: " + jobtempl.getOutputPath());
\r
152 if (error != null) {
\r
153 if (!error.contains(":")) {
\r
154 error = makeLocalPath(error);
\r
156 jobtempl.setErrorPath(error);
\r
157 log.debug("Output errors to: " + jobtempl.getErrorPath());
\r
162 void setNativeSpecs(Executable<?> executable) throws DrmaaException {
\r
163 String nativeSpecs = executable.getClusterJobSettings();
\r
164 if(!compbio.util.Util.isEmpty(nativeSpecs)) {
\r
165 log.debug("Using cluster job settings: " + nativeSpecs);
\r
166 jobtempl.setNativeSpecification(nativeSpecs);
\r
170 void setEmail(String email) {
\r
171 log.trace("Setting email to:" + email);
\r
173 jobtempl.setEmail(Collections.singleton(email));
\r
174 jobtempl.setBlockEmail(false);
\r
175 } catch (DrmaaException e) {
\r
176 log.debug(e.getLocalizedMessage());
\r
177 throw new IllegalArgumentException(e);
\r
181 void setJobName(String name) {
\r
182 log.trace("Setting job name to:" + name);
\r
184 jobtempl.setJobName(name);
\r
185 } catch (DrmaaException e) {
\r
186 log.debug(e.getLocalizedMessage());
\r
187 throw new IllegalArgumentException(e);
\r
191 @SuppressWarnings("unchecked")
\r
192 void setJobEnvironmentVariables(Map<String, String> env_variables) {
\r
193 assert env_variables != null && !env_variables.isEmpty();
\r
195 log.trace("Setting job environment to:" + env_variables);
\r
196 Map<String, String> sysEnv = jobtempl.getJobEnvironment();
\r
197 if (sysEnv != null && !sysEnv.isEmpty()) {
\r
198 Util.mergeEnvVariables(sysEnv, env_variables);
\r
200 sysEnv = env_variables;
\r
202 jobtempl.setJobEnvironment(sysEnv);
\r
204 } catch (DrmaaException e) {
\r
205 log.debug(e.getLocalizedMessage());
\r
206 throw new IllegalArgumentException(e);
\r
210 private static String makeLocalPath(String path) {
\r
214 public boolean deepClean() {
\r
215 throw new UnsupportedOperationException();
\r
218 * remove all files from these this.jobtempl.getInputPath();
\r
219 * this.jobtempl.getOutputPath(); this.jobtempl.getWorkingDirectory();
\r
221 // executable.getInputFiles();
\r
225 * This will never return clust.engine.JobStatus.CANCELLED as for sun grid
\r
226 * engine cancelled job is the same as failed. Cancelled jobs needs to be
\r
227 * tracked manually!
\r
229 static compbio.metadata.JobStatus getJobStatus(String jobId) {
\r
231 ClusterJobId clusterJobId = ClusterSession.getClusterJobId(jobId);
\r
232 switch (clustSession.getJobStatus(clusterJobId)) {
\r
234 compbio.engine.client.Util.writeStatFile(Configurator.getWorkDirectory(jobId),
\r
235 JobStatus.FINISHED.toString());
\r
237 return compbio.metadata.JobStatus.FINISHED;
\r
238 case Session.FAILED:
\r
239 compbio.engine.client.Util.writeMarker(Configurator.getWorkDirectory(jobId),
\r
242 return compbio.metadata.JobStatus.FAILED;
\r
244 case Session.RUNNING:
\r
245 // will not persist this status as temporary
\r
246 return compbio.metadata.JobStatus.RUNNING;
\r
248 case Session.SYSTEM_SUSPENDED:
\r
249 case Session.USER_SYSTEM_SUSPENDED:
\r
250 case Session.USER_SUSPENDED:
\r
251 case Session.USER_SYSTEM_ON_HOLD:
\r
252 case Session.USER_ON_HOLD:
\r
253 case Session.SYSTEM_ON_HOLD:
\r
254 case Session.QUEUED_ACTIVE:
\r
255 // will not persist this status as temporary
\r
256 return compbio.metadata.JobStatus.PENDING;
\r
259 // It is possible that the this status is returned for a job that is almost completed
\r
260 // when a state is changed from RUNNING to DONE
\r
261 // It looks like a bug in DRMAA SGE implementation
\r
262 return compbio.metadata.JobStatus.UNDEFINED;
\r
264 } catch (InvalidJobException e) {
\r
265 log.info("Job " + jobId + " could not be located by DRMAA "
\r
266 + e.getLocalizedMessage(), e.getCause());
\r
267 log.info("Attempting to determine the status by marker files");
\r
268 return getRecordedJobStatus(jobId);
\r
269 } catch (DrmaaException e) {
\r
271 "Exception in DRMAA system while quering the job status: "
\r
272 + e.getLocalizedMessage(), e.getCause());
\r
273 } catch (IOException e) {
\r
274 log.error("Could not read JOBID for taskId: " + jobId
\r
275 + " Message: " + e.getLocalizedMessage(), e.getCause());
\r
278 return JobStatus.UNDEFINED;
\r
281 static JobStatus getRecordedJobStatus(String jobId) {
\r
283 * Job has already been removed from the task list, so it running
\r
284 * status could not be determined. Most likely it has been
\r
285 * cancelled, finished or failed.
\r
287 String workDir = Configurator.getWorkDirectory(jobId);
\r
288 if (Util.isMarked(workDir, JobStatus.FINISHED)
\r
289 || Util.isMarked(workDir, JobStatus.COLLECTED)) {
\r
290 return JobStatus.FINISHED;
\r
292 if (Util.isMarked(workDir, JobStatus.CANCELLED)) {
\r
293 return JobStatus.CANCELLED;
\r
295 if (Util.isMarked(workDir, JobStatus.FAILED)) {
\r
296 return JobStatus.FAILED;
\r
298 return JobStatus.UNDEFINED;
\r
303 public boolean cleanup() {
\r
305 * TODO there is two additional files created by sun grid engine which
\r
306 * are named as follows: output this.getWorkDirectory() +
\r
307 * executable.getClass().getSimpleName() + "." + "o" + this.jobId; error
\r
308 * this.getWorkDirectory() + executable.getClass().getSimpleName() + "."
\r
309 * + "e" + this.jobId; individual executable does not know about these
\r
310 * two unless it implements PipedExecutable which need to collect data
\r
311 * from these streams Thus this method will fail to remove the task
\r
312 * directory completely
\r
314 return Cleaner.deleteFiles(confExecutable);
\r
317 JobInfo waitForJob(String jobId) throws JobExecutionException {
\r
318 assert Util.isValidJobId(jobId);
\r
319 return ClusterUtil.waitForResult(clustSession, jobId);
\r
322 boolean cancelJob(String jobId) {
\r
323 assert Util.isValidJobId(jobId);
\r
324 return compbio.engine.cluster.drmaa.ClusterUtil.cancelJob(jobId,
\r
329 public boolean cancelJob() {
\r
330 return cancelJob(this.jobId);
\r
333 String submitJob() throws JobSubmissionException {
\r
337 jobId = session.runJob(jobtempl);
\r
338 log.info("submitted single job with jobids:");
\r
339 log.info("\t \"" + jobId + "\"");
\r
340 session.deleteJobTemplate(jobtempl);
\r
341 clustSession.addJob(jobId, confExecutable);
\r
342 } catch (DrmaaException e) {
\r
343 e.printStackTrace();
\r
344 throw new JobSubmissionException(e);
\r
347 return this.confExecutable.getTaskId();
\r
350 public String getWorkDirectory() {
\r
351 return this.workDirectory;
\r
355 public void executeJob() throws JobSubmissionException {
\r
356 this.jobId = submitJob();
\r
360 * This method will block before the calculation has completed and then
\r
361 * return the object containing a job execution statistics
\r
364 * @throws JobExecutionException
\r
366 public JobInfo getJobInfo() throws JobExecutionException {
\r
367 return waitForJob(this.jobId);
\r
371 public ConfiguredExecutable<?> waitForResult() throws JobExecutionException {
\r
372 ConfiguredExecutable<?> confExec;
\r
374 confExec = new AsyncJobRunner().getResults(this.jobId);
\r
375 if (confExec == null) {
\r
376 log.warn("Could not find results of job " + this.jobId);
\r
378 } catch (ResultNotAvailableException e) {
\r
379 log.error(e.getMessage(), e.getCause());
\r
380 throw new JobExecutionException(e);
\r
386 public compbio.metadata.JobStatus getJobStatus() {
\r
387 return getJobStatus(this.jobId);
\r
390 public static JobRunner getInstance(ConfiguredExecutable<?> executable)
\r
391 throws JobSubmissionException {
\r
392 return new JobRunner(executable);
\r