Refactoring (renaming) 2 classes: AsyncJobRunner.java -> AsyncClusterRunner.java...
[jabaws.git] / engine / compbio / engine / cluster / drmaa / ClusterSession.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.cluster.drmaa;\r
20 \r
21 import java.io.File;\r
22 import java.io.IOException;\r
23 import java.util.Calendar;\r
24 import java.util.List;\r
25 import java.util.concurrent.CopyOnWriteArrayList;\r
26 \r
27 import org.apache.log4j.Logger;\r
28 import org.ggf.drmaa.DrmaaException;\r
29 import org.ggf.drmaa.InvalidJobException;\r
30 import org.ggf.drmaa.JobInfo;\r
31 import org.ggf.drmaa.Session;\r
32 import org.ggf.drmaa.SessionFactory;\r
33 \r
34 import compbio.engine.ClusterJobId;\r
35 import compbio.engine.Job;\r
36 import compbio.engine.Configurator;\r
37 import compbio.engine.client.ConfiguredExecutable;\r
38 import compbio.engine.client.PathValidator;\r
39 import compbio.engine.client.EngineUtil;\r
40 import compbio.engine.conf.PropertyHelperManager;\r
41 import compbio.metadata.JobStatus;\r
42 import compbio.metadata.ResultNotAvailableException;\r
43 import compbio.util.FileUtil;\r
44 import compbio.util.PropertyHelper;\r
45 import compbio.util.Util;\r
46 \r
47 public final class ClusterSession {\r
48 \r
49         private static final Logger log = Logger.getLogger(ClusterSession.class);\r
50 \r
51         private static final PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
52 \r
53         public static final String JOBID = "JOBID";\r
54         // TaskId (getTaskDirectory()) -> ConfiguredExecutable<?> map\r
55         // Cluster jobId is only stored in a file\r
56         // static final Map<JobId, ConfiguredExecutable<?>> jobs = new\r
57         // ConcurrentHashMap<JobId, ConfiguredExecutable<?>>();\r
58 \r
59         static final List<Job> jobs = new CopyOnWriteArrayList<Job>();\r
60         private static boolean open = true;\r
61 \r
62         // Keep this at the end of other static initializers to avoid making\r
63         // incomplete instance!\r
64         private static final ClusterSession INSTANCE = new ClusterSession();\r
65 \r
66         private final Session session;\r
67         // can be used in init method to reconnect the the session\r
68         private final String sContact;\r
69 \r
70         // TODO deside whether the task list is needed!\r
71         // private static BufferedWriter tasks;\r
72 \r
73         private ClusterSession() {\r
74                 log.debug("Initializing session " + Util.datef.format(Calendar.getInstance().getTime()));\r
75                 SessionFactory factory = SessionFactory.getFactory();\r
76                 session = factory.getSession();\r
77                 sContact = session.getContact();\r
78                 try {\r
79                         /*\r
80                          * String tasksFileName = ph.getProperty("cluster.tasks.file"); File\r
81                          * taskFile = new File(tasksFileName); if(!taskFile.exists()) {\r
82                          * taskFile.createNewFile(); } tasks = new BufferedWriter(new\r
83                          * PrintWriter(taskFile));\r
84                          */\r
85                         session.init(null);\r
86                         // Make sure that the session is going to be properly closed\r
87                         Runtime.getRuntime().addShutdownHook(new Thread() {\r
88                                 @Override\r
89                                 public void run() {\r
90                                         /*\r
91                                          * try { if(tasks!=null) { tasks.close(); } }\r
92                                          * catch(IOException e) { log.error(e.getMessage()); }\r
93                                          */\r
94                                         close();\r
95                                 }\r
96                         });\r
97 \r
98                 } catch (DrmaaException e) {\r
99                         log.error(e.getMessage());\r
100                 }\r
101                 /*\r
102                  * throw new RuntimeException("Could not create task file! " +\r
103                  * "Please check that Engine.cluster.properties " +\r
104                  * "file is provided and contains cluster.tasks.file property. " +\r
105                  * "This property should contain the file name " +\r
106                  * "for storing tasks ids! Cause: " + e.getMessage());\r
107                  */\r
108 \r
109         }\r
110 \r
111         synchronized static ClusterSession getInstance() {\r
112                 return INSTANCE;\r
113         }\r
114 \r
115         public Session getSession() {\r
116                 return INSTANCE.session;\r
117         }\r
118 \r
119         public void close() {\r
120                 try {\r
121                         if (open) {\r
122                                 session.exit();\r
123                                 open = false;\r
124                                 log.debug("Closing the session at: " + Util.datef.format(Calendar.getInstance().getTime()));\r
125                         }\r
126                 } catch (DrmaaException dre) {\r
127                         // Cannot recover at this point, just log\r
128                         dre.printStackTrace();\r
129                 }\r
130         }\r
131 \r
132         void addJob(String jobId, ConfiguredExecutable<?> executable) {\r
133                 String taskDirectory = executable.getTaskId();\r
134                 assert !PathValidator.isValidDirectory(taskDirectory) : "Directory provided is not valid! Directory: "\r
135                                 + taskDirectory;\r
136                 assert !Util.isEmpty(jobId);\r
137 \r
138                 EngineUtil.writeStatFile(executable.getWorkDirectory(), JobStatus.SUBMITTED.toString());\r
139                 EngineUtil.writeFile(executable.getWorkDirectory(), JOBID, jobId, false);\r
140                 log.debug("Adding taskId: " + taskDirectory + " to cluster job list");\r
141                 assert EngineUtil.isValidJobId(taskDirectory);\r
142                 jobs.add(new Job(taskDirectory, jobId, executable));\r
143         }\r
144 \r
145         public void removeJob(String taskId) {\r
146                 assert !Util.isEmpty(taskId);\r
147                 assert EngineUtil.isValidJobId(taskId);\r
148                 removeJobFromListbyTaskId(taskId);\r
149         }\r
150 \r
151         /*\r
152          * public List<JobInfo> waitForJobs(List<String> jobIds) throws\r
153          * DrmaaException { return waitForJobs(jobIds,\r
154          * Session.TIMEOUT_WAIT_FOREVER); }\r
155          * \r
156          * public List<JobInfo> waitForJobs(List<String> jobIds, long waitingTime)\r
157          * throws DrmaaException { if (!open) { throw new\r
158          * IllegalStateException("Session is already closed!"); } assert jobIds !=\r
159          * null && jobIds.size() > 1;\r
160          * \r
161          * session.synchronize(jobIds, waitingTime, false); List<JobInfo> jobsInfo =\r
162          * new ArrayList<JobInfo>(jobIds.size()); for (String jobId : jobIds) {\r
163          * jobsInfo.add(waitForJob(jobId)); } return jobsInfo; }\r
164          */\r
165 \r
166         /*\r
167          * public List<JobInfo> waitForAll() throws DrmaaException { assert\r
168          * jobs.size() > 0; return waitForJobs(new\r
169          * ArrayList<String>(ClusterSession.jobs.keySet())); }\r
170          * \r
171          * \r
172          * public void waitForAll_DropStatistics() throws DrmaaException { assert\r
173          * jobs.size() > 0; session.synchronize(new ArrayList<String>(Collections\r
174          * .unmodifiableCollection(ClusterSession.jobs.keySet())),\r
175          * Session.TIMEOUT_WAIT_FOREVER, true); }\r
176          */\r
177 \r
178         public JobInfo waitForJob(String taskId) throws DrmaaException, IOException {\r
179                 // String clusterJobId = ClusterSession.getClusterJobId(jobId);\r
180                 return waitForJob(taskId, Session.TIMEOUT_WAIT_FOREVER);\r
181         }\r
182 \r
183         public static ClusterJobId getClusterJobId(String taskId) throws IOException {\r
184                 Job job = Job.getByTaskId(taskId, jobs);\r
185                 if (job != null) {\r
186                         return job.getJobId();\r
187                 }\r
188                 // The job must have been removed from the task list use work\r
189                 // directory to find out jobid\r
190                 String workDir = compbio.engine.Configurator.getWorkDirectory(taskId);\r
191                 assert !Util.isEmpty(workDir);\r
192                 File file = new File(workDir, JOBID);\r
193                 log.debug("Looking up cluster jobid by the task id " + taskId + " File path is " + file.getAbsolutePath());\r
194                 assert file.exists();\r
195                 return new ClusterJobId(FileUtil.readFileToString(file));\r
196         }\r
197 \r
198         public JobInfo waitForJob(String jobId, long waitingTime) throws DrmaaException, IOException {\r
199                 ClusterJobId cjobId = getClusterJobId(jobId);\r
200                 JobInfo status = session.wait(cjobId.getJobId(), waitingTime);\r
201                 // Once the job has been waited for it will be finished\r
202                 // Next time it will not be found in the session, so removed from the\r
203                 // job list\r
204                 EngineUtil.writeStatFile(Configurator.getWorkDirectory(jobId), JobStatus.FINISHED.toString());\r
205 \r
206                 return status;\r
207         }\r
208 \r
209         private static void removeJobFromListbyTaskId(String taskId) {\r
210                 assert !Util.isEmpty(taskId);\r
211                 Job job = Job.getByTaskId(taskId, jobs);\r
212                 if (job != null) {\r
213                         log.debug("Removing taskId" + taskId + " from cluster job list");\r
214                         jobs.remove(job);\r
215                 }\r
216         }\r
217 \r
218         public ConfiguredExecutable<?> getResults(String taskId) throws DrmaaException, ResultNotAvailableException {\r
219 \r
220                 EngineUtil.isValidJobId(taskId);\r
221                 try {\r
222                         JobInfo status = waitForJob(taskId);\r
223                 } catch (InvalidJobException e) {\r
224                         // Its OK to continue, the job may have already completed normally\r
225                         log.warn("Could not find the cluster job with id " + taskId + " perhaps it has completed", e.getCause());\r
226                 } catch (IOException e) {\r
227                         log.error("Could not read JOBID file for the job " + taskId + " Message " + e.getLocalizedMessage(),\r
228                                         e.getCause());\r
229                 }\r
230                 // Once the job has been waited for it will be finished\r
231                 // Next time it will not be found in the session, so removed from the\r
232                 // job list\r
233                 ConfiguredExecutable<?> exec = null;\r
234                 Job job = Job.getByTaskId(taskId, jobs);\r
235                 if (job != null) {\r
236                         exec = job.getConfExecutable();\r
237                         removeJobFromListbyTaskId(taskId);\r
238                 } else {\r
239                         // If task was not find in the list of jobs, than it must have been\r
240                         // collected already\r
241                         // Resurrect the job to find out there the output is\r
242                         exec = EngineUtil.loadExecutable(taskId);\r
243                 }\r
244                 if (exec != null) {\r
245                         EngineUtil.writeMarker(exec.getWorkDirectory(), JobStatus.COLLECTED);\r
246                 }\r
247                 return exec;\r
248         }\r
249 \r
250         public static StatisticManager getStatistics(JobInfo status) throws DrmaaException {\r
251                 return new StatisticManager(status);\r
252         }\r
253 \r
254         static void logStatistics(JobInfo status) throws DrmaaException {\r
255                 log.info(getStatistics(status).getAllStats());\r
256         }\r
257 \r
258         /**\r
259          * Apparently completed jobs cannot be found! If this happened most likely\r
260          * that the job is not running any more and Most likely it has been\r
261          * cancelled, finished or failed.\r
262          * \r
263          * @throws InvalidJobException\r
264          *             if the job is no longer in the queue or running. basically it\r
265          *             will throw this exception for all finished or cancelled jobs\r
266          */\r
267         public int getJobStatus(ClusterJobId jobId) throws DrmaaException, InvalidJobException {\r
268                 return session.getJobProgramStatus(jobId.getJobId());\r
269         }\r
270 \r
271         /**\r
272          * Method for getting jobs status by quering the cluster, It returns status\r
273          * in therms of a Sessions, not a JobStatus Should only be used for testing!\r
274          * \r
275          * @param status\r
276          * @return job status\r
277          * @throws DrmaaException\r
278          */\r
279         @Deprecated\r
280         public static String getJobStatus(final int status) throws DrmaaException {\r
281                 String statusString = null;\r
282                 switch (status) {\r
283                 case Session.UNDETERMINED:\r
284                         statusString = "Job status cannot be determined\n";\r
285                         break;\r
286                 case Session.QUEUED_ACTIVE:\r
287                         statusString = "Job is queued and active\n";\r
288                         break;\r
289                 case Session.SYSTEM_ON_HOLD:\r
290                         statusString = "Job is queued and in system hold\n";\r
291                         break;\r
292                 case Session.USER_ON_HOLD:\r
293                         statusString = "Job is queued and in user hold\n";\r
294                         break;\r
295                 case Session.USER_SYSTEM_ON_HOLD:\r
296                         statusString = "Job is queued and in user and system hold\n";\r
297                         break;\r
298                 case Session.RUNNING:\r
299                         statusString = "Job is running\n";\r
300                         break;\r
301                 case Session.SYSTEM_SUSPENDED:\r
302                         statusString = "Job is system suspended\n";\r
303                         break;\r
304                 case Session.USER_SUSPENDED:\r
305                         statusString = "Job is user suspended\n";\r
306                         break;\r
307                 case Session.USER_SYSTEM_SUSPENDED:\r
308                         statusString = "Job is user and system suspended\n";\r
309                         break;\r
310                 case Session.DONE:\r
311                         statusString = "Job finished normally\n";\r
312                         break;\r
313                 case Session.FAILED:\r
314                         statusString = "Job finished, but failed\n";\r
315                         break;\r
316                 }\r
317                 return statusString;\r
318         }\r
319 \r
320 }\r