From: Sasha Sherstnev Date: Thu, 26 Sep 2013 13:30:04 +0000 (+0100) Subject: Refactoring (renaming) 2 classes: AsyncJobRunner.java -> AsyncClusterRunner.java... X-Git-Url: http://source.jalview.org/gitweb/?a=commitdiff_plain;h=b56011664ec5d11bb5d0cd09395e646586888700;p=jabaws.git Refactoring (renaming) 2 classes: AsyncJobRunner.java -> AsyncClusterRunner.java and JobRunner.java -> ClusterRunner.java --- diff --git a/engine/compbio/engine/Configurator.java b/engine/compbio/engine/Configurator.java index 1aa18ea..3feeab6 100644 --- a/engine/compbio/engine/Configurator.java +++ b/engine/compbio/engine/Configurator.java @@ -30,8 +30,8 @@ import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.PathValidator; import compbio.engine.client.EngineUtil; -import compbio.engine.cluster.drmaa.AsyncJobRunner; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.AsyncClusterRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.conf.DirectoryManager; import compbio.engine.conf.PropertyHelperManager; import compbio.engine.local.AsyncLocalRunner; @@ -51,8 +51,6 @@ public class Configurator { public final static String LOCAL_WORK_DIRECTORY = initLocalDirectory(); public final static String CLUSTER_WORK_DIRECTORY = initClusterWorkDirectory(); - - private static boolean initBooleanValue(String key) { assert key != null; String status = ph.getProperty(key); @@ -70,13 +68,10 @@ public class Configurator { if (!Util.isEmpty(tmpDir)) { tmpDir = tmpDir.trim(); } else { - throw new RuntimeException( - "Cluster work directory must be provided! "); + throw new RuntimeException("Cluster work directory must be provided! "); } - if (LOCAL_WORK_DIRECTORY != null - && LOCAL_WORK_DIRECTORY.equals(CLUSTER_WORK_DIRECTORY)) { - throw new InvalidParameterException( - "Cluster engine output directory must be different of that for local engine!"); + if (LOCAL_WORK_DIRECTORY != null && LOCAL_WORK_DIRECTORY.equals(CLUSTER_WORK_DIRECTORY)) { + throw new InvalidParameterException("Cluster engine output directory must be different of that for local engine!"); } } return tmpDir; @@ -105,8 +100,7 @@ public class Configurator { * {@link LoadBalancer} for an engine. This method will fall back and return * local engine if * - * 1) No engines are defined in the properties or they have been defined - * incorrectly + * 1) No engines are defined in the properties or they have been defined incorrectly * * 2) Execution environment is Windows as the system cannot really run * cluster submission from windows @@ -115,32 +109,27 @@ public class Configurator { * @return SyncExecutor backed up by either cluster or local engines * @throws JobSubmissionException */ - static Executable.ExecProvider getExecProvider( - ConfiguredExecutable executable, List dataSet) + static Executable.ExecProvider getExecProvider(ConfiguredExecutable executable, List dataSet) throws JobSubmissionException { // Look where executable claims to be executed Executable.ExecProvider provider = executable.getSupportedRuntimes(); if (!IS_CLUSTER_ENGINE_ENABLED && !IS_LOCAL_ENGINE_ENABLED) { // Both engines disabled! - throw new RuntimeException( - "Both engines are disabled! " - + "Check conf/Engine.cluster.properties and conf/Engine.local.properties. " - + "At least one engine must be enabled!"); + throw new RuntimeException("Both engines are disabled! " + + "Check conf/Engine.cluster.properties and conf/Engine.local.properties. At least one engine must be enabled!"); } if (provider == Executable.ExecProvider.Local) { if (IS_LOCAL_ENGINE_ENABLED) { return Executable.ExecProvider.Local; } else { - throw new JobSubmissionException( - "Executable can be executed only on locally, but local engine is disabled!"); + throw new JobSubmissionException("Executable can be executed only on locally, but local engine is disabled!"); } } if (provider == Executable.ExecProvider.Cluster) { if (IS_CLUSTER_ENGINE_ENABLED) { return Executable.ExecProvider.Cluster; } else { - throw new JobSubmissionException( - "Executable can be executed only on the cluster, but cluster engine is disabled!"); + throw new JobSubmissionException("Executable can be executed only on the cluster, but cluster engine is disabled!"); } } // We are here if executable can be executed on both Cluster and Local @@ -172,118 +161,99 @@ public class Configurator { return Executable.ExecProvider.Local; } - public static ConfiguredExecutable configureExecutable( - Executable executable) throws JobSubmissionException { + public static ConfiguredExecutable configureExecutable(Executable executable) throws JobSubmissionException { - ConfExecutable confExec = new ConfExecutable(executable, - DirectoryManager.getTaskDirectory(executable.getClass())); + ConfExecutable confExec = new ConfExecutable(executable, DirectoryManager.getTaskDirectory(executable.getClass())); Executable.ExecProvider provider = getExecProvider(confExec, null); confExec.setExecProvider(provider); setupWorkDirectory(confExec, provider); return confExec; } - public static ConfiguredExecutable configureExecutable( - Executable executable, List dataSet) + public static ConfiguredExecutable configureExecutable(Executable executable, List dataSet) throws JobSubmissionException { - ConfExecutable confExec = new ConfExecutable(executable, - DirectoryManager.getTaskDirectory(executable.getClass())); + ConfExecutable confExec = new ConfExecutable(executable, DirectoryManager.getTaskDirectory(executable.getClass())); Executable.ExecProvider provider = getExecProvider(confExec, dataSet); confExec.setExecProvider(provider); setupWorkDirectory(confExec, provider); return confExec; } - static void setupWorkDirectory(ConfExecutable confExec, - Executable.ExecProvider provider) { + static void setupWorkDirectory(ConfExecutable confExec, Executable.ExecProvider provider) { assert provider != null && provider != Executable.ExecProvider.Any; String workDir = ""; if (provider == Executable.ExecProvider.Local) { - workDir = Configurator.LOCAL_WORK_DIRECTORY + File.separator - + confExec.getTaskId(); + workDir = Configurator.LOCAL_WORK_DIRECTORY + File.separator + confExec.getTaskId(); } else { - workDir = Configurator.CLUSTER_WORK_DIRECTORY + File.separator - + confExec.getTaskId(); + workDir = Configurator.CLUSTER_WORK_DIRECTORY + File.separator + confExec.getTaskId(); } // Create working directory for the task File wdir = new File(workDir); wdir.mkdir(); - log.info("Creating working directory for the task in: " - + wdir.getAbsolutePath()); + log.info("Creating working directory for the task in: " + wdir.getAbsolutePath()); // Tell the executable where to get the results confExec.setWorkDirectory(workDir); } - public static ConfiguredExecutable configureExecutable( - Executable executable, Executable.ExecProvider provider) + public static ConfiguredExecutable configureExecutable(Executable executable, Executable.ExecProvider provider) throws JobSubmissionException { if (executable == null) { throw new InvalidParameterException("Executable must be provided!"); } - ConfExecutable confExec = new ConfExecutable(executable, - DirectoryManager.getTaskDirectory(executable.getClass())); - if (provider == Executable.ExecProvider.Cluster - && !IS_CLUSTER_ENGINE_ENABLED) { - throw new JobSubmissionException( - "Cluster engine is disabled or not configured!"); + ConfExecutable confExec = new ConfExecutable(executable, DirectoryManager.getTaskDirectory(executable.getClass())); + if (provider == Executable.ExecProvider.Cluster && !IS_CLUSTER_ENGINE_ENABLED) { + throw new JobSubmissionException("Cluster engine is disabled or not configured!"); } - if (provider == Executable.ExecProvider.Local - && !IS_LOCAL_ENGINE_ENABLED) { - throw new JobSubmissionException( - "Local engine is disabled or not configured!"); + if (provider == Executable.ExecProvider.Local && !IS_LOCAL_ENGINE_ENABLED) { + throw new JobSubmissionException("Local engine is disabled or not configured!"); } confExec.setExecProvider(provider); setupWorkDirectory(confExec, provider); return confExec; } - public static AsyncExecutor getAsyncEngine( - ConfiguredExecutable executable, Executable.ExecProvider provider) { + public static AsyncExecutor getAsyncEngine(ConfiguredExecutable executable, Executable.ExecProvider provider) { assert provider != Executable.ExecProvider.Any && provider != null; if (provider == Executable.ExecProvider.Cluster) { - return new AsyncJobRunner(); + return new AsyncClusterRunner(); } return new AsyncLocalRunner(); } - public static SyncExecutor getSyncEngine( - ConfiguredExecutable executable, Executable.ExecProvider provider) + public static SyncExecutor getSyncEngine(ConfiguredExecutable executable, Executable.ExecProvider provider) throws JobSubmissionException { assert provider != Executable.ExecProvider.Any && provider != null; if (provider == Executable.ExecProvider.Cluster) { - return JobRunner.getInstance(executable); + return ClusterRunner.getInstance(executable); } return new LocalRunner(executable); } - public static AsyncExecutor getAsyncEngine( - ConfiguredExecutable executable) { + public static AsyncExecutor getAsyncEngine(ConfiguredExecutable executable) { if (isTargetedForLocalExecution(executable)) { return new AsyncLocalRunner(); } - return new AsyncJobRunner(); + return new AsyncClusterRunner(); } public static AsyncExecutor getAsyncEngine(String taskId) { if (isLocal(taskId)) { return new AsyncLocalRunner(); } - return new AsyncJobRunner(); + return new AsyncClusterRunner(); } - public static SyncExecutor getSyncEngine(ConfiguredExecutable executable) - throws JobSubmissionException { + public static SyncExecutor getSyncEngine(ConfiguredExecutable executable) throws JobSubmissionException { if (isTargetedForLocalExecution(executable)) { return new LocalRunner(executable); } - return JobRunner.getInstance(executable); + return ClusterRunner.getInstance(executable); } - static boolean isTargetedForLocalExecution( - ConfiguredExecutable executable) { + static boolean isTargetedForLocalExecution(ConfiguredExecutable executable) { // In the uncommon case that the cluster and local execution temporary // directories are the same, in this case the method return true anyway diff --git a/engine/compbio/engine/cluster/drmaa/AsyncJobRunner.java b/engine/compbio/engine/cluster/drmaa/AsyncClusterRunner.java similarity index 77% rename from engine/compbio/engine/cluster/drmaa/AsyncJobRunner.java rename to engine/compbio/engine/cluster/drmaa/AsyncClusterRunner.java index 1cd3197..9cd4b4f 100644 --- a/engine/compbio/engine/cluster/drmaa/AsyncJobRunner.java +++ b/engine/compbio/engine/cluster/drmaa/AsyncClusterRunner.java @@ -39,32 +39,32 @@ import compbio.metadata.ResultNotAvailableException; * template gets deleted, this needs to be taken into account in this * class design! */ -public class AsyncJobRunner implements AsyncExecutor { +public class AsyncClusterRunner implements AsyncExecutor { - private static Logger log = Logger.getLogger(AsyncJobRunner.class); + private static Logger log = Logger.getLogger(AsyncClusterRunner.class); @Override - public String submitJob(ConfiguredExecutable executable) - throws JobSubmissionException { - JobRunner jr = new JobRunner(executable); - jr.submitJob(); // ignore cluster job id as it could be retrieved from fs + public String submitJob(ConfiguredExecutable executable) throws JobSubmissionException { + ClusterRunner jr = new ClusterRunner(executable); + jr.submitJob(); + // ignore cluster job id as it could be retrieved from fs return executable.getTaskId(); } @Override public boolean cancelJob(String jobId) { ClusterSession clustSession = ClusterSession.getInstance(); - return compbio.engine.cluster.drmaa.ClusterEngineUtil.cancelJob(jobId, - clustSession); + return compbio.engine.cluster.drmaa.ClusterEngineUtil.cancelJob(jobId, clustSession); } /* - * This will never return clust.engine.JobStatus.CANCELLED as for sun grid engine - * cancelled job is the same as failed. Cancelled jobs needs to be tracked manually! + * This will never return clust.engine.JobStatus.CANCELLED as for sun grid + * engine cancelled job is the same as failed. Cancelled jobs needs to be + * tracked manually! */ @Override public compbio.metadata.JobStatus getJobStatus(String jobId) { - return JobRunner.getJobStatus(jobId); + return ClusterRunner.getJobStatus(jobId); } @Override @@ -74,11 +74,8 @@ public class AsyncJobRunner implements AsyncExecutor { } @Override - public ConfiguredExecutable getResults(String jobId) - throws ResultNotAvailableException { - + public ConfiguredExecutable getResults(String jobId) throws ResultNotAvailableException { assert EngineUtil.isValidJobId(jobId); - ClusterSession csession = ClusterSession.getInstance(); ConfiguredExecutable exec; try { diff --git a/engine/compbio/engine/cluster/drmaa/JobRunner.java b/engine/compbio/engine/cluster/drmaa/ClusterRunner.java similarity index 92% rename from engine/compbio/engine/cluster/drmaa/JobRunner.java rename to engine/compbio/engine/cluster/drmaa/ClusterRunner.java index 516c3eb..d36ca89 100644 --- a/engine/compbio/engine/cluster/drmaa/JobRunner.java +++ b/engine/compbio/engine/cluster/drmaa/ClusterRunner.java @@ -56,17 +56,17 @@ import compbio.metadata.ResultNotAvailableException; * template gets deleted, this needs to be taken into account in this * class design! */ -public class JobRunner implements SyncExecutor { +public class ClusterRunner implements SyncExecutor { final JobTemplate jobtempl; static ClusterSession clustSession = ClusterSession.getInstance(); static Session session = clustSession.getSession(); - static final Logger log = Logger.getLogger(JobRunner.class); + static final Logger log = Logger.getLogger(ClusterRunner.class); final ConfiguredExecutable confExecutable; private final String workDirectory; String jobId; - public JobRunner(ConfiguredExecutable confExec) + public ClusterRunner(ConfiguredExecutable confExec) throws JobSubmissionException { try { String command = confExec.getCommand(ExecProvider.Cluster); @@ -84,9 +84,7 @@ public class JobRunner implements SyncExecutor { // Tell the job where to get/put things jobtempl.setWorkingDirectory(this.workDirectory); - /* - * Set environment variables for the process if any - */ + // Set environment variables for the process if any Map jobEnv = confExec.getEnvironment(); if (jobEnv != null && !jobEnv.isEmpty()) { setJobEnvironmentVariables(jobEnv); @@ -97,20 +95,15 @@ public class JobRunner implements SyncExecutor { jobtempl.setArgs(args); } - /* - * If executable need in/out data to be piped into it - */ + //If executable need in/out data to be piped into it if (confExec.getExecutable() instanceof PipedExecutable) { setPipes(confExec); } - /* - * If executable require special cluster configuration parameters to - * be set e.g. queue, ram, time etc - */ + // If executable require special cluster configuration parameters to + // be set e.g. queue, ram, time etc setNativeSpecs(confExec.getExecutable()); - log.trace("using arguments: " + jobtempl.getArgs()); this.confExecutable = confExec; // Save run configuration @@ -368,7 +361,7 @@ public class JobRunner implements SyncExecutor { public ConfiguredExecutable waitForResult() throws JobExecutionException { ConfiguredExecutable confExec; try { - confExec = new AsyncJobRunner().getResults(this.jobId); + confExec = new AsyncClusterRunner().getResults(this.jobId); if (confExec == null) { log.warn("Could not find results of job " + this.jobId); } @@ -384,9 +377,9 @@ public class JobRunner implements SyncExecutor { return getJobStatus(this.jobId); } - public static JobRunner getInstance(ConfiguredExecutable executable) + public static ClusterRunner getInstance(ConfiguredExecutable executable) throws JobSubmissionException { - return new JobRunner(executable); + return new ClusterRunner(executable); } } // class end diff --git a/engine/compbio/engine/cluster/drmaa/ClusterSession.java b/engine/compbio/engine/cluster/drmaa/ClusterSession.java index 431c39f..c66ccfb 100644 --- a/engine/compbio/engine/cluster/drmaa/ClusterSession.java +++ b/engine/compbio/engine/cluster/drmaa/ClusterSession.java @@ -48,8 +48,7 @@ public final class ClusterSession { private static final Logger log = Logger.getLogger(ClusterSession.class); - private static final PropertyHelper ph = PropertyHelperManager - .getPropertyHelper(); + private static final PropertyHelper ph = PropertyHelperManager.getPropertyHelper(); public static final String JOBID = "JOBID"; // TaskId (getTaskDirectory()) -> ConfiguredExecutable map @@ -72,8 +71,7 @@ public final class ClusterSession { // private static BufferedWriter tasks; private ClusterSession() { - log.debug("Initializing session " - + Util.datef.format(Calendar.getInstance().getTime())); + log.debug("Initializing session " + Util.datef.format(Calendar.getInstance().getTime())); SessionFactory factory = SessionFactory.getFactory(); session = factory.getSession(); sContact = session.getContact(); @@ -123,8 +121,7 @@ public final class ClusterSession { if (open) { session.exit(); open = false; - log.debug("Closing the session at: " - + Util.datef.format(Calendar.getInstance().getTime())); + log.debug("Closing the session at: " + Util.datef.format(Calendar.getInstance().getTime())); } } catch (DrmaaException dre) { // Cannot recover at this point, just log @@ -183,8 +180,7 @@ public final class ClusterSession { return waitForJob(taskId, Session.TIMEOUT_WAIT_FOREVER); } - public static ClusterJobId getClusterJobId(String taskId) - throws IOException { + public static ClusterJobId getClusterJobId(String taskId) throws IOException { Job job = Job.getByTaskId(taskId, jobs); if (job != null) { return job.getJobId(); @@ -194,14 +190,12 @@ public final class ClusterSession { String workDir = compbio.engine.Configurator.getWorkDirectory(taskId); assert !Util.isEmpty(workDir); File file = new File(workDir, JOBID); - log.debug("Looking up cluster jobid by the task id " + taskId - + " File path is " + file.getAbsolutePath()); + log.debug("Looking up cluster jobid by the task id " + taskId + " File path is " + file.getAbsolutePath()); assert file.exists(); return new ClusterJobId(FileUtil.readFileToString(file)); } - public JobInfo waitForJob(String jobId, long waitingTime) - throws DrmaaException, IOException { + public JobInfo waitForJob(String jobId, long waitingTime) throws DrmaaException, IOException { ClusterJobId cjobId = getClusterJobId(jobId); JobInfo status = session.wait(cjobId.getJobId(), waitingTime); // Once the job has been waited for it will be finished @@ -221,19 +215,17 @@ public final class ClusterSession { } } - public ConfiguredExecutable getResults(String taskId) - throws DrmaaException, ResultNotAvailableException { + public ConfiguredExecutable getResults(String taskId) throws DrmaaException, ResultNotAvailableException { EngineUtil.isValidJobId(taskId); try { JobInfo status = waitForJob(taskId); } catch (InvalidJobException e) { // Its OK to continue, the job may have already completed normally - log.warn("Could not find the cluster job with id " + taskId - + " perhaps it has completed", e.getCause()); + log.warn("Could not find the cluster job with id " + taskId + " perhaps it has completed", e.getCause()); } catch (IOException e) { - log.error("Could not read JOBID file for the job " + taskId - + " Message " + e.getLocalizedMessage(), e.getCause()); + log.error("Could not read JOBID file for the job " + taskId + " Message " + e.getLocalizedMessage(), + e.getCause()); } // Once the job has been waited for it will be finished // Next time it will not be found in the session, so removed from the @@ -250,14 +242,12 @@ public final class ClusterSession { exec = EngineUtil.loadExecutable(taskId); } if (exec != null) { - EngineUtil.writeMarker(exec.getWorkDirectory(), - JobStatus.COLLECTED); + EngineUtil.writeMarker(exec.getWorkDirectory(), JobStatus.COLLECTED); } return exec; } - public static StatisticManager getStatistics(JobInfo status) - throws DrmaaException { + public static StatisticManager getStatistics(JobInfo status) throws DrmaaException { return new StatisticManager(status); } @@ -274,8 +264,7 @@ public final class ClusterSession { * if the job is no longer in the queue or running. basically it * will throw this exception for all finished or cancelled jobs */ - public int getJobStatus(ClusterJobId jobId) throws DrmaaException, - InvalidJobException { + public int getJobStatus(ClusterJobId jobId) throws DrmaaException, InvalidJobException { return session.getJobProgramStatus(jobId.getJobId()); } @@ -291,39 +280,39 @@ public final class ClusterSession { public static String getJobStatus(final int status) throws DrmaaException { String statusString = null; switch (status) { - case Session.UNDETERMINED : - statusString = "Job status cannot be determined\n"; - break; - case Session.QUEUED_ACTIVE : - statusString = "Job is queued and active\n"; - break; - case Session.SYSTEM_ON_HOLD : - statusString = "Job is queued and in system hold\n"; - break; - case Session.USER_ON_HOLD : - statusString = "Job is queued and in user hold\n"; - break; - case Session.USER_SYSTEM_ON_HOLD : - statusString = "Job is queued and in user and system hold\n"; - break; - case Session.RUNNING : - statusString = "Job is running\n"; - break; - case Session.SYSTEM_SUSPENDED : - statusString = "Job is system suspended\n"; - break; - case Session.USER_SUSPENDED : - statusString = "Job is user suspended\n"; - break; - case Session.USER_SYSTEM_SUSPENDED : - statusString = "Job is user and system suspended\n"; - break; - case Session.DONE : - statusString = "Job finished normally\n"; - break; - case Session.FAILED : - statusString = "Job finished, but failed\n"; - break; + case Session.UNDETERMINED: + statusString = "Job status cannot be determined\n"; + break; + case Session.QUEUED_ACTIVE: + statusString = "Job is queued and active\n"; + break; + case Session.SYSTEM_ON_HOLD: + statusString = "Job is queued and in system hold\n"; + break; + case Session.USER_ON_HOLD: + statusString = "Job is queued and in user hold\n"; + break; + case Session.USER_SYSTEM_ON_HOLD: + statusString = "Job is queued and in user and system hold\n"; + break; + case Session.RUNNING: + statusString = "Job is running\n"; + break; + case Session.SYSTEM_SUSPENDED: + statusString = "Job is system suspended\n"; + break; + case Session.USER_SUSPENDED: + statusString = "Job is user suspended\n"; + break; + case Session.USER_SYSTEM_SUSPENDED: + statusString = "Job is user and system suspended\n"; + break; + case Session.DONE: + statusString = "Job finished normally\n"; + break; + case Session.FAILED: + statusString = "Job finished, but failed\n"; + break; } return statusString; } diff --git a/engine/compbio/engine/local/ExecutableWrapper.java b/engine/compbio/engine/local/ExecutableWrapper.java index b9bf00a..a32daf8 100644 --- a/engine/compbio/engine/local/ExecutableWrapper.java +++ b/engine/compbio/engine/local/ExecutableWrapper.java @@ -51,48 +51,48 @@ import compbio.util.annotation.Immutable; public final class ExecutableWrapper implements Callable> { - public static final String PROC_IN_FILE = "procInput.txt"; - public static final String PROC_OUT_FILE = "procOutput.txt"; - public static final String PROC_ERR_FILE = "procError.txt"; - - private static ExecutorService es; - private final ConfiguredExecutable confExec; - private final ProcessBuilder pbuilder; - - private static final Logger log = Logger.getLogger(ExecutableWrapper.class); - - public ExecutableWrapper(ConfiguredExecutable executable, String workDirectory) throws JobSubmissionException { - this.confExec = executable; - String cmd = null; - try { - cmd = executable.getCommand(ExecProvider.Local); - PathValidator.validateExecutable(cmd); - } catch (IllegalArgumentException e) { - log.error(e.getMessage(), e.getCause()); - throw new JobSubmissionException(e); - } - List params = executable.getParameters().getCommands(); - params.add(0, cmd); - - pbuilder = new ProcessBuilder(params); - if (executable.getEnvironment() != null) { - log.debug("Setting command environment variables: " + pbuilder.environment()); - EngineUtil.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment()); - log.debug("Process environment:" + pbuilder.environment()); - } - log.debug("Setting command: " + pbuilder.command()); - PathValidator.validateDirectory(workDirectory); - pbuilder.directory(new File(workDirectory)); - log.debug("Current working directory is " + SysPrefs.getCurrentDirectory()); - log.debug("Setting working directory: " + workDirectory); - // Initialize private executor to dump processes output if any to the file system - synchronized (log) { - if (es == null) { - /* - * Two threads are necessary for the process to write in two streams error and output - * simultaneously and hold the stream until exit. If only one thread is used, the - * second stream may never get access to the thread efficiently deadlocking the proccess! - */ + public static final String PROC_IN_FILE = "procInput.txt"; + public static final String PROC_OUT_FILE = "procOutput.txt"; + public static final String PROC_ERR_FILE = "procError.txt"; + + private static ExecutorService es; + private final ConfiguredExecutable confExec; + private final ProcessBuilder pbuilder; + + private static final Logger log = Logger.getLogger(ExecutableWrapper.class); + + public ExecutableWrapper(ConfiguredExecutable executable, String workDirectory) throws JobSubmissionException { + this.confExec = executable; + String cmd = null; + try { + cmd = executable.getCommand(ExecProvider.Local); + PathValidator.validateExecutable(cmd); + } catch (IllegalArgumentException e) { + log.error(e.getMessage(), e.getCause()); + throw new JobSubmissionException(e); + } + List params = executable.getParameters().getCommands(); + params.add(0, cmd); + + pbuilder = new ProcessBuilder(params); + if (executable.getEnvironment() != null) { + log.debug("Setting command environment variables: " + pbuilder.environment()); + EngineUtil.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment()); + log.debug("Process environment:" + pbuilder.environment()); + } + log.debug("Setting command: " + pbuilder.command()); + PathValidator.validateDirectory(workDirectory); + pbuilder.directory(new File(workDirectory)); + log.debug("Current working directory is " + SysPrefs.getCurrentDirectory()); + log.debug("Setting working directory: " + workDirectory); + // Initialize private executor to dump processes output if any to the file system + synchronized (log) { + if (es == null) { + /* + * Two threads are necessary for the process to write in two streams error and output + * simultaneously and hold the stream until exit. If only one thread is used, the + * second stream may never get access to the thread efficiently deadlocking the proccess! + */ this.es = Executors.newCachedThreadPool(); log.debug("Initializing executor for local processes output dump"); // Make sure that the executors are going to be properly closed @@ -102,122 +102,126 @@ public final class ExecutableWrapper implements shutdownService(); } }); + } } } - } - - /** - * Stops internal executor service which captures streams of native - * executables. This method is intended for stopping service if deployed in - * the web application content. There is NO NEED of using this method - * otherwise as the executor service is taken care of internally. - */ - public static final void shutdownService() { - if (es != null) { - es.shutdownNow(); + + /** + * Stops internal executor service which captures streams of native + * executables. This method is intended for stopping service if deployed in + * the web application content. There is NO NEED of using this method + * otherwise as the executor service is taken care of internally. + */ + public static final void shutdownService() { + if (es != null) { + es.shutdownNow(); + } } - } - - /** - * It is vital that output and error streams are captured immediately for - * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its - * own thread ready to capture the output. If executor could not execute - * capture immediately this could lead to the call method to stale, as - * execution could not proceed without output being captured. Every call to - * call() method will use 2 threads - * @throws JobSubmissionException - */ - @Override - public ConfiguredExecutable call() throws IOException { - Process proc = null; - Future errorf = null; - Future outputf = null; - PrintStream errorStream = null; - PrintStream outStream = null; - PrintStream comStream = null; - - try { - log.info("Calculation started at " + System.nanoTime()); - EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString()); - proc = pbuilder.start(); - - // store input command and program environment - comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE)); - comStream.append("# program command\n"); - for (String par : pbuilder.command()) { + + /** + * It is vital that output and error streams are captured immediately for + * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its + * own thread ready to capture the output. If executor could not execute + * capture immediately this could lead to the call method to stale, as + * execution could not proceed without output being captured. Every call to + * call() method will use 2 threads + * @throws JobSubmissionException + */ + @Override + public ConfiguredExecutable call() throws IOException { + Process proc = null; + Future errorf = null; + Future outputf = null; + PrintStream errorStream = null; + PrintStream outStream = null; + PrintStream comStream = null; + + try { + log.info("Calculation started at " + System.nanoTime()); + EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString()); + proc = pbuilder.start(); + + // store input command and program environment + comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE)); + comStream.append("# program command\n"); + for (String par : pbuilder.command()) { comStream.append(par + " "); - } - comStream.append("\n\n# program environment\n"); - for (Entry var : pbuilder.environment().entrySet()) { - comStream.append(var.getKey() + " =\t" + var.getValue() + "\n"); - } - comStream.close(); - - // any error message? - errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError())); - StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR); - - // any output? - outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput())); - StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT); - - // kick it off - errorf = es.submit(errorGobbler); - outputf = es.submit(outputGobbler); - - // any error??? - int exitVal = proc.waitFor(); - log.info("Calculation completed at " + System.nanoTime()); - EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString()); - // Let streams to write for a little more - errorf.get(2, TimeUnit.SECONDS); - outputf.get(2, TimeUnit.SECONDS); - - // Close streams - errorStream.close(); - outStream.close(); - log.debug("Local process exit value: " + exitVal); - } catch (ExecutionException e) { - // Log and ignore this is not important - log.trace("Native Process output threw exception: " + e.getMessage()); - } catch (TimeoutException e) { - // Log and ignore this is not important - log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage()); - } catch (InterruptedException e) { - log.error("Native Process was interrupted aborting: " + e.getMessage()); - proc.destroy(); - errorf.cancel(true); - outputf.cancel(true); - // restore interruption status - Thread.currentThread().interrupt(); - } finally { - // just to make sure that we do not left anything running - if (proc != null) { + } + comStream.append("\n\n# program environment\n"); + for (Entry var : pbuilder.environment().entrySet()) { + comStream.append(var.getKey() + " =\t" + var.getValue() + "\n"); + } + comStream.close(); + + // any error message? + errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError())); + StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR); + + // any output? + outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput())); + StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT); + + // kick it off + errorf = es.submit(errorGobbler); + outputf = es.submit(outputGobbler); + + // any error??? + int exitVal = proc.waitFor(); + //proc.getClass(); + log.info("Calculation completed at " + System.nanoTime()); + EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString()); + + // Let streams to write for a little more + errorf.get(2, TimeUnit.SECONDS); + outputf.get(2, TimeUnit.SECONDS); + + // Close streams + errorStream.close(); + outStream.close(); + log.debug("Local process exit value: " + exitVal); + } catch (ExecutionException e) { + // Log and ignore this is not important + log.trace("Native Process output threw exception: " + e.getMessage()); + } catch (TimeoutException e) { + // Log and ignore this is not important + log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage()); + } catch (InterruptedException e) { + log.error("Native Process was interrupted aborting: " + e.getMessage()); + System.err.println("Native Process was interrupted aborting: " + e.getMessage()); proc.destroy(); - } - if (errorf != null) { errorf.cancel(true); - } - if (outputf != null) { outputf.cancel(true); + // restore interruption status + Thread.currentThread().interrupt(); + } finally { + // just to make sure that we do not left anything running + if (proc != null) { + proc.destroy(); + } + if (errorf != null) { + errorf.cancel(true); + } + if (outputf != null) { + outputf.cancel(true); + } + FileUtil.closeSilently(log, errorStream); + FileUtil.closeSilently(log, outStream); } - FileUtil.closeSilently(log, errorStream); - FileUtil.closeSilently(log, outStream); + return confExec; } - return confExec; - } - private String getOutput() { - if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable) { - return confExec.getOutput(); + private String getOutput() { + if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable) { + return confExec.getOutput(); + } + return PROC_OUT_FILE; } - return PROC_OUT_FILE; - } - private String getError() { - if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable) { - return confExec.getError(); + private String getError() { + if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable) { + return confExec.getError(); + } + return PROC_ERR_FILE; } - return PROC_ERR_FILE; - } + } diff --git a/testsrc/compbio/engine/cluster/drmaa/ClusterSessionTester.java b/testsrc/compbio/engine/cluster/drmaa/ClusterSessionTester.java index b3ec217..d72337b 100644 --- a/testsrc/compbio/engine/cluster/drmaa/ClusterSessionTester.java +++ b/testsrc/compbio/engine/cluster/drmaa/ClusterSessionTester.java @@ -46,7 +46,7 @@ public class ClusterSessionTester { try { ConfiguredExecutable clw = Configurator .configureExecutable(cl, Executable.ExecProvider.Cluster); - JobRunner jr = JobRunner.getInstance(clw); + ClusterRunner jr = ClusterRunner.getInstance(clw); String jobId = jr.submitJob(); ClusterSession cs = ClusterSession.getInstance(); // this only holds for sequential execution diff --git a/testsrc/compbio/engine/cluster/drmaa/DrmaaAsyncClusterEngineTester.java b/testsrc/compbio/engine/cluster/drmaa/DrmaaAsyncClusterEngineTester.java index 4322544..922aa90 100644 --- a/testsrc/compbio/engine/cluster/drmaa/DrmaaAsyncClusterEngineTester.java +++ b/testsrc/compbio/engine/cluster/drmaa/DrmaaAsyncClusterEngineTester.java @@ -60,7 +60,7 @@ public class DrmaaAsyncClusterEngineTester { try { ConfiguredExecutable confClustal = Configurator .configureExecutable(clustal); - AsyncExecutor runner = new AsyncJobRunner(); + AsyncExecutor runner = new AsyncClusterRunner(); assertNotNull("Runner is NULL", runner); String jobId = runner.submitJob(confClustal); assertEquals("Input was not set!", test_input, clustal.getInput()); @@ -93,7 +93,7 @@ public class DrmaaAsyncClusterEngineTester { try { ConfiguredExecutable confClustal = Configurator .configureExecutable(clustal); - AsyncJobRunner runner = new AsyncJobRunner(); + AsyncClusterRunner runner = new AsyncClusterRunner(); String jobId = runner.submitJob(confClustal); assertNotNull("Runner is NULL", runner); // assertNotNull("JobId is null", jobId1); @@ -125,12 +125,12 @@ public class DrmaaAsyncClusterEngineTester { clustal.setInput(test_input).setOutput(cluster_test_outfile); try { - AsyncJobRunner runner = new AsyncJobRunner(); + AsyncClusterRunner runner = new AsyncClusterRunner(); ConfiguredExecutable confClustal = Configurator .configureExecutable(clustal); String jobId = runner.submitJob(confClustal); assertNotNull("Runner is NULL", runner); - AsyncJobRunner runner2 = new AsyncJobRunner(); + AsyncClusterRunner runner2 = new AsyncClusterRunner(); boolean hasRun = false; boolean hasPended = false; @@ -162,7 +162,7 @@ public class DrmaaAsyncClusterEngineTester { // immediately // the status could be UNDEFINED! // assertFalse(hasUndefined); - AsyncJobRunner runner3 = new AsyncJobRunner(); + AsyncClusterRunner runner3 = new AsyncClusterRunner(); Executable exec = runner3.getResults(jobId); assertNotNull(exec); // Now try collecting result for the second time diff --git a/testsrc/compbio/engine/cluster/drmaa/DrmaaClusterEngineTester.java b/testsrc/compbio/engine/cluster/drmaa/DrmaaClusterEngineTester.java index c8998c7..eb5ead2 100644 --- a/testsrc/compbio/engine/cluster/drmaa/DrmaaClusterEngineTester.java +++ b/testsrc/compbio/engine/cluster/drmaa/DrmaaClusterEngineTester.java @@ -67,7 +67,7 @@ public class DrmaaClusterEngineTester { Executable.ExecProvider.Cluster); assertNotNull(confClust.getWorkDirectory()); - JobRunner runner = JobRunner.getInstance(confClust); + ClusterRunner runner = ClusterRunner.getInstance(confClust); assertEquals("Input was not set!", test_input, clustal.getInput()); assertNotNull("Runner is NULL", runner); runner.executeJob(); @@ -150,7 +150,7 @@ public class DrmaaClusterEngineTester { Executable.ExecProvider.Cluster); assertNotNull(confClust.getWorkDirectory()); - JobRunner runner = JobRunner.getInstance(confClust); + ClusterRunner runner = ClusterRunner.getInstance(confClust); assertNotNull("Runner is NULL", runner); runner.executeJob(); diff --git a/testsrc/compbio/runner/conservation/AAConTester.java b/testsrc/compbio/runner/conservation/AAConTester.java index 825de0b..37f50cd 100644 --- a/testsrc/compbio/runner/conservation/AAConTester.java +++ b/testsrc/compbio/runner/conservation/AAConTester.java @@ -49,7 +49,7 @@ import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; import compbio.engine.cluster.drmaa.ClusterEngineUtil; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.local.LocalRunner; import compbio.metadata.AllTestSuit; @@ -84,7 +84,7 @@ public class AAConTester { assertFalse(SysPrefs.isWindows, "Cluster execution can only be in unix environment"); try { ConfiguredExecutable confAAcon = Configurator.configureExecutable(aacon, Executable.ExecProvider.Cluster); - JobRunner runner = JobRunner.getInstance(confAAcon); + ClusterRunner runner = ClusterRunner.getInstance(confAAcon); assertNotNull(runner, "Runner is NULL"); runner.executeJob(); diff --git a/testsrc/compbio/runner/disorder/DisemblTester.java b/testsrc/compbio/runner/disorder/DisemblTester.java index de206d0..e358387 100644 --- a/testsrc/compbio/runner/disorder/DisemblTester.java +++ b/testsrc/compbio/runner/disorder/DisemblTester.java @@ -52,7 +52,7 @@ import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; import compbio.engine.cluster.drmaa.ClusterEngineUtil; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.local.LocalRunner; import compbio.metadata.AllTestSuit; @@ -88,7 +88,7 @@ public class DisemblTester { ConfiguredExecutable confDisembl = Configurator .configureExecutable(disembl, Executable.ExecProvider.Cluster); - JobRunner runner = JobRunner.getInstance(confDisembl); + ClusterRunner runner = ClusterRunner.getInstance(confDisembl); assertNotNull(runner, "Runner is NULL"); runner.executeJob(); diff --git a/testsrc/compbio/runner/disorder/GlobPlotTester.java b/testsrc/compbio/runner/disorder/GlobPlotTester.java index aef8de2..083142a 100644 --- a/testsrc/compbio/runner/disorder/GlobPlotTester.java +++ b/testsrc/compbio/runner/disorder/GlobPlotTester.java @@ -50,7 +50,7 @@ import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; import compbio.engine.cluster.drmaa.ClusterEngineUtil; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.local.LocalRunner; import compbio.metadata.AllTestSuit; @@ -84,7 +84,7 @@ public class GlobPlotTester { ConfiguredExecutable confGlobPlot = Configurator .configureExecutable(globprot, Executable.ExecProvider.Cluster); - JobRunner runner = JobRunner.getInstance(confGlobPlot); + ClusterRunner runner = ClusterRunner.getInstance(confGlobPlot); assertNotNull(runner, "Runner is NULL"); runner.executeJob(); diff --git a/testsrc/compbio/runner/disorder/IUPredTester.java b/testsrc/compbio/runner/disorder/IUPredTester.java index f521154..121aba6 100644 --- a/testsrc/compbio/runner/disorder/IUPredTester.java +++ b/testsrc/compbio/runner/disorder/IUPredTester.java @@ -50,7 +50,7 @@ import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; import compbio.engine.cluster.drmaa.ClusterEngineUtil; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.local.LocalRunner; import compbio.metadata.AllTestSuit; @@ -83,7 +83,7 @@ public class IUPredTester { ConfiguredExecutable confIUPred = Configurator .configureExecutable(iupred, Executable.ExecProvider.Cluster); - JobRunner runner = JobRunner.getInstance(confIUPred); + ClusterRunner runner = ClusterRunner.getInstance(confIUPred); assertNotNull(runner, "Runner is NULL"); runner.executeJob(); diff --git a/testsrc/compbio/runner/disorder/JronnTester.java b/testsrc/compbio/runner/disorder/JronnTester.java index d0cadaa..f065f13 100644 --- a/testsrc/compbio/runner/disorder/JronnTester.java +++ b/testsrc/compbio/runner/disorder/JronnTester.java @@ -50,7 +50,7 @@ import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; import compbio.engine.cluster.drmaa.ClusterEngineUtil; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.local.LocalRunner; import compbio.metadata.AllTestSuit; @@ -81,7 +81,7 @@ public class JronnTester { assertFalse(SysPrefs.isWindows, "Cluster execution can only be in unix environment"); try { ConfiguredExecutable confJronn = Configurator.configureExecutable(jronn, Executable.ExecProvider.Cluster); - JobRunner runner = JobRunner.getInstance(confJronn); + ClusterRunner runner = ClusterRunner.getInstance(confJronn); assertNotNull(runner, "Runner is NULL"); runner.executeJob(); diff --git a/testsrc/compbio/runner/msa/ClustalOTester.java b/testsrc/compbio/runner/msa/ClustalOTester.java index d50e0fa..281a50c 100644 --- a/testsrc/compbio/runner/msa/ClustalOTester.java +++ b/testsrc/compbio/runner/msa/ClustalOTester.java @@ -51,7 +51,7 @@ import compbio.engine.client.Executable; import compbio.engine.client.Executable.ExecProvider; import compbio.engine.client.RunConfiguration; import compbio.engine.cluster.drmaa.ClusterEngineUtil; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.conf.RunnerConfigMarshaller; import compbio.engine.local.AsyncLocalRunner; @@ -287,7 +287,7 @@ public class ClustalOTester { clustal.setInput(AllTestSuit.test_input).setOutput(cluster_test_outfile); try { ConfiguredExecutable confClustal = Configurator.configureExecutable(clustal); - JobRunner runner = JobRunner.getInstance(confClustal); + ClusterRunner runner = ClusterRunner.getInstance(confClustal); // ClusterSession csession = JobRunner.getSession(); assertNotNull(runner); runner.executeJob(); diff --git a/testsrc/compbio/runner/msa/ClustalWTester.java b/testsrc/compbio/runner/msa/ClustalWTester.java index e6f7ac9..d8617cb 100644 --- a/testsrc/compbio/runner/msa/ClustalWTester.java +++ b/testsrc/compbio/runner/msa/ClustalWTester.java @@ -53,7 +53,7 @@ import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; import compbio.engine.client.Executable.ExecProvider; import compbio.engine.cluster.drmaa.ClusterEngineUtil; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.conf.RunnerConfigMarshaller; import compbio.engine.local.AsyncLocalRunner; @@ -372,7 +372,7 @@ public class ClustalWTester { try { ConfiguredExecutable confClustal = Configurator.configureExecutable(clustal); - JobRunner runner = JobRunner.getInstance(confClustal); + ClusterRunner runner = ClusterRunner.getInstance(confClustal); // ClusterSession csession = JobRunner.getSession(); assertNotNull(runner); runner.executeJob(); diff --git a/testsrc/compbio/runner/msa/MafftTester.java b/testsrc/compbio/runner/msa/MafftTester.java index 4d85bd5..53b8fe3 100644 --- a/testsrc/compbio/runner/msa/MafftTester.java +++ b/testsrc/compbio/runner/msa/MafftTester.java @@ -44,7 +44,7 @@ import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; import compbio.engine.client.Executable.ExecProvider; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.metadata.AllTestSuit; import compbio.metadata.ChunkHolder; import compbio.metadata.JobExecutionException; @@ -177,7 +177,7 @@ public class MafftTester { ConfiguredExecutable cmafft = Configurator .configureExecutable(mafft, Executable.ExecProvider.Cluster); - JobRunner sexecutor = (JobRunner) Configurator.getSyncEngine( + ClusterRunner sexecutor = (ClusterRunner) Configurator.getSyncEngine( cmafft, Executable.ExecProvider.Cluster); sexecutor.executeJob(); ConfiguredExecutable al = sexecutor.waitForResult(); diff --git a/testsrc/compbio/runner/msa/MuscleTester.java b/testsrc/compbio/runner/msa/MuscleTester.java index dc45ca5..354246c 100644 --- a/testsrc/compbio/runner/msa/MuscleTester.java +++ b/testsrc/compbio/runner/msa/MuscleTester.java @@ -48,7 +48,7 @@ import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; import compbio.engine.cluster.drmaa.ClusterEngineUtil; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.local.LocalRunner; import compbio.metadata.AllTestSuit; @@ -86,7 +86,7 @@ public class MuscleTester { ConfiguredExecutable confMuscle = Configurator .configureExecutable(muscle, Executable.ExecProvider.Cluster); - JobRunner runner = JobRunner.getInstance(confMuscle); + ClusterRunner runner = ClusterRunner.getInstance(confMuscle); assertNotNull(runner, "Runner is NULL"); runner.executeJob(); diff --git a/testsrc/compbio/runner/msa/ProbconsTester.java b/testsrc/compbio/runner/msa/ProbconsTester.java index afdc1b6..36fffba 100644 --- a/testsrc/compbio/runner/msa/ProbconsTester.java +++ b/testsrc/compbio/runner/msa/ProbconsTester.java @@ -42,7 +42,7 @@ import compbio.engine.client.ConfExecutable; import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.metadata.AllTestSuit; import compbio.metadata.ChunkHolder; import compbio.metadata.JobExecutionException; @@ -169,7 +169,7 @@ public class ProbconsTester { try { ConfiguredExecutable cmafft = Configurator .configureExecutable(probc, Executable.ExecProvider.Cluster); - JobRunner sexecutor = (JobRunner) Configurator.getSyncEngine( + ClusterRunner sexecutor = (ClusterRunner) Configurator.getSyncEngine( cmafft, Executable.ExecProvider.Cluster); sexecutor.executeJob(); ConfiguredExecutable al = sexecutor.waitForResult(); diff --git a/testsrc/compbio/runner/msa/TcoffeeTester.java b/testsrc/compbio/runner/msa/TcoffeeTester.java index f0f07bf..8fc2e99 100644 --- a/testsrc/compbio/runner/msa/TcoffeeTester.java +++ b/testsrc/compbio/runner/msa/TcoffeeTester.java @@ -45,7 +45,7 @@ import compbio.engine.client.ConfExecutable; import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.local.LocalRunner; import compbio.metadata.AllTestSuit; import compbio.metadata.ChunkHolder; @@ -249,7 +249,7 @@ public class TcoffeeTester { public void RunOnCluster() { try { ConfiguredExecutable cmafft = Configurator.configureExecutable(tcoffee, Executable.ExecProvider.Cluster); - JobRunner sexecutor = (JobRunner) Configurator.getSyncEngine(cmafft, Executable.ExecProvider.Cluster); + ClusterRunner sexecutor = (ClusterRunner) Configurator.getSyncEngine(cmafft, Executable.ExecProvider.Cluster); sexecutor.executeJob(); ConfiguredExecutable al = sexecutor.waitForResult(); Alignment align = al.getResults(); diff --git a/testsrc/compbio/runner/predictors/JpredTester.java b/testsrc/compbio/runner/predictors/JpredTester.java index 1bf753b..f053200 100644 --- a/testsrc/compbio/runner/predictors/JpredTester.java +++ b/testsrc/compbio/runner/predictors/JpredTester.java @@ -48,7 +48,7 @@ import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; import compbio.engine.cluster.drmaa.ClusterEngineUtil; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.local.LocalRunner; import compbio.metadata.AllTestSuit; @@ -90,7 +90,7 @@ public class JpredTester { ConfiguredExecutable confpred = Configurator.configureExecutable(pred, Executable.ExecProvider.Cluster); Preset conf = jpredPreset.getPresetByName("cluster configuration"); confpred.addParameters(conf.getOptions()); - JobRunner runner = JobRunner.getInstance(confpred); + ClusterRunner runner = ClusterRunner.getInstance(confpred); assertNotNull(runner, "Runner is NULL"); runner.executeJob(); diff --git a/testsrc/compbio/runner/structure/RNAalifoldTester.java b/testsrc/compbio/runner/structure/RNAalifoldTester.java index 3f0986f..370d040 100644 --- a/testsrc/compbio/runner/structure/RNAalifoldTester.java +++ b/testsrc/compbio/runner/structure/RNAalifoldTester.java @@ -35,7 +35,7 @@ import compbio.engine.client.ConfiguredExecutable; import compbio.engine.client.Executable; import compbio.engine.client.RunConfiguration; import compbio.engine.cluster.drmaa.ClusterEngineUtil; -import compbio.engine.cluster.drmaa.JobRunner; +import compbio.engine.cluster.drmaa.ClusterRunner; import compbio.engine.cluster.drmaa.StatisticManager; import compbio.engine.local.AsyncLocalRunner; import compbio.engine.local.LocalExecutorService; diff --git a/webservices/compbio/ws/client/Jws2Client.java b/webservices/compbio/ws/client/Jws2Client.java index dbe999a..4a0b459 100644 --- a/webservices/compbio/ws/client/Jws2Client.java +++ b/webservices/compbio/ws/client/Jws2Client.java @@ -56,6 +56,7 @@ import compbio.data.sequence.ScoreManager; import compbio.data.sequence.SequenceUtil; import compbio.data.sequence.UnknownFileFormatException; import compbio.metadata.JobSubmissionException; +import compbio.metadata.JobStatus; import compbio.metadata.Option; import compbio.metadata.Limit; import compbio.metadata.Preset; @@ -372,6 +373,8 @@ public class Jws2Client { } System.out.println("\n\rcalling predictor........."); Thread.sleep(100); + JobStatus status = wsproxy.getJobStatus(jobId); + System.out.println("\njob " + jobId + " status: " + status); scores = wsproxy.getAnnotation(jobId); } catch (JobSubmissionException e) { System.err.println("Exception while submitting job to a web server. Exception details are below:"); @@ -460,7 +463,12 @@ public class Jws2Client { jobId = msaws.align(fastalist); } System.out.println("\ncalling program........."); - Thread.sleep(100); + long startTime = System.nanoTime(); + while (JobStatus.RUNNING == msaws.getJobStatus(jobId)) { + Thread.sleep(1000); + long endTime = System.nanoTime(); + System.out.println("job " + jobId + " time executing: "+ (endTime - startTime) / 1000000 +" msec, status: " + msaws.getJobStatus(jobId)); + } alignment = msaws.getResult(jobId); } catch (IOException e) { System.err.println("Exception while reading the input file. Check that the input file is a FASTA file! " diff --git a/webservices/compbio/ws/server/MafftWS.java b/webservices/compbio/ws/server/MafftWS.java index 1d02e81..17c5c6e 100644 --- a/webservices/compbio/ws/server/MafftWS.java +++ b/webservices/compbio/ws/server/MafftWS.java @@ -58,15 +58,13 @@ public class MafftWS implements MsaWS { private static final LimitsManager limitMan = EngineUtil.getLimits(new Mafft().getType()); @Override - public String align(List sequences) - throws JobSubmissionException { + public String align(List sequences) throws JobSubmissionException { WSUtil.validateFastaInput(sequences); ConfiguredExecutable confMafft = init(sequences); return WSUtil.align(sequences, confMafft, log, "align", getLimit("")); } - ConfiguredExecutable init(List dataSet) - throws JobSubmissionException { + ConfiguredExecutable init(List dataSet) throws JobSubmissionException { Mafft mafft = new Mafft(); mafft.setInput(SkeletalExecutable.INPUT); mafft.setOutput(SkeletalExecutable.OUTPUT); @@ -75,8 +73,7 @@ public class MafftWS implements MsaWS { } @Override - public String customAlign(List sequences, - List> options) throws JobSubmissionException, + public String customAlign(List sequences, List> options) throws JobSubmissionException, WrongParameterException { WSUtil.validateFastaInput(sequences); ConfiguredExecutable confMafft = init(sequences); @@ -87,9 +84,7 @@ public class MafftWS implements MsaWS { } @Override - public String presetAlign(List sequences, - Preset preset) throws JobSubmissionException, - WrongParameterException { + public String presetAlign(List sequences, Preset preset) throws JobSubmissionException, WrongParameterException { WSUtil.validateFastaInput(sequences); if (preset == null) { throw new WrongParameterException("Preset must be provided!"); @@ -108,10 +103,9 @@ public class MafftWS implements MsaWS { public Alignment getResult(String jobId) throws ResultNotAvailableException { WSUtil.validateJobId(jobId); AsyncExecutor asyncEngine = Configurator.getAsyncEngine(jobId); - ConfiguredExecutable mafft = (ConfiguredExecutable) asyncEngine - .getResults(jobId); + ConfiguredExecutable mafft = (ConfiguredExecutable) asyncEngine.getResults(jobId); Alignment al = mafft.getResults(); - return new Alignment (al.getSequences(), Program.Mafft, '-'); + return new Alignment(al.getSequences(), Program.Mafft, '-'); } @Override @@ -131,8 +125,7 @@ public class MafftWS implements MsaWS { @Override public ChunkHolder pullExecStatistics(String jobId, long position) { WSUtil.validateJobId(jobId); - String file = Configurator.getWorkDirectory(jobId) + File.separator - + new Mafft().getError(); + String file = Configurator.getWorkDirectory(jobId) + File.separator + new Mafft().getError(); return WSUtil.pullFile(file, position); } diff --git a/webservices/compbio/ws/server/SequenceAnnotationService.java b/webservices/compbio/ws/server/SequenceAnnotationService.java index 641c745..afe0bf2 100644 --- a/webservices/compbio/ws/server/SequenceAnnotationService.java +++ b/webservices/compbio/ws/server/SequenceAnnotationService.java @@ -62,8 +62,7 @@ public abstract class SequenceAnnotationService extends GenericMetadataServic @SuppressWarnings("unchecked") public String analize(List sequences) - throws UnsupportedRuntimeException, LimitExceededException, - JobSubmissionException { + throws UnsupportedRuntimeException, LimitExceededException, JobSubmissionException { WSUtil.validateFastaInput(sequences); ConfiguredExecutable confIUPred = init(sequences); return WSUtil.analize(sequences, confIUPred, log, "analize", getLimit("")); @@ -86,8 +85,7 @@ public abstract class SequenceAnnotationService extends GenericMetadataServic } public String presetAnalize(List sequences, Preset preset) - throws UnsupportedRuntimeException, LimitExceededException, - JobSubmissionException, WrongParameterException { + throws UnsupportedRuntimeException, LimitExceededException, JobSubmissionException, WrongParameterException { WSUtil.validateAAConInput(sequences); if (preset == null) { throw new WrongParameterException("Preset must be provided!"); @@ -97,7 +95,6 @@ public abstract class SequenceAnnotationService extends GenericMetadataServic confAAcon.addParameters(preset.getOptions()); @SuppressWarnings("unchecked") Limit limit = getLimit(preset.getName()); - return WSUtil - .analize(sequences, confAAcon, log, "presetAnalize", limit); + return WSUtil.analize(sequences, confAAcon, log, "presetAnalize", limit); } } diff --git a/webservices/compbio/ws/server/WSUtil.java b/webservices/compbio/ws/server/WSUtil.java index 835b553..89c7375 100644 --- a/webservices/compbio/ws/server/WSUtil.java +++ b/webservices/compbio/ws/server/WSUtil.java @@ -27,6 +27,7 @@ import org.apache.log4j.Logger; import compbio.data.sequence.FastaSequence; import compbio.data.sequence.ScoreManager; +import compbio.runner.RunnerUtil; import compbio.engine.AsyncExecutor; import compbio.engine.Configurator; import compbio.engine.ProgressGetter; @@ -100,7 +101,7 @@ public final class WSUtil { if (limit != null && limit.isExceeded(sequences)) { throw LimitExceededException.newLimitExceeded(limit, sequences); } - compbio.runner.RunnerUtil.writeInput(sequences, confExec); + RunnerUtil.writeInput(sequences, confExec); AsyncExecutor engine = Configurator.getAsyncEngine(confExec); String jobId = engine.submitJob(confExec); reportUsage(confExec, logger); @@ -122,7 +123,7 @@ public final class WSUtil { if (limit != null && limit.isExceeded(sequences)) { throw LimitExceededException.newLimitExceeded(limit, sequences); } - compbio.runner.RunnerUtil.writeInput(sequences, confExec); + RunnerUtil.writeInput(sequences, confExec); AsyncExecutor engine = Configurator.getAsyncEngine(confExec); String jobId = engine.submitJob(confExec); reportUsage(confExec, log); @@ -139,7 +140,7 @@ public final class WSUtil { if (limit != null && limit.isExceeded(sequences)) { throw LimitExceededException.newLimitExceeded(limit, sequences); } - compbio.runner.RunnerUtil.writeClustalInput(sequences, confExec, '-'); + RunnerUtil.writeClustalInput(sequences, confExec, '-'); AsyncExecutor engine = Configurator.getAsyncEngine(confExec); String jobId = engine.submitJob(confExec); reportUsage(confExec, log);