516c3eb3d1a3ae9d09f6fb87ea528b8d2e1a05ab
[jabaws.git] / engine / compbio / engine / cluster / drmaa / JobRunner.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.cluster.drmaa;\r
20 \r
21 import java.io.IOException;\r
22 import java.util.Collections;\r
23 import java.util.List;\r
24 import java.util.Map;\r
25 \r
26 import org.apache.log4j.Logger;\r
27 import org.ggf.drmaa.DrmaaException;\r
28 import org.ggf.drmaa.InvalidJobException;\r
29 import org.ggf.drmaa.JobInfo;\r
30 import org.ggf.drmaa.JobTemplate;\r
31 import org.ggf.drmaa.Session;\r
32 \r
33 import compbio.engine.Cleaner;\r
34 import compbio.engine.ClusterJobId;\r
35 import compbio.engine.Configurator;\r
36 import compbio.engine.SyncExecutor;\r
37 \r
38 import compbio.engine.client.ConfiguredExecutable;\r
39 import compbio.engine.client.Executable;\r
40 import compbio.engine.client.PathValidator;\r
41 import compbio.engine.client.PipedExecutable;\r
42 import compbio.engine.client.EngineUtil;\r
43 import compbio.engine.client.Executable.ExecProvider;\r
44 import compbio.metadata.JobExecutionException;\r
45 import compbio.metadata.JobStatus;\r
46 import compbio.metadata.JobSubmissionException;\r
47 import compbio.metadata.ResultNotAvailableException;\r
48 \r
49 /**\r
50  * Single cluster job runner class\r
51  * \r
52  * @author pvtroshin\r
53  * @date August 2009\r
54  * \r
55  *       TODO after call to submitJob() no setters really work as the job\r
56  *       template gets deleted, this needs to be taken into account in this\r
57  *       class design!\r
58  */\r
59 public class JobRunner implements SyncExecutor {\r
60 \r
61         final JobTemplate jobtempl;\r
62         static ClusterSession clustSession = ClusterSession.getInstance();\r
63         static Session session = clustSession.getSession();\r
64         static final Logger log = Logger.getLogger(JobRunner.class);\r
65         final ConfiguredExecutable<?> confExecutable;\r
66         private final String workDirectory;\r
67         String jobId;\r
68 \r
69         public JobRunner(ConfiguredExecutable<?> confExec)\r
70                         throws JobSubmissionException {\r
71                 try {\r
72                         String command = confExec.getCommand(ExecProvider.Cluster);\r
73                         PathValidator.validateExecutable(command);\r
74                         log.debug("Setting command " + command);\r
75 \r
76                         jobtempl = session.createJobTemplate();\r
77                         jobtempl.setRemoteCommand(command);\r
78                         jobtempl.setJoinFiles(false);\r
79                         setJobName(confExec.getExecutable().getClass().getSimpleName());\r
80 \r
81                         this.workDirectory = confExec.getWorkDirectory();\r
82                         assert !compbio.util.Util.isEmpty(workDirectory);\r
83 \r
84                         // Tell the job where to get/put things\r
85                         jobtempl.setWorkingDirectory(this.workDirectory);\r
86 \r
87                         /*\r
88                          * Set environment variables for the process if any\r
89                          */\r
90                         Map<String, String> jobEnv = confExec.getEnvironment();\r
91                         if (jobEnv != null && !jobEnv.isEmpty()) {\r
92                                 setJobEnvironmentVariables(jobEnv);\r
93                         }\r
94                         List<String> args = confExec.getParameters().getCommands();\r
95                         // Set optional parameters\r
96                         if (args != null && args.size() > 0) {\r
97                                 jobtempl.setArgs(args);\r
98                         }\r
99 \r
100                         /*\r
101                          * If executable need in/out data to be piped into it\r
102                          */\r
103                         if (confExec.getExecutable() instanceof PipedExecutable<?>) {\r
104                                 setPipes(confExec);\r
105                         }\r
106 \r
107                         /*\r
108                          * If executable require special cluster configuration parameters to\r
109                          * be set e.g. queue, ram, time etc\r
110                          */\r
111                         setNativeSpecs(confExec.getExecutable());\r
112 \r
113 \r
114                         log.trace("using arguments: " + jobtempl.getArgs());\r
115                         this.confExecutable = confExec;\r
116                         // Save run configuration\r
117                         confExec.saveRunConfiguration();\r
118 \r
119                 } catch (DrmaaException e) {\r
120                         log.error(e.getLocalizedMessage(), e.getCause());\r
121                         throw new JobSubmissionException(e);\r
122                 } catch (IOException e) {\r
123                         log.error(e.getLocalizedMessage(), e.getCause());\r
124                         throw new JobSubmissionException(e);\r
125                 } \r
126 \r
127         }\r
128 \r
129         void setPipes(ConfiguredExecutable<?> executable) throws DrmaaException {\r
130 \r
131                 String output = executable.getOutput();\r
132                 String error = executable.getError();\r
133                 // Standard drmaa path format is hostname:path\r
134                 // to avoid supplying hostnames with all local paths just prepend colon\r
135                 // to the path\r
136                 // Input and output can be null as in and out files may be defined in\r
137                 // parameters\r
138                 /*\r
139                  * Use this for piping input into the process if (input != null) { if\r
140                  * (!input.contains(":")) { input = makeLocalPath(input);\r
141                  * log.trace("converting input to " + input); }\r
142                  * jobtempl.setInputPath(input); log.debug("use Input: " +\r
143                  * jobtempl.getInputPath()); }\r
144                  */\r
145                 if (output != null) {\r
146                         if (!output.contains(":")) {\r
147                                 output = makeLocalPath(output);\r
148                         }\r
149                         jobtempl.setOutputPath(output);\r
150                         log.debug("Output to: " + jobtempl.getOutputPath());\r
151                 }\r
152                 if (error != null) {\r
153                         if (!error.contains(":")) {\r
154                                 error = makeLocalPath(error);\r
155                         }\r
156                         jobtempl.setErrorPath(error);\r
157                         log.debug("Output errors to: " + jobtempl.getErrorPath());\r
158                 }\r
159 \r
160         }\r
161 \r
162         void setNativeSpecs(Executable<?> executable) throws DrmaaException {\r
163                 String nativeSpecs = executable.getClusterJobSettings(); \r
164                 if(!compbio.util.Util.isEmpty(nativeSpecs)) {\r
165                         log.debug("Using cluster job settings: " + nativeSpecs);\r
166                         jobtempl.setNativeSpecification(nativeSpecs);\r
167                 }\r
168         }\r
169 \r
170         void setEmail(String email) {\r
171                 log.trace("Setting email to:" + email);\r
172                 try {\r
173                         jobtempl.setEmail(Collections.singleton(email));\r
174                         jobtempl.setBlockEmail(false);\r
175                 } catch (DrmaaException e) {\r
176                         log.debug(e.getLocalizedMessage());\r
177                         throw new IllegalArgumentException(e);\r
178                 }\r
179         }\r
180 \r
181         void setJobName(String name) {\r
182                 log.trace("Setting job name to:" + name);\r
183                 try {\r
184                         jobtempl.setJobName(name);\r
185                 } catch (DrmaaException e) {\r
186                         log.debug(e.getLocalizedMessage());\r
187                         throw new IllegalArgumentException(e);\r
188                 }\r
189         }\r
190 \r
191         @SuppressWarnings("unchecked")\r
192         void setJobEnvironmentVariables(Map<String, String> env_variables) {\r
193                 assert env_variables != null && !env_variables.isEmpty();\r
194                 try {\r
195                         log.trace("Setting job environment to:" + env_variables);\r
196                         Map<String, String> sysEnv = jobtempl.getJobEnvironment();\r
197                         if (sysEnv != null && !sysEnv.isEmpty()) {\r
198                                 EngineUtil.mergeEnvVariables(sysEnv, env_variables);\r
199                         } else {\r
200                                 sysEnv = env_variables;\r
201                         }\r
202                         jobtempl.setJobEnvironment(sysEnv);\r
203 \r
204                 } catch (DrmaaException e) {\r
205                         log.debug(e.getLocalizedMessage());\r
206                         throw new IllegalArgumentException(e);\r
207                 }\r
208         }\r
209 \r
210         private static String makeLocalPath(String path) {\r
211                 return ":" + path;\r
212         }\r
213 \r
214         public boolean deepClean() {\r
215                 throw new UnsupportedOperationException();\r
216                 // TODO\r
217                 /*\r
218                  * remove all files from these this.jobtempl.getInputPath();\r
219                  * this.jobtempl.getOutputPath(); this.jobtempl.getWorkingDirectory();\r
220                  */\r
221                 // executable.getInputFiles();\r
222         }\r
223 \r
224         /**\r
225          * This will never return clust.engine.JobStatus.CANCELLED as for sun grid\r
226          * engine cancelled job is the same as failed. Cancelled jobs needs to be\r
227          * tracked manually!\r
228          */\r
229         static compbio.metadata.JobStatus getJobStatus(String jobId) {\r
230                 try {\r
231                         ClusterJobId clusterJobId = ClusterSession.getClusterJobId(jobId);\r
232                         switch (clustSession.getJobStatus(clusterJobId)) {\r
233                         case Session.DONE:\r
234                                 EngineUtil.writeStatFile(Configurator.getWorkDirectory(jobId), JobStatus.FINISHED.toString());\r
235                                 return compbio.metadata.JobStatus.FINISHED;\r
236 \r
237                         case Session.FAILED:\r
238                                 EngineUtil.writeMarker(Configurator.getWorkDirectory(jobId), JobStatus.FAILED);\r
239                                 return compbio.metadata.JobStatus.FAILED;\r
240 \r
241                         case Session.RUNNING:\r
242                                 // will not persist this status as temporary\r
243                                 return compbio.metadata.JobStatus.RUNNING;\r
244 \r
245                         case Session.SYSTEM_SUSPENDED:\r
246                         case Session.USER_SYSTEM_SUSPENDED:\r
247                         case Session.USER_SUSPENDED:\r
248                         case Session.USER_SYSTEM_ON_HOLD:\r
249                         case Session.USER_ON_HOLD:\r
250                         case Session.SYSTEM_ON_HOLD:\r
251                         case Session.QUEUED_ACTIVE:\r
252                                 // will not persist this status as temporary\r
253                                 return compbio.metadata.JobStatus.PENDING;\r
254 \r
255                         default:\r
256                                 // It is possible that the this status is returned for a job that is almost completed\r
257                                 // when a state is changed from RUNNING to DONE\r
258                                 // It looks like a bug in DRMAA SGE implementation \r
259                                 return compbio.metadata.JobStatus.UNDEFINED;\r
260                         }\r
261                 } catch (InvalidJobException e) {\r
262                         log.info("Job " + jobId + " could not be located by DRMAA "\r
263                                         + e.getLocalizedMessage(), e.getCause());\r
264                         log.info("Attempting to determine the status by marker files");\r
265                         return getRecordedJobStatus(jobId);\r
266                 } catch (DrmaaException e) {\r
267                         log.error(\r
268                                         "Exception in DRMAA system while quering the job status: "\r
269                                                         + e.getLocalizedMessage(), e.getCause());\r
270                 } catch (IOException e) {\r
271                         log.error("Could not read JOBID for taskId: " + jobId\r
272                                         + " Message: " + e.getLocalizedMessage(), e.getCause());\r
273                 }\r
274 \r
275                 return JobStatus.UNDEFINED;\r
276         }\r
277 \r
278         static JobStatus getRecordedJobStatus(String jobId) { \r
279                 /*\r
280                  * Job has already been removed from the task list, so it running\r
281                  * status could not be determined. Most likely it has been\r
282                  * cancelled, finished or failed.\r
283                  */\r
284                 String workDir = Configurator.getWorkDirectory(jobId);\r
285                 if (EngineUtil.isMarked(workDir, JobStatus.FINISHED)\r
286                                 || EngineUtil.isMarked(workDir, JobStatus.COLLECTED)) {\r
287                         return JobStatus.FINISHED;\r
288                 }\r
289                 if (EngineUtil.isMarked(workDir, JobStatus.CANCELLED)) {\r
290                         return JobStatus.CANCELLED;\r
291                 }\r
292                 if (EngineUtil.isMarked(workDir, JobStatus.FAILED)) {\r
293                         return JobStatus.FAILED;\r
294                 }\r
295                 return JobStatus.UNDEFINED; \r
296         }\r
297         \r
298         \r
299         @Override\r
300         public boolean cleanup() {\r
301                 /*\r
302                  * TODO there is two additional files created by sun grid engine which\r
303                  * are named as follows: output this.getWorkDirectory() +\r
304                  * executable.getClass().getSimpleName() + "." + "o" + this.jobId; error\r
305                  * this.getWorkDirectory() + executable.getClass().getSimpleName() + "."\r
306                  * + "e" + this.jobId; individual executable does not know about these\r
307                  * two unless it implements PipedExecutable which need to collect data\r
308                  * from these streams Thus this method will fail to remove the task\r
309                  * directory completely\r
310                  */\r
311                 return Cleaner.deleteFiles(confExecutable);\r
312         }\r
313 \r
314         JobInfo waitForJob(String jobId) throws JobExecutionException {\r
315                 assert EngineUtil.isValidJobId(jobId);\r
316                 return ClusterEngineUtil.waitForResult(clustSession, jobId);\r
317         }\r
318 \r
319         boolean cancelJob(String jobId) {\r
320                 assert EngineUtil.isValidJobId(jobId);\r
321                 return compbio.engine.cluster.drmaa.ClusterEngineUtil.cancelJob(jobId,\r
322                                 clustSession);\r
323         }\r
324 \r
325         @Override\r
326         public boolean cancelJob() {\r
327                 return cancelJob(this.jobId);\r
328         }\r
329 \r
330         String submitJob() throws JobSubmissionException {\r
331 \r
332                 String jobId;\r
333                 try {\r
334                         jobId = session.runJob(jobtempl);\r
335                         log.info("submitted single job with jobids:");\r
336                         log.info("\t \"" + jobId + "\"");\r
337                         session.deleteJobTemplate(jobtempl);\r
338                         clustSession.addJob(jobId, confExecutable);\r
339                 } catch (DrmaaException e) {\r
340                         e.printStackTrace();\r
341                         throw new JobSubmissionException(e);\r
342                 }\r
343 \r
344                 return this.confExecutable.getTaskId();\r
345         }\r
346 \r
347         public String getWorkDirectory() {\r
348                 return this.workDirectory;\r
349         }\r
350 \r
351         @Override\r
352         public void executeJob() throws JobSubmissionException {\r
353                 this.jobId = submitJob();\r
354         }\r
355 \r
356         /**\r
357          * This method will block before the calculation has completed and then\r
358          * return the object containing a job execution statistics\r
359          * \r
360          * @return\r
361          * @throws JobExecutionException\r
362          */\r
363         public JobInfo getJobInfo() throws JobExecutionException {\r
364                 return waitForJob(this.jobId);\r
365         }\r
366 \r
367         @Override\r
368         public ConfiguredExecutable<?> waitForResult() throws JobExecutionException {\r
369                 ConfiguredExecutable<?> confExec;\r
370                 try {\r
371                         confExec = new AsyncJobRunner().getResults(this.jobId);\r
372                         if (confExec == null) {\r
373                                 log.warn("Could not find results of job " + this.jobId);\r
374                         }\r
375                 } catch (ResultNotAvailableException e) {\r
376                         log.error(e.getMessage(), e.getCause());\r
377                         throw new JobExecutionException(e);\r
378                 }\r
379                 return confExec;\r
380         }\r
381 \r
382         @Override\r
383         public compbio.metadata.JobStatus getJobStatus() {\r
384                 return getJobStatus(this.jobId);\r
385         }\r
386 \r
387         public static JobRunner getInstance(ConfiguredExecutable<?> executable)\r
388                         throws JobSubmissionException {\r
389                 return new JobRunner(executable);\r
390         }\r
391 \r
392 } // class end\r