1 /* Copyright (c) 2009 Peter Troshin
\r
2 * Copyright (c) 2013 Alexander Sherstnev
\r
4 * Java Bioinformatics Analysis Web Services (JABAWS)
\r
7 * This library is free software; you can redistribute it and/or modify it under the terms of the
\r
8 * Apache License version 2 as published by the Apache Software Foundation
\r
10 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
\r
11 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
\r
12 * License for more details.
\r
14 * A copy of the license is in apache_license.txt. It is also available here:
\r
15 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
\r
17 * Any republication or derived work distributed in source code form
\r
18 * must include this copyright and license notice.
\r
21 package compbio.engine.local;
\r
23 import java.io.File;
\r
24 import java.io.IOException;
\r
25 import java.io.PrintStream;
\r
26 import java.util.List;
\r
27 import java.util.Map.Entry;
\r
28 import java.util.concurrent.Callable;
\r
29 import java.util.concurrent.ExecutionException;
\r
30 import java.util.concurrent.ExecutorService;
\r
31 import java.util.concurrent.Executors;
\r
32 import java.util.concurrent.Future;
\r
33 import java.util.concurrent.TimeUnit;
\r
34 import java.util.concurrent.TimeoutException;
\r
36 import org.apache.log4j.Logger;
\r
38 import compbio.engine.client.ConfiguredExecutable;
\r
39 import compbio.engine.client.PathValidator;
\r
40 import compbio.engine.client.PipedExecutable;
\r
41 import compbio.engine.client.EngineUtil;
\r
42 import compbio.engine.client.Executable.ExecProvider;
\r
43 import compbio.engine.local.StreamGobbler.OutputType;
\r
44 import compbio.metadata.JobStatus;
\r
45 import compbio.metadata.JobSubmissionException;
\r
46 import compbio.util.FileUtil;
\r
47 import compbio.util.SysPrefs;
\r
48 import compbio.util.annotation.Immutable;
\r
51 public final class ExecutableWrapper implements
\r
52 Callable<ConfiguredExecutable<?>> {
\r
54 public static final String PROC_IN_FILE = "procInput.txt";
\r
55 public static final String PROC_OUT_FILE = "procOutput.txt";
\r
56 public static final String PROC_ERR_FILE = "procError.txt";
\r
58 private static ExecutorService es;
\r
59 private final ConfiguredExecutable<?> confExec;
\r
60 private final ProcessBuilder pbuilder;
\r
62 private static final Logger log = Logger.getLogger(ExecutableWrapper.class);
\r
64 public ExecutableWrapper(ConfiguredExecutable<?> executable, String workDirectory) throws JobSubmissionException {
\r
65 this.confExec = executable;
\r
68 cmd = executable.getCommand(ExecProvider.Local);
\r
69 PathValidator.validateExecutable(cmd);
\r
70 } catch (IllegalArgumentException e) {
\r
71 log.error(e.getMessage(), e.getCause());
\r
72 throw new JobSubmissionException(e);
\r
74 List<String> params = executable.getParameters().getCommands();
\r
77 pbuilder = new ProcessBuilder(params);
\r
78 if (executable.getEnvironment() != null) {
\r
79 log.debug("Setting command environment variables: " + pbuilder.environment());
\r
80 EngineUtil.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment());
\r
81 log.debug("Process environment:" + pbuilder.environment());
\r
83 log.debug("Setting command: " + pbuilder.command());
\r
84 PathValidator.validateDirectory(workDirectory);
\r
85 pbuilder.directory(new File(workDirectory));
\r
86 log.debug("Current working directory is " + SysPrefs.getCurrentDirectory());
\r
87 log.debug("Setting working directory: " + workDirectory);
\r
88 // Initialize private executor to dump processes output if any to the file system
\r
89 synchronized (log) {
\r
92 * Two threads are necessary for the process to write in two streams error and output
\r
93 * simultaneously and hold the stream until exit. If only one thread is used, the
\r
94 * second stream may never get access to the thread efficiently deadlocking the proccess!
\r
96 this.es = Executors.newCachedThreadPool();
\r
97 log.debug("Initializing executor for local processes output dump");
\r
98 // Make sure that the executors are going to be properly closed
\r
99 Runtime.getRuntime().addShutdownHook(new Thread() {
\r
101 public void run() {
\r
110 * Stops internal executor service which captures streams of native
\r
111 * executables. This method is intended for stopping service if deployed in
\r
112 * the web application content. There is NO NEED of using this method
\r
113 * otherwise as the executor service is taken care of internally.
\r
115 public static final void shutdownService() {
\r
122 * It is vital that output and error streams are captured immediately for
\r
123 * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its
\r
124 * own thread ready to capture the output. If executor could not execute
\r
125 * capture immediately this could lead to the call method to stale, as
\r
126 * execution could not proceed without output being captured. Every call to
\r
127 * call() method will use 2 threads
\r
128 * @throws JobSubmissionException
\r
131 public ConfiguredExecutable<?> call() throws IOException {
\r
132 Process proc = null;
\r
133 Future<?> errorf = null;
\r
134 Future<?> outputf = null;
\r
135 PrintStream errorStream = null;
\r
136 PrintStream outStream = null;
\r
137 PrintStream comStream = null;
\r
140 log.info("Calculation started at " + System.nanoTime());
\r
141 EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString());
\r
142 proc = pbuilder.start();
\r
144 // store input command and program environment
\r
145 comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE));
\r
146 comStream.append("# program command\n");
\r
147 for (String par : pbuilder.command()) {
\r
148 comStream.append(par + " ");
\r
150 comStream.append("\n\n# program environment\n");
\r
151 for (Entry<String, String> var : pbuilder.environment().entrySet()) {
\r
152 comStream.append(var.getKey() + " =\t" + var.getValue() + "\n");
\r
156 // any error message?
\r
157 errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError()));
\r
158 StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR);
\r
161 outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput()));
\r
162 StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT);
\r
165 errorf = es.submit(errorGobbler);
\r
166 outputf = es.submit(outputGobbler);
\r
169 int exitVal = proc.waitFor();
\r
170 log.info("Calculation completed at " + System.nanoTime());
\r
171 EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString());
\r
172 // Let streams to write for a little more
\r
173 errorf.get(2, TimeUnit.SECONDS);
\r
174 outputf.get(2, TimeUnit.SECONDS);
\r
177 errorStream.close();
\r
179 log.debug("Local process exit value: " + exitVal);
\r
180 } catch (ExecutionException e) {
\r
181 // Log and ignore this is not important
\r
182 log.trace("Native Process output threw exception: " + e.getMessage());
\r
183 } catch (TimeoutException e) {
\r
184 // Log and ignore this is not important
\r
185 log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage());
\r
186 } catch (InterruptedException e) {
\r
187 log.error("Native Process was interrupted aborting: " + e.getMessage());
\r
189 errorf.cancel(true);
\r
190 outputf.cancel(true);
\r
191 // restore interruption status
\r
192 Thread.currentThread().interrupt();
\r
194 // just to make sure that we do not left anything running
\r
195 if (proc != null) {
\r
198 if (errorf != null) {
\r
199 errorf.cancel(true);
\r
201 if (outputf != null) {
\r
202 outputf.cancel(true);
\r
204 FileUtil.closeSilently(log, errorStream);
\r
205 FileUtil.closeSilently(log, outStream);
\r
210 private String getOutput() {
\r
211 if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {
\r
212 return confExec.getOutput();
\r
214 return PROC_OUT_FILE;
\r
217 private String getError() {
\r
218 if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {
\r
219 return confExec.getError();
\r
221 return PROC_ERR_FILE;
\r