Get rind of NativeClusterJob interface as pretty much every jobs best to use some...
[jabaws.git] / engine / compbio / engine / cluster / drmaa / JobRunner.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.cluster.drmaa;\r
20 \r
21 import java.io.IOException;\r
22 import java.util.Collections;\r
23 import java.util.List;\r
24 import java.util.Map;\r
25 \r
26 import org.apache.log4j.Logger;\r
27 import org.ggf.drmaa.DrmaaException;\r
28 import org.ggf.drmaa.InvalidJobException;\r
29 import org.ggf.drmaa.JobInfo;\r
30 import org.ggf.drmaa.JobTemplate;\r
31 import org.ggf.drmaa.Session;\r
32 \r
33 import compbio.engine.Cleaner;\r
34 import compbio.engine.ClusterJobId;\r
35 import compbio.engine.Configurator;\r
36 import compbio.engine.SyncExecutor;\r
37 \r
38 import compbio.engine.client.ConfiguredExecutable;\r
39 import compbio.engine.client.Executable;\r
40 import compbio.engine.client.PathValidator;\r
41 import compbio.engine.client.PipedExecutable;\r
42 import compbio.engine.client.Util;\r
43 import compbio.engine.client.Executable.ExecProvider;\r
44 import compbio.metadata.JobExecutionException;\r
45 import compbio.metadata.JobStatus;\r
46 import compbio.metadata.JobSubmissionException;\r
47 import compbio.metadata.ResultNotAvailableException;\r
48 \r
49 /**\r
50  * Single cluster job runner class\r
51  * \r
52  * @author pvtroshin\r
53  * @date August 2009\r
54  * \r
55  *       TODO after call to submitJob() no setters really work as the job\r
56  *       template gets deleted, this needs to be taken into account in this\r
57  *       class design!\r
58  */\r
59 public class JobRunner implements SyncExecutor {\r
60 \r
61         final JobTemplate jobtempl;\r
62         static ClusterSession clustSession = ClusterSession.getInstance();\r
63         static Session session = clustSession.getSession();\r
64         static final Logger log = Logger.getLogger(JobRunner.class);\r
65         final ConfiguredExecutable<?> confExecutable;\r
66         private final String workDirectory;\r
67         String jobId;\r
68 \r
69         public JobRunner(ConfiguredExecutable<?> confExec)\r
70                         throws JobSubmissionException {\r
71                 try {\r
72                         String command = confExec.getCommand(ExecProvider.Cluster);\r
73                         PathValidator.validateExecutable(command);\r
74                         log.debug("Setting command " + command);\r
75 \r
76                         jobtempl = session.createJobTemplate();\r
77                         jobtempl.setRemoteCommand(command);\r
78                         jobtempl.setJoinFiles(false);\r
79                         setJobName(confExec.getExecutable().getClass().getSimpleName());\r
80 \r
81                         this.workDirectory = confExec.getWorkDirectory();\r
82                         assert !compbio.util.Util.isEmpty(workDirectory);\r
83 \r
84                         // Tell the job where to get/put things\r
85                         jobtempl.setWorkingDirectory(this.workDirectory);\r
86 \r
87                         /*\r
88                          * Set environment variables for the process if any\r
89                          */\r
90                         Map<String, String> jobEnv = confExec.getEnvironment();\r
91                         if (jobEnv != null && !jobEnv.isEmpty()) {\r
92                                 setJobEnvironmentVariables(jobEnv);\r
93                         }\r
94                         List<String> args = confExec.getParameters().getCommands();\r
95                         // Set optional parameters\r
96                         if (args != null && args.size() > 0) {\r
97                                 jobtempl.setArgs(args);\r
98                         }\r
99 \r
100                         /*\r
101                          * If executable need in/out data to be piped into it\r
102                          */\r
103                         if (confExec.getExecutable() instanceof PipedExecutable<?>) {\r
104                                 setPipes(confExec);\r
105                         }\r
106 \r
107                         /*\r
108                          * If executable require special cluster configuration parameters to\r
109                          * be set e.g. queue, ram, time etc\r
110                          */\r
111                         setNativeSpecs(confExec.getExecutable());\r
112 \r
113 \r
114                         log.trace("using arguments: " + jobtempl.getArgs());\r
115                         this.confExecutable = confExec;\r
116                         // Save run configuration\r
117                         confExec.saveRunConfiguration();\r
118 \r
119                 } catch (DrmaaException e) {\r
120                         log.error(e.getLocalizedMessage(), e.getCause());\r
121                         throw new JobSubmissionException(e);\r
122                 } catch (IOException e) {\r
123                         log.error(e.getLocalizedMessage(), e.getCause());\r
124                         throw new JobSubmissionException(e);\r
125                 } \r
126 \r
127         }\r
128 \r
129         void setPipes(ConfiguredExecutable<?> executable) throws DrmaaException {\r
130 \r
131                 String output = executable.getOutput();\r
132                 String error = executable.getError();\r
133                 // Standard drmaa path format is hostname:path\r
134                 // to avoid supplying hostnames with all local paths just prepend colon\r
135                 // to the path\r
136                 // Input and output can be null as in and out files may be defined in\r
137                 // parameters\r
138                 /*\r
139                  * Use this for piping input into the process if (input != null) { if\r
140                  * (!input.contains(":")) { input = makeLocalPath(input);\r
141                  * log.trace("converting input to " + input); }\r
142                  * jobtempl.setInputPath(input); log.debug("use Input: " +\r
143                  * jobtempl.getInputPath()); }\r
144                  */\r
145                 if (output != null) {\r
146                         if (!output.contains(":")) {\r
147                                 output = makeLocalPath(output);\r
148                         }\r
149                         jobtempl.setOutputPath(output);\r
150                         log.debug("Output to: " + jobtempl.getOutputPath());\r
151                 }\r
152                 if (error != null) {\r
153                         if (!error.contains(":")) {\r
154                                 error = makeLocalPath(error);\r
155                         }\r
156                         jobtempl.setErrorPath(error);\r
157                         log.debug("Output errors to: " + jobtempl.getErrorPath());\r
158                 }\r
159 \r
160         }\r
161 \r
162         void setNativeSpecs(Executable<?> executable) throws DrmaaException {\r
163                 String nativeSpecs = executable.getClusterJobSettings(); \r
164                 if(!compbio.util.Util.isEmpty(nativeSpecs)) {\r
165                         log.debug("Using cluster job settings: " + nativeSpecs);\r
166                         jobtempl.setNativeSpecification(nativeSpecs);\r
167                 }\r
168         }\r
169 \r
170         void setEmail(String email) {\r
171                 log.trace("Setting email to:" + email);\r
172                 try {\r
173                         jobtempl.setEmail(Collections.singleton(email));\r
174                         jobtempl.setBlockEmail(false);\r
175                 } catch (DrmaaException e) {\r
176                         log.debug(e.getLocalizedMessage());\r
177                         throw new IllegalArgumentException(e);\r
178                 }\r
179         }\r
180 \r
181         void setJobName(String name) {\r
182                 log.trace("Setting job name to:" + name);\r
183                 try {\r
184                         jobtempl.setJobName(name);\r
185                 } catch (DrmaaException e) {\r
186                         log.debug(e.getLocalizedMessage());\r
187                         throw new IllegalArgumentException(e);\r
188                 }\r
189         }\r
190 \r
191         @SuppressWarnings("unchecked")\r
192         void setJobEnvironmentVariables(Map<String, String> env_variables) {\r
193                 assert env_variables != null && !env_variables.isEmpty();\r
194                 try {\r
195                         log.trace("Setting job environment to:" + env_variables);\r
196                         Map<String, String> sysEnv = jobtempl.getJobEnvironment();\r
197                         if (sysEnv != null && !sysEnv.isEmpty()) {\r
198                                 Util.mergeEnvVariables(sysEnv, env_variables);\r
199                         } else {\r
200                                 sysEnv = env_variables;\r
201                         }\r
202                         jobtempl.setJobEnvironment(sysEnv);\r
203 \r
204                 } catch (DrmaaException e) {\r
205                         log.debug(e.getLocalizedMessage());\r
206                         throw new IllegalArgumentException(e);\r
207                 }\r
208         }\r
209 \r
210         private static String makeLocalPath(String path) {\r
211                 return ":" + path;\r
212         }\r
213 \r
214         public boolean deepClean() {\r
215                 throw new UnsupportedOperationException();\r
216                 // TODO\r
217                 /*\r
218                  * remove all files from these this.jobtempl.getInputPath();\r
219                  * this.jobtempl.getOutputPath(); this.jobtempl.getWorkingDirectory();\r
220                  */\r
221                 // executable.getInputFiles();\r
222         }\r
223 \r
224         /**\r
225          * This will never return clust.engine.JobStatus.CANCELLED as for sun grid\r
226          * engine cancelled job is the same as failed. Cancelled jobs needs to be\r
227          * tracked manually!\r
228          */\r
229         static compbio.metadata.JobStatus getJobStatus(String jobId) {\r
230                 try {\r
231                         ClusterJobId clusterJobId = ClusterSession.getClusterJobId(jobId);\r
232                         switch (clustSession.getJobStatus(clusterJobId)) {\r
233                         case Session.DONE:\r
234                                 compbio.engine.client.Util.writeStatFile(Configurator.getWorkDirectory(jobId),\r
235                                                 JobStatus.FINISHED.toString());\r
236 \r
237                                 return compbio.metadata.JobStatus.FINISHED;\r
238                         case Session.FAILED:\r
239                                 compbio.engine.client.Util.writeMarker(Configurator.getWorkDirectory(jobId),\r
240                                                 JobStatus.FAILED);\r
241 \r
242                                 return compbio.metadata.JobStatus.FAILED;\r
243 \r
244                         case Session.RUNNING:\r
245                                 // will not persist this status as temporary\r
246                                 return compbio.metadata.JobStatus.RUNNING;\r
247 \r
248                         case Session.SYSTEM_SUSPENDED:\r
249                         case Session.USER_SYSTEM_SUSPENDED:\r
250                         case Session.USER_SUSPENDED:\r
251                         case Session.USER_SYSTEM_ON_HOLD:\r
252                         case Session.USER_ON_HOLD:\r
253                         case Session.SYSTEM_ON_HOLD:\r
254                         case Session.QUEUED_ACTIVE:\r
255                                 // will not persist this status as temporary\r
256                                 return compbio.metadata.JobStatus.PENDING;\r
257 \r
258                         default:\r
259                                 // It is possible that the this status is returned for a job that is almost completed\r
260                                 // when a state is changed from RUNNING to DONE\r
261                                 // It looks like a bug in DRMAA SGE implementation \r
262                                 return compbio.metadata.JobStatus.UNDEFINED;\r
263                         }\r
264                 } catch (InvalidJobException e) {\r
265                         log.info("Job " + jobId + " could not be located by DRMAA "\r
266                                         + e.getLocalizedMessage(), e.getCause());\r
267                         log.info("Attempting to determine the status by marker files");\r
268                         return getRecordedJobStatus(jobId);\r
269                 } catch (DrmaaException e) {\r
270                         log.error(\r
271                                         "Exception in DRMAA system while quering the job status: "\r
272                                                         + e.getLocalizedMessage(), e.getCause());\r
273                 } catch (IOException e) {\r
274                         log.error("Could not read JOBID for taskId: " + jobId\r
275                                         + " Message: " + e.getLocalizedMessage(), e.getCause());\r
276                 }\r
277 \r
278                 return JobStatus.UNDEFINED;\r
279         }\r
280 \r
281         static JobStatus getRecordedJobStatus(String jobId) { \r
282                 /*\r
283                  * Job has already been removed from the task list, so it running\r
284                  * status could not be determined. Most likely it has been\r
285                  * cancelled, finished or failed.\r
286                  */\r
287                 String workDir = Configurator.getWorkDirectory(jobId);\r
288                 if (Util.isMarked(workDir, JobStatus.FINISHED)\r
289                                 || Util.isMarked(workDir, JobStatus.COLLECTED)) {\r
290                         return JobStatus.FINISHED;\r
291                 }\r
292                 if (Util.isMarked(workDir, JobStatus.CANCELLED)) {\r
293                         return JobStatus.CANCELLED;\r
294                 }\r
295                 if (Util.isMarked(workDir, JobStatus.FAILED)) {\r
296                         return JobStatus.FAILED;\r
297                 }\r
298                 return JobStatus.UNDEFINED; \r
299         }\r
300         \r
301         \r
302         @Override\r
303         public boolean cleanup() {\r
304                 /*\r
305                  * TODO there is two additional files created by sun grid engine which\r
306                  * are named as follows: output this.getWorkDirectory() +\r
307                  * executable.getClass().getSimpleName() + "." + "o" + this.jobId; error\r
308                  * this.getWorkDirectory() + executable.getClass().getSimpleName() + "."\r
309                  * + "e" + this.jobId; individual executable does not know about these\r
310                  * two unless it implements PipedExecutable which need to collect data\r
311                  * from these streams Thus this method will fail to remove the task\r
312                  * directory completely\r
313                  */\r
314                 return Cleaner.deleteFiles(confExecutable);\r
315         }\r
316 \r
317         JobInfo waitForJob(String jobId) throws JobExecutionException {\r
318                 assert Util.isValidJobId(jobId);\r
319                 return ClusterUtil.waitForResult(clustSession, jobId);\r
320         }\r
321 \r
322         boolean cancelJob(String jobId) {\r
323                 assert Util.isValidJobId(jobId);\r
324                 return compbio.engine.cluster.drmaa.ClusterUtil.cancelJob(jobId,\r
325                                 clustSession);\r
326         }\r
327 \r
328         @Override\r
329         public boolean cancelJob() {\r
330                 return cancelJob(this.jobId);\r
331         }\r
332 \r
333         String submitJob() throws JobSubmissionException {\r
334 \r
335                 String jobId;\r
336                 try {\r
337                         jobId = session.runJob(jobtempl);\r
338                         log.info("submitted single job with jobids:");\r
339                         log.info("\t \"" + jobId + "\"");\r
340                         session.deleteJobTemplate(jobtempl);\r
341                         clustSession.addJob(jobId, confExecutable);\r
342                 } catch (DrmaaException e) {\r
343                         e.printStackTrace();\r
344                         throw new JobSubmissionException(e);\r
345                 }\r
346 \r
347                 return this.confExecutable.getTaskId();\r
348         }\r
349 \r
350         public String getWorkDirectory() {\r
351                 return this.workDirectory;\r
352         }\r
353 \r
354         @Override\r
355         public void executeJob() throws JobSubmissionException {\r
356                 this.jobId = submitJob();\r
357         }\r
358 \r
359         /**\r
360          * This method will block before the calculation has completed and then\r
361          * return the object containing a job execution statistics\r
362          * \r
363          * @return\r
364          * @throws JobExecutionException\r
365          */\r
366         public JobInfo getJobInfo() throws JobExecutionException {\r
367                 return waitForJob(this.jobId);\r
368         }\r
369 \r
370         @Override\r
371         public ConfiguredExecutable<?> waitForResult() throws JobExecutionException {\r
372                 ConfiguredExecutable<?> confExec;\r
373                 try {\r
374                         confExec = new AsyncJobRunner().getResults(this.jobId);\r
375                         if (confExec == null) {\r
376                                 log.warn("Could not find results of job " + this.jobId);\r
377                         }\r
378                 } catch (ResultNotAvailableException e) {\r
379                         log.error(e.getMessage(), e.getCause());\r
380                         throw new JobExecutionException(e);\r
381                 }\r
382                 return confExec;\r
383         }\r
384 \r
385         @Override\r
386         public compbio.metadata.JobStatus getJobStatus() {\r
387                 return getJobStatus(this.jobId);\r
388         }\r
389 \r
390         public static JobRunner getInstance(ConfiguredExecutable<?> executable)\r
391                         throws JobSubmissionException {\r
392                 return new JobRunner(executable);\r
393         }\r
394 \r
395 } // class end\r