1 /* Copyright (c) 2009 Peter Troshin
\r
2 * Copyright (c) 2013 Alexander Sherstnev
\r
4 * Java Bioinformatics Analysis Web Services (JABAWS)
\r
7 * This library is free software; you can redistribute it and/or modify it under the terms of the
\r
8 * Apache License version 2 as published by the Apache Software Foundation
\r
10 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
\r
11 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
\r
12 * License for more details.
\r
14 * A copy of the license is in apache_license.txt. It is also available here:
\r
15 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
\r
17 * Any republication or derived work distributed in source code form
\r
18 * must include this copyright and license notice.
\r
21 package compbio.engine.local;
\r
23 import java.io.File;
\r
24 import java.io.IOException;
\r
25 import java.io.PrintStream;
\r
26 import java.util.List;
\r
27 import java.util.Map;
\r
28 import java.util.Map.Entry;
\r
29 import java.util.concurrent.Callable;
\r
30 import java.util.concurrent.ExecutionException;
\r
31 import java.util.concurrent.ExecutorService;
\r
32 import java.util.concurrent.Executors;
\r
33 import java.util.concurrent.Future;
\r
34 import java.util.concurrent.TimeUnit;
\r
35 import java.util.concurrent.TimeoutException;
\r
37 import org.apache.log4j.Logger;
\r
39 import compbio.engine.client.ConfiguredExecutable;
\r
40 import compbio.engine.client.PathValidator;
\r
41 import compbio.engine.client.PipedExecutable;
\r
42 import compbio.engine.client.Util;
\r
43 import compbio.engine.client.Executable.ExecProvider;
\r
44 import compbio.engine.local.StreamGobbler.OutputType;
\r
45 import compbio.metadata.JobStatus;
\r
46 import compbio.metadata.JobSubmissionException;
\r
47 import compbio.util.FileUtil;
\r
48 import compbio.util.SysPrefs;
\r
49 import compbio.util.annotation.Immutable;
\r
52 public final class ExecutableWrapper implements
\r
53 Callable<ConfiguredExecutable<?>> {
\r
55 public static final String PROC_IN_FILE = "procInput.txt";
\r
56 public static final String PROC_OUT_FILE = "procOutput.txt";
\r
57 public static final String PROC_ERR_FILE = "procError.txt";
\r
59 private static ExecutorService es;
\r
60 private final ConfiguredExecutable<?> confExec;
\r
61 private final ProcessBuilder pbuilder;
\r
63 private static final Logger log = Logger.getLogger(ExecutableWrapper.class);
\r
65 public ExecutableWrapper(ConfiguredExecutable<?> executable, String workDirectory) throws JobSubmissionException {
\r
66 this.confExec = executable;
\r
69 cmd = executable.getCommand(ExecProvider.Local);
\r
70 PathValidator.validateExecutable(cmd);
\r
71 } catch (IllegalArgumentException e) {
\r
72 log.error(e.getMessage(), e.getCause());
\r
73 throw new JobSubmissionException(e);
\r
75 List<String> params = executable.getParameters().getCommands();
\r
78 pbuilder = new ProcessBuilder(params);
\r
79 if (executable.getEnvironment() != null) {
\r
80 log.debug("Setting command environment variables: " + pbuilder.environment());
\r
81 Util.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment());
\r
82 log.debug("Process environment:" + pbuilder.environment());
\r
84 log.debug("Setting command: " + pbuilder.command());
\r
85 PathValidator.validateDirectory(workDirectory);
\r
86 pbuilder.directory(new File(workDirectory));
\r
87 log.debug("Current working directory is " + SysPrefs.getCurrentDirectory());
\r
88 log.debug("Setting working directory: " + workDirectory);
\r
89 // Initialize private executor to dump processes output if any to the file system
\r
90 synchronized (log) {
\r
93 * Two threads are necessary for the process to write in two streams error and output
\r
94 * simultaneously and hold the stream until exit. If only one thread is used, the
\r
95 * second stream may never get access to the thread efficiently deadlocking the proccess!
\r
97 this.es = Executors.newCachedThreadPool();
\r
98 log.debug("Initializing executor for local processes output dump");
\r
99 // Make sure that the executors are going to be properly closed
\r
100 Runtime.getRuntime().addShutdownHook(new Thread() {
\r
102 public void run() {
\r
111 * Stops internal executor service which captures streams of native
\r
112 * executables. This method is intended for stopping service if deployed in
\r
113 * the web application content. There is NO NEED of using this method
\r
114 * otherwise as the executor service is taken care of internally.
\r
116 public static final void shutdownService() {
\r
123 * It is vital that output and error streams are captured immediately for
\r
124 * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its
\r
125 * own thread ready to capture the output. If executor could not execute
\r
126 * capture immediately this could lead to the call method to stale, as
\r
127 * execution could not proceed without output being captured. Every call to
\r
128 * call() method will use 2 threads
\r
129 * @throws JobSubmissionException
\r
132 public ConfiguredExecutable<?> call() throws IOException {
\r
133 Process proc = null;
\r
134 Future<?> errorf = null;
\r
135 Future<?> outputf = null;
\r
136 PrintStream errorStream = null;
\r
137 PrintStream outStream = null;
\r
138 PrintStream comStream = null;
\r
141 log.info("Calculation started at " + System.nanoTime());
\r
142 Util.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString());
\r
143 proc = pbuilder.start();
\r
145 // store input command and program environment
\r
146 comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE));
\r
147 comStream.append("# program command\n");
\r
148 for (String par : pbuilder.command()) {
\r
149 comStream.append(par + " ");
\r
151 comStream.append("\n\n# program environment\n");
\r
152 for (Entry<String, String> var : pbuilder.environment().entrySet()) {
\r
153 comStream.append(var.getKey() + " =\t" + var.getValue() + "\n");
\r
157 // any error message?
\r
158 errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError()));
\r
159 StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR);
\r
162 outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput()));
\r
163 StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT);
\r
166 errorf = es.submit(errorGobbler);
\r
167 outputf = es.submit(outputGobbler);
\r
170 int exitVal = proc.waitFor();
\r
171 log.info("Calculation completed at " + System.nanoTime());
\r
172 Util.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString());
\r
173 // Let streams to write for a little more
\r
174 errorf.get(2, TimeUnit.SECONDS);
\r
175 outputf.get(2, TimeUnit.SECONDS);
\r
178 errorStream.close();
\r
180 log.debug("Local process exit value: " + exitVal);
\r
181 } catch (ExecutionException e) {
\r
182 // Log and ignore this is not important
\r
183 log.trace("Native Process output threw exception: " + e.getMessage());
\r
184 } catch (TimeoutException e) {
\r
185 // Log and ignore this is not important
\r
186 log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage());
\r
187 } catch (InterruptedException e) {
\r
188 log.error("Native Process was interrupted aborting: " + e.getMessage());
\r
190 errorf.cancel(true);
\r
191 outputf.cancel(true);
\r
192 // restore interruption status
\r
193 Thread.currentThread().interrupt();
\r
195 // just to make sure that we do not left anything running
\r
196 if (proc != null) {
\r
199 if (errorf != null) {
\r
200 errorf.cancel(true);
\r
202 if (outputf != null) {
\r
203 outputf.cancel(true);
\r
205 FileUtil.closeSilently(log, errorStream);
\r
206 FileUtil.closeSilently(log, outStream);
\r
211 private String getOutput() {
\r
212 if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {
\r
213 return confExec.getOutput();
\r
215 return PROC_OUT_FILE;
\r
218 private String getError() {
\r
219 if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {
\r
220 return confExec.getError();
\r
222 return PROC_ERR_FILE;
\r