public final class ExecutableWrapper implements\r
Callable<ConfiguredExecutable<?>> {\r
\r
- public static final String PROC_IN_FILE = "procInput.txt";\r
- public static final String PROC_OUT_FILE = "procOutput.txt";\r
- public static final String PROC_ERR_FILE = "procError.txt";\r
-\r
- private static ExecutorService es;\r
- private final ConfiguredExecutable<?> confExec;\r
- private final ProcessBuilder pbuilder;\r
-\r
- private static final Logger log = Logger.getLogger(ExecutableWrapper.class);\r
-\r
- public ExecutableWrapper(ConfiguredExecutable<?> executable, String workDirectory) throws JobSubmissionException {\r
- this.confExec = executable;\r
- String cmd = null;\r
- try {\r
- cmd = executable.getCommand(ExecProvider.Local);\r
- PathValidator.validateExecutable(cmd);\r
- } catch (IllegalArgumentException e) {\r
- log.error(e.getMessage(), e.getCause());\r
- throw new JobSubmissionException(e);\r
- }\r
- List<String> params = executable.getParameters().getCommands();\r
- params.add(0, cmd);\r
-\r
- pbuilder = new ProcessBuilder(params);\r
- if (executable.getEnvironment() != null) {\r
- log.debug("Setting command environment variables: " + pbuilder.environment());\r
- EngineUtil.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment());\r
- log.debug("Process environment:" + pbuilder.environment());\r
- }\r
- log.debug("Setting command: " + pbuilder.command());\r
- PathValidator.validateDirectory(workDirectory);\r
- pbuilder.directory(new File(workDirectory));\r
- log.debug("Current working directory is " + SysPrefs.getCurrentDirectory());\r
- log.debug("Setting working directory: " + workDirectory);\r
- // Initialize private executor to dump processes output if any to the file system\r
- synchronized (log) {\r
- if (es == null) {\r
- /* \r
- * Two threads are necessary for the process to write in two streams error and output\r
- * simultaneously and hold the stream until exit. If only one thread is used, the\r
- * second stream may never get access to the thread efficiently deadlocking the proccess!\r
- */\r
+ public static final String PROC_IN_FILE = "procInput.txt";\r
+ public static final String PROC_OUT_FILE = "procOutput.txt";\r
+ public static final String PROC_ERR_FILE = "procError.txt";\r
+\r
+ private static ExecutorService es;\r
+ private final ConfiguredExecutable<?> confExec;\r
+ private final ProcessBuilder pbuilder;\r
+\r
+ private static final Logger log = Logger.getLogger(ExecutableWrapper.class);\r
+\r
+ public ExecutableWrapper(ConfiguredExecutable<?> executable, String workDirectory) throws JobSubmissionException {\r
+ this.confExec = executable;\r
+ String cmd = null;\r
+ try {\r
+ cmd = executable.getCommand(ExecProvider.Local);\r
+ PathValidator.validateExecutable(cmd);\r
+ } catch (IllegalArgumentException e) {\r
+ log.error(e.getMessage(), e.getCause());\r
+ throw new JobSubmissionException(e);\r
+ }\r
+ List<String> params = executable.getParameters().getCommands();\r
+ params.add(0, cmd);\r
+\r
+ pbuilder = new ProcessBuilder(params);\r
+ if (executable.getEnvironment() != null) {\r
+ log.debug("Setting command environment variables: " + pbuilder.environment());\r
+ EngineUtil.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment());\r
+ log.debug("Process environment:" + pbuilder.environment());\r
+ }\r
+ log.debug("Setting command: " + pbuilder.command());\r
+ PathValidator.validateDirectory(workDirectory);\r
+ pbuilder.directory(new File(workDirectory));\r
+ log.debug("Current working directory is " + SysPrefs.getCurrentDirectory());\r
+ log.debug("Setting working directory: " + workDirectory);\r
+ // Initialize private executor to dump processes output if any to the file system\r
+ synchronized (log) {\r
+ if (es == null) {\r
+ /* \r
+ * Two threads are necessary for the process to write in two streams error and output\r
+ * simultaneously and hold the stream until exit. If only one thread is used, the\r
+ * second stream may never get access to the thread efficiently deadlocking the proccess!\r
+ */\r
this.es = Executors.newCachedThreadPool();\r
log.debug("Initializing executor for local processes output dump");\r
// Make sure that the executors are going to be properly closed\r
shutdownService();\r
}\r
});\r
+ }\r
}\r
}\r
- }\r
-\r
- /**\r
- * Stops internal executor service which captures streams of native\r
- * executables. This method is intended for stopping service if deployed in\r
- * the web application content. There is NO NEED of using this method\r
- * otherwise as the executor service is taken care of internally.\r
- */\r
- public static final void shutdownService() {\r
- if (es != null) {\r
- es.shutdownNow();\r
+\r
+ /**\r
+ * Stops internal executor service which captures streams of native\r
+ * executables. This method is intended for stopping service if deployed in\r
+ * the web application content. There is NO NEED of using this method\r
+ * otherwise as the executor service is taken care of internally.\r
+ */\r
+ public static final void shutdownService() {\r
+ if (es != null) {\r
+ es.shutdownNow();\r
+ }\r
}\r
- }\r
-\r
- /**\r
- * It is vital that output and error streams are captured immediately for\r
- * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its\r
- * own thread ready to capture the output. If executor could not execute\r
- * capture immediately this could lead to the call method to stale, as\r
- * execution could not proceed without output being captured. Every call to\r
- * call() method will use 2 threads\r
- * @throws JobSubmissionException \r
- */\r
- @Override\r
- public ConfiguredExecutable<?> call() throws IOException {\r
- Process proc = null;\r
- Future<?> errorf = null;\r
- Future<?> outputf = null;\r
- PrintStream errorStream = null;\r
- PrintStream outStream = null;\r
- PrintStream comStream = null;\r
-\r
- try {\r
- log.info("Calculation started at " + System.nanoTime());\r
- EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString());\r
- proc = pbuilder.start();\r
-\r
- // store input command and program environment\r
- comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE));\r
- comStream.append("# program command\n");\r
- for (String par : pbuilder.command()) {\r
+\r
+ /**\r
+ * It is vital that output and error streams are captured immediately for\r
+ * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its\r
+ * own thread ready to capture the output. If executor could not execute\r
+ * capture immediately this could lead to the call method to stale, as\r
+ * execution could not proceed without output being captured. Every call to\r
+ * call() method will use 2 threads\r
+ * @throws JobSubmissionException \r
+ */\r
+ @Override\r
+ public ConfiguredExecutable<?> call() throws IOException {\r
+ Process proc = null;\r
+ Future<?> errorf = null;\r
+ Future<?> outputf = null;\r
+ PrintStream errorStream = null;\r
+ PrintStream outStream = null;\r
+ PrintStream comStream = null;\r
+\r
+ try {\r
+ log.info("Calculation started at " + System.nanoTime());\r
+ EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString());\r
+ proc = pbuilder.start();\r
+\r
+ // store input command and program environment\r
+ comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE));\r
+ comStream.append("# program command\n");\r
+ for (String par : pbuilder.command()) {\r
comStream.append(par + " ");\r
- }\r
- comStream.append("\n\n# program environment\n");\r
- for (Entry<String, String> var : pbuilder.environment().entrySet()) {\r
- comStream.append(var.getKey() + " =\t" + var.getValue() + "\n");\r
- }\r
- comStream.close();\r
-\r
- // any error message?\r
- errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError()));\r
- StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR);\r
-\r
- // any output?\r
- outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput()));\r
- StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT);\r
-\r
- // kick it off\r
- errorf = es.submit(errorGobbler);\r
- outputf = es.submit(outputGobbler);\r
-\r
- // any error???\r
- int exitVal = proc.waitFor();\r
- log.info("Calculation completed at " + System.nanoTime());\r
- EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString());\r
- // Let streams to write for a little more\r
- errorf.get(2, TimeUnit.SECONDS);\r
- outputf.get(2, TimeUnit.SECONDS);\r
-\r
- // Close streams\r
- errorStream.close();\r
- outStream.close();\r
- log.debug("Local process exit value: " + exitVal);\r
- } catch (ExecutionException e) {\r
- // Log and ignore this is not important\r
- log.trace("Native Process output threw exception: " + e.getMessage());\r
- } catch (TimeoutException e) {\r
- // Log and ignore this is not important\r
- log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage());\r
- } catch (InterruptedException e) {\r
- log.error("Native Process was interrupted aborting: " + e.getMessage());\r
- proc.destroy();\r
- errorf.cancel(true);\r
- outputf.cancel(true);\r
- // restore interruption status\r
- Thread.currentThread().interrupt();\r
- } finally {\r
- // just to make sure that we do not left anything running\r
- if (proc != null) {\r
+ }\r
+ comStream.append("\n\n# program environment\n");\r
+ for (Entry<String, String> var : pbuilder.environment().entrySet()) {\r
+ comStream.append(var.getKey() + " =\t" + var.getValue() + "\n");\r
+ }\r
+ comStream.close();\r
+\r
+ // any error message?\r
+ errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError()));\r
+ StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR);\r
+\r
+ // any output?\r
+ outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput()));\r
+ StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT);\r
+\r
+ // kick it off\r
+ errorf = es.submit(errorGobbler);\r
+ outputf = es.submit(outputGobbler);\r
+\r
+ // any error???\r
+ int exitVal = proc.waitFor();\r
+ //proc.getClass();\r
+ log.info("Calculation completed at " + System.nanoTime());\r
+ EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString());\r
+\r
+ // Let streams to write for a little more\r
+ errorf.get(2, TimeUnit.SECONDS);\r
+ outputf.get(2, TimeUnit.SECONDS);\r
+\r
+ // Close streams\r
+ errorStream.close();\r
+ outStream.close();\r
+ log.debug("Local process exit value: " + exitVal);\r
+ } catch (ExecutionException e) {\r
+ // Log and ignore this is not important\r
+ log.trace("Native Process output threw exception: " + e.getMessage());\r
+ } catch (TimeoutException e) {\r
+ // Log and ignore this is not important\r
+ log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage());\r
+ } catch (InterruptedException e) {\r
+ log.error("Native Process was interrupted aborting: " + e.getMessage());\r
+ System.err.println("Native Process was interrupted aborting: " + e.getMessage());\r
proc.destroy();\r
- }\r
- if (errorf != null) {\r
errorf.cancel(true);\r
- }\r
- if (outputf != null) {\r
outputf.cancel(true);\r
+ // restore interruption status\r
+ Thread.currentThread().interrupt();\r
+ } finally {\r
+ // just to make sure that we do not left anything running\r
+ if (proc != null) {\r
+ proc.destroy();\r
+ }\r
+ if (errorf != null) {\r
+ errorf.cancel(true);\r
+ }\r
+ if (outputf != null) {\r
+ outputf.cancel(true);\r
+ }\r
+ FileUtil.closeSilently(log, errorStream);\r
+ FileUtil.closeSilently(log, outStream);\r
}\r
- FileUtil.closeSilently(log, errorStream);\r
- FileUtil.closeSilently(log, outStream);\r
+ return confExec;\r
}\r
- return confExec;\r
- }\r
\r
- private String getOutput() {\r
- if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
- return confExec.getOutput();\r
+ private String getOutput() {\r
+ if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
+ return confExec.getOutput();\r
+ }\r
+ return PROC_OUT_FILE;\r
}\r
- return PROC_OUT_FILE;\r
- }\r
\r
- private String getError() {\r
- if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
- return confExec.getError();\r
+ private String getError() {\r
+ if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
+ return confExec.getError();\r
+ }\r
+ return PROC_ERR_FILE;\r
}\r
- return PROC_ERR_FILE;\r
- }\r
+\r
}\r