/* Copyright (c) 2009 Peter Troshin * * JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0 * * This library is free software; you can redistribute it and/or modify it under the terms of the * Apache License version 2 as published by the Apache Software Foundation * * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache * License for more details. * * A copy of the license is in apache_license.txt. It is also available here: * @see: http://www.apache.org/licenses/LICENSE-2.0.txt * * Any republication or derived work distributed in source code form * must include this copyright and license notice. */ package compbio.engine.cluster.drmaa; import java.io.File; import java.io.IOException; import java.util.Calendar; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; import org.ggf.drmaa.DrmaaException; import org.ggf.drmaa.InvalidJobException; import org.ggf.drmaa.JobInfo; import org.ggf.drmaa.Session; import org.ggf.drmaa.SessionFactory; import compbio.engine.ClusterJobId; import compbio.engine.Job; import compbio.engine.Configurator; import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.PathValidator; import compbio.engine.client.EngineUtil; import compbio.engine.conf.PropertyHelperManager; import compbio.metadata.JobStatus; import compbio.metadata.ResultNotAvailableException; import compbio.util.FileUtil; import compbio.util.PropertyHelper; import compbio.util.Util; public final class ClusterSession { private static final Logger log = Logger.getLogger(ClusterSession.class); private static final PropertyHelper ph = PropertyHelperManager.getPropertyHelper(); public static final String JOBID = "JOBID"; // TaskId (getTaskDirectory()) -> ConfiguredExecutable map // Cluster jobId is only stored in a file // static final Map> jobs = new // ConcurrentHashMap>(); static final List jobs = new CopyOnWriteArrayList(); private static boolean open = true; // Keep this at the end of other static initializers to avoid making // incomplete instance! private static final ClusterSession INSTANCE = new ClusterSession(); private final Session session; // can be used in init method to reconnect the the session private final String sContact; // TODO deside whether the task list is needed! // private static BufferedWriter tasks; private ClusterSession() { log.debug("Initializing session " + Util.datef.format(Calendar.getInstance().getTime())); SessionFactory factory = SessionFactory.getFactory(); session = factory.getSession(); sContact = session.getContact(); try { /* * String tasksFileName = ph.getProperty("cluster.tasks.file"); File * taskFile = new File(tasksFileName); if(!taskFile.exists()) { * taskFile.createNewFile(); } tasks = new BufferedWriter(new * PrintWriter(taskFile)); */ session.init(null); // Make sure that the session is going to be properly closed Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { /* * try { if(tasks!=null) { tasks.close(); } } * catch(IOException e) { log.error(e.getMessage()); } */ close(); } }); } catch (DrmaaException e) { log.error(e.getMessage()); } /* * throw new RuntimeException("Could not create task file! " + * "Please check that Engine.cluster.properties " + * "file is provided and contains cluster.tasks.file property. " + * "This property should contain the file name " + * "for storing tasks ids! Cause: " + e.getMessage()); */ } synchronized static ClusterSession getInstance() { return INSTANCE; } public Session getSession() { return INSTANCE.session; } public void close() { try { if (open) { session.exit(); open = false; log.debug("Closing the session at: " + Util.datef.format(Calendar.getInstance().getTime())); } } catch (DrmaaException dre) { // Cannot recover at this point, just log dre.printStackTrace(); } } void addJob(String jobId, ConfiguredExecutable executable) { String taskDirectory = executable.getTaskId(); assert !PathValidator.isValidDirectory(taskDirectory) : "Directory provided is not valid! Directory: " + taskDirectory; assert !Util.isEmpty(jobId); EngineUtil.writeStatFile(executable.getWorkDirectory(), JobStatus.SUBMITTED.toString()); EngineUtil.writeFile(executable.getWorkDirectory(), JOBID, jobId, false); log.debug("Adding taskId: " + taskDirectory + " to cluster job list"); assert EngineUtil.isValidJobId(taskDirectory); jobs.add(new Job(taskDirectory, jobId, executable)); } public void removeJob(String taskId) { assert !Util.isEmpty(taskId); assert EngineUtil.isValidJobId(taskId); removeJobFromListbyTaskId(taskId); } /* * public List waitForJobs(List jobIds) throws * DrmaaException { return waitForJobs(jobIds, * Session.TIMEOUT_WAIT_FOREVER); } * * public List waitForJobs(List jobIds, long waitingTime) * throws DrmaaException { if (!open) { throw new * IllegalStateException("Session is already closed!"); } assert jobIds != * null && jobIds.size() > 1; * * session.synchronize(jobIds, waitingTime, false); List jobsInfo = * new ArrayList(jobIds.size()); for (String jobId : jobIds) { * jobsInfo.add(waitForJob(jobId)); } return jobsInfo; } */ /* * public List waitForAll() throws DrmaaException { assert * jobs.size() > 0; return waitForJobs(new * ArrayList(ClusterSession.jobs.keySet())); } * * * public void waitForAll_DropStatistics() throws DrmaaException { assert * jobs.size() > 0; session.synchronize(new ArrayList(Collections * .unmodifiableCollection(ClusterSession.jobs.keySet())), * Session.TIMEOUT_WAIT_FOREVER, true); } */ public JobInfo waitForJob(String taskId) throws DrmaaException, IOException { // String clusterJobId = ClusterSession.getClusterJobId(jobId); return waitForJob(taskId, Session.TIMEOUT_WAIT_FOREVER); } public static ClusterJobId getClusterJobId(String taskId) throws IOException { Job job = Job.getByTaskId(taskId, jobs); if (job != null) { return job.getJobId(); } // The job must have been removed from the task list use work // directory to find out jobid String workDir = compbio.engine.Configurator.getWorkDirectory(taskId); assert !Util.isEmpty(workDir); File file = new File(workDir, JOBID); log.debug("Looking up cluster jobid by the task id " + taskId + " File path is " + file.getAbsolutePath()); assert file.exists(); return new ClusterJobId(FileUtil.readFileToString(file)); } public JobInfo waitForJob(String jobId, long waitingTime) throws DrmaaException, IOException { ClusterJobId cjobId = getClusterJobId(jobId); JobInfo status = session.wait(cjobId.getJobId(), waitingTime); // Once the job has been waited for it will be finished // Next time it will not be found in the session, so removed from the // job list EngineUtil.writeStatFile(Configurator.getWorkDirectory(jobId), JobStatus.FINISHED.toString()); return status; } private static void removeJobFromListbyTaskId(String taskId) { assert !Util.isEmpty(taskId); Job job = Job.getByTaskId(taskId, jobs); if (job != null) { log.debug("Removing taskId" + taskId + " from cluster job list"); jobs.remove(job); } } public ConfiguredExecutable getResults(String taskId) throws DrmaaException, ResultNotAvailableException { EngineUtil.isValidJobId(taskId); try { JobInfo status = waitForJob(taskId); } catch (InvalidJobException e) { // Its OK to continue, the job may have already completed normally log.warn("Could not find the cluster job with id " + taskId + " perhaps it has completed", e.getCause()); } catch (IOException e) { log.error("Could not read JOBID file for the job " + taskId + " Message " + e.getLocalizedMessage(), e.getCause()); } // Once the job has been waited for it will be finished // Next time it will not be found in the session, so removed from the // job list ConfiguredExecutable exec = null; Job job = Job.getByTaskId(taskId, jobs); if (job != null) { exec = job.getConfExecutable(); removeJobFromListbyTaskId(taskId); } else { // If task was not find in the list of jobs, than it must have been // collected already // Resurrect the job to find out there the output is exec = EngineUtil.loadExecutable(taskId); } if (exec != null) { EngineUtil.writeMarker(exec.getWorkDirectory(), JobStatus.COLLECTED); } return exec; } public static StatisticManager getStatistics(JobInfo status) throws DrmaaException { return new StatisticManager(status); } static void logStatistics(JobInfo status) throws DrmaaException { log.info(getStatistics(status).getAllStats()); } /** * Apparently completed jobs cannot be found! If this happened most likely * that the job is not running any more and Most likely it has been * cancelled, finished or failed. * * @throws InvalidJobException * if the job is no longer in the queue or running. basically it * will throw this exception for all finished or cancelled jobs */ public int getJobStatus(ClusterJobId jobId) throws DrmaaException, InvalidJobException { return session.getJobProgramStatus(jobId.getJobId()); } /** * Method for getting jobs status by quering the cluster, It returns status * in therms of a Sessions, not a JobStatus Should only be used for testing! * * @param status * @return job status * @throws DrmaaException */ @Deprecated public static String getJobStatus(final int status) throws DrmaaException { String statusString = null; switch (status) { case Session.UNDETERMINED: statusString = "Job status cannot be determined\n"; break; case Session.QUEUED_ACTIVE: statusString = "Job is queued and active\n"; break; case Session.SYSTEM_ON_HOLD: statusString = "Job is queued and in system hold\n"; break; case Session.USER_ON_HOLD: statusString = "Job is queued and in user hold\n"; break; case Session.USER_SYSTEM_ON_HOLD: statusString = "Job is queued and in user and system hold\n"; break; case Session.RUNNING: statusString = "Job is running\n"; break; case Session.SYSTEM_SUSPENDED: statusString = "Job is system suspended\n"; break; case Session.USER_SUSPENDED: statusString = "Job is user suspended\n"; break; case Session.USER_SYSTEM_SUSPENDED: statusString = "Job is user and system suspended\n"; break; case Session.DONE: statusString = "Job finished normally\n"; break; case Session.FAILED: statusString = "Job finished, but failed\n"; break; } return statusString; } }