1 /* Copyright (c) 2009 Peter Troshin
\r
3 * JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0
\r
5 * This library is free software; you can redistribute it and/or modify it under the terms of the
\r
6 * Apache License version 2 as published by the Apache Software Foundation
\r
8 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
\r
9 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
\r
10 * License for more details.
\r
12 * A copy of the license is in apache_license.txt. It is also available here:
\r
13 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
\r
15 * Any republication or derived work distributed in source code form
\r
16 * must include this copyright and license notice.
\r
19 package compbio.engine.cluster.drmaa;
\r
21 import java.io.IOException;
\r
22 import java.util.Collections;
\r
23 import java.util.List;
\r
24 import java.util.Map;
\r
26 import org.apache.log4j.Logger;
\r
27 import org.ggf.drmaa.DrmaaException;
\r
28 import org.ggf.drmaa.InvalidJobException;
\r
29 import org.ggf.drmaa.JobInfo;
\r
30 import org.ggf.drmaa.JobTemplate;
\r
31 import org.ggf.drmaa.Session;
\r
33 import compbio.engine.Cleaner;
\r
34 import compbio.engine.ClusterJobId;
\r
35 import compbio.engine.Configurator;
\r
36 import compbio.engine.SyncExecutor;
\r
38 import compbio.engine.client.ConfiguredExecutable;
\r
39 import compbio.engine.client.Executable;
\r
40 import compbio.engine.client.PathValidator;
\r
41 import compbio.engine.client.PipedExecutable;
\r
42 import compbio.engine.client.EngineUtil;
\r
43 import compbio.engine.client.Executable.ExecProvider;
\r
44 import compbio.metadata.JobExecutionException;
\r
45 import compbio.metadata.JobStatus;
\r
46 import compbio.metadata.JobSubmissionException;
\r
47 import compbio.metadata.ResultNotAvailableException;
\r
50 * Single cluster job runner class
\r
55 * TODO after call to submitJob() no setters really work as the job
\r
56 * template gets deleted, this needs to be taken into account in this
\r
59 public class ClusterRunner implements SyncExecutor {
\r
61 final JobTemplate jobtempl;
\r
62 static ClusterSession clustSession = ClusterSession.getInstance();
\r
63 static Session session = clustSession.getSession();
\r
64 static final Logger log = Logger.getLogger(ClusterRunner.class);
\r
65 final ConfiguredExecutable<?> confExecutable;
\r
66 private final String workDirectory;
\r
69 public ClusterRunner(ConfiguredExecutable<?> confExec)
\r
70 throws JobSubmissionException {
\r
72 String command = confExec.getCommand(ExecProvider.Cluster);
\r
73 PathValidator.validateExecutable(command);
\r
74 log.debug("Setting command " + command);
\r
76 jobtempl = session.createJobTemplate();
\r
77 jobtempl.setRemoteCommand(command);
\r
78 jobtempl.setJoinFiles(false);
\r
79 setJobName(confExec.getExecutable().getClass().getSimpleName());
\r
81 this.workDirectory = confExec.getWorkDirectory();
\r
82 assert !compbio.util.Util.isEmpty(workDirectory);
\r
84 // Tell the job where to get/put things
\r
85 jobtempl.setWorkingDirectory(this.workDirectory);
\r
87 // Set environment variables for the process if any
\r
88 Map<String, String> jobEnv = confExec.getEnvironment();
\r
89 if (jobEnv != null && !jobEnv.isEmpty()) {
\r
90 setJobEnvironmentVariables(jobEnv);
\r
92 List<String> args = confExec.getParameters().getCommands();
\r
93 // Set optional parameters
\r
94 if (args != null && args.size() > 0) {
\r
95 jobtempl.setArgs(args);
\r
98 //If executable need in/out data to be piped into it
\r
99 if (confExec.getExecutable() instanceof PipedExecutable<?>) {
\r
100 setPipes(confExec);
\r
103 // If executable require special cluster configuration parameters to
\r
104 // be set e.g. queue, ram, time etc
\r
105 setNativeSpecs(confExec.getExecutable());
\r
107 log.trace("using arguments: " + jobtempl.getArgs());
\r
108 this.confExecutable = confExec;
\r
109 // Save run configuration
\r
110 confExec.saveRunConfiguration();
\r
112 } catch (DrmaaException e) {
\r
113 log.error(e.getLocalizedMessage(), e.getCause());
\r
114 throw new JobSubmissionException(e);
\r
115 } catch (IOException e) {
\r
116 log.error(e.getLocalizedMessage(), e.getCause());
\r
117 throw new JobSubmissionException(e);
\r
122 void setPipes(ConfiguredExecutable<?> executable) throws DrmaaException {
\r
124 String output = executable.getOutput();
\r
125 String error = executable.getError();
\r
126 // Standard drmaa path format is hostname:path
\r
127 // to avoid supplying hostnames with all local paths just prepend colon
\r
129 // Input and output can be null as in and out files may be defined in
\r
132 * Use this for piping input into the process if (input != null) { if
\r
133 * (!input.contains(":")) { input = makeLocalPath(input);
\r
134 * log.trace("converting input to " + input); }
\r
135 * jobtempl.setInputPath(input); log.debug("use Input: " +
\r
136 * jobtempl.getInputPath()); }
\r
138 if (output != null) {
\r
139 if (!output.contains(":")) {
\r
140 output = makeLocalPath(output);
\r
142 jobtempl.setOutputPath(output);
\r
143 log.debug("Output to: " + jobtempl.getOutputPath());
\r
145 if (error != null) {
\r
146 if (!error.contains(":")) {
\r
147 error = makeLocalPath(error);
\r
149 jobtempl.setErrorPath(error);
\r
150 log.debug("Output errors to: " + jobtempl.getErrorPath());
\r
155 void setNativeSpecs(Executable<?> executable) throws DrmaaException {
\r
156 String nativeSpecs = executable.getClusterJobSettings();
\r
157 if(!compbio.util.Util.isEmpty(nativeSpecs)) {
\r
158 log.debug("Using cluster job settings: " + nativeSpecs);
\r
159 jobtempl.setNativeSpecification(nativeSpecs);
\r
163 void setEmail(String email) {
\r
164 log.trace("Setting email to:" + email);
\r
166 jobtempl.setEmail(Collections.singleton(email));
\r
167 jobtempl.setBlockEmail(false);
\r
168 } catch (DrmaaException e) {
\r
169 log.debug(e.getLocalizedMessage());
\r
170 throw new IllegalArgumentException(e);
\r
174 void setJobName(String name) {
\r
175 log.trace("Setting job name to:" + name);
\r
177 jobtempl.setJobName(name);
\r
178 } catch (DrmaaException e) {
\r
179 log.debug(e.getLocalizedMessage());
\r
180 throw new IllegalArgumentException(e);
\r
184 @SuppressWarnings("unchecked")
\r
185 void setJobEnvironmentVariables(Map<String, String> env_variables) {
\r
186 assert env_variables != null && !env_variables.isEmpty();
\r
188 log.trace("Setting job environment to:" + env_variables);
\r
189 Map<String, String> sysEnv = jobtempl.getJobEnvironment();
\r
190 if (sysEnv != null && !sysEnv.isEmpty()) {
\r
191 EngineUtil.mergeEnvVariables(sysEnv, env_variables);
\r
193 sysEnv = env_variables;
\r
195 jobtempl.setJobEnvironment(sysEnv);
\r
197 } catch (DrmaaException e) {
\r
198 log.debug(e.getLocalizedMessage());
\r
199 throw new IllegalArgumentException(e);
\r
203 private static String makeLocalPath(String path) {
\r
207 public boolean deepClean() {
\r
208 throw new UnsupportedOperationException();
\r
211 * remove all files from these this.jobtempl.getInputPath();
\r
212 * this.jobtempl.getOutputPath(); this.jobtempl.getWorkingDirectory();
\r
214 // executable.getInputFiles();
\r
218 * This will never return clust.engine.JobStatus.CANCELLED as for sun grid
\r
219 * engine cancelled job is the same as failed. Cancelled jobs needs to be
\r
220 * tracked manually!
\r
222 static compbio.metadata.JobStatus getJobStatus(String jobId) {
\r
224 ClusterJobId clusterJobId = ClusterSession.getClusterJobId(jobId);
\r
225 switch (clustSession.getJobStatus(clusterJobId)) {
\r
227 EngineUtil.writeStatFile(Configurator.getWorkDirectory(jobId), JobStatus.FINISHED.toString());
\r
228 return compbio.metadata.JobStatus.FINISHED;
\r
230 case Session.FAILED:
\r
231 EngineUtil.writeMarker(Configurator.getWorkDirectory(jobId), JobStatus.FAILED);
\r
232 return compbio.metadata.JobStatus.FAILED;
\r
234 case Session.RUNNING:
\r
235 // will not persist this status as temporary
\r
236 return compbio.metadata.JobStatus.RUNNING;
\r
238 case Session.SYSTEM_SUSPENDED:
\r
239 case Session.USER_SYSTEM_SUSPENDED:
\r
240 case Session.USER_SUSPENDED:
\r
241 case Session.USER_SYSTEM_ON_HOLD:
\r
242 case Session.USER_ON_HOLD:
\r
243 case Session.SYSTEM_ON_HOLD:
\r
244 case Session.QUEUED_ACTIVE:
\r
245 // will not persist this status as temporary
\r
246 return compbio.metadata.JobStatus.PENDING;
\r
249 // It is possible that the this status is returned for a job that is almost completed
\r
250 // when a state is changed from RUNNING to DONE
\r
251 // It looks like a bug in DRMAA SGE implementation
\r
252 return compbio.metadata.JobStatus.UNDEFINED;
\r
254 } catch (InvalidJobException e) {
\r
255 log.info("Job " + jobId + " could not be located by DRMAA "
\r
256 + e.getLocalizedMessage(), e.getCause());
\r
257 log.info("Attempting to determine the status by marker files");
\r
258 return getRecordedJobStatus(jobId);
\r
259 } catch (DrmaaException e) {
\r
261 "Exception in DRMAA system while quering the job status: "
\r
262 + e.getLocalizedMessage(), e.getCause());
\r
263 } catch (IOException e) {
\r
264 log.error("Could not read JOBID for taskId: " + jobId
\r
265 + " Message: " + e.getLocalizedMessage(), e.getCause());
\r
268 return JobStatus.UNDEFINED;
\r
271 static JobStatus getRecordedJobStatus(String jobId) {
\r
273 * Job has already been removed from the task list, so it running
\r
274 * status could not be determined. Most likely it has been
\r
275 * cancelled, finished or failed.
\r
277 String workDir = Configurator.getWorkDirectory(jobId);
\r
278 if (EngineUtil.isMarked(workDir, JobStatus.FINISHED)
\r
279 || EngineUtil.isMarked(workDir, JobStatus.COLLECTED)) {
\r
280 return JobStatus.FINISHED;
\r
282 if (EngineUtil.isMarked(workDir, JobStatus.CANCELLED)) {
\r
283 return JobStatus.CANCELLED;
\r
285 if (EngineUtil.isMarked(workDir, JobStatus.FAILED)) {
\r
286 return JobStatus.FAILED;
\r
288 return JobStatus.UNDEFINED;
\r
293 public boolean cleanup() {
\r
295 * TODO there is two additional files created by sun grid engine which
\r
296 * are named as follows: output this.getWorkDirectory() +
\r
297 * executable.getClass().getSimpleName() + "." + "o" + this.jobId; error
\r
298 * this.getWorkDirectory() + executable.getClass().getSimpleName() + "."
\r
299 * + "e" + this.jobId; individual executable does not know about these
\r
300 * two unless it implements PipedExecutable which need to collect data
\r
301 * from these streams Thus this method will fail to remove the task
\r
302 * directory completely
\r
304 return Cleaner.deleteFiles(confExecutable);
\r
307 JobInfo waitForJob(String jobId) throws JobExecutionException {
\r
308 assert EngineUtil.isValidJobId(jobId);
\r
309 return ClusterEngineUtil.waitForResult(clustSession, jobId);
\r
312 boolean cancelJob(String jobId) {
\r
313 assert EngineUtil.isValidJobId(jobId);
\r
314 return compbio.engine.cluster.drmaa.ClusterEngineUtil.cancelJob(jobId,
\r
319 public boolean cancelJob() {
\r
320 return cancelJob(this.jobId);
\r
323 String submitJob() throws JobSubmissionException {
\r
327 jobId = session.runJob(jobtempl);
\r
328 log.info("submitted single job with jobids:");
\r
329 log.info("\t \"" + jobId + "\"");
\r
330 session.deleteJobTemplate(jobtempl);
\r
331 clustSession.addJob(jobId, confExecutable);
\r
332 } catch (DrmaaException e) {
\r
333 e.printStackTrace();
\r
334 throw new JobSubmissionException(e);
\r
337 return this.confExecutable.getTaskId();
\r
340 public String getWorkDirectory() {
\r
341 return this.workDirectory;
\r
345 public void executeJob() throws JobSubmissionException {
\r
346 this.jobId = submitJob();
\r
350 * This method will block before the calculation has completed and then
\r
351 * return the object containing a job execution statistics
\r
354 * @throws JobExecutionException
\r
356 public JobInfo getJobInfo() throws JobExecutionException {
\r
357 return waitForJob(this.jobId);
\r
361 public ConfiguredExecutable<?> waitForResult() throws JobExecutionException {
\r
362 ConfiguredExecutable<?> confExec;
\r
364 confExec = new AsyncClusterRunner().getResults(this.jobId);
\r
365 if (confExec == null) {
\r
366 log.warn("Could not find results of job " + this.jobId);
\r
368 } catch (ResultNotAvailableException e) {
\r
369 log.error(e.getMessage(), e.getCause());
\r
370 throw new JobExecutionException(e);
\r
376 public compbio.metadata.JobStatus getJobStatus() {
\r
377 return getJobStatus(this.jobId);
\r
380 public static ClusterRunner getInstance(ConfiguredExecutable<?> executable)
\r
381 throws JobSubmissionException {
\r
382 return new ClusterRunner(executable);
\r