Next version of JABA
[jabaws.git] / engine / compbio / engine / cluster / drmaa / JobRunner.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.cluster.drmaa;\r
20 \r
21 import java.io.IOException;\r
22 import java.util.Collections;\r
23 import java.util.List;\r
24 import java.util.Map;\r
25 \r
26 import org.apache.log4j.Logger;\r
27 import org.ggf.drmaa.DrmaaException;\r
28 import org.ggf.drmaa.InvalidJobException;\r
29 import org.ggf.drmaa.JobInfo;\r
30 import org.ggf.drmaa.JobTemplate;\r
31 import org.ggf.drmaa.Session;\r
32 \r
33 import compbio.engine.Cleaner;\r
34 import compbio.engine.ClusterJobId;\r
35 import compbio.engine.Configurator;\r
36 import compbio.engine.SyncExecutor;\r
37 import compbio.engine.client.ClusterNativeSpecExecutable;\r
38 import compbio.engine.client.ConfiguredExecutable;\r
39 import compbio.engine.client.Executable;\r
40 import compbio.engine.client.PathValidator;\r
41 import compbio.engine.client.PipedExecutable;\r
42 import compbio.engine.client.Util;\r
43 import compbio.engine.client.Executable.ExecProvider;\r
44 import compbio.metadata.JobExecutionException;\r
45 import compbio.metadata.JobStatus;\r
46 import compbio.metadata.JobSubmissionException;\r
47 import compbio.metadata.ResultNotAvailableException;\r
48 \r
49 /**\r
50  * Single cluster job runner class\r
51  * \r
52  * @author pvtroshin\r
53  * @date August 2009\r
54  * \r
55  *       TODO after call to submitJob() no setters really work as the job\r
56  *       template gets deleted, this needs to be taken into account in this\r
57  *       class design!\r
58  */\r
59 public class JobRunner implements SyncExecutor {\r
60 \r
61         final JobTemplate jobtempl;\r
62         static ClusterSession clustSession = ClusterSession.getInstance();\r
63         static Session session = clustSession.getSession();\r
64         static final Logger log = Logger.getLogger(JobRunner.class);\r
65         final ConfiguredExecutable<?> confExecutable;\r
66         private final String workDirectory;\r
67         String jobId;\r
68 \r
69         public JobRunner(ConfiguredExecutable<?> confExec)\r
70                         throws JobSubmissionException {\r
71                 try {\r
72                         String command = confExec.getCommand(ExecProvider.Cluster);\r
73                         PathValidator.validateExecutable(command);\r
74                         log.debug("Setting command " + command);\r
75 \r
76                         jobtempl = session.createJobTemplate();\r
77                         jobtempl.setRemoteCommand(command);\r
78                         jobtempl.setJoinFiles(false);\r
79                         setJobName(confExec.getExecutable().getClass().getSimpleName());\r
80 \r
81                         this.workDirectory = confExec.getWorkDirectory();\r
82                         assert !compbio.util.Util.isEmpty(workDirectory);\r
83 \r
84                         // Tell the job where to get/put things\r
85                         jobtempl.setWorkingDirectory(this.workDirectory);\r
86 \r
87                         /*\r
88                          * Set environment variables for the process if any\r
89                          */\r
90                         Map<String, String> jobEnv = confExec.getEnvironment();\r
91                         if (jobEnv != null && !jobEnv.isEmpty()) {\r
92                                 setJobEnvironmentVariables(jobEnv);\r
93                         }\r
94                         List<String> args = confExec.getParameters().getCommands();\r
95                         // Set optional parameters\r
96                         if (args != null && args.size() > 0) {\r
97                                 jobtempl.setArgs(args);\r
98                         }\r
99 \r
100                         /*\r
101                          * If executable need in/out data to be piped into it\r
102                          */\r
103                         if (confExec.getExecutable() instanceof PipedExecutable<?>) {\r
104                                 setPipes(confExec);\r
105                         }\r
106 \r
107                         /*\r
108                          * If executable require special cluster configuration parameters to\r
109                          * be set e.g. queue, ram, time etc\r
110                          */\r
111                         if (confExec.getExecutable() instanceof ClusterNativeSpecExecutable<?>) {\r
112                                 setNativeSpecs(confExec.getExecutable());\r
113                         }\r
114 \r
115                         log.trace("using arguments: " + jobtempl.getArgs());\r
116                         this.confExecutable = confExec;\r
117                         // Save run configuration\r
118                         confExec.saveRunConfiguration();\r
119 \r
120                 } catch (DrmaaException e) {\r
121                         log.error(e.getLocalizedMessage(), e.getCause());\r
122                         throw new JobSubmissionException(e);\r
123                 } catch (IOException e) {\r
124                         log.error(e.getLocalizedMessage(), e.getCause());\r
125                         throw new JobSubmissionException(e);\r
126                 } \r
127 \r
128         }\r
129 \r
130         void setPipes(ConfiguredExecutable<?> executable) throws DrmaaException {\r
131 \r
132                 String output = executable.getOutput();\r
133                 String error = executable.getError();\r
134                 // Standard drmaa path format is hostname:path\r
135                 // to avoid supplying hostnames with all local paths just prepend colon\r
136                 // to the path\r
137                 // Input and output can be null as in and out files may be defined in\r
138                 // parameters\r
139                 /*\r
140                  * Use this for piping input into the process if (input != null) { if\r
141                  * (!input.contains(":")) { input = makeLocalPath(input);\r
142                  * log.trace("converting input to " + input); }\r
143                  * jobtempl.setInputPath(input); log.debug("use Input: " +\r
144                  * jobtempl.getInputPath()); }\r
145                  */\r
146                 if (output != null) {\r
147                         if (!output.contains(":")) {\r
148                                 output = makeLocalPath(output);\r
149                         }\r
150                         jobtempl.setOutputPath(output);\r
151                         log.debug("Output to: " + jobtempl.getOutputPath());\r
152                 }\r
153                 if (error != null) {\r
154                         if (!error.contains(":")) {\r
155                                 error = makeLocalPath(error);\r
156                         }\r
157                         jobtempl.setErrorPath(error);\r
158                         log.debug("Output errors to: " + jobtempl.getErrorPath());\r
159                 }\r
160 \r
161         }\r
162 \r
163         void setNativeSpecs(Executable<?> executable) throws DrmaaException {\r
164                 jobtempl\r
165                                 .setNativeSpecification(((ClusterNativeSpecExecutable<?>) executable)\r
166                                                 .getNativeSpecs());\r
167         }\r
168 \r
169         void setEmail(String email) {\r
170                 log.trace("Setting email to:" + email);\r
171                 try {\r
172                         jobtempl.setEmail(Collections.singleton(email));\r
173                         jobtempl.setBlockEmail(false);\r
174                 } catch (DrmaaException e) {\r
175                         log.debug(e.getLocalizedMessage());\r
176                         throw new IllegalArgumentException(e);\r
177                 }\r
178         }\r
179 \r
180         void setJobName(String name) {\r
181                 log.trace("Setting job name to:" + name);\r
182                 try {\r
183                         jobtempl.setJobName(name);\r
184                 } catch (DrmaaException e) {\r
185                         log.debug(e.getLocalizedMessage());\r
186                         throw new IllegalArgumentException(e);\r
187                 }\r
188         }\r
189 \r
190         @SuppressWarnings("unchecked")\r
191         void setJobEnvironmentVariables(Map<String, String> env_variables) {\r
192                 assert env_variables != null && !env_variables.isEmpty();\r
193                 try {\r
194                         log.trace("Setting job environment to:" + env_variables);\r
195                         Map<String, String> sysEnv = jobtempl.getJobEnvironment();\r
196                         if (sysEnv != null && !sysEnv.isEmpty()) {\r
197                                 Util.mergeEnvVariables(sysEnv, env_variables);\r
198                         } else {\r
199                                 sysEnv = env_variables;\r
200                         }\r
201                         jobtempl.setJobEnvironment(sysEnv);\r
202 \r
203                 } catch (DrmaaException e) {\r
204                         log.debug(e.getLocalizedMessage());\r
205                         throw new IllegalArgumentException(e);\r
206                 }\r
207         }\r
208 \r
209         private static String makeLocalPath(String path) {\r
210                 return ":" + path;\r
211         }\r
212 \r
213         public boolean deepClean() {\r
214                 throw new UnsupportedOperationException();\r
215                 // TODO\r
216                 /*\r
217                  * remove all files from these this.jobtempl.getInputPath();\r
218                  * this.jobtempl.getOutputPath(); this.jobtempl.getWorkingDirectory();\r
219                  */\r
220                 // executable.getInputFiles();\r
221         }\r
222 \r
223         /**\r
224          * This will never return clust.engine.JobStatus.CANCELLED as for sun grid\r
225          * engine cancelled job is the same as failed. Cancelled jobs needs to be\r
226          * tracked manually!\r
227          */\r
228         static compbio.metadata.JobStatus getJobStatus(String jobId) {\r
229                 try {\r
230                         ClusterJobId clusterJobId = ClusterSession.getClusterJobId(jobId);\r
231                         switch (clustSession.getJobStatus(clusterJobId)) {\r
232                         case Session.DONE:\r
233                                 compbio.engine.client.Util.writeStatFile(Configurator.getWorkDirectory(jobId),\r
234                                                 JobStatus.FINISHED.toString());\r
235 \r
236                                 return compbio.metadata.JobStatus.FINISHED;\r
237                         case Session.FAILED:\r
238                                 compbio.engine.client.Util.writeMarker(Configurator.getWorkDirectory(jobId),\r
239                                                 JobStatus.FAILED);\r
240 \r
241                                 return compbio.metadata.JobStatus.FAILED;\r
242 \r
243                         case Session.RUNNING:\r
244                                 // will not persist this status as temporary\r
245                                 return compbio.metadata.JobStatus.RUNNING;\r
246 \r
247                         case Session.SYSTEM_SUSPENDED:\r
248                         case Session.USER_SYSTEM_SUSPENDED:\r
249                         case Session.USER_SUSPENDED:\r
250                         case Session.USER_SYSTEM_ON_HOLD:\r
251                         case Session.USER_ON_HOLD:\r
252                         case Session.SYSTEM_ON_HOLD:\r
253                         case Session.QUEUED_ACTIVE:\r
254                                 // will not persist this status as temporary\r
255                                 return compbio.metadata.JobStatus.PENDING;\r
256 \r
257                         default:\r
258                                 // It is possible that the this status is returned for a job that is almost completed\r
259                                 // when a state is changed from RUNNING to DONE\r
260                                 // It looks like a bug in DRMAA SGE implementation \r
261                                 return compbio.metadata.JobStatus.UNDEFINED;\r
262                         }\r
263                 } catch (InvalidJobException e) {\r
264                         log.info("Job " + jobId + " could not be located by DRMAA "\r
265                                         + e.getLocalizedMessage(), e.getCause());\r
266                         log.info("Attempting to determine the status by marker files");\r
267                         return getRecordedJobStatus(jobId);\r
268                 } catch (DrmaaException e) {\r
269                         log.error(\r
270                                         "Exception in DRMAA system while quering the job status: "\r
271                                                         + e.getLocalizedMessage(), e.getCause());\r
272                 } catch (IOException e) {\r
273                         log.error("Could not read JOBID for taskId: " + jobId\r
274                                         + " Message: " + e.getLocalizedMessage(), e.getCause());\r
275                 }\r
276 \r
277                 return JobStatus.UNDEFINED;\r
278         }\r
279 \r
280         static JobStatus getRecordedJobStatus(String jobId) { \r
281                 /*\r
282                  * Job has already been removed from the task list, so it running\r
283                  * status could not be determined. Most likely it has been\r
284                  * cancelled, finished or failed.\r
285                  */\r
286                 String workDir = Configurator.getWorkDirectory(jobId);\r
287                 if (Util.isMarked(workDir, JobStatus.FINISHED)\r
288                                 || Util.isMarked(workDir, JobStatus.COLLECTED)) {\r
289                         return JobStatus.FINISHED;\r
290                 }\r
291                 if (Util.isMarked(workDir, JobStatus.CANCELLED)) {\r
292                         return JobStatus.CANCELLED;\r
293                 }\r
294                 if (Util.isMarked(workDir, JobStatus.FAILED)) {\r
295                         return JobStatus.FAILED;\r
296                 }\r
297                 return JobStatus.UNDEFINED; \r
298         }\r
299         \r
300         \r
301         @Override\r
302         public boolean cleanup() {\r
303                 /*\r
304                  * TODO there is two additional files created by sun grid engine which\r
305                  * are named as follows: output this.getWorkDirectory() +\r
306                  * executable.getClass().getSimpleName() + "." + "o" + this.jobId; error\r
307                  * this.getWorkDirectory() + executable.getClass().getSimpleName() + "."\r
308                  * + "e" + this.jobId; individual executable does not know about these\r
309                  * two unless it implements PipedExecutable which need to collect data\r
310                  * from these streams Thus this method will fail to remove the task\r
311                  * directory completely\r
312                  */\r
313                 return Cleaner.deleteFiles(confExecutable);\r
314         }\r
315 \r
316         JobInfo waitForJob(String jobId) throws JobExecutionException {\r
317                 assert Util.isValidJobId(jobId);\r
318                 return ClusterUtil.waitForResult(clustSession, jobId);\r
319         }\r
320 \r
321         boolean cancelJob(String jobId) {\r
322                 assert Util.isValidJobId(jobId);\r
323                 return compbio.engine.cluster.drmaa.ClusterUtil.cancelJob(jobId,\r
324                                 clustSession);\r
325         }\r
326 \r
327         @Override\r
328         public boolean cancelJob() {\r
329                 return cancelJob(this.jobId);\r
330         }\r
331 \r
332         String submitJob() throws JobSubmissionException {\r
333 \r
334                 String jobId;\r
335                 try {\r
336                         jobId = session.runJob(jobtempl);\r
337                         log.info("submitted single job with jobids:");\r
338                         log.info("\t \"" + jobId + "\"");\r
339                         session.deleteJobTemplate(jobtempl);\r
340                         clustSession.addJob(jobId, confExecutable);\r
341                 } catch (DrmaaException e) {\r
342                         e.printStackTrace();\r
343                         throw new JobSubmissionException(e);\r
344                 }\r
345 \r
346                 return this.confExecutable.getTaskId();\r
347         }\r
348 \r
349         public String getWorkDirectory() {\r
350                 return this.workDirectory;\r
351         }\r
352 \r
353         @Override\r
354         public void executeJob() throws JobSubmissionException {\r
355                 this.jobId = submitJob();\r
356         }\r
357 \r
358         /**\r
359          * This method will block before the calculation has completed and then\r
360          * return the object containing a job execution statistics\r
361          * \r
362          * @return\r
363          * @throws JobExecutionException\r
364          */\r
365         public JobInfo getJobInfo() throws JobExecutionException {\r
366                 return waitForJob(this.jobId);\r
367         }\r
368 \r
369         @Override\r
370         public ConfiguredExecutable<?> waitForResult() throws JobExecutionException {\r
371                 ConfiguredExecutable<?> confExec;\r
372                 try {\r
373                         confExec = new AsyncJobRunner().getResults(this.jobId);\r
374                         if (confExec == null) {\r
375                                 log.warn("Could not find results of job " + this.jobId);\r
376                         }\r
377                 } catch (ResultNotAvailableException e) {\r
378                         log.error(e.getMessage(), e.getCause());\r
379                         throw new JobExecutionException(e);\r
380                 }\r
381                 return confExec;\r
382         }\r
383 \r
384         @Override\r
385         public compbio.metadata.JobStatus getJobStatus() {\r
386                 return getJobStatus(this.jobId);\r
387         }\r
388 \r
389         public static JobRunner getInstance(ConfiguredExecutable<?> executable)\r
390                         throws JobSubmissionException {\r
391                 return new JobRunner(executable);\r
392         }\r
393 \r
394 } // class end\r