import compbio.engine.client.Executable;\r
import compbio.engine.client.PathValidator;\r
import compbio.engine.client.EngineUtil;\r
-import compbio.engine.cluster.drmaa.AsyncJobRunner;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.AsyncClusterRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.conf.DirectoryManager;\r
import compbio.engine.conf.PropertyHelperManager;\r
import compbio.engine.local.AsyncLocalRunner;\r
public final static String LOCAL_WORK_DIRECTORY = initLocalDirectory();\r
public final static String CLUSTER_WORK_DIRECTORY = initClusterWorkDirectory();\r
\r
-\r
-\r
private static boolean initBooleanValue(String key) {\r
assert key != null;\r
String status = ph.getProperty(key);\r
if (!Util.isEmpty(tmpDir)) {\r
tmpDir = tmpDir.trim();\r
} else {\r
- throw new RuntimeException(\r
- "Cluster work directory must be provided! ");\r
+ throw new RuntimeException("Cluster work directory must be provided! ");\r
}\r
- if (LOCAL_WORK_DIRECTORY != null\r
- && LOCAL_WORK_DIRECTORY.equals(CLUSTER_WORK_DIRECTORY)) {\r
- throw new InvalidParameterException(\r
- "Cluster engine output directory must be different of that for local engine!");\r
+ if (LOCAL_WORK_DIRECTORY != null && LOCAL_WORK_DIRECTORY.equals(CLUSTER_WORK_DIRECTORY)) {\r
+ throw new InvalidParameterException("Cluster engine output directory must be different of that for local engine!");\r
}\r
}\r
return tmpDir;\r
* {@link LoadBalancer} for an engine. This method will fall back and return\r
* local engine if\r
* \r
- * 1) No engines are defined in the properties or they have been defined\r
- * incorrectly\r
+ * 1) No engines are defined in the properties or they have been defined incorrectly\r
* \r
* 2) Execution environment is Windows as the system cannot really run\r
* cluster submission from windows\r
* @return SyncExecutor backed up by either cluster or local engines\r
* @throws JobSubmissionException\r
*/\r
- static Executable.ExecProvider getExecProvider(\r
- ConfiguredExecutable<?> executable, List<FastaSequence> dataSet)\r
+ static Executable.ExecProvider getExecProvider(ConfiguredExecutable<?> executable, List<FastaSequence> dataSet)\r
throws JobSubmissionException {\r
// Look where executable claims to be executed\r
Executable.ExecProvider provider = executable.getSupportedRuntimes();\r
if (!IS_CLUSTER_ENGINE_ENABLED && !IS_LOCAL_ENGINE_ENABLED) {\r
// Both engines disabled!\r
- throw new RuntimeException(\r
- "Both engines are disabled! "\r
- + "Check conf/Engine.cluster.properties and conf/Engine.local.properties. "\r
- + "At least one engine must be enabled!");\r
+ throw new RuntimeException("Both engines are disabled! "\r
+ + "Check conf/Engine.cluster.properties and conf/Engine.local.properties. At least one engine must be enabled!");\r
}\r
if (provider == Executable.ExecProvider.Local) {\r
if (IS_LOCAL_ENGINE_ENABLED) {\r
return Executable.ExecProvider.Local;\r
} else {\r
- throw new JobSubmissionException(\r
- "Executable can be executed only on locally, but local engine is disabled!");\r
+ throw new JobSubmissionException("Executable can be executed only on locally, but local engine is disabled!");\r
}\r
}\r
if (provider == Executable.ExecProvider.Cluster) {\r
if (IS_CLUSTER_ENGINE_ENABLED) {\r
return Executable.ExecProvider.Cluster;\r
} else {\r
- throw new JobSubmissionException(\r
- "Executable can be executed only on the cluster, but cluster engine is disabled!");\r
+ throw new JobSubmissionException("Executable can be executed only on the cluster, but cluster engine is disabled!");\r
}\r
}\r
// We are here if executable can be executed on both Cluster and Local\r
return Executable.ExecProvider.Local;\r
}\r
\r
- public static <T> ConfiguredExecutable<T> configureExecutable(\r
- Executable<T> executable) throws JobSubmissionException {\r
+ public static <T> ConfiguredExecutable<T> configureExecutable(Executable<T> executable) throws JobSubmissionException {\r
\r
- ConfExecutable<T> confExec = new ConfExecutable<T>(executable,\r
- DirectoryManager.getTaskDirectory(executable.getClass()));\r
+ ConfExecutable<T> confExec = new ConfExecutable<T>(executable, DirectoryManager.getTaskDirectory(executable.getClass()));\r
Executable.ExecProvider provider = getExecProvider(confExec, null);\r
confExec.setExecProvider(provider);\r
setupWorkDirectory(confExec, provider);\r
return confExec;\r
}\r
\r
- public static <T> ConfiguredExecutable<T> configureExecutable(\r
- Executable<T> executable, List<FastaSequence> dataSet)\r
+ public static <T> ConfiguredExecutable<T> configureExecutable(Executable<T> executable, List<FastaSequence> dataSet)\r
throws JobSubmissionException {\r
\r
- ConfExecutable<T> confExec = new ConfExecutable<T>(executable,\r
- DirectoryManager.getTaskDirectory(executable.getClass()));\r
+ ConfExecutable<T> confExec = new ConfExecutable<T>(executable, DirectoryManager.getTaskDirectory(executable.getClass()));\r
Executable.ExecProvider provider = getExecProvider(confExec, dataSet);\r
confExec.setExecProvider(provider);\r
setupWorkDirectory(confExec, provider);\r
return confExec;\r
}\r
\r
- static <T> void setupWorkDirectory(ConfExecutable<T> confExec,\r
- Executable.ExecProvider provider) {\r
+ static <T> void setupWorkDirectory(ConfExecutable<T> confExec, Executable.ExecProvider provider) {\r
assert provider != null && provider != Executable.ExecProvider.Any;\r
String workDir = "";\r
if (provider == Executable.ExecProvider.Local) {\r
- workDir = Configurator.LOCAL_WORK_DIRECTORY + File.separator\r
- + confExec.getTaskId();\r
+ workDir = Configurator.LOCAL_WORK_DIRECTORY + File.separator + confExec.getTaskId();\r
} else {\r
- workDir = Configurator.CLUSTER_WORK_DIRECTORY + File.separator\r
- + confExec.getTaskId();\r
+ workDir = Configurator.CLUSTER_WORK_DIRECTORY + File.separator + confExec.getTaskId();\r
}\r
// Create working directory for the task\r
File wdir = new File(workDir);\r
wdir.mkdir();\r
- log.info("Creating working directory for the task in: "\r
- + wdir.getAbsolutePath());\r
+ log.info("Creating working directory for the task in: " + wdir.getAbsolutePath());\r
// Tell the executable where to get the results\r
confExec.setWorkDirectory(workDir);\r
}\r
\r
- public static <T> ConfiguredExecutable<T> configureExecutable(\r
- Executable<T> executable, Executable.ExecProvider provider)\r
+ public static <T> ConfiguredExecutable<T> configureExecutable(Executable<T> executable, Executable.ExecProvider provider)\r
throws JobSubmissionException {\r
if (executable == null) {\r
throw new InvalidParameterException("Executable must be provided!");\r
}\r
- ConfExecutable<T> confExec = new ConfExecutable<T>(executable,\r
- DirectoryManager.getTaskDirectory(executable.getClass()));\r
- if (provider == Executable.ExecProvider.Cluster\r
- && !IS_CLUSTER_ENGINE_ENABLED) {\r
- throw new JobSubmissionException(\r
- "Cluster engine is disabled or not configured!");\r
+ ConfExecutable<T> confExec = new ConfExecutable<T>(executable, DirectoryManager.getTaskDirectory(executable.getClass()));\r
+ if (provider == Executable.ExecProvider.Cluster && !IS_CLUSTER_ENGINE_ENABLED) {\r
+ throw new JobSubmissionException("Cluster engine is disabled or not configured!");\r
}\r
- if (provider == Executable.ExecProvider.Local\r
- && !IS_LOCAL_ENGINE_ENABLED) {\r
- throw new JobSubmissionException(\r
- "Local engine is disabled or not configured!");\r
+ if (provider == Executable.ExecProvider.Local && !IS_LOCAL_ENGINE_ENABLED) {\r
+ throw new JobSubmissionException("Local engine is disabled or not configured!");\r
}\r
confExec.setExecProvider(provider);\r
setupWorkDirectory(confExec, provider);\r
return confExec;\r
}\r
\r
- public static AsyncExecutor getAsyncEngine(\r
- ConfiguredExecutable<?> executable, Executable.ExecProvider provider) {\r
+ public static AsyncExecutor getAsyncEngine(ConfiguredExecutable<?> executable, Executable.ExecProvider provider) {\r
\r
assert provider != Executable.ExecProvider.Any && provider != null;\r
if (provider == Executable.ExecProvider.Cluster) {\r
- return new AsyncJobRunner();\r
+ return new AsyncClusterRunner();\r
}\r
return new AsyncLocalRunner();\r
}\r
\r
- public static SyncExecutor getSyncEngine(\r
- ConfiguredExecutable<?> executable, Executable.ExecProvider provider)\r
+ public static SyncExecutor getSyncEngine(ConfiguredExecutable<?> executable, Executable.ExecProvider provider)\r
throws JobSubmissionException {\r
\r
assert provider != Executable.ExecProvider.Any && provider != null;\r
if (provider == Executable.ExecProvider.Cluster) {\r
- return JobRunner.getInstance(executable);\r
+ return ClusterRunner.getInstance(executable);\r
}\r
return new LocalRunner(executable);\r
}\r
\r
- public static AsyncExecutor getAsyncEngine(\r
- ConfiguredExecutable<?> executable) {\r
+ public static AsyncExecutor getAsyncEngine(ConfiguredExecutable<?> executable) {\r
if (isTargetedForLocalExecution(executable)) {\r
return new AsyncLocalRunner();\r
}\r
- return new AsyncJobRunner();\r
+ return new AsyncClusterRunner();\r
}\r
\r
public static AsyncExecutor getAsyncEngine(String taskId) {\r
if (isLocal(taskId)) {\r
return new AsyncLocalRunner();\r
}\r
- return new AsyncJobRunner();\r
+ return new AsyncClusterRunner();\r
}\r
\r
- public static SyncExecutor getSyncEngine(ConfiguredExecutable<?> executable)\r
- throws JobSubmissionException {\r
+ public static SyncExecutor getSyncEngine(ConfiguredExecutable<?> executable) throws JobSubmissionException {\r
if (isTargetedForLocalExecution(executable)) {\r
return new LocalRunner(executable);\r
}\r
- return JobRunner.getInstance(executable);\r
+ return ClusterRunner.getInstance(executable);\r
}\r
\r
- static boolean isTargetedForLocalExecution(\r
- ConfiguredExecutable<?> executable) {\r
+ static boolean isTargetedForLocalExecution(ConfiguredExecutable<?> executable) {\r
// In the uncommon case that the cluster and local execution temporary\r
// directories are the same, in this case the method return true anyway\r
\r
* template gets deleted, this needs to be taken into account in this\r
* class design!\r
*/\r
-public class AsyncJobRunner implements AsyncExecutor {\r
+public class AsyncClusterRunner implements AsyncExecutor {\r
\r
- private static Logger log = Logger.getLogger(AsyncJobRunner.class);\r
+ private static Logger log = Logger.getLogger(AsyncClusterRunner.class);\r
\r
@Override\r
- public String submitJob(ConfiguredExecutable<?> executable)\r
- throws JobSubmissionException {\r
- JobRunner jr = new JobRunner(executable);\r
- jr.submitJob(); // ignore cluster job id as it could be retrieved from fs\r
+ public String submitJob(ConfiguredExecutable<?> executable) throws JobSubmissionException {\r
+ ClusterRunner jr = new ClusterRunner(executable);\r
+ jr.submitJob();\r
+ // ignore cluster job id as it could be retrieved from fs\r
return executable.getTaskId();\r
}\r
\r
@Override\r
public boolean cancelJob(String jobId) {\r
ClusterSession clustSession = ClusterSession.getInstance();\r
- return compbio.engine.cluster.drmaa.ClusterEngineUtil.cancelJob(jobId,\r
- clustSession);\r
+ return compbio.engine.cluster.drmaa.ClusterEngineUtil.cancelJob(jobId, clustSession);\r
}\r
\r
/*\r
- * This will never return clust.engine.JobStatus.CANCELLED as for sun grid engine \r
- * cancelled job is the same as failed. Cancelled jobs needs to be tracked manually!\r
+ * This will never return clust.engine.JobStatus.CANCELLED as for sun grid\r
+ * engine cancelled job is the same as failed. Cancelled jobs needs to be\r
+ * tracked manually!\r
*/\r
@Override\r
public compbio.metadata.JobStatus getJobStatus(String jobId) {\r
- return JobRunner.getJobStatus(jobId);\r
+ return ClusterRunner.getJobStatus(jobId);\r
}\r
\r
@Override\r
}\r
\r
@Override\r
- public ConfiguredExecutable<?> getResults(String jobId)\r
- throws ResultNotAvailableException {\r
-\r
+ public ConfiguredExecutable<?> getResults(String jobId) throws ResultNotAvailableException {\r
assert EngineUtil.isValidJobId(jobId);\r
-\r
ClusterSession csession = ClusterSession.getInstance();\r
ConfiguredExecutable<?> exec;\r
try {\r
* template gets deleted, this needs to be taken into account in this\r
* class design!\r
*/\r
-public class JobRunner implements SyncExecutor {\r
+public class ClusterRunner implements SyncExecutor {\r
\r
final JobTemplate jobtempl;\r
static ClusterSession clustSession = ClusterSession.getInstance();\r
static Session session = clustSession.getSession();\r
- static final Logger log = Logger.getLogger(JobRunner.class);\r
+ static final Logger log = Logger.getLogger(ClusterRunner.class);\r
final ConfiguredExecutable<?> confExecutable;\r
private final String workDirectory;\r
String jobId;\r
\r
- public JobRunner(ConfiguredExecutable<?> confExec)\r
+ public ClusterRunner(ConfiguredExecutable<?> confExec)\r
throws JobSubmissionException {\r
try {\r
String command = confExec.getCommand(ExecProvider.Cluster);\r
// Tell the job where to get/put things\r
jobtempl.setWorkingDirectory(this.workDirectory);\r
\r
- /*\r
- * Set environment variables for the process if any\r
- */\r
+ // Set environment variables for the process if any\r
Map<String, String> jobEnv = confExec.getEnvironment();\r
if (jobEnv != null && !jobEnv.isEmpty()) {\r
setJobEnvironmentVariables(jobEnv);\r
jobtempl.setArgs(args);\r
}\r
\r
- /*\r
- * If executable need in/out data to be piped into it\r
- */\r
+ //If executable need in/out data to be piped into it\r
if (confExec.getExecutable() instanceof PipedExecutable<?>) {\r
setPipes(confExec);\r
}\r
\r
- /*\r
- * If executable require special cluster configuration parameters to\r
- * be set e.g. queue, ram, time etc\r
- */\r
+ // If executable require special cluster configuration parameters to\r
+ // be set e.g. queue, ram, time etc\r
setNativeSpecs(confExec.getExecutable());\r
\r
-\r
log.trace("using arguments: " + jobtempl.getArgs());\r
this.confExecutable = confExec;\r
// Save run configuration\r
public ConfiguredExecutable<?> waitForResult() throws JobExecutionException {\r
ConfiguredExecutable<?> confExec;\r
try {\r
- confExec = new AsyncJobRunner().getResults(this.jobId);\r
+ confExec = new AsyncClusterRunner().getResults(this.jobId);\r
if (confExec == null) {\r
log.warn("Could not find results of job " + this.jobId);\r
}\r
return getJobStatus(this.jobId);\r
}\r
\r
- public static JobRunner getInstance(ConfiguredExecutable<?> executable)\r
+ public static ClusterRunner getInstance(ConfiguredExecutable<?> executable)\r
throws JobSubmissionException {\r
- return new JobRunner(executable);\r
+ return new ClusterRunner(executable);\r
}\r
\r
} // class end\r
\r
private static final Logger log = Logger.getLogger(ClusterSession.class);\r
\r
- private static final PropertyHelper ph = PropertyHelperManager\r
- .getPropertyHelper();\r
+ private static final PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
\r
public static final String JOBID = "JOBID";\r
// TaskId (getTaskDirectory()) -> ConfiguredExecutable<?> map\r
// private static BufferedWriter tasks;\r
\r
private ClusterSession() {\r
- log.debug("Initializing session "\r
- + Util.datef.format(Calendar.getInstance().getTime()));\r
+ log.debug("Initializing session " + Util.datef.format(Calendar.getInstance().getTime()));\r
SessionFactory factory = SessionFactory.getFactory();\r
session = factory.getSession();\r
sContact = session.getContact();\r
if (open) {\r
session.exit();\r
open = false;\r
- log.debug("Closing the session at: "\r
- + Util.datef.format(Calendar.getInstance().getTime()));\r
+ log.debug("Closing the session at: " + Util.datef.format(Calendar.getInstance().getTime()));\r
}\r
} catch (DrmaaException dre) {\r
// Cannot recover at this point, just log\r
return waitForJob(taskId, Session.TIMEOUT_WAIT_FOREVER);\r
}\r
\r
- public static ClusterJobId getClusterJobId(String taskId)\r
- throws IOException {\r
+ public static ClusterJobId getClusterJobId(String taskId) throws IOException {\r
Job job = Job.getByTaskId(taskId, jobs);\r
if (job != null) {\r
return job.getJobId();\r
String workDir = compbio.engine.Configurator.getWorkDirectory(taskId);\r
assert !Util.isEmpty(workDir);\r
File file = new File(workDir, JOBID);\r
- log.debug("Looking up cluster jobid by the task id " + taskId\r
- + " File path is " + file.getAbsolutePath());\r
+ log.debug("Looking up cluster jobid by the task id " + taskId + " File path is " + file.getAbsolutePath());\r
assert file.exists();\r
return new ClusterJobId(FileUtil.readFileToString(file));\r
}\r
\r
- public JobInfo waitForJob(String jobId, long waitingTime)\r
- throws DrmaaException, IOException {\r
+ public JobInfo waitForJob(String jobId, long waitingTime) throws DrmaaException, IOException {\r
ClusterJobId cjobId = getClusterJobId(jobId);\r
JobInfo status = session.wait(cjobId.getJobId(), waitingTime);\r
// Once the job has been waited for it will be finished\r
}\r
}\r
\r
- public ConfiguredExecutable<?> getResults(String taskId)\r
- throws DrmaaException, ResultNotAvailableException {\r
+ public ConfiguredExecutable<?> getResults(String taskId) throws DrmaaException, ResultNotAvailableException {\r
\r
EngineUtil.isValidJobId(taskId);\r
try {\r
JobInfo status = waitForJob(taskId);\r
} catch (InvalidJobException e) {\r
// Its OK to continue, the job may have already completed normally\r
- log.warn("Could not find the cluster job with id " + taskId\r
- + " perhaps it has completed", e.getCause());\r
+ log.warn("Could not find the cluster job with id " + taskId + " perhaps it has completed", e.getCause());\r
} catch (IOException e) {\r
- log.error("Could not read JOBID file for the job " + taskId\r
- + " Message " + e.getLocalizedMessage(), e.getCause());\r
+ log.error("Could not read JOBID file for the job " + taskId + " Message " + e.getLocalizedMessage(),\r
+ e.getCause());\r
}\r
// Once the job has been waited for it will be finished\r
// Next time it will not be found in the session, so removed from the\r
exec = EngineUtil.loadExecutable(taskId);\r
}\r
if (exec != null) {\r
- EngineUtil.writeMarker(exec.getWorkDirectory(),\r
- JobStatus.COLLECTED);\r
+ EngineUtil.writeMarker(exec.getWorkDirectory(), JobStatus.COLLECTED);\r
}\r
return exec;\r
}\r
\r
- public static StatisticManager getStatistics(JobInfo status)\r
- throws DrmaaException {\r
+ public static StatisticManager getStatistics(JobInfo status) throws DrmaaException {\r
return new StatisticManager(status);\r
}\r
\r
* if the job is no longer in the queue or running. basically it\r
* will throw this exception for all finished or cancelled jobs\r
*/\r
- public int getJobStatus(ClusterJobId jobId) throws DrmaaException,\r
- InvalidJobException {\r
+ public int getJobStatus(ClusterJobId jobId) throws DrmaaException, InvalidJobException {\r
return session.getJobProgramStatus(jobId.getJobId());\r
}\r
\r
public static String getJobStatus(final int status) throws DrmaaException {\r
String statusString = null;\r
switch (status) {\r
- case Session.UNDETERMINED :\r
- statusString = "Job status cannot be determined\n";\r
- break;\r
- case Session.QUEUED_ACTIVE :\r
- statusString = "Job is queued and active\n";\r
- break;\r
- case Session.SYSTEM_ON_HOLD :\r
- statusString = "Job is queued and in system hold\n";\r
- break;\r
- case Session.USER_ON_HOLD :\r
- statusString = "Job is queued and in user hold\n";\r
- break;\r
- case Session.USER_SYSTEM_ON_HOLD :\r
- statusString = "Job is queued and in user and system hold\n";\r
- break;\r
- case Session.RUNNING :\r
- statusString = "Job is running\n";\r
- break;\r
- case Session.SYSTEM_SUSPENDED :\r
- statusString = "Job is system suspended\n";\r
- break;\r
- case Session.USER_SUSPENDED :\r
- statusString = "Job is user suspended\n";\r
- break;\r
- case Session.USER_SYSTEM_SUSPENDED :\r
- statusString = "Job is user and system suspended\n";\r
- break;\r
- case Session.DONE :\r
- statusString = "Job finished normally\n";\r
- break;\r
- case Session.FAILED :\r
- statusString = "Job finished, but failed\n";\r
- break;\r
+ case Session.UNDETERMINED:\r
+ statusString = "Job status cannot be determined\n";\r
+ break;\r
+ case Session.QUEUED_ACTIVE:\r
+ statusString = "Job is queued and active\n";\r
+ break;\r
+ case Session.SYSTEM_ON_HOLD:\r
+ statusString = "Job is queued and in system hold\n";\r
+ break;\r
+ case Session.USER_ON_HOLD:\r
+ statusString = "Job is queued and in user hold\n";\r
+ break;\r
+ case Session.USER_SYSTEM_ON_HOLD:\r
+ statusString = "Job is queued and in user and system hold\n";\r
+ break;\r
+ case Session.RUNNING:\r
+ statusString = "Job is running\n";\r
+ break;\r
+ case Session.SYSTEM_SUSPENDED:\r
+ statusString = "Job is system suspended\n";\r
+ break;\r
+ case Session.USER_SUSPENDED:\r
+ statusString = "Job is user suspended\n";\r
+ break;\r
+ case Session.USER_SYSTEM_SUSPENDED:\r
+ statusString = "Job is user and system suspended\n";\r
+ break;\r
+ case Session.DONE:\r
+ statusString = "Job finished normally\n";\r
+ break;\r
+ case Session.FAILED:\r
+ statusString = "Job finished, but failed\n";\r
+ break;\r
}\r
return statusString;\r
}\r
public final class ExecutableWrapper implements\r
Callable<ConfiguredExecutable<?>> {\r
\r
- public static final String PROC_IN_FILE = "procInput.txt";\r
- public static final String PROC_OUT_FILE = "procOutput.txt";\r
- public static final String PROC_ERR_FILE = "procError.txt";\r
-\r
- private static ExecutorService es;\r
- private final ConfiguredExecutable<?> confExec;\r
- private final ProcessBuilder pbuilder;\r
-\r
- private static final Logger log = Logger.getLogger(ExecutableWrapper.class);\r
-\r
- public ExecutableWrapper(ConfiguredExecutable<?> executable, String workDirectory) throws JobSubmissionException {\r
- this.confExec = executable;\r
- String cmd = null;\r
- try {\r
- cmd = executable.getCommand(ExecProvider.Local);\r
- PathValidator.validateExecutable(cmd);\r
- } catch (IllegalArgumentException e) {\r
- log.error(e.getMessage(), e.getCause());\r
- throw new JobSubmissionException(e);\r
- }\r
- List<String> params = executable.getParameters().getCommands();\r
- params.add(0, cmd);\r
-\r
- pbuilder = new ProcessBuilder(params);\r
- if (executable.getEnvironment() != null) {\r
- log.debug("Setting command environment variables: " + pbuilder.environment());\r
- EngineUtil.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment());\r
- log.debug("Process environment:" + pbuilder.environment());\r
- }\r
- log.debug("Setting command: " + pbuilder.command());\r
- PathValidator.validateDirectory(workDirectory);\r
- pbuilder.directory(new File(workDirectory));\r
- log.debug("Current working directory is " + SysPrefs.getCurrentDirectory());\r
- log.debug("Setting working directory: " + workDirectory);\r
- // Initialize private executor to dump processes output if any to the file system\r
- synchronized (log) {\r
- if (es == null) {\r
- /* \r
- * Two threads are necessary for the process to write in two streams error and output\r
- * simultaneously and hold the stream until exit. If only one thread is used, the\r
- * second stream may never get access to the thread efficiently deadlocking the proccess!\r
- */\r
+ public static final String PROC_IN_FILE = "procInput.txt";\r
+ public static final String PROC_OUT_FILE = "procOutput.txt";\r
+ public static final String PROC_ERR_FILE = "procError.txt";\r
+\r
+ private static ExecutorService es;\r
+ private final ConfiguredExecutable<?> confExec;\r
+ private final ProcessBuilder pbuilder;\r
+\r
+ private static final Logger log = Logger.getLogger(ExecutableWrapper.class);\r
+\r
+ public ExecutableWrapper(ConfiguredExecutable<?> executable, String workDirectory) throws JobSubmissionException {\r
+ this.confExec = executable;\r
+ String cmd = null;\r
+ try {\r
+ cmd = executable.getCommand(ExecProvider.Local);\r
+ PathValidator.validateExecutable(cmd);\r
+ } catch (IllegalArgumentException e) {\r
+ log.error(e.getMessage(), e.getCause());\r
+ throw new JobSubmissionException(e);\r
+ }\r
+ List<String> params = executable.getParameters().getCommands();\r
+ params.add(0, cmd);\r
+\r
+ pbuilder = new ProcessBuilder(params);\r
+ if (executable.getEnvironment() != null) {\r
+ log.debug("Setting command environment variables: " + pbuilder.environment());\r
+ EngineUtil.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment());\r
+ log.debug("Process environment:" + pbuilder.environment());\r
+ }\r
+ log.debug("Setting command: " + pbuilder.command());\r
+ PathValidator.validateDirectory(workDirectory);\r
+ pbuilder.directory(new File(workDirectory));\r
+ log.debug("Current working directory is " + SysPrefs.getCurrentDirectory());\r
+ log.debug("Setting working directory: " + workDirectory);\r
+ // Initialize private executor to dump processes output if any to the file system\r
+ synchronized (log) {\r
+ if (es == null) {\r
+ /* \r
+ * Two threads are necessary for the process to write in two streams error and output\r
+ * simultaneously and hold the stream until exit. If only one thread is used, the\r
+ * second stream may never get access to the thread efficiently deadlocking the proccess!\r
+ */\r
this.es = Executors.newCachedThreadPool();\r
log.debug("Initializing executor for local processes output dump");\r
// Make sure that the executors are going to be properly closed\r
shutdownService();\r
}\r
});\r
+ }\r
}\r
}\r
- }\r
-\r
- /**\r
- * Stops internal executor service which captures streams of native\r
- * executables. This method is intended for stopping service if deployed in\r
- * the web application content. There is NO NEED of using this method\r
- * otherwise as the executor service is taken care of internally.\r
- */\r
- public static final void shutdownService() {\r
- if (es != null) {\r
- es.shutdownNow();\r
+\r
+ /**\r
+ * Stops internal executor service which captures streams of native\r
+ * executables. This method is intended for stopping service if deployed in\r
+ * the web application content. There is NO NEED of using this method\r
+ * otherwise as the executor service is taken care of internally.\r
+ */\r
+ public static final void shutdownService() {\r
+ if (es != null) {\r
+ es.shutdownNow();\r
+ }\r
}\r
- }\r
-\r
- /**\r
- * It is vital that output and error streams are captured immediately for\r
- * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its\r
- * own thread ready to capture the output. If executor could not execute\r
- * capture immediately this could lead to the call method to stale, as\r
- * execution could not proceed without output being captured. Every call to\r
- * call() method will use 2 threads\r
- * @throws JobSubmissionException \r
- */\r
- @Override\r
- public ConfiguredExecutable<?> call() throws IOException {\r
- Process proc = null;\r
- Future<?> errorf = null;\r
- Future<?> outputf = null;\r
- PrintStream errorStream = null;\r
- PrintStream outStream = null;\r
- PrintStream comStream = null;\r
-\r
- try {\r
- log.info("Calculation started at " + System.nanoTime());\r
- EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString());\r
- proc = pbuilder.start();\r
-\r
- // store input command and program environment\r
- comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE));\r
- comStream.append("# program command\n");\r
- for (String par : pbuilder.command()) {\r
+\r
+ /**\r
+ * It is vital that output and error streams are captured immediately for\r
+ * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its\r
+ * own thread ready to capture the output. If executor could not execute\r
+ * capture immediately this could lead to the call method to stale, as\r
+ * execution could not proceed without output being captured. Every call to\r
+ * call() method will use 2 threads\r
+ * @throws JobSubmissionException \r
+ */\r
+ @Override\r
+ public ConfiguredExecutable<?> call() throws IOException {\r
+ Process proc = null;\r
+ Future<?> errorf = null;\r
+ Future<?> outputf = null;\r
+ PrintStream errorStream = null;\r
+ PrintStream outStream = null;\r
+ PrintStream comStream = null;\r
+\r
+ try {\r
+ log.info("Calculation started at " + System.nanoTime());\r
+ EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString());\r
+ proc = pbuilder.start();\r
+\r
+ // store input command and program environment\r
+ comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE));\r
+ comStream.append("# program command\n");\r
+ for (String par : pbuilder.command()) {\r
comStream.append(par + " ");\r
- }\r
- comStream.append("\n\n# program environment\n");\r
- for (Entry<String, String> var : pbuilder.environment().entrySet()) {\r
- comStream.append(var.getKey() + " =\t" + var.getValue() + "\n");\r
- }\r
- comStream.close();\r
-\r
- // any error message?\r
- errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError()));\r
- StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR);\r
-\r
- // any output?\r
- outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput()));\r
- StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT);\r
-\r
- // kick it off\r
- errorf = es.submit(errorGobbler);\r
- outputf = es.submit(outputGobbler);\r
-\r
- // any error???\r
- int exitVal = proc.waitFor();\r
- log.info("Calculation completed at " + System.nanoTime());\r
- EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString());\r
- // Let streams to write for a little more\r
- errorf.get(2, TimeUnit.SECONDS);\r
- outputf.get(2, TimeUnit.SECONDS);\r
-\r
- // Close streams\r
- errorStream.close();\r
- outStream.close();\r
- log.debug("Local process exit value: " + exitVal);\r
- } catch (ExecutionException e) {\r
- // Log and ignore this is not important\r
- log.trace("Native Process output threw exception: " + e.getMessage());\r
- } catch (TimeoutException e) {\r
- // Log and ignore this is not important\r
- log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage());\r
- } catch (InterruptedException e) {\r
- log.error("Native Process was interrupted aborting: " + e.getMessage());\r
- proc.destroy();\r
- errorf.cancel(true);\r
- outputf.cancel(true);\r
- // restore interruption status\r
- Thread.currentThread().interrupt();\r
- } finally {\r
- // just to make sure that we do not left anything running\r
- if (proc != null) {\r
+ }\r
+ comStream.append("\n\n# program environment\n");\r
+ for (Entry<String, String> var : pbuilder.environment().entrySet()) {\r
+ comStream.append(var.getKey() + " =\t" + var.getValue() + "\n");\r
+ }\r
+ comStream.close();\r
+\r
+ // any error message?\r
+ errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError()));\r
+ StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR);\r
+\r
+ // any output?\r
+ outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput()));\r
+ StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT);\r
+\r
+ // kick it off\r
+ errorf = es.submit(errorGobbler);\r
+ outputf = es.submit(outputGobbler);\r
+\r
+ // any error???\r
+ int exitVal = proc.waitFor();\r
+ //proc.getClass();\r
+ log.info("Calculation completed at " + System.nanoTime());\r
+ EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString());\r
+\r
+ // Let streams to write for a little more\r
+ errorf.get(2, TimeUnit.SECONDS);\r
+ outputf.get(2, TimeUnit.SECONDS);\r
+\r
+ // Close streams\r
+ errorStream.close();\r
+ outStream.close();\r
+ log.debug("Local process exit value: " + exitVal);\r
+ } catch (ExecutionException e) {\r
+ // Log and ignore this is not important\r
+ log.trace("Native Process output threw exception: " + e.getMessage());\r
+ } catch (TimeoutException e) {\r
+ // Log and ignore this is not important\r
+ log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage());\r
+ } catch (InterruptedException e) {\r
+ log.error("Native Process was interrupted aborting: " + e.getMessage());\r
+ System.err.println("Native Process was interrupted aborting: " + e.getMessage());\r
proc.destroy();\r
- }\r
- if (errorf != null) {\r
errorf.cancel(true);\r
- }\r
- if (outputf != null) {\r
outputf.cancel(true);\r
+ // restore interruption status\r
+ Thread.currentThread().interrupt();\r
+ } finally {\r
+ // just to make sure that we do not left anything running\r
+ if (proc != null) {\r
+ proc.destroy();\r
+ }\r
+ if (errorf != null) {\r
+ errorf.cancel(true);\r
+ }\r
+ if (outputf != null) {\r
+ outputf.cancel(true);\r
+ }\r
+ FileUtil.closeSilently(log, errorStream);\r
+ FileUtil.closeSilently(log, outStream);\r
}\r
- FileUtil.closeSilently(log, errorStream);\r
- FileUtil.closeSilently(log, outStream);\r
+ return confExec;\r
}\r
- return confExec;\r
- }\r
\r
- private String getOutput() {\r
- if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
- return confExec.getOutput();\r
+ private String getOutput() {\r
+ if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
+ return confExec.getOutput();\r
+ }\r
+ return PROC_OUT_FILE;\r
}\r
- return PROC_OUT_FILE;\r
- }\r
\r
- private String getError() {\r
- if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
- return confExec.getError();\r
+ private String getError() {\r
+ if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
+ return confExec.getError();\r
+ }\r
+ return PROC_ERR_FILE;\r
}\r
- return PROC_ERR_FILE;\r
- }\r
+\r
}\r
try {\r
ConfiguredExecutable<ClustalW> clw = Configurator\r
.configureExecutable(cl, Executable.ExecProvider.Cluster);\r
- JobRunner jr = JobRunner.getInstance(clw);\r
+ ClusterRunner jr = ClusterRunner.getInstance(clw);\r
String jobId = jr.submitJob();\r
ClusterSession cs = ClusterSession.getInstance();\r
// this only holds for sequential execution\r
try {\r
ConfiguredExecutable<ClustalW> confClustal = Configurator\r
.configureExecutable(clustal);\r
- AsyncExecutor runner = new AsyncJobRunner();\r
+ AsyncExecutor runner = new AsyncClusterRunner();\r
assertNotNull("Runner is NULL", runner);\r
String jobId = runner.submitJob(confClustal);\r
assertEquals("Input was not set!", test_input, clustal.getInput());\r
try {\r
ConfiguredExecutable<ClustalW> confClustal = Configurator\r
.configureExecutable(clustal);\r
- AsyncJobRunner runner = new AsyncJobRunner();\r
+ AsyncClusterRunner runner = new AsyncClusterRunner();\r
String jobId = runner.submitJob(confClustal);\r
assertNotNull("Runner is NULL", runner);\r
// assertNotNull("JobId is null", jobId1);\r
clustal.setInput(test_input).setOutput(cluster_test_outfile);\r
\r
try {\r
- AsyncJobRunner runner = new AsyncJobRunner();\r
+ AsyncClusterRunner runner = new AsyncClusterRunner();\r
ConfiguredExecutable<ClustalW> confClustal = Configurator\r
.configureExecutable(clustal);\r
String jobId = runner.submitJob(confClustal);\r
assertNotNull("Runner is NULL", runner);\r
- AsyncJobRunner runner2 = new AsyncJobRunner();\r
+ AsyncClusterRunner runner2 = new AsyncClusterRunner();\r
\r
boolean hasRun = false;\r
boolean hasPended = false;\r
// immediately\r
// the status could be UNDEFINED!\r
// assertFalse(hasUndefined);\r
- AsyncJobRunner runner3 = new AsyncJobRunner();\r
+ AsyncClusterRunner runner3 = new AsyncClusterRunner();\r
Executable<?> exec = runner3.getResults(jobId);\r
assertNotNull(exec);\r
// Now try collecting result for the second time\r
Executable.ExecProvider.Cluster);\r
assertNotNull(confClust.getWorkDirectory());\r
\r
- JobRunner runner = JobRunner.getInstance(confClust);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confClust);\r
assertEquals("Input was not set!", test_input, clustal.getInput());\r
assertNotNull("Runner is NULL", runner);\r
runner.executeJob();\r
Executable.ExecProvider.Cluster);\r
assertNotNull(confClust.getWorkDirectory());\r
\r
- JobRunner runner = JobRunner.getInstance(confClust);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confClust);\r
assertNotNull("Runner is NULL", runner);\r
\r
runner.executeJob();\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.cluster.drmaa.ClusterEngineUtil;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.cluster.drmaa.StatisticManager;\r
import compbio.engine.local.LocalRunner;\r
import compbio.metadata.AllTestSuit;\r
assertFalse(SysPrefs.isWindows, "Cluster execution can only be in unix environment");\r
try {\r
ConfiguredExecutable<AACon> confAAcon = Configurator.configureExecutable(aacon, Executable.ExecProvider.Cluster);\r
- JobRunner runner = JobRunner.getInstance(confAAcon);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confAAcon);\r
\r
assertNotNull(runner, "Runner is NULL");\r
runner.executeJob();\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.cluster.drmaa.ClusterEngineUtil;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.cluster.drmaa.StatisticManager;\r
import compbio.engine.local.LocalRunner;\r
import compbio.metadata.AllTestSuit;\r
ConfiguredExecutable<Disembl> confDisembl = Configurator\r
.configureExecutable(disembl,\r
Executable.ExecProvider.Cluster);\r
- JobRunner runner = JobRunner.getInstance(confDisembl);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confDisembl);\r
\r
assertNotNull(runner, "Runner is NULL");\r
runner.executeJob();\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.cluster.drmaa.ClusterEngineUtil;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.cluster.drmaa.StatisticManager;\r
import compbio.engine.local.LocalRunner;\r
import compbio.metadata.AllTestSuit;\r
ConfiguredExecutable<GlobPlot> confGlobPlot = Configurator\r
.configureExecutable(globprot,\r
Executable.ExecProvider.Cluster);\r
- JobRunner runner = JobRunner.getInstance(confGlobPlot);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confGlobPlot);\r
\r
assertNotNull(runner, "Runner is NULL");\r
runner.executeJob();\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.cluster.drmaa.ClusterEngineUtil;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.cluster.drmaa.StatisticManager;\r
import compbio.engine.local.LocalRunner;\r
import compbio.metadata.AllTestSuit;\r
ConfiguredExecutable<IUPred> confIUPred = Configurator\r
.configureExecutable(iupred,\r
Executable.ExecProvider.Cluster);\r
- JobRunner runner = JobRunner.getInstance(confIUPred);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confIUPred);\r
\r
assertNotNull(runner, "Runner is NULL");\r
runner.executeJob();\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.cluster.drmaa.ClusterEngineUtil;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.cluster.drmaa.StatisticManager;\r
import compbio.engine.local.LocalRunner;\r
import compbio.metadata.AllTestSuit;\r
assertFalse(SysPrefs.isWindows, "Cluster execution can only be in unix environment");\r
try {\r
ConfiguredExecutable<Jronn> confJronn = Configurator.configureExecutable(jronn, Executable.ExecProvider.Cluster);\r
- JobRunner runner = JobRunner.getInstance(confJronn);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confJronn);\r
\r
assertNotNull(runner, "Runner is NULL");\r
runner.executeJob();\r
import compbio.engine.client.Executable.ExecProvider;\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.cluster.drmaa.ClusterEngineUtil;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.cluster.drmaa.StatisticManager;\r
import compbio.engine.conf.RunnerConfigMarshaller;\r
import compbio.engine.local.AsyncLocalRunner;\r
clustal.setInput(AllTestSuit.test_input).setOutput(cluster_test_outfile);\r
try {\r
ConfiguredExecutable<ClustalO> confClustal = Configurator.configureExecutable(clustal);\r
- JobRunner runner = JobRunner.getInstance(confClustal);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confClustal);\r
// ClusterSession csession = JobRunner.getSession();\r
assertNotNull(runner);\r
runner.executeJob();\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.client.Executable.ExecProvider;\r
import compbio.engine.cluster.drmaa.ClusterEngineUtil;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.cluster.drmaa.StatisticManager;\r
import compbio.engine.conf.RunnerConfigMarshaller;\r
import compbio.engine.local.AsyncLocalRunner;\r
\r
try {\r
ConfiguredExecutable<ClustalW> confClustal = Configurator.configureExecutable(clustal);\r
- JobRunner runner = JobRunner.getInstance(confClustal);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confClustal);\r
// ClusterSession csession = JobRunner.getSession();\r
assertNotNull(runner);\r
runner.executeJob();\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.client.Executable.ExecProvider;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.metadata.AllTestSuit;\r
import compbio.metadata.ChunkHolder;\r
import compbio.metadata.JobExecutionException;\r
\r
ConfiguredExecutable<Mafft> cmafft = Configurator\r
.configureExecutable(mafft, Executable.ExecProvider.Cluster);\r
- JobRunner sexecutor = (JobRunner) Configurator.getSyncEngine(\r
+ ClusterRunner sexecutor = (ClusterRunner) Configurator.getSyncEngine(\r
cmafft, Executable.ExecProvider.Cluster);\r
sexecutor.executeJob();\r
ConfiguredExecutable<?> al = sexecutor.waitForResult();\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.cluster.drmaa.ClusterEngineUtil;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.cluster.drmaa.StatisticManager;\r
import compbio.engine.local.LocalRunner;\r
import compbio.metadata.AllTestSuit;\r
ConfiguredExecutable<Muscle> confMuscle = Configurator\r
.configureExecutable(muscle,\r
Executable.ExecProvider.Cluster);\r
- JobRunner runner = JobRunner.getInstance(confMuscle);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confMuscle);\r
\r
assertNotNull(runner, "Runner is NULL");\r
runner.executeJob();\r
import compbio.engine.client.ConfiguredExecutable;\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.metadata.AllTestSuit;\r
import compbio.metadata.ChunkHolder;\r
import compbio.metadata.JobExecutionException;\r
try {\r
ConfiguredExecutable<Probcons> cmafft = Configurator\r
.configureExecutable(probc, Executable.ExecProvider.Cluster);\r
- JobRunner sexecutor = (JobRunner) Configurator.getSyncEngine(\r
+ ClusterRunner sexecutor = (ClusterRunner) Configurator.getSyncEngine(\r
cmafft, Executable.ExecProvider.Cluster);\r
sexecutor.executeJob();\r
ConfiguredExecutable<?> al = sexecutor.waitForResult();\r
import compbio.engine.client.ConfiguredExecutable;\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.local.LocalRunner;\r
import compbio.metadata.AllTestSuit;\r
import compbio.metadata.ChunkHolder;\r
public void RunOnCluster() {\r
try {\r
ConfiguredExecutable<Tcoffee> cmafft = Configurator.configureExecutable(tcoffee, Executable.ExecProvider.Cluster);\r
- JobRunner sexecutor = (JobRunner) Configurator.getSyncEngine(cmafft, Executable.ExecProvider.Cluster);\r
+ ClusterRunner sexecutor = (ClusterRunner) Configurator.getSyncEngine(cmafft, Executable.ExecProvider.Cluster);\r
sexecutor.executeJob();\r
ConfiguredExecutable<?> al = sexecutor.waitForResult();\r
Alignment align = al.getResults();\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.cluster.drmaa.ClusterEngineUtil;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.cluster.drmaa.StatisticManager;\r
import compbio.engine.local.LocalRunner;\r
import compbio.metadata.AllTestSuit;\r
ConfiguredExecutable<Jpred> confpred = Configurator.configureExecutable(pred, Executable.ExecProvider.Cluster);\r
Preset<Jpred> conf = jpredPreset.getPresetByName("cluster configuration");\r
confpred.addParameters(conf.getOptions());\r
- JobRunner runner = JobRunner.getInstance(confpred);\r
+ ClusterRunner runner = ClusterRunner.getInstance(confpred);\r
assertNotNull(runner, "Runner is NULL");\r
\r
runner.executeJob();\r
import compbio.engine.client.Executable;\r
import compbio.engine.client.RunConfiguration;\r
import compbio.engine.cluster.drmaa.ClusterEngineUtil;\r
-import compbio.engine.cluster.drmaa.JobRunner;\r
+import compbio.engine.cluster.drmaa.ClusterRunner;\r
import compbio.engine.cluster.drmaa.StatisticManager;\r
import compbio.engine.local.AsyncLocalRunner;\r
import compbio.engine.local.LocalExecutorService;\r
import compbio.data.sequence.SequenceUtil;\r
import compbio.data.sequence.UnknownFileFormatException;\r
import compbio.metadata.JobSubmissionException;\r
+import compbio.metadata.JobStatus;\r
import compbio.metadata.Option;\r
import compbio.metadata.Limit;\r
import compbio.metadata.Preset;\r
}\r
System.out.println("\n\rcalling predictor.........");\r
Thread.sleep(100);\r
+ JobStatus status = wsproxy.getJobStatus(jobId);\r
+ System.out.println("\njob " + jobId + " status: " + status);\r
scores = wsproxy.getAnnotation(jobId);\r
} catch (JobSubmissionException e) {\r
System.err.println("Exception while submitting job to a web server. Exception details are below:");\r
jobId = msaws.align(fastalist);\r
}\r
System.out.println("\ncalling program.........");\r
- Thread.sleep(100);\r
+ long startTime = System.nanoTime();\r
+ while (JobStatus.RUNNING == msaws.getJobStatus(jobId)) {\r
+ Thread.sleep(1000);\r
+ long endTime = System.nanoTime();\r
+ System.out.println("job " + jobId + " time executing: "+ (endTime - startTime) / 1000000 +" msec, status: " + msaws.getJobStatus(jobId));\r
+ }\r
alignment = msaws.getResult(jobId);\r
} catch (IOException e) {\r
System.err.println("Exception while reading the input file. Check that the input file is a FASTA file! "\r
private static final LimitsManager<Mafft> limitMan = EngineUtil.getLimits(new Mafft().getType());\r
\r
@Override\r
- public String align(List<FastaSequence> sequences)\r
- throws JobSubmissionException {\r
+ public String align(List<FastaSequence> sequences) throws JobSubmissionException {\r
WSUtil.validateFastaInput(sequences);\r
ConfiguredExecutable<Mafft> confMafft = init(sequences);\r
return WSUtil.align(sequences, confMafft, log, "align", getLimit(""));\r
}\r
\r
- ConfiguredExecutable<Mafft> init(List<FastaSequence> dataSet)\r
- throws JobSubmissionException {\r
+ ConfiguredExecutable<Mafft> init(List<FastaSequence> dataSet) throws JobSubmissionException {\r
Mafft mafft = new Mafft();\r
mafft.setInput(SkeletalExecutable.INPUT);\r
mafft.setOutput(SkeletalExecutable.OUTPUT);\r
}\r
\r
@Override\r
- public String customAlign(List<FastaSequence> sequences,\r
- List<Option<Mafft>> options) throws JobSubmissionException,\r
+ public String customAlign(List<FastaSequence> sequences, List<Option<Mafft>> options) throws JobSubmissionException,\r
WrongParameterException {\r
WSUtil.validateFastaInput(sequences);\r
ConfiguredExecutable<Mafft> confMafft = init(sequences);\r
}\r
\r
@Override\r
- public String presetAlign(List<FastaSequence> sequences,\r
- Preset<Mafft> preset) throws JobSubmissionException,\r
- WrongParameterException {\r
+ public String presetAlign(List<FastaSequence> sequences, Preset<Mafft> preset) throws JobSubmissionException, WrongParameterException {\r
WSUtil.validateFastaInput(sequences);\r
if (preset == null) {\r
throw new WrongParameterException("Preset must be provided!");\r
public Alignment getResult(String jobId) throws ResultNotAvailableException {\r
WSUtil.validateJobId(jobId);\r
AsyncExecutor asyncEngine = Configurator.getAsyncEngine(jobId);\r
- ConfiguredExecutable<Mafft> mafft = (ConfiguredExecutable<Mafft>) asyncEngine\r
- .getResults(jobId);\r
+ ConfiguredExecutable<Mafft> mafft = (ConfiguredExecutable<Mafft>) asyncEngine.getResults(jobId);\r
Alignment al = mafft.getResults();\r
- return new Alignment (al.getSequences(), Program.Mafft, '-');\r
+ return new Alignment(al.getSequences(), Program.Mafft, '-');\r
}\r
\r
@Override\r
@Override\r
public ChunkHolder pullExecStatistics(String jobId, long position) {\r
WSUtil.validateJobId(jobId);\r
- String file = Configurator.getWorkDirectory(jobId) + File.separator\r
- + new Mafft().getError();\r
+ String file = Configurator.getWorkDirectory(jobId) + File.separator + new Mafft().getError();\r
return WSUtil.pullFile(file, position);\r
}\r
\r
\r
@SuppressWarnings("unchecked")\r
public String analize(List<FastaSequence> sequences)\r
- throws UnsupportedRuntimeException, LimitExceededException,\r
- JobSubmissionException {\r
+ throws UnsupportedRuntimeException, LimitExceededException, JobSubmissionException {\r
WSUtil.validateFastaInput(sequences);\r
ConfiguredExecutable<T> confIUPred = init(sequences);\r
return WSUtil.analize(sequences, confIUPred, log, "analize", getLimit(""));\r
}\r
\r
public String presetAnalize(List<FastaSequence> sequences, Preset<T> preset)\r
- throws UnsupportedRuntimeException, LimitExceededException,\r
- JobSubmissionException, WrongParameterException {\r
+ throws UnsupportedRuntimeException, LimitExceededException, JobSubmissionException, WrongParameterException {\r
WSUtil.validateAAConInput(sequences);\r
if (preset == null) {\r
throw new WrongParameterException("Preset must be provided!");\r
confAAcon.addParameters(preset.getOptions());\r
@SuppressWarnings("unchecked")\r
Limit<T> limit = getLimit(preset.getName());\r
- return WSUtil\r
- .analize(sequences, confAAcon, log, "presetAnalize", limit);\r
+ return WSUtil.analize(sequences, confAAcon, log, "presetAnalize", limit);\r
}\r
}\r
\r
import compbio.data.sequence.FastaSequence;\r
import compbio.data.sequence.ScoreManager;\r
+import compbio.runner.RunnerUtil;\r
import compbio.engine.AsyncExecutor;\r
import compbio.engine.Configurator;\r
import compbio.engine.ProgressGetter;\r
if (limit != null && limit.isExceeded(sequences)) {\r
throw LimitExceededException.newLimitExceeded(limit, sequences);\r
}\r
- compbio.runner.RunnerUtil.writeInput(sequences, confExec);\r
+ RunnerUtil.writeInput(sequences, confExec);\r
AsyncExecutor engine = Configurator.getAsyncEngine(confExec);\r
String jobId = engine.submitJob(confExec);\r
reportUsage(confExec, logger);\r
if (limit != null && limit.isExceeded(sequences)) {\r
throw LimitExceededException.newLimitExceeded(limit, sequences);\r
}\r
- compbio.runner.RunnerUtil.writeInput(sequences, confExec);\r
+ RunnerUtil.writeInput(sequences, confExec);\r
AsyncExecutor engine = Configurator.getAsyncEngine(confExec);\r
String jobId = engine.submitJob(confExec);\r
reportUsage(confExec, log);\r
if (limit != null && limit.isExceeded(sequences)) {\r
throw LimitExceededException.newLimitExceeded(limit, sequences);\r
}\r
- compbio.runner.RunnerUtil.writeClustalInput(sequences, confExec, '-');\r
+ RunnerUtil.writeClustalInput(sequences, confExec, '-');\r
AsyncExecutor engine = Configurator.getAsyncEngine(confExec);\r
String jobId = engine.submitJob(confExec);\r
reportUsage(confExec, log);\r