e8b17b9604e596935f61b02e55db086c34cd577f
[jabaws.git] / engine / compbio / engine / cluster / drmaa / ClusterSession.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.cluster.drmaa;\r
20 \r
21 import java.io.File;\r
22 import java.io.IOException;\r
23 import java.util.Calendar;\r
24 import java.util.List;\r
25 import java.util.concurrent.CopyOnWriteArrayList;\r
26 \r
27 import org.apache.log4j.Logger;\r
28 import org.ggf.drmaa.DrmaaException;\r
29 import org.ggf.drmaa.InvalidJobException;\r
30 import org.ggf.drmaa.JobInfo;\r
31 import org.ggf.drmaa.Session;\r
32 import org.ggf.drmaa.SessionFactory;\r
33 \r
34 import compbio.engine.ClusterJobId;\r
35 import compbio.engine.Job;\r
36 import compbio.engine.client.ConfiguredExecutable;\r
37 import compbio.engine.client.PathValidator;\r
38 import compbio.engine.conf.PropertyHelperManager;\r
39 import compbio.metadata.JobStatus;\r
40 import compbio.metadata.ResultNotAvailableException;\r
41 import compbio.util.FileUtil;\r
42 import compbio.util.PropertyHelper;\r
43 import compbio.util.Util;\r
44 \r
45 public final class ClusterSession {\r
46 \r
47         private static final Logger log = Logger.getLogger(ClusterSession.class);\r
48 \r
49         private static final PropertyHelper ph = PropertyHelperManager\r
50                         .getPropertyHelper();\r
51 \r
52         public static final String JOBID = "JOBID";\r
53         // TaskId (getTaskDirectory()) -> ConfiguredExecutable<?> map\r
54         // Cluster jobId is only stored in a file\r
55         // static final Map<JobId, ConfiguredExecutable<?>> jobs = new\r
56         // ConcurrentHashMap<JobId, ConfiguredExecutable<?>>();\r
57 \r
58         static final List<Job> jobs = new CopyOnWriteArrayList<Job>();\r
59         private static boolean open = true;\r
60 \r
61         // Keep this at the end of other static initializers to avoid making\r
62         // incomplete instance!\r
63         private static final ClusterSession INSTANCE = new ClusterSession();\r
64 \r
65         private final Session session;\r
66         // can be used in init method to reconnect the the session\r
67         private final String sContact;\r
68 \r
69         // TODO deside whether the task list is needed!\r
70         // private static BufferedWriter tasks;\r
71 \r
72         private ClusterSession() {\r
73                 log.debug("Initializing session "\r
74                                 + Util.datef.format(Calendar.getInstance().getTime()));\r
75                 SessionFactory factory = SessionFactory.getFactory();\r
76                 session = factory.getSession();\r
77                 sContact = session.getContact();\r
78                 try {\r
79                         /*\r
80                          * String tasksFileName = ph.getProperty("cluster.tasks.file"); File\r
81                          * taskFile = new File(tasksFileName); if(!taskFile.exists()) {\r
82                          * taskFile.createNewFile(); } tasks = new BufferedWriter(new\r
83                          * PrintWriter(taskFile));\r
84                          */\r
85                         session.init(null);\r
86                         // Make sure that the session is going to be properly closed\r
87                         Runtime.getRuntime().addShutdownHook(new Thread() {\r
88                                 @Override\r
89                                 public void run() {\r
90                                         /*\r
91                                          * try { if(tasks!=null) { tasks.close(); } }\r
92                                          * catch(IOException e) { log.error(e.getMessage()); }\r
93                                          */\r
94                                         close();\r
95                                 }\r
96                         });\r
97 \r
98                 } catch (DrmaaException e) {\r
99                         log.error(e.getMessage());\r
100                 }\r
101                 /*\r
102                  * throw new RuntimeException("Could not create task file! " +\r
103                  * "Please check that Engine.cluster.properties " +\r
104                  * "file is provided and contains cluster.tasks.file property. " +\r
105                  * "This property should contain the file name " +\r
106                  * "for storing tasks ids! Cause: " + e.getMessage());\r
107                  */\r
108 \r
109         }\r
110 \r
111         synchronized static ClusterSession getInstance() {\r
112                 return INSTANCE;\r
113         }\r
114 \r
115         public Session getSession() {\r
116                 return INSTANCE.session;\r
117         }\r
118 \r
119         public void close() {\r
120                 try {\r
121                         if (open) {\r
122                                 session.exit();\r
123                                 open = false;\r
124                                 log.debug("Closing the session at: "\r
125                                                 + Util.datef.format(Calendar.getInstance().getTime()));\r
126                         }\r
127                 } catch (DrmaaException dre) {\r
128                         // Cannot recover at this point, just log\r
129                         dre.printStackTrace();\r
130                 }\r
131         }\r
132 \r
133         void addJob(String jobId, ConfiguredExecutable<?> executable) {\r
134                 String taskDirectory = executable.getTaskId();\r
135                 assert !PathValidator.isValidDirectory(taskDirectory) : "Directory provided is not valid! Directory: "\r
136                                 + taskDirectory;\r
137                 assert !Util.isEmpty(jobId);\r
138 \r
139                 compbio.engine.client.Util.writeStatFile(executable.getWorkDirectory(),\r
140                                 JobStatus.SUBMITTED.toString());\r
141                 compbio.engine.client.Util.writeFile(executable.getWorkDirectory(),\r
142                                 JOBID, jobId, false);\r
143                 log.debug("Adding taskId: " + taskDirectory + " to cluster job list");\r
144                 assert compbio.engine.client.Util.isValidJobId(taskDirectory);\r
145                 jobs.add(new Job(taskDirectory, jobId, executable));\r
146         }\r
147 \r
148         public void removeJob(String taskId) {\r
149                 assert !Util.isEmpty(taskId);\r
150                 assert compbio.engine.client.Util.isValidJobId(taskId);\r
151                 removeJobFromListbyTaskId(taskId);\r
152         }\r
153 \r
154         /*\r
155          * public List<JobInfo> waitForJobs(List<String> jobIds) throws\r
156          * DrmaaException { return waitForJobs(jobIds,\r
157          * Session.TIMEOUT_WAIT_FOREVER); }\r
158          * \r
159          * public List<JobInfo> waitForJobs(List<String> jobIds, long waitingTime)\r
160          * throws DrmaaException { if (!open) { throw new\r
161          * IllegalStateException("Session is already closed!"); } assert jobIds !=\r
162          * null && jobIds.size() > 1;\r
163          * \r
164          * session.synchronize(jobIds, waitingTime, false); List<JobInfo> jobsInfo =\r
165          * new ArrayList<JobInfo>(jobIds.size()); for (String jobId : jobIds) {\r
166          * jobsInfo.add(waitForJob(jobId)); } return jobsInfo; }\r
167          */\r
168 \r
169         /*\r
170          * public List<JobInfo> waitForAll() throws DrmaaException { assert\r
171          * jobs.size() > 0; return waitForJobs(new\r
172          * ArrayList<String>(ClusterSession.jobs.keySet())); }\r
173          * \r
174          * \r
175          * public void waitForAll_DropStatistics() throws DrmaaException { assert\r
176          * jobs.size() > 0; session.synchronize(new ArrayList<String>(Collections\r
177          * .unmodifiableCollection(ClusterSession.jobs.keySet())),\r
178          * Session.TIMEOUT_WAIT_FOREVER, true); }\r
179          */\r
180 \r
181         public JobInfo waitForJob(String taskId) throws DrmaaException, IOException {\r
182                 // String clusterJobId = ClusterSession.getClusterJobId(jobId);\r
183                 return waitForJob(taskId, Session.TIMEOUT_WAIT_FOREVER);\r
184         }\r
185 \r
186         public static ClusterJobId getClusterJobId(String taskId)\r
187                         throws IOException {\r
188                 Job job = Job.getByTaskId(taskId, jobs);\r
189                 if (job != null) {\r
190                         return job.getJobId();\r
191                 }\r
192                 // The job must have been removed from the task list use work\r
193                 // directory to find out jobid\r
194                 String workDir = compbio.engine.Configurator.getWorkDirectory(taskId);\r
195                 assert !Util.isEmpty(workDir);\r
196                 File file = new File(workDir, JOBID);\r
197                 log.debug("Looking up cluster jobid by the task id " + taskId\r
198                                 + " File path is " + file.getAbsolutePath());\r
199                 assert file.exists();\r
200                 return new ClusterJobId(FileUtil.readFileToString(file));\r
201         }\r
202 \r
203         public JobInfo waitForJob(String jobId, long waitingTime)\r
204                         throws DrmaaException, IOException {\r
205                 ClusterJobId cjobId = getClusterJobId(jobId);\r
206                 JobInfo status = session.wait(cjobId.getJobId(), waitingTime);\r
207                 // Once the job has been waited for it will be finished\r
208                 // Next time it will not be found in the session, so removed from the\r
209                 // job list\r
210                 compbio.engine.client.Util.writeStatFile(compbio.engine.Configurator\r
211                                 .getWorkDirectory(jobId), JobStatus.FINISHED.toString());\r
212 \r
213                 return status;\r
214         }\r
215 \r
216         private static void removeJobFromListbyTaskId(String taskId) {\r
217                 assert !Util.isEmpty(taskId);\r
218                 Job job = Job.getByTaskId(taskId, jobs);\r
219                 if (job != null) {\r
220                         log.debug("Removing taskId" + taskId + " from cluster job list");\r
221                         jobs.remove(job);\r
222                 }\r
223         }\r
224 \r
225         public ConfiguredExecutable<?> getResults(String taskId)\r
226                         throws DrmaaException, ResultNotAvailableException {\r
227 \r
228                 compbio.engine.client.Util.isValidJobId(taskId);\r
229                 try {\r
230                         JobInfo status = waitForJob(taskId);\r
231                 } catch (InvalidJobException e) {\r
232                         // Its OK to continue, the job may have already completed normally\r
233                         log.warn("Could not find the cluster job with id " + taskId\r
234                                         + " perhaps it has completed", e.getCause());\r
235                 } catch (IOException e) {\r
236                         log.error("Could not read JOBID file for the job " + taskId\r
237                                         + " Message " + e.getLocalizedMessage(), e.getCause());\r
238                 }\r
239                 // Once the job has been waited for it will be finished\r
240                 // Next time it will not be found in the session, so removed from the\r
241                 // job list\r
242                 ConfiguredExecutable<?> exec = null;\r
243                 Job job = Job.getByTaskId(taskId, jobs);\r
244                 if (job != null) {\r
245                         exec = job.getConfExecutable();\r
246                         removeJobFromListbyTaskId(taskId);\r
247                 } else {\r
248                         // If task was not find in the list of jobs, than it must have been\r
249                         // collected already\r
250                         // Resurrect the job to find out there the output is\r
251                         exec = compbio.engine.client.Util.loadExecutable(taskId);\r
252                 }\r
253                 if (exec != null) {\r
254                         compbio.engine.client.Util.writeMarker(exec.getWorkDirectory(),\r
255                                         JobStatus.COLLECTED);\r
256                 }\r
257                 return exec;\r
258         }\r
259 \r
260         public static StatisticManager getStatistics(JobInfo status)\r
261                         throws DrmaaException {\r
262                 return new StatisticManager(status);\r
263         }\r
264 \r
265         static void logStatistics(JobInfo status) throws DrmaaException {\r
266                 log.info(getStatistics(status).getAllStats());\r
267         }\r
268 \r
269         /**\r
270          * Apparently completed jobs cannot be found! If this happened most likely\r
271          * that the job is not running any more and Most likely it has been\r
272          * cancelled, finished or failed.\r
273          * \r
274          * @throws InvalidJobException\r
275          *             if the job is no longer in the queue or running. basically it\r
276          *             will throw this exception for all finished or cancelled jobs\r
277          */\r
278         public int getJobStatus(ClusterJobId jobId) throws DrmaaException,\r
279                         InvalidJobException {\r
280                 return session.getJobProgramStatus(jobId.getJobId());\r
281         }\r
282 \r
283         /**\r
284          * Method for getting jobs status by quering the cluster, It returns status\r
285          * in therms of a Sessions, not a JobStatus Should only be used for testing!\r
286          * \r
287          * @param status\r
288          * @return\r
289          * @throws DrmaaException\r
290          */\r
291         @Deprecated\r
292         public static String getJobStatus(final int status) throws DrmaaException {\r
293                 String statusString = null;\r
294                 switch (status) {\r
295                 case Session.UNDETERMINED:\r
296                         statusString = "Job status cannot be determined\n";\r
297                         break;\r
298                 case Session.QUEUED_ACTIVE:\r
299                         statusString = "Job is queued and active\n";\r
300                         break;\r
301                 case Session.SYSTEM_ON_HOLD:\r
302                         statusString = "Job is queued and in system hold\n";\r
303                         break;\r
304                 case Session.USER_ON_HOLD:\r
305                         statusString = "Job is queued and in user hold\n";\r
306                         break;\r
307                 case Session.USER_SYSTEM_ON_HOLD:\r
308                         statusString = "Job is queued and in user and system hold\n";\r
309                         break;\r
310                 case Session.RUNNING:\r
311                         statusString = "Job is running\n";\r
312                         break;\r
313                 case Session.SYSTEM_SUSPENDED:\r
314                         statusString = "Job is system suspended\n";\r
315                         break;\r
316                 case Session.USER_SUSPENDED:\r
317                         statusString = "Job is user suspended\n";\r
318                         break;\r
319                 case Session.USER_SYSTEM_SUSPENDED:\r
320                         statusString = "Job is user and system suspended\n";\r
321                         break;\r
322                 case Session.DONE:\r
323                         statusString = "Job finished normally\n";\r
324                         break;\r
325                 case Session.FAILED:\r
326                         statusString = "Job finished, but failed\n";\r
327                         break;\r
328                 }\r
329                 return statusString;\r
330         }\r
331 \r
332 }\r