1 /* Copyright (c) 2009 Peter Troshin
\r
3 * JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0
\r
5 * This library is free software; you can redistribute it and/or modify it under the terms of the
\r
6 * Apache License version 2 as published by the Apache Software Foundation
\r
8 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
\r
9 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
\r
10 * License for more details.
\r
12 * A copy of the license is in apache_license.txt. It is also available here:
\r
13 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
\r
15 * Any republication or derived work distributed in source code form
\r
16 * must include this copyright and license notice.
\r
19 package compbio.engine.local;
\r
21 import java.io.File;
\r
22 import java.io.IOException;
\r
23 import java.io.PrintStream;
\r
24 import java.util.List;
\r
25 import java.util.concurrent.Callable;
\r
26 import java.util.concurrent.ExecutionException;
\r
27 import java.util.concurrent.ExecutorService;
\r
28 import java.util.concurrent.Executors;
\r
29 import java.util.concurrent.Future;
\r
30 import java.util.concurrent.TimeUnit;
\r
31 import java.util.concurrent.TimeoutException;
\r
33 import org.apache.log4j.Logger;
\r
35 import compbio.engine.client.ConfiguredExecutable;
\r
36 import compbio.engine.client.PathValidator;
\r
37 import compbio.engine.client.PipedExecutable;
\r
38 import compbio.engine.client.Util;
\r
39 import compbio.engine.client.Executable.ExecProvider;
\r
40 import compbio.engine.local.StreamGobbler.OutputType;
\r
41 import compbio.metadata.JobStatus;
\r
42 import compbio.metadata.JobSubmissionException;
\r
43 import compbio.util.FileUtil;
\r
44 import compbio.util.SysPrefs;
\r
45 import compbio.util.annotation.Immutable;
\r
48 public final class ExecutableWrapper implements
\r
49 Callable<ConfiguredExecutable<?>> {
\r
51 public static final String PROC_OUT_FILE = "procOutput.txt";
\r
52 public static final String PROC_ERR_FILE = "procError.txt";
\r
54 private static ExecutorService es;
\r
56 private static final Logger log = Logger.getLogger(ExecutableWrapper.class);
\r
58 private final ConfiguredExecutable<?> confExec;
\r
60 private final ProcessBuilder pbuilder;
\r
62 public ExecutableWrapper(ConfiguredExecutable<?> executable,
\r
63 String workDirectory) throws JobSubmissionException {
\r
64 this.confExec = executable;
\r
67 cmd = executable.getCommand(ExecProvider.Local);
\r
68 PathValidator.validateExecutable(cmd);
\r
69 } catch (IllegalArgumentException e) {
\r
70 log.error(e.getMessage(), e.getCause());
\r
71 throw new JobSubmissionException(e);
\r
73 List<String> params = executable.getParameters().getCommands();
\r
76 pbuilder = new ProcessBuilder(params);
\r
77 if (executable.getEnvironment() != null) {
\r
78 log.debug("Setting command environment variables: "
\r
79 + pbuilder.environment());
\r
80 Util.mergeEnvVariables(pbuilder.environment(), executable
\r
82 log.debug("Process environment:" + pbuilder.environment());
\r
84 log.debug("Setting command: " + pbuilder.command());
\r
85 PathValidator.validateDirectory(workDirectory);
\r
86 pbuilder.directory(new File(workDirectory));
\r
87 log.debug("Current working directory is "
\r
88 + SysPrefs.getCurrentDirectory());
\r
89 log.debug("Setting working directory: " + workDirectory);
\r
90 // Initialize private executor to dump processes output if any to the
\r
92 synchronized (log) {
\r
94 // Two threads are necessary for the process to write in two
\r
95 // streams error and output
\r
96 // simultaneously and hold the stream until exit. If only one
\r
97 // thread is used, the second stream may never
\r
98 // get access to the thread efficiently deadlocking the
\r
100 this.es = Executors.newCachedThreadPool();
\r
102 .debug("Initializing executor for local processes output dump");
\r
103 // Make sure that the executors are going to be properly closed
\r
104 Runtime.getRuntime().addShutdownHook(new Thread() {
\r
106 public void run() {
\r
115 * Stops internal executor service which captures streams of native
\r
116 * executables. This method is intended for stopping service if deployed in
\r
117 * the web application content. There is NO NEED of using this method
\r
118 * otherwise as the executor service is taken care of internally.
\r
120 public static final void shutdownService() {
\r
127 * It is vital that output and error streams are captured immediately for
\r
128 * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its
\r
129 * own thread ready to capture the output. If executor could not execute
\r
130 * capture immediately this could lead to the call method to stale, as
\r
131 * execution could not proceed without output being captured. Every call to
\r
132 * call() method will use 2 threads
\r
135 public ConfiguredExecutable<?> call() throws IOException {
\r
136 Process proc = null;
\r
137 Future<?> errorf = null;
\r
138 Future<?> outputf = null;
\r
139 PrintStream errorStream = null;
\r
140 PrintStream outStream = null;
\r
142 log.info("Calculation started at " + System.nanoTime());
\r
143 Util.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED
\r
145 // pb.redirectErrorStream(false);
\r
146 proc = pbuilder.start();
\r
149 * any error message?
\r
151 errorStream = new PrintStream(new File(pbuilder.directory()
\r
152 + File.separator + getError()));
\r
153 StreamGobbler errorGobbler = new StreamGobbler(proc
\r
154 .getErrorStream(), errorStream, OutputType.ERROR);
\r
157 outStream = new PrintStream(new File(pbuilder.directory()
\r
158 + File.separator + getOutput()));
\r
159 StreamGobbler outputGobbler = new StreamGobbler(proc
\r
160 .getInputStream(), outStream, OutputType.OUTPUT);
\r
163 errorf = es.submit(errorGobbler);
\r
164 outputf = es.submit(outputGobbler);
\r
167 int exitVal = proc.waitFor();
\r
168 log.info("Calculation completed at " + System.nanoTime());
\r
169 Util.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED
\r
171 // Let streams to write for a little more
\r
172 errorf.get(2, TimeUnit.SECONDS);
\r
173 outputf.get(2, TimeUnit.SECONDS);
\r
175 errorStream.close();
\r
177 log.debug("Local process exit value: " + exitVal);
\r
178 } catch (ExecutionException e) {
\r
179 // Log and ignore this is not important
\r
180 log.trace("Native Process output threw exception: "
\r
182 } catch (TimeoutException e) {
\r
183 // Log and ignore this is not important
\r
185 .trace("Native Process output took longer then 2s to write, aborting: "
\r
187 } catch (InterruptedException e) {
\r
188 log.error("Native Process was interrupted aborting: "
\r
191 errorf.cancel(true);
\r
192 outputf.cancel(true);
\r
193 // restore interruption status
\r
194 Thread.currentThread().interrupt();
\r
196 if (proc != null) {
\r
197 // just to make sure that we do not left anything running
\r
200 if (errorf != null) {
\r
201 errorf.cancel(true);
\r
203 if (outputf != null) {
\r
204 outputf.cancel(true);
\r
206 FileUtil.closeSilently(log, errorStream);
\r
207 FileUtil.closeSilently(log, outStream);
\r
212 private String getOutput() {
\r
213 if (confExec.getOutput() != null
\r
214 && confExec.getExecutable() instanceof PipedExecutable<?>) {
\r
215 return confExec.getOutput();
\r
217 return PROC_OUT_FILE;
\r
220 private String getError() {
\r
221 if (confExec.getError() != null
\r
222 && confExec.getExecutable() instanceof PipedExecutable<?>) {
\r
223 return confExec.getError();
\r
225 return PROC_ERR_FILE;
\r