d36ca899dd36541e24adf518ec15db42f9718ed1
[jabaws.git] / engine / compbio / engine / cluster / drmaa / ClusterRunner.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.cluster.drmaa;\r
20 \r
21 import java.io.IOException;\r
22 import java.util.Collections;\r
23 import java.util.List;\r
24 import java.util.Map;\r
25 \r
26 import org.apache.log4j.Logger;\r
27 import org.ggf.drmaa.DrmaaException;\r
28 import org.ggf.drmaa.InvalidJobException;\r
29 import org.ggf.drmaa.JobInfo;\r
30 import org.ggf.drmaa.JobTemplate;\r
31 import org.ggf.drmaa.Session;\r
32 \r
33 import compbio.engine.Cleaner;\r
34 import compbio.engine.ClusterJobId;\r
35 import compbio.engine.Configurator;\r
36 import compbio.engine.SyncExecutor;\r
37 \r
38 import compbio.engine.client.ConfiguredExecutable;\r
39 import compbio.engine.client.Executable;\r
40 import compbio.engine.client.PathValidator;\r
41 import compbio.engine.client.PipedExecutable;\r
42 import compbio.engine.client.EngineUtil;\r
43 import compbio.engine.client.Executable.ExecProvider;\r
44 import compbio.metadata.JobExecutionException;\r
45 import compbio.metadata.JobStatus;\r
46 import compbio.metadata.JobSubmissionException;\r
47 import compbio.metadata.ResultNotAvailableException;\r
48 \r
49 /**\r
50  * Single cluster job runner class\r
51  * \r
52  * @author pvtroshin\r
53  * @date August 2009\r
54  * \r
55  *       TODO after call to submitJob() no setters really work as the job\r
56  *       template gets deleted, this needs to be taken into account in this\r
57  *       class design!\r
58  */\r
59 public class ClusterRunner implements SyncExecutor {\r
60 \r
61         final JobTemplate jobtempl;\r
62         static ClusterSession clustSession = ClusterSession.getInstance();\r
63         static Session session = clustSession.getSession();\r
64         static final Logger log = Logger.getLogger(ClusterRunner.class);\r
65         final ConfiguredExecutable<?> confExecutable;\r
66         private final String workDirectory;\r
67         String jobId;\r
68 \r
69         public ClusterRunner(ConfiguredExecutable<?> confExec)\r
70                         throws JobSubmissionException {\r
71                 try {\r
72                         String command = confExec.getCommand(ExecProvider.Cluster);\r
73                         PathValidator.validateExecutable(command);\r
74                         log.debug("Setting command " + command);\r
75 \r
76                         jobtempl = session.createJobTemplate();\r
77                         jobtempl.setRemoteCommand(command);\r
78                         jobtempl.setJoinFiles(false);\r
79                         setJobName(confExec.getExecutable().getClass().getSimpleName());\r
80 \r
81                         this.workDirectory = confExec.getWorkDirectory();\r
82                         assert !compbio.util.Util.isEmpty(workDirectory);\r
83 \r
84                         // Tell the job where to get/put things\r
85                         jobtempl.setWorkingDirectory(this.workDirectory);\r
86 \r
87                         // Set environment variables for the process if any\r
88                         Map<String, String> jobEnv = confExec.getEnvironment();\r
89                         if (jobEnv != null && !jobEnv.isEmpty()) {\r
90                                 setJobEnvironmentVariables(jobEnv);\r
91                         }\r
92                         List<String> args = confExec.getParameters().getCommands();\r
93                         // Set optional parameters\r
94                         if (args != null && args.size() > 0) {\r
95                                 jobtempl.setArgs(args);\r
96                         }\r
97 \r
98                         //If executable need in/out data to be piped into it\r
99                         if (confExec.getExecutable() instanceof PipedExecutable<?>) {\r
100                                 setPipes(confExec);\r
101                         }\r
102 \r
103                         // If executable require special cluster configuration parameters to\r
104                         // be set e.g. queue, ram, time etc\r
105                         setNativeSpecs(confExec.getExecutable());\r
106 \r
107                         log.trace("using arguments: " + jobtempl.getArgs());\r
108                         this.confExecutable = confExec;\r
109                         // Save run configuration\r
110                         confExec.saveRunConfiguration();\r
111 \r
112                 } catch (DrmaaException e) {\r
113                         log.error(e.getLocalizedMessage(), e.getCause());\r
114                         throw new JobSubmissionException(e);\r
115                 } catch (IOException e) {\r
116                         log.error(e.getLocalizedMessage(), e.getCause());\r
117                         throw new JobSubmissionException(e);\r
118                 } \r
119 \r
120         }\r
121 \r
122         void setPipes(ConfiguredExecutable<?> executable) throws DrmaaException {\r
123 \r
124                 String output = executable.getOutput();\r
125                 String error = executable.getError();\r
126                 // Standard drmaa path format is hostname:path\r
127                 // to avoid supplying hostnames with all local paths just prepend colon\r
128                 // to the path\r
129                 // Input and output can be null as in and out files may be defined in\r
130                 // parameters\r
131                 /*\r
132                  * Use this for piping input into the process if (input != null) { if\r
133                  * (!input.contains(":")) { input = makeLocalPath(input);\r
134                  * log.trace("converting input to " + input); }\r
135                  * jobtempl.setInputPath(input); log.debug("use Input: " +\r
136                  * jobtempl.getInputPath()); }\r
137                  */\r
138                 if (output != null) {\r
139                         if (!output.contains(":")) {\r
140                                 output = makeLocalPath(output);\r
141                         }\r
142                         jobtempl.setOutputPath(output);\r
143                         log.debug("Output to: " + jobtempl.getOutputPath());\r
144                 }\r
145                 if (error != null) {\r
146                         if (!error.contains(":")) {\r
147                                 error = makeLocalPath(error);\r
148                         }\r
149                         jobtempl.setErrorPath(error);\r
150                         log.debug("Output errors to: " + jobtempl.getErrorPath());\r
151                 }\r
152 \r
153         }\r
154 \r
155         void setNativeSpecs(Executable<?> executable) throws DrmaaException {\r
156                 String nativeSpecs = executable.getClusterJobSettings(); \r
157                 if(!compbio.util.Util.isEmpty(nativeSpecs)) {\r
158                         log.debug("Using cluster job settings: " + nativeSpecs);\r
159                         jobtempl.setNativeSpecification(nativeSpecs);\r
160                 }\r
161         }\r
162 \r
163         void setEmail(String email) {\r
164                 log.trace("Setting email to:" + email);\r
165                 try {\r
166                         jobtempl.setEmail(Collections.singleton(email));\r
167                         jobtempl.setBlockEmail(false);\r
168                 } catch (DrmaaException e) {\r
169                         log.debug(e.getLocalizedMessage());\r
170                         throw new IllegalArgumentException(e);\r
171                 }\r
172         }\r
173 \r
174         void setJobName(String name) {\r
175                 log.trace("Setting job name to:" + name);\r
176                 try {\r
177                         jobtempl.setJobName(name);\r
178                 } catch (DrmaaException e) {\r
179                         log.debug(e.getLocalizedMessage());\r
180                         throw new IllegalArgumentException(e);\r
181                 }\r
182         }\r
183 \r
184         @SuppressWarnings("unchecked")\r
185         void setJobEnvironmentVariables(Map<String, String> env_variables) {\r
186                 assert env_variables != null && !env_variables.isEmpty();\r
187                 try {\r
188                         log.trace("Setting job environment to:" + env_variables);\r
189                         Map<String, String> sysEnv = jobtempl.getJobEnvironment();\r
190                         if (sysEnv != null && !sysEnv.isEmpty()) {\r
191                                 EngineUtil.mergeEnvVariables(sysEnv, env_variables);\r
192                         } else {\r
193                                 sysEnv = env_variables;\r
194                         }\r
195                         jobtempl.setJobEnvironment(sysEnv);\r
196 \r
197                 } catch (DrmaaException e) {\r
198                         log.debug(e.getLocalizedMessage());\r
199                         throw new IllegalArgumentException(e);\r
200                 }\r
201         }\r
202 \r
203         private static String makeLocalPath(String path) {\r
204                 return ":" + path;\r
205         }\r
206 \r
207         public boolean deepClean() {\r
208                 throw new UnsupportedOperationException();\r
209                 // TODO\r
210                 /*\r
211                  * remove all files from these this.jobtempl.getInputPath();\r
212                  * this.jobtempl.getOutputPath(); this.jobtempl.getWorkingDirectory();\r
213                  */\r
214                 // executable.getInputFiles();\r
215         }\r
216 \r
217         /**\r
218          * This will never return clust.engine.JobStatus.CANCELLED as for sun grid\r
219          * engine cancelled job is the same as failed. Cancelled jobs needs to be\r
220          * tracked manually!\r
221          */\r
222         static compbio.metadata.JobStatus getJobStatus(String jobId) {\r
223                 try {\r
224                         ClusterJobId clusterJobId = ClusterSession.getClusterJobId(jobId);\r
225                         switch (clustSession.getJobStatus(clusterJobId)) {\r
226                         case Session.DONE:\r
227                                 EngineUtil.writeStatFile(Configurator.getWorkDirectory(jobId), JobStatus.FINISHED.toString());\r
228                                 return compbio.metadata.JobStatus.FINISHED;\r
229 \r
230                         case Session.FAILED:\r
231                                 EngineUtil.writeMarker(Configurator.getWorkDirectory(jobId), JobStatus.FAILED);\r
232                                 return compbio.metadata.JobStatus.FAILED;\r
233 \r
234                         case Session.RUNNING:\r
235                                 // will not persist this status as temporary\r
236                                 return compbio.metadata.JobStatus.RUNNING;\r
237 \r
238                         case Session.SYSTEM_SUSPENDED:\r
239                         case Session.USER_SYSTEM_SUSPENDED:\r
240                         case Session.USER_SUSPENDED:\r
241                         case Session.USER_SYSTEM_ON_HOLD:\r
242                         case Session.USER_ON_HOLD:\r
243                         case Session.SYSTEM_ON_HOLD:\r
244                         case Session.QUEUED_ACTIVE:\r
245                                 // will not persist this status as temporary\r
246                                 return compbio.metadata.JobStatus.PENDING;\r
247 \r
248                         default:\r
249                                 // It is possible that the this status is returned for a job that is almost completed\r
250                                 // when a state is changed from RUNNING to DONE\r
251                                 // It looks like a bug in DRMAA SGE implementation \r
252                                 return compbio.metadata.JobStatus.UNDEFINED;\r
253                         }\r
254                 } catch (InvalidJobException e) {\r
255                         log.info("Job " + jobId + " could not be located by DRMAA "\r
256                                         + e.getLocalizedMessage(), e.getCause());\r
257                         log.info("Attempting to determine the status by marker files");\r
258                         return getRecordedJobStatus(jobId);\r
259                 } catch (DrmaaException e) {\r
260                         log.error(\r
261                                         "Exception in DRMAA system while quering the job status: "\r
262                                                         + e.getLocalizedMessage(), e.getCause());\r
263                 } catch (IOException e) {\r
264                         log.error("Could not read JOBID for taskId: " + jobId\r
265                                         + " Message: " + e.getLocalizedMessage(), e.getCause());\r
266                 }\r
267 \r
268                 return JobStatus.UNDEFINED;\r
269         }\r
270 \r
271         static JobStatus getRecordedJobStatus(String jobId) { \r
272                 /*\r
273                  * Job has already been removed from the task list, so it running\r
274                  * status could not be determined. Most likely it has been\r
275                  * cancelled, finished or failed.\r
276                  */\r
277                 String workDir = Configurator.getWorkDirectory(jobId);\r
278                 if (EngineUtil.isMarked(workDir, JobStatus.FINISHED)\r
279                                 || EngineUtil.isMarked(workDir, JobStatus.COLLECTED)) {\r
280                         return JobStatus.FINISHED;\r
281                 }\r
282                 if (EngineUtil.isMarked(workDir, JobStatus.CANCELLED)) {\r
283                         return JobStatus.CANCELLED;\r
284                 }\r
285                 if (EngineUtil.isMarked(workDir, JobStatus.FAILED)) {\r
286                         return JobStatus.FAILED;\r
287                 }\r
288                 return JobStatus.UNDEFINED; \r
289         }\r
290         \r
291         \r
292         @Override\r
293         public boolean cleanup() {\r
294                 /*\r
295                  * TODO there is two additional files created by sun grid engine which\r
296                  * are named as follows: output this.getWorkDirectory() +\r
297                  * executable.getClass().getSimpleName() + "." + "o" + this.jobId; error\r
298                  * this.getWorkDirectory() + executable.getClass().getSimpleName() + "."\r
299                  * + "e" + this.jobId; individual executable does not know about these\r
300                  * two unless it implements PipedExecutable which need to collect data\r
301                  * from these streams Thus this method will fail to remove the task\r
302                  * directory completely\r
303                  */\r
304                 return Cleaner.deleteFiles(confExecutable);\r
305         }\r
306 \r
307         JobInfo waitForJob(String jobId) throws JobExecutionException {\r
308                 assert EngineUtil.isValidJobId(jobId);\r
309                 return ClusterEngineUtil.waitForResult(clustSession, jobId);\r
310         }\r
311 \r
312         boolean cancelJob(String jobId) {\r
313                 assert EngineUtil.isValidJobId(jobId);\r
314                 return compbio.engine.cluster.drmaa.ClusterEngineUtil.cancelJob(jobId,\r
315                                 clustSession);\r
316         }\r
317 \r
318         @Override\r
319         public boolean cancelJob() {\r
320                 return cancelJob(this.jobId);\r
321         }\r
322 \r
323         String submitJob() throws JobSubmissionException {\r
324 \r
325                 String jobId;\r
326                 try {\r
327                         jobId = session.runJob(jobtempl);\r
328                         log.info("submitted single job with jobids:");\r
329                         log.info("\t \"" + jobId + "\"");\r
330                         session.deleteJobTemplate(jobtempl);\r
331                         clustSession.addJob(jobId, confExecutable);\r
332                 } catch (DrmaaException e) {\r
333                         e.printStackTrace();\r
334                         throw new JobSubmissionException(e);\r
335                 }\r
336 \r
337                 return this.confExecutable.getTaskId();\r
338         }\r
339 \r
340         public String getWorkDirectory() {\r
341                 return this.workDirectory;\r
342         }\r
343 \r
344         @Override\r
345         public void executeJob() throws JobSubmissionException {\r
346                 this.jobId = submitJob();\r
347         }\r
348 \r
349         /**\r
350          * This method will block before the calculation has completed and then\r
351          * return the object containing a job execution statistics\r
352          * \r
353          * @return\r
354          * @throws JobExecutionException\r
355          */\r
356         public JobInfo getJobInfo() throws JobExecutionException {\r
357                 return waitForJob(this.jobId);\r
358         }\r
359 \r
360         @Override\r
361         public ConfiguredExecutable<?> waitForResult() throws JobExecutionException {\r
362                 ConfiguredExecutable<?> confExec;\r
363                 try {\r
364                         confExec = new AsyncClusterRunner().getResults(this.jobId);\r
365                         if (confExec == null) {\r
366                                 log.warn("Could not find results of job " + this.jobId);\r
367                         }\r
368                 } catch (ResultNotAvailableException e) {\r
369                         log.error(e.getMessage(), e.getCause());\r
370                         throw new JobExecutionException(e);\r
371                 }\r
372                 return confExec;\r
373         }\r
374 \r
375         @Override\r
376         public compbio.metadata.JobStatus getJobStatus() {\r
377                 return getJobStatus(this.jobId);\r
378         }\r
379 \r
380         public static ClusterRunner getInstance(ConfiguredExecutable<?> executable)\r
381                         throws JobSubmissionException {\r
382                 return new ClusterRunner(executable);\r
383         }\r
384 \r
385 } // class end\r