1 /* Copyright (c) 2009 Peter Troshin
\r
3 * JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0
\r
5 * This library is free software; you can redistribute it and/or modify it under the terms of the
\r
6 * Apache License version 2 as published by the Apache Software Foundation
\r
8 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
\r
9 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
\r
10 * License for more details.
\r
12 * A copy of the license is in apache_license.txt. It is also available here:
\r
13 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
\r
15 * Any republication or derived work distributed in source code form
\r
16 * must include this copyright and license notice.
\r
19 package compbio.engine.cluster.drmaa;
\r
21 import java.io.File;
\r
22 import java.io.IOException;
\r
23 import java.util.Calendar;
\r
24 import java.util.List;
\r
25 import java.util.concurrent.CopyOnWriteArrayList;
\r
27 import org.apache.log4j.Logger;
\r
28 import org.ggf.drmaa.DrmaaException;
\r
29 import org.ggf.drmaa.InvalidJobException;
\r
30 import org.ggf.drmaa.JobInfo;
\r
31 import org.ggf.drmaa.Session;
\r
32 import org.ggf.drmaa.SessionFactory;
\r
34 import compbio.engine.ClusterJobId;
\r
35 import compbio.engine.Job;
\r
36 import compbio.engine.client.ConfiguredExecutable;
\r
37 import compbio.engine.client.PathValidator;
\r
38 import compbio.engine.conf.PropertyHelperManager;
\r
39 import compbio.metadata.JobStatus;
\r
40 import compbio.metadata.ResultNotAvailableException;
\r
41 import compbio.util.FileUtil;
\r
42 import compbio.util.PropertyHelper;
\r
43 import compbio.util.Util;
\r
45 public final class ClusterSession {
\r
47 private static final Logger log = Logger.getLogger(ClusterSession.class);
\r
49 private static final PropertyHelper ph = PropertyHelperManager
\r
50 .getPropertyHelper();
\r
52 public static final String JOBID = "JOBID";
\r
53 // TaskId (getTaskDirectory()) -> ConfiguredExecutable<?> map
\r
54 // Cluster jobId is only stored in a file
\r
55 // static final Map<JobId, ConfiguredExecutable<?>> jobs = new
\r
56 // ConcurrentHashMap<JobId, ConfiguredExecutable<?>>();
\r
58 static final List<Job> jobs = new CopyOnWriteArrayList<Job>();
\r
59 private static boolean open = true;
\r
61 // Keep this at the end of other static initializers to avoid making
\r
62 // incomplete instance!
\r
63 private static final ClusterSession INSTANCE = new ClusterSession();
\r
65 private final Session session;
\r
66 // can be used in init method to reconnect the the session
\r
67 private final String sContact;
\r
69 // TODO deside whether the task list is needed!
\r
70 // private static BufferedWriter tasks;
\r
72 private ClusterSession() {
\r
73 log.debug("Initializing session "
\r
74 + Util.datef.format(Calendar.getInstance().getTime()));
\r
75 SessionFactory factory = SessionFactory.getFactory();
\r
76 session = factory.getSession();
\r
77 sContact = session.getContact();
\r
80 * String tasksFileName = ph.getProperty("cluster.tasks.file"); File
\r
81 * taskFile = new File(tasksFileName); if(!taskFile.exists()) {
\r
82 * taskFile.createNewFile(); } tasks = new BufferedWriter(new
\r
83 * PrintWriter(taskFile));
\r
86 // Make sure that the session is going to be properly closed
\r
87 Runtime.getRuntime().addShutdownHook(new Thread() {
\r
91 * try { if(tasks!=null) { tasks.close(); } }
\r
92 * catch(IOException e) { log.error(e.getMessage()); }
\r
98 } catch (DrmaaException e) {
\r
99 log.error(e.getMessage());
\r
102 * throw new RuntimeException("Could not create task file! " +
\r
103 * "Please check that Engine.cluster.properties " +
\r
104 * "file is provided and contains cluster.tasks.file property. " +
\r
105 * "This property should contain the file name " +
\r
106 * "for storing tasks ids! Cause: " + e.getMessage());
\r
111 synchronized static ClusterSession getInstance() {
\r
115 public Session getSession() {
\r
116 return INSTANCE.session;
\r
119 public void close() {
\r
124 log.debug("Closing the session at: "
\r
125 + Util.datef.format(Calendar.getInstance().getTime()));
\r
127 } catch (DrmaaException dre) {
\r
128 // Cannot recover at this point, just log
\r
129 dre.printStackTrace();
\r
133 void addJob(String jobId, ConfiguredExecutable<?> executable) {
\r
134 String taskDirectory = executable.getTaskId();
\r
135 assert !PathValidator.isValidDirectory(taskDirectory) : "Directory provided is not valid! Directory: "
\r
137 assert !Util.isEmpty(jobId);
\r
139 compbio.engine.client.Util.writeStatFile(executable.getWorkDirectory(),
\r
140 JobStatus.SUBMITTED.toString());
\r
141 compbio.engine.client.Util.writeFile(executable.getWorkDirectory(),
\r
142 JOBID, jobId, false);
\r
143 log.debug("Adding taskId: " + taskDirectory + " to cluster job list");
\r
144 assert compbio.engine.client.Util.isValidJobId(taskDirectory);
\r
145 jobs.add(new Job(taskDirectory, jobId, executable));
\r
148 public void removeJob(String taskId) {
\r
149 assert !Util.isEmpty(taskId);
\r
150 assert compbio.engine.client.Util.isValidJobId(taskId);
\r
151 removeJobFromListbyTaskId(taskId);
\r
155 * public List<JobInfo> waitForJobs(List<String> jobIds) throws
\r
156 * DrmaaException { return waitForJobs(jobIds,
\r
157 * Session.TIMEOUT_WAIT_FOREVER); }
\r
159 * public List<JobInfo> waitForJobs(List<String> jobIds, long waitingTime)
\r
160 * throws DrmaaException { if (!open) { throw new
\r
161 * IllegalStateException("Session is already closed!"); } assert jobIds !=
\r
162 * null && jobIds.size() > 1;
\r
164 * session.synchronize(jobIds, waitingTime, false); List<JobInfo> jobsInfo =
\r
165 * new ArrayList<JobInfo>(jobIds.size()); for (String jobId : jobIds) {
\r
166 * jobsInfo.add(waitForJob(jobId)); } return jobsInfo; }
\r
170 * public List<JobInfo> waitForAll() throws DrmaaException { assert
\r
171 * jobs.size() > 0; return waitForJobs(new
\r
172 * ArrayList<String>(ClusterSession.jobs.keySet())); }
\r
175 * public void waitForAll_DropStatistics() throws DrmaaException { assert
\r
176 * jobs.size() > 0; session.synchronize(new ArrayList<String>(Collections
\r
177 * .unmodifiableCollection(ClusterSession.jobs.keySet())),
\r
178 * Session.TIMEOUT_WAIT_FOREVER, true); }
\r
181 public JobInfo waitForJob(String taskId) throws DrmaaException, IOException {
\r
182 // String clusterJobId = ClusterSession.getClusterJobId(jobId);
\r
183 return waitForJob(taskId, Session.TIMEOUT_WAIT_FOREVER);
\r
186 public static ClusterJobId getClusterJobId(String taskId)
\r
187 throws IOException {
\r
188 Job job = Job.getByTaskId(taskId, jobs);
\r
190 return job.getJobId();
\r
192 // The job must have been removed from the task list use work
\r
193 // directory to find out jobid
\r
194 String workDir = compbio.engine.Configurator.getWorkDirectory(taskId);
\r
195 assert !Util.isEmpty(workDir);
\r
196 File file = new File(workDir, JOBID);
\r
197 log.debug("Looking up cluster jobid by the task id " + taskId
\r
198 + " File path is " + file.getAbsolutePath());
\r
199 assert file.exists();
\r
200 return new ClusterJobId(FileUtil.readFileToString(file));
\r
203 public JobInfo waitForJob(String jobId, long waitingTime)
\r
204 throws DrmaaException, IOException {
\r
205 ClusterJobId cjobId = getClusterJobId(jobId);
\r
206 JobInfo status = session.wait(cjobId.getJobId(), waitingTime);
\r
207 // Once the job has been waited for it will be finished
\r
208 // Next time it will not be found in the session, so removed from the
\r
210 compbio.engine.client.Util.writeStatFile(
\r
211 compbio.engine.Configurator.getWorkDirectory(jobId),
\r
212 JobStatus.FINISHED.toString());
\r
217 private static void removeJobFromListbyTaskId(String taskId) {
\r
218 assert !Util.isEmpty(taskId);
\r
219 Job job = Job.getByTaskId(taskId, jobs);
\r
221 log.debug("Removing taskId" + taskId + " from cluster job list");
\r
226 public ConfiguredExecutable<?> getResults(String taskId)
\r
227 throws DrmaaException, ResultNotAvailableException {
\r
229 compbio.engine.client.Util.isValidJobId(taskId);
\r
231 JobInfo status = waitForJob(taskId);
\r
232 } catch (InvalidJobException e) {
\r
233 // Its OK to continue, the job may have already completed normally
\r
234 log.warn("Could not find the cluster job with id " + taskId
\r
235 + " perhaps it has completed", e.getCause());
\r
236 } catch (IOException e) {
\r
237 log.error("Could not read JOBID file for the job " + taskId
\r
238 + " Message " + e.getLocalizedMessage(), e.getCause());
\r
240 // Once the job has been waited for it will be finished
\r
241 // Next time it will not be found in the session, so removed from the
\r
243 ConfiguredExecutable<?> exec = null;
\r
244 Job job = Job.getByTaskId(taskId, jobs);
\r
246 exec = job.getConfExecutable();
\r
247 removeJobFromListbyTaskId(taskId);
\r
249 // If task was not find in the list of jobs, than it must have been
\r
250 // collected already
\r
251 // Resurrect the job to find out there the output is
\r
252 exec = compbio.engine.client.Util.loadExecutable(taskId);
\r
254 if (exec != null) {
\r
255 compbio.engine.client.Util.writeMarker(exec.getWorkDirectory(),
\r
256 JobStatus.COLLECTED);
\r
261 public static StatisticManager getStatistics(JobInfo status)
\r
262 throws DrmaaException {
\r
263 return new StatisticManager(status);
\r
266 static void logStatistics(JobInfo status) throws DrmaaException {
\r
267 log.info(getStatistics(status).getAllStats());
\r
271 * Apparently completed jobs cannot be found! If this happened most likely
\r
272 * that the job is not running any more and Most likely it has been
\r
273 * cancelled, finished or failed.
\r
275 * @throws InvalidJobException
\r
276 * if the job is no longer in the queue or running. basically it
\r
277 * will throw this exception for all finished or cancelled jobs
\r
279 public int getJobStatus(ClusterJobId jobId) throws DrmaaException,
\r
280 InvalidJobException {
\r
281 return session.getJobProgramStatus(jobId.getJobId());
\r
285 * Method for getting jobs status by quering the cluster, It returns status
\r
286 * in therms of a Sessions, not a JobStatus Should only be used for testing!
\r
289 * @return job status
\r
290 * @throws DrmaaException
\r
293 public static String getJobStatus(final int status) throws DrmaaException {
\r
294 String statusString = null;
\r
296 case Session.UNDETERMINED :
\r
297 statusString = "Job status cannot be determined\n";
\r
299 case Session.QUEUED_ACTIVE :
\r
300 statusString = "Job is queued and active\n";
\r
302 case Session.SYSTEM_ON_HOLD :
\r
303 statusString = "Job is queued and in system hold\n";
\r
305 case Session.USER_ON_HOLD :
\r
306 statusString = "Job is queued and in user hold\n";
\r
308 case Session.USER_SYSTEM_ON_HOLD :
\r
309 statusString = "Job is queued and in user and system hold\n";
\r
311 case Session.RUNNING :
\r
312 statusString = "Job is running\n";
\r
314 case Session.SYSTEM_SUSPENDED :
\r
315 statusString = "Job is system suspended\n";
\r
317 case Session.USER_SUSPENDED :
\r
318 statusString = "Job is user suspended\n";
\r
320 case Session.USER_SYSTEM_SUSPENDED :
\r
321 statusString = "Job is user and system suspended\n";
\r
323 case Session.DONE :
\r
324 statusString = "Job finished normally\n";
\r
326 case Session.FAILED :
\r
327 statusString = "Job finished, but failed\n";
\r
330 return statusString;
\r