1 /* Copyright (c) 2009 Peter Troshin
\r
3 * JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0
\r
5 * This library is free software; you can redistribute it and/or modify it under the terms of the
\r
6 * Apache License version 2 as published by the Apache Software Foundation
\r
8 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
\r
9 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
\r
10 * License for more details.
\r
12 * A copy of the license is in apache_license.txt. It is also available here:
\r
13 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
\r
15 * Any republication or derived work distributed in source code form
\r
16 * must include this copyright and license notice.
\r
19 package compbio.engine.local;
\r
21 import java.io.File;
\r
22 import java.io.IOException;
\r
23 import java.io.PrintStream;
\r
24 import java.util.List;
\r
25 import java.util.concurrent.Callable;
\r
26 import java.util.concurrent.ExecutionException;
\r
27 import java.util.concurrent.ExecutorService;
\r
28 import java.util.concurrent.Executors;
\r
29 import java.util.concurrent.Future;
\r
30 import java.util.concurrent.TimeUnit;
\r
31 import java.util.concurrent.TimeoutException;
\r
33 import org.apache.log4j.Logger;
\r
35 import compbio.engine.client.ConfiguredExecutable;
\r
36 import compbio.engine.client.PathValidator;
\r
37 import compbio.engine.client.PipedExecutable;
\r
38 import compbio.engine.client.Util;
\r
39 import compbio.engine.client.Executable.ExecProvider;
\r
40 import compbio.engine.local.StreamGobbler.OutputType;
\r
41 import compbio.metadata.JobStatus;
\r
42 import compbio.metadata.JobSubmissionException;
\r
43 import compbio.util.FileUtil;
\r
44 import compbio.util.SysPrefs;
\r
45 import compbio.util.annotation.Immutable;
\r
48 public final class ExecutableWrapper implements
\r
49 Callable<ConfiguredExecutable<?>> {
\r
51 public static final String PROC_OUT_FILE = "procOutput.txt";
\r
52 public static final String PROC_ERR_FILE = "procError.txt";
\r
54 private static ExecutorService es;
\r
56 private static final Logger log = Logger.getLogger(ExecutableWrapper.class);
\r
58 private final ConfiguredExecutable<?> confExec;
\r
60 private final ProcessBuilder pbuilder;
\r
62 public ExecutableWrapper(ConfiguredExecutable<?> executable,
\r
63 String workDirectory) throws JobSubmissionException {
\r
64 this.confExec = executable;
\r
67 cmd = executable.getCommand(ExecProvider.Local);
\r
68 PathValidator.validateExecutable(cmd);
\r
69 } catch (IllegalArgumentException e) {
\r
70 log.error(e.getMessage(), e.getCause());
\r
71 throw new JobSubmissionException(e);
\r
73 List<String> params = executable.getParameters().getCommands();
\r
76 pbuilder = new ProcessBuilder(params);
\r
77 if (executable.getEnvironment() != null) {
\r
78 log.debug("Setting command environment variables: " + pbuilder.environment());
\r
79 Util.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment());
\r
80 log.debug("Process environment:" + pbuilder.environment());
\r
82 log.debug("Setting command: " + pbuilder.command());
\r
83 PathValidator.validateDirectory(workDirectory);
\r
84 pbuilder.directory(new File(workDirectory));
\r
85 log.debug("Current working directory is " + SysPrefs.getCurrentDirectory());
\r
86 log.debug("Setting working directory: " + workDirectory);
\r
87 // Initialize private executor to dump processes output if any to the
\r
89 synchronized (log) {
\r
91 // Two threads are necessary for the process to write in two
\r
92 // streams error and output
\r
93 // simultaneously and hold the stream until exit. If only one
\r
94 // thread is used, the second stream may never
\r
95 // get access to the thread efficiently deadlocking the
\r
97 this.es = Executors.newCachedThreadPool();
\r
98 log.debug("Initializing executor for local processes output dump");
\r
99 // Make sure that the executors are going to be properly closed
\r
100 Runtime.getRuntime().addShutdownHook(new Thread() {
\r
102 public void run() {
\r
111 * Stops internal executor service which captures streams of native
\r
112 * executables. This method is intended for stopping service if deployed in
\r
113 * the web application content. There is NO NEED of using this method
\r
114 * otherwise as the executor service is taken care of internally.
\r
116 public static final void shutdownService() {
\r
123 * It is vital that output and error streams are captured immediately for
\r
124 * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its
\r
125 * own thread ready to capture the output. If executor could not execute
\r
126 * capture immediately this could lead to the call method to stale, as
\r
127 * execution could not proceed without output being captured. Every call to
\r
128 * call() method will use 2 threads
\r
131 public ConfiguredExecutable<?> call() throws IOException {
\r
132 Process proc = null;
\r
133 Future<?> errorf = null;
\r
134 Future<?> outputf = null;
\r
135 PrintStream errorStream = null;
\r
136 PrintStream outStream = null;
\r
138 log.info("Calculation started at " + System.nanoTime());
\r
139 Util.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED
\r
141 // pb.redirectErrorStream(false);
\r
142 proc = pbuilder.start();
\r
145 * any error message?
\r
147 errorStream = new PrintStream(new File(pbuilder.directory()
\r
148 + File.separator + getError()));
\r
149 StreamGobbler errorGobbler = new StreamGobbler(proc
\r
150 .getErrorStream(), errorStream, OutputType.ERROR);
\r
153 outStream = new PrintStream(new File(pbuilder.directory()
\r
154 + File.separator + getOutput()));
\r
155 StreamGobbler outputGobbler = new StreamGobbler(proc
\r
156 .getInputStream(), outStream, OutputType.OUTPUT);
\r
159 errorf = es.submit(errorGobbler);
\r
160 outputf = es.submit(outputGobbler);
\r
163 int exitVal = proc.waitFor();
\r
164 log.info("Calculation completed at " + System.nanoTime());
\r
165 Util.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED
\r
167 // Let streams to write for a little more
\r
168 errorf.get(2, TimeUnit.SECONDS);
\r
169 outputf.get(2, TimeUnit.SECONDS);
\r
171 errorStream.close();
\r
173 log.debug("Local process exit value: " + exitVal);
\r
174 } catch (ExecutionException e) {
\r
175 // Log and ignore this is not important
\r
176 log.trace("Native Process output threw exception: "
\r
178 } catch (TimeoutException e) {
\r
179 // Log and ignore this is not important
\r
181 .trace("Native Process output took longer then 2s to write, aborting: "
\r
183 } catch (InterruptedException e) {
\r
184 log.error("Native Process was interrupted aborting: "
\r
187 errorf.cancel(true);
\r
188 outputf.cancel(true);
\r
189 // restore interruption status
\r
190 Thread.currentThread().interrupt();
\r
192 if (proc != null) {
\r
193 // just to make sure that we do not left anything running
\r
196 if (errorf != null) {
\r
197 errorf.cancel(true);
\r
199 if (outputf != null) {
\r
200 outputf.cancel(true);
\r
202 FileUtil.closeSilently(log, errorStream);
\r
203 FileUtil.closeSilently(log, outStream);
\r
208 private String getOutput() {
\r
209 if (confExec.getOutput() != null
\r
210 && confExec.getExecutable() instanceof PipedExecutable<?>) {
\r
211 return confExec.getOutput();
\r
213 return PROC_OUT_FILE;
\r
216 private String getError() {
\r
217 if (confExec.getError() != null
\r
218 && confExec.getExecutable() instanceof PipedExecutable<?>) {
\r
219 return confExec.getError();
\r
221 return PROC_ERR_FILE;
\r