Refactoring: rename duplicated Util classes
[jabaws.git] / engine / compbio / engine / cluster / drmaa / ClusterSession.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.cluster.drmaa;\r
20 \r
21 import java.io.File;\r
22 import java.io.IOException;\r
23 import java.util.Calendar;\r
24 import java.util.List;\r
25 import java.util.concurrent.CopyOnWriteArrayList;\r
26 \r
27 import org.apache.log4j.Logger;\r
28 import org.ggf.drmaa.DrmaaException;\r
29 import org.ggf.drmaa.InvalidJobException;\r
30 import org.ggf.drmaa.JobInfo;\r
31 import org.ggf.drmaa.Session;\r
32 import org.ggf.drmaa.SessionFactory;\r
33 \r
34 import compbio.engine.ClusterJobId;\r
35 import compbio.engine.Job;\r
36 import compbio.engine.Configurator;\r
37 import compbio.engine.client.ConfiguredExecutable;\r
38 import compbio.engine.client.PathValidator;\r
39 import compbio.engine.client.EngineUtil;\r
40 import compbio.engine.conf.PropertyHelperManager;\r
41 import compbio.metadata.JobStatus;\r
42 import compbio.metadata.ResultNotAvailableException;\r
43 import compbio.util.FileUtil;\r
44 import compbio.util.PropertyHelper;\r
45 import compbio.util.Util;\r
46 \r
47 public final class ClusterSession {\r
48 \r
49         private static final Logger log = Logger.getLogger(ClusterSession.class);\r
50 \r
51         private static final PropertyHelper ph = PropertyHelperManager\r
52                         .getPropertyHelper();\r
53 \r
54         public static final String JOBID = "JOBID";\r
55         // TaskId (getTaskDirectory()) -> ConfiguredExecutable<?> map\r
56         // Cluster jobId is only stored in a file\r
57         // static final Map<JobId, ConfiguredExecutable<?>> jobs = new\r
58         // ConcurrentHashMap<JobId, ConfiguredExecutable<?>>();\r
59 \r
60         static final List<Job> jobs = new CopyOnWriteArrayList<Job>();\r
61         private static boolean open = true;\r
62 \r
63         // Keep this at the end of other static initializers to avoid making\r
64         // incomplete instance!\r
65         private static final ClusterSession INSTANCE = new ClusterSession();\r
66 \r
67         private final Session session;\r
68         // can be used in init method to reconnect the the session\r
69         private final String sContact;\r
70 \r
71         // TODO deside whether the task list is needed!\r
72         // private static BufferedWriter tasks;\r
73 \r
74         private ClusterSession() {\r
75                 log.debug("Initializing session "\r
76                                 + Util.datef.format(Calendar.getInstance().getTime()));\r
77                 SessionFactory factory = SessionFactory.getFactory();\r
78                 session = factory.getSession();\r
79                 sContact = session.getContact();\r
80                 try {\r
81                         /*\r
82                          * String tasksFileName = ph.getProperty("cluster.tasks.file"); File\r
83                          * taskFile = new File(tasksFileName); if(!taskFile.exists()) {\r
84                          * taskFile.createNewFile(); } tasks = new BufferedWriter(new\r
85                          * PrintWriter(taskFile));\r
86                          */\r
87                         session.init(null);\r
88                         // Make sure that the session is going to be properly closed\r
89                         Runtime.getRuntime().addShutdownHook(new Thread() {\r
90                                 @Override\r
91                                 public void run() {\r
92                                         /*\r
93                                          * try { if(tasks!=null) { tasks.close(); } }\r
94                                          * catch(IOException e) { log.error(e.getMessage()); }\r
95                                          */\r
96                                         close();\r
97                                 }\r
98                         });\r
99 \r
100                 } catch (DrmaaException e) {\r
101                         log.error(e.getMessage());\r
102                 }\r
103                 /*\r
104                  * throw new RuntimeException("Could not create task file! " +\r
105                  * "Please check that Engine.cluster.properties " +\r
106                  * "file is provided and contains cluster.tasks.file property. " +\r
107                  * "This property should contain the file name " +\r
108                  * "for storing tasks ids! Cause: " + e.getMessage());\r
109                  */\r
110 \r
111         }\r
112 \r
113         synchronized static ClusterSession getInstance() {\r
114                 return INSTANCE;\r
115         }\r
116 \r
117         public Session getSession() {\r
118                 return INSTANCE.session;\r
119         }\r
120 \r
121         public void close() {\r
122                 try {\r
123                         if (open) {\r
124                                 session.exit();\r
125                                 open = false;\r
126                                 log.debug("Closing the session at: "\r
127                                                 + Util.datef.format(Calendar.getInstance().getTime()));\r
128                         }\r
129                 } catch (DrmaaException dre) {\r
130                         // Cannot recover at this point, just log\r
131                         dre.printStackTrace();\r
132                 }\r
133         }\r
134 \r
135         void addJob(String jobId, ConfiguredExecutable<?> executable) {\r
136                 String taskDirectory = executable.getTaskId();\r
137                 assert !PathValidator.isValidDirectory(taskDirectory) : "Directory provided is not valid! Directory: "\r
138                                 + taskDirectory;\r
139                 assert !Util.isEmpty(jobId);\r
140 \r
141                 EngineUtil.writeStatFile(executable.getWorkDirectory(), JobStatus.SUBMITTED.toString());\r
142                 EngineUtil.writeFile(executable.getWorkDirectory(), JOBID, jobId, false);\r
143                 log.debug("Adding taskId: " + taskDirectory + " to cluster job list");\r
144                 assert EngineUtil.isValidJobId(taskDirectory);\r
145                 jobs.add(new Job(taskDirectory, jobId, executable));\r
146         }\r
147 \r
148         public void removeJob(String taskId) {\r
149                 assert !Util.isEmpty(taskId);\r
150                 assert EngineUtil.isValidJobId(taskId);\r
151                 removeJobFromListbyTaskId(taskId);\r
152         }\r
153 \r
154         /*\r
155          * public List<JobInfo> waitForJobs(List<String> jobIds) throws\r
156          * DrmaaException { return waitForJobs(jobIds,\r
157          * Session.TIMEOUT_WAIT_FOREVER); }\r
158          * \r
159          * public List<JobInfo> waitForJobs(List<String> jobIds, long waitingTime)\r
160          * throws DrmaaException { if (!open) { throw new\r
161          * IllegalStateException("Session is already closed!"); } assert jobIds !=\r
162          * null && jobIds.size() > 1;\r
163          * \r
164          * session.synchronize(jobIds, waitingTime, false); List<JobInfo> jobsInfo =\r
165          * new ArrayList<JobInfo>(jobIds.size()); for (String jobId : jobIds) {\r
166          * jobsInfo.add(waitForJob(jobId)); } return jobsInfo; }\r
167          */\r
168 \r
169         /*\r
170          * public List<JobInfo> waitForAll() throws DrmaaException { assert\r
171          * jobs.size() > 0; return waitForJobs(new\r
172          * ArrayList<String>(ClusterSession.jobs.keySet())); }\r
173          * \r
174          * \r
175          * public void waitForAll_DropStatistics() throws DrmaaException { assert\r
176          * jobs.size() > 0; session.synchronize(new ArrayList<String>(Collections\r
177          * .unmodifiableCollection(ClusterSession.jobs.keySet())),\r
178          * Session.TIMEOUT_WAIT_FOREVER, true); }\r
179          */\r
180 \r
181         public JobInfo waitForJob(String taskId) throws DrmaaException, IOException {\r
182                 // String clusterJobId = ClusterSession.getClusterJobId(jobId);\r
183                 return waitForJob(taskId, Session.TIMEOUT_WAIT_FOREVER);\r
184         }\r
185 \r
186         public static ClusterJobId getClusterJobId(String taskId)\r
187                         throws IOException {\r
188                 Job job = Job.getByTaskId(taskId, jobs);\r
189                 if (job != null) {\r
190                         return job.getJobId();\r
191                 }\r
192                 // The job must have been removed from the task list use work\r
193                 // directory to find out jobid\r
194                 String workDir = compbio.engine.Configurator.getWorkDirectory(taskId);\r
195                 assert !Util.isEmpty(workDir);\r
196                 File file = new File(workDir, JOBID);\r
197                 log.debug("Looking up cluster jobid by the task id " + taskId\r
198                                 + " File path is " + file.getAbsolutePath());\r
199                 assert file.exists();\r
200                 return new ClusterJobId(FileUtil.readFileToString(file));\r
201         }\r
202 \r
203         public JobInfo waitForJob(String jobId, long waitingTime)\r
204                         throws DrmaaException, IOException {\r
205                 ClusterJobId cjobId = getClusterJobId(jobId);\r
206                 JobInfo status = session.wait(cjobId.getJobId(), waitingTime);\r
207                 // Once the job has been waited for it will be finished\r
208                 // Next time it will not be found in the session, so removed from the\r
209                 // job list\r
210                 EngineUtil.writeStatFile(Configurator.getWorkDirectory(jobId), JobStatus.FINISHED.toString());\r
211 \r
212                 return status;\r
213         }\r
214 \r
215         private static void removeJobFromListbyTaskId(String taskId) {\r
216                 assert !Util.isEmpty(taskId);\r
217                 Job job = Job.getByTaskId(taskId, jobs);\r
218                 if (job != null) {\r
219                         log.debug("Removing taskId" + taskId + " from cluster job list");\r
220                         jobs.remove(job);\r
221                 }\r
222         }\r
223 \r
224         public ConfiguredExecutable<?> getResults(String taskId)\r
225                         throws DrmaaException, ResultNotAvailableException {\r
226 \r
227                 EngineUtil.isValidJobId(taskId);\r
228                 try {\r
229                         JobInfo status = waitForJob(taskId);\r
230                 } catch (InvalidJobException e) {\r
231                         // Its OK to continue, the job may have already completed normally\r
232                         log.warn("Could not find the cluster job with id " + taskId\r
233                                         + " perhaps it has completed", e.getCause());\r
234                 } catch (IOException e) {\r
235                         log.error("Could not read JOBID file for the job " + taskId\r
236                                         + " Message " + e.getLocalizedMessage(), e.getCause());\r
237                 }\r
238                 // Once the job has been waited for it will be finished\r
239                 // Next time it will not be found in the session, so removed from the\r
240                 // job list\r
241                 ConfiguredExecutable<?> exec = null;\r
242                 Job job = Job.getByTaskId(taskId, jobs);\r
243                 if (job != null) {\r
244                         exec = job.getConfExecutable();\r
245                         removeJobFromListbyTaskId(taskId);\r
246                 } else {\r
247                         // If task was not find in the list of jobs, than it must have been\r
248                         // collected already\r
249                         // Resurrect the job to find out there the output is\r
250                         exec = EngineUtil.loadExecutable(taskId);\r
251                 }\r
252                 if (exec != null) {\r
253                         EngineUtil.writeMarker(exec.getWorkDirectory(),\r
254                                         JobStatus.COLLECTED);\r
255                 }\r
256                 return exec;\r
257         }\r
258 \r
259         public static StatisticManager getStatistics(JobInfo status)\r
260                         throws DrmaaException {\r
261                 return new StatisticManager(status);\r
262         }\r
263 \r
264         static void logStatistics(JobInfo status) throws DrmaaException {\r
265                 log.info(getStatistics(status).getAllStats());\r
266         }\r
267 \r
268         /**\r
269          * Apparently completed jobs cannot be found! If this happened most likely\r
270          * that the job is not running any more and Most likely it has been\r
271          * cancelled, finished or failed.\r
272          * \r
273          * @throws InvalidJobException\r
274          *             if the job is no longer in the queue or running. basically it\r
275          *             will throw this exception for all finished or cancelled jobs\r
276          */\r
277         public int getJobStatus(ClusterJobId jobId) throws DrmaaException,\r
278                         InvalidJobException {\r
279                 return session.getJobProgramStatus(jobId.getJobId());\r
280         }\r
281 \r
282         /**\r
283          * Method for getting jobs status by quering the cluster, It returns status\r
284          * in therms of a Sessions, not a JobStatus Should only be used for testing!\r
285          * \r
286          * @param status\r
287          * @return job status\r
288          * @throws DrmaaException\r
289          */\r
290         @Deprecated\r
291         public static String getJobStatus(final int status) throws DrmaaException {\r
292                 String statusString = null;\r
293                 switch (status) {\r
294                         case Session.UNDETERMINED :\r
295                                 statusString = "Job status cannot be determined\n";\r
296                                 break;\r
297                         case Session.QUEUED_ACTIVE :\r
298                                 statusString = "Job is queued and active\n";\r
299                                 break;\r
300                         case Session.SYSTEM_ON_HOLD :\r
301                                 statusString = "Job is queued and in system hold\n";\r
302                                 break;\r
303                         case Session.USER_ON_HOLD :\r
304                                 statusString = "Job is queued and in user hold\n";\r
305                                 break;\r
306                         case Session.USER_SYSTEM_ON_HOLD :\r
307                                 statusString = "Job is queued and in user and system hold\n";\r
308                                 break;\r
309                         case Session.RUNNING :\r
310                                 statusString = "Job is running\n";\r
311                                 break;\r
312                         case Session.SYSTEM_SUSPENDED :\r
313                                 statusString = "Job is system suspended\n";\r
314                                 break;\r
315                         case Session.USER_SUSPENDED :\r
316                                 statusString = "Job is user suspended\n";\r
317                                 break;\r
318                         case Session.USER_SYSTEM_SUSPENDED :\r
319                                 statusString = "Job is user and system suspended\n";\r
320                                 break;\r
321                         case Session.DONE :\r
322                                 statusString = "Job finished normally\n";\r
323                                 break;\r
324                         case Session.FAILED :\r
325                                 statusString = "Job finished, but failed\n";\r
326                                 break;\r
327                 }\r
328                 return statusString;\r
329         }\r
330 \r
331 }\r