Javadoc fixes
[jabaws.git] / engine / compbio / engine / cluster / drmaa / ClusterSession.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.cluster.drmaa;\r
20 \r
21 import java.io.File;\r
22 import java.io.IOException;\r
23 import java.util.Calendar;\r
24 import java.util.List;\r
25 import java.util.concurrent.CopyOnWriteArrayList;\r
26 \r
27 import org.apache.log4j.Logger;\r
28 import org.ggf.drmaa.DrmaaException;\r
29 import org.ggf.drmaa.InvalidJobException;\r
30 import org.ggf.drmaa.JobInfo;\r
31 import org.ggf.drmaa.Session;\r
32 import org.ggf.drmaa.SessionFactory;\r
33 \r
34 import compbio.engine.ClusterJobId;\r
35 import compbio.engine.Job;\r
36 import compbio.engine.client.ConfiguredExecutable;\r
37 import compbio.engine.client.PathValidator;\r
38 import compbio.engine.conf.PropertyHelperManager;\r
39 import compbio.metadata.JobStatus;\r
40 import compbio.metadata.ResultNotAvailableException;\r
41 import compbio.util.FileUtil;\r
42 import compbio.util.PropertyHelper;\r
43 import compbio.util.Util;\r
44 \r
45 public final class ClusterSession {\r
46 \r
47         private static final Logger log = Logger.getLogger(ClusterSession.class);\r
48 \r
49         private static final PropertyHelper ph = PropertyHelperManager\r
50                         .getPropertyHelper();\r
51 \r
52         public static final String JOBID = "JOBID";\r
53         // TaskId (getTaskDirectory()) -> ConfiguredExecutable<?> map\r
54         // Cluster jobId is only stored in a file\r
55         // static final Map<JobId, ConfiguredExecutable<?>> jobs = new\r
56         // ConcurrentHashMap<JobId, ConfiguredExecutable<?>>();\r
57 \r
58         static final List<Job> jobs = new CopyOnWriteArrayList<Job>();\r
59         private static boolean open = true;\r
60 \r
61         // Keep this at the end of other static initializers to avoid making\r
62         // incomplete instance!\r
63         private static final ClusterSession INSTANCE = new ClusterSession();\r
64 \r
65         private final Session session;\r
66         // can be used in init method to reconnect the the session\r
67         private final String sContact;\r
68 \r
69         // TODO deside whether the task list is needed!\r
70         // private static BufferedWriter tasks;\r
71 \r
72         private ClusterSession() {\r
73                 log.debug("Initializing session "\r
74                                 + Util.datef.format(Calendar.getInstance().getTime()));\r
75                 SessionFactory factory = SessionFactory.getFactory();\r
76                 session = factory.getSession();\r
77                 sContact = session.getContact();\r
78                 try {\r
79                         /*\r
80                          * String tasksFileName = ph.getProperty("cluster.tasks.file"); File\r
81                          * taskFile = new File(tasksFileName); if(!taskFile.exists()) {\r
82                          * taskFile.createNewFile(); } tasks = new BufferedWriter(new\r
83                          * PrintWriter(taskFile));\r
84                          */\r
85                         session.init(null);\r
86                         // Make sure that the session is going to be properly closed\r
87                         Runtime.getRuntime().addShutdownHook(new Thread() {\r
88                                 @Override\r
89                                 public void run() {\r
90                                         /*\r
91                                          * try { if(tasks!=null) { tasks.close(); } }\r
92                                          * catch(IOException e) { log.error(e.getMessage()); }\r
93                                          */\r
94                                         close();\r
95                                 }\r
96                         });\r
97 \r
98                 } catch (DrmaaException e) {\r
99                         log.error(e.getMessage());\r
100                 }\r
101                 /*\r
102                  * throw new RuntimeException("Could not create task file! " +\r
103                  * "Please check that Engine.cluster.properties " +\r
104                  * "file is provided and contains cluster.tasks.file property. " +\r
105                  * "This property should contain the file name " +\r
106                  * "for storing tasks ids! Cause: " + e.getMessage());\r
107                  */\r
108 \r
109         }\r
110 \r
111         synchronized static ClusterSession getInstance() {\r
112                 return INSTANCE;\r
113         }\r
114 \r
115         public Session getSession() {\r
116                 return INSTANCE.session;\r
117         }\r
118 \r
119         public void close() {\r
120                 try {\r
121                         if (open) {\r
122                                 session.exit();\r
123                                 open = false;\r
124                                 log.debug("Closing the session at: "\r
125                                                 + Util.datef.format(Calendar.getInstance().getTime()));\r
126                         }\r
127                 } catch (DrmaaException dre) {\r
128                         // Cannot recover at this point, just log\r
129                         dre.printStackTrace();\r
130                 }\r
131         }\r
132 \r
133         void addJob(String jobId, ConfiguredExecutable<?> executable) {\r
134                 String taskDirectory = executable.getTaskId();\r
135                 assert !PathValidator.isValidDirectory(taskDirectory) : "Directory provided is not valid! Directory: "\r
136                                 + taskDirectory;\r
137                 assert !Util.isEmpty(jobId);\r
138 \r
139                 compbio.engine.client.Util.writeStatFile(executable.getWorkDirectory(),\r
140                                 JobStatus.SUBMITTED.toString());\r
141                 compbio.engine.client.Util.writeFile(executable.getWorkDirectory(),\r
142                                 JOBID, jobId, false);\r
143                 log.debug("Adding taskId: " + taskDirectory + " to cluster job list");\r
144                 assert compbio.engine.client.Util.isValidJobId(taskDirectory);\r
145                 jobs.add(new Job(taskDirectory, jobId, executable));\r
146         }\r
147 \r
148         public void removeJob(String taskId) {\r
149                 assert !Util.isEmpty(taskId);\r
150                 assert compbio.engine.client.Util.isValidJobId(taskId);\r
151                 removeJobFromListbyTaskId(taskId);\r
152         }\r
153 \r
154         /*\r
155          * public List<JobInfo> waitForJobs(List<String> jobIds) throws\r
156          * DrmaaException { return waitForJobs(jobIds,\r
157          * Session.TIMEOUT_WAIT_FOREVER); }\r
158          * \r
159          * public List<JobInfo> waitForJobs(List<String> jobIds, long waitingTime)\r
160          * throws DrmaaException { if (!open) { throw new\r
161          * IllegalStateException("Session is already closed!"); } assert jobIds !=\r
162          * null && jobIds.size() > 1;\r
163          * \r
164          * session.synchronize(jobIds, waitingTime, false); List<JobInfo> jobsInfo =\r
165          * new ArrayList<JobInfo>(jobIds.size()); for (String jobId : jobIds) {\r
166          * jobsInfo.add(waitForJob(jobId)); } return jobsInfo; }\r
167          */\r
168 \r
169         /*\r
170          * public List<JobInfo> waitForAll() throws DrmaaException { assert\r
171          * jobs.size() > 0; return waitForJobs(new\r
172          * ArrayList<String>(ClusterSession.jobs.keySet())); }\r
173          * \r
174          * \r
175          * public void waitForAll_DropStatistics() throws DrmaaException { assert\r
176          * jobs.size() > 0; session.synchronize(new ArrayList<String>(Collections\r
177          * .unmodifiableCollection(ClusterSession.jobs.keySet())),\r
178          * Session.TIMEOUT_WAIT_FOREVER, true); }\r
179          */\r
180 \r
181         public JobInfo waitForJob(String taskId) throws DrmaaException, IOException {\r
182                 // String clusterJobId = ClusterSession.getClusterJobId(jobId);\r
183                 return waitForJob(taskId, Session.TIMEOUT_WAIT_FOREVER);\r
184         }\r
185 \r
186         public static ClusterJobId getClusterJobId(String taskId)\r
187                         throws IOException {\r
188                 Job job = Job.getByTaskId(taskId, jobs);\r
189                 if (job != null) {\r
190                         return job.getJobId();\r
191                 }\r
192                 // The job must have been removed from the task list use work\r
193                 // directory to find out jobid\r
194                 String workDir = compbio.engine.Configurator.getWorkDirectory(taskId);\r
195                 assert !Util.isEmpty(workDir);\r
196                 File file = new File(workDir, JOBID);\r
197                 log.debug("Looking up cluster jobid by the task id " + taskId\r
198                                 + " File path is " + file.getAbsolutePath());\r
199                 assert file.exists();\r
200                 return new ClusterJobId(FileUtil.readFileToString(file));\r
201         }\r
202 \r
203         public JobInfo waitForJob(String jobId, long waitingTime)\r
204                         throws DrmaaException, IOException {\r
205                 ClusterJobId cjobId = getClusterJobId(jobId);\r
206                 JobInfo status = session.wait(cjobId.getJobId(), waitingTime);\r
207                 // Once the job has been waited for it will be finished\r
208                 // Next time it will not be found in the session, so removed from the\r
209                 // job list\r
210                 compbio.engine.client.Util.writeStatFile(\r
211                                 compbio.engine.Configurator.getWorkDirectory(jobId),\r
212                                 JobStatus.FINISHED.toString());\r
213 \r
214                 return status;\r
215         }\r
216 \r
217         private static void removeJobFromListbyTaskId(String taskId) {\r
218                 assert !Util.isEmpty(taskId);\r
219                 Job job = Job.getByTaskId(taskId, jobs);\r
220                 if (job != null) {\r
221                         log.debug("Removing taskId" + taskId + " from cluster job list");\r
222                         jobs.remove(job);\r
223                 }\r
224         }\r
225 \r
226         public ConfiguredExecutable<?> getResults(String taskId)\r
227                         throws DrmaaException, ResultNotAvailableException {\r
228 \r
229                 compbio.engine.client.Util.isValidJobId(taskId);\r
230                 try {\r
231                         JobInfo status = waitForJob(taskId);\r
232                 } catch (InvalidJobException e) {\r
233                         // Its OK to continue, the job may have already completed normally\r
234                         log.warn("Could not find the cluster job with id " + taskId\r
235                                         + " perhaps it has completed", e.getCause());\r
236                 } catch (IOException e) {\r
237                         log.error("Could not read JOBID file for the job " + taskId\r
238                                         + " Message " + e.getLocalizedMessage(), e.getCause());\r
239                 }\r
240                 // Once the job has been waited for it will be finished\r
241                 // Next time it will not be found in the session, so removed from the\r
242                 // job list\r
243                 ConfiguredExecutable<?> exec = null;\r
244                 Job job = Job.getByTaskId(taskId, jobs);\r
245                 if (job != null) {\r
246                         exec = job.getConfExecutable();\r
247                         removeJobFromListbyTaskId(taskId);\r
248                 } else {\r
249                         // If task was not find in the list of jobs, than it must have been\r
250                         // collected already\r
251                         // Resurrect the job to find out there the output is\r
252                         exec = compbio.engine.client.Util.loadExecutable(taskId);\r
253                 }\r
254                 if (exec != null) {\r
255                         compbio.engine.client.Util.writeMarker(exec.getWorkDirectory(),\r
256                                         JobStatus.COLLECTED);\r
257                 }\r
258                 return exec;\r
259         }\r
260 \r
261         public static StatisticManager getStatistics(JobInfo status)\r
262                         throws DrmaaException {\r
263                 return new StatisticManager(status);\r
264         }\r
265 \r
266         static void logStatistics(JobInfo status) throws DrmaaException {\r
267                 log.info(getStatistics(status).getAllStats());\r
268         }\r
269 \r
270         /**\r
271          * Apparently completed jobs cannot be found! If this happened most likely\r
272          * that the job is not running any more and Most likely it has been\r
273          * cancelled, finished or failed.\r
274          * \r
275          * @throws InvalidJobException\r
276          *             if the job is no longer in the queue or running. basically it\r
277          *             will throw this exception for all finished or cancelled jobs\r
278          */\r
279         public int getJobStatus(ClusterJobId jobId) throws DrmaaException,\r
280                         InvalidJobException {\r
281                 return session.getJobProgramStatus(jobId.getJobId());\r
282         }\r
283 \r
284         /**\r
285          * Method for getting jobs status by quering the cluster, It returns status\r
286          * in therms of a Sessions, not a JobStatus Should only be used for testing!\r
287          * \r
288          * @param status\r
289          * @return job status\r
290          * @throws DrmaaException\r
291          */\r
292         @Deprecated\r
293         public static String getJobStatus(final int status) throws DrmaaException {\r
294                 String statusString = null;\r
295                 switch (status) {\r
296                         case Session.UNDETERMINED :\r
297                                 statusString = "Job status cannot be determined\n";\r
298                                 break;\r
299                         case Session.QUEUED_ACTIVE :\r
300                                 statusString = "Job is queued and active\n";\r
301                                 break;\r
302                         case Session.SYSTEM_ON_HOLD :\r
303                                 statusString = "Job is queued and in system hold\n";\r
304                                 break;\r
305                         case Session.USER_ON_HOLD :\r
306                                 statusString = "Job is queued and in user hold\n";\r
307                                 break;\r
308                         case Session.USER_SYSTEM_ON_HOLD :\r
309                                 statusString = "Job is queued and in user and system hold\n";\r
310                                 break;\r
311                         case Session.RUNNING :\r
312                                 statusString = "Job is running\n";\r
313                                 break;\r
314                         case Session.SYSTEM_SUSPENDED :\r
315                                 statusString = "Job is system suspended\n";\r
316                                 break;\r
317                         case Session.USER_SUSPENDED :\r
318                                 statusString = "Job is user suspended\n";\r
319                                 break;\r
320                         case Session.USER_SYSTEM_SUSPENDED :\r
321                                 statusString = "Job is user and system suspended\n";\r
322                                 break;\r
323                         case Session.DONE :\r
324                                 statusString = "Job finished normally\n";\r
325                                 break;\r
326                         case Session.FAILED :\r
327                                 statusString = "Job finished, but failed\n";\r
328                                 break;\r
329                 }\r
330                 return statusString;\r
331         }\r
332 \r
333 }\r