Fix core WST file
[jabaws.git] / engine / compbio / engine / local / ExecutableWrapper.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.local;\r
20 \r
21 import java.io.File;\r
22 import java.io.IOException;\r
23 import java.io.PrintStream;\r
24 import java.util.List;\r
25 import java.util.concurrent.Callable;\r
26 import java.util.concurrent.ExecutionException;\r
27 import java.util.concurrent.ExecutorService;\r
28 import java.util.concurrent.Executors;\r
29 import java.util.concurrent.Future;\r
30 import java.util.concurrent.TimeUnit;\r
31 import java.util.concurrent.TimeoutException;\r
32 \r
33 import org.apache.log4j.Logger;\r
34 \r
35 import compbio.engine.client.ConfiguredExecutable;\r
36 import compbio.engine.client.PathValidator;\r
37 import compbio.engine.client.PipedExecutable;\r
38 import compbio.engine.client.Util;\r
39 import compbio.engine.client.Executable.ExecProvider;\r
40 import compbio.engine.local.StreamGobbler.OutputType;\r
41 import compbio.metadata.JobStatus;\r
42 import compbio.metadata.JobSubmissionException;\r
43 import compbio.util.FileUtil;\r
44 import compbio.util.SysPrefs;\r
45 import compbio.util.annotation.Immutable;\r
46 \r
47 @Immutable\r
48 public final class ExecutableWrapper implements\r
49         Callable<ConfiguredExecutable<?>> {\r
50 \r
51     public static final String PROC_OUT_FILE = "procOutput.txt";\r
52     public static final String PROC_ERR_FILE = "procError.txt";\r
53 \r
54     private static ExecutorService es;\r
55 \r
56     private static final Logger log = Logger.getLogger(ExecutableWrapper.class);\r
57 \r
58     private final ConfiguredExecutable<?> confExec;\r
59 \r
60     private final ProcessBuilder pbuilder;\r
61 \r
62     public ExecutableWrapper(ConfiguredExecutable<?> executable,\r
63             String workDirectory) throws JobSubmissionException {\r
64         this.confExec = executable;\r
65         String cmd = null;\r
66         try {\r
67             cmd = executable.getCommand(ExecProvider.Local);\r
68             PathValidator.validateExecutable(cmd);\r
69         } catch (IllegalArgumentException e) {\r
70             log.error(e.getMessage(), e.getCause());\r
71             throw new JobSubmissionException(e);\r
72         }\r
73         List<String> params = executable.getParameters().getCommands();\r
74         params.add(0, cmd);\r
75 \r
76         pbuilder = new ProcessBuilder(params);\r
77         if (executable.getEnvironment() != null) {\r
78             log.debug("Setting command environment variables: "\r
79                     + pbuilder.environment());\r
80             Util.mergeEnvVariables(pbuilder.environment(), executable\r
81                     .getEnvironment());\r
82             log.debug("Process environment:" + pbuilder.environment());\r
83         }\r
84         log.debug("Setting command: " + pbuilder.command());\r
85         PathValidator.validateDirectory(workDirectory);\r
86         pbuilder.directory(new File(workDirectory));\r
87         log.debug("Current working directory is "\r
88                 + SysPrefs.getCurrentDirectory());\r
89         log.debug("Setting working directory: " + workDirectory);\r
90         // Initialize private executor to dump processes output if any to the\r
91         // file system\r
92         synchronized (log) {\r
93             if (es == null) {\r
94                 // Two threads are necessary for the process to write in two\r
95                 // streams error and output\r
96                 // simultaneously and hold the stream until exit. If only one\r
97                 // thread is used, the second stream may never\r
98                 // get access to the thread efficiently deadlocking the\r
99                 // proccess!\r
100                 this.es = Executors.newCachedThreadPool();\r
101                 log\r
102                         .debug("Initializing executor for local processes output dump");\r
103                 // Make sure that the executors are going to be properly closed\r
104                 Runtime.getRuntime().addShutdownHook(new Thread() {\r
105                     @Override\r
106                     public void run() {\r
107                         shutdownService();\r
108                     }\r
109                 });\r
110             }\r
111         }\r
112     }\r
113 \r
114     /**\r
115      * Stops internal executor service which captures streams of native\r
116      * executables. This method is intended for stopping service if deployed in\r
117      * the web application content. There is NO NEED of using this method\r
118      * otherwise as the executor service is taken care of internally.\r
119      */\r
120     public static final void shutdownService() {\r
121         if (es != null) {\r
122             es.shutdownNow();\r
123         }\r
124     }\r
125 \r
126     /**\r
127      * It is vital that output and error streams are captured immediately for\r
128      * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its\r
129      * own thread ready to capture the output. If executor could not execute\r
130      * capture immediately this could lead to the call method to stale, as\r
131      * execution could not proceed without output being captured. Every call to\r
132      * call() method will use 2 threads\r
133      */\r
134     @Override\r
135     public ConfiguredExecutable<?> call() throws IOException {\r
136         Process proc = null;\r
137         Future<?> errorf = null;\r
138         Future<?> outputf = null;\r
139         PrintStream errorStream = null;\r
140         PrintStream outStream = null;\r
141         try {\r
142             log.info("Calculation started at " + System.nanoTime());\r
143             Util.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED\r
144                     .toString());\r
145             // pb.redirectErrorStream(false);\r
146             proc = pbuilder.start();\r
147 \r
148             /*\r
149              * any error message?\r
150              */\r
151             errorStream = new PrintStream(new File(pbuilder.directory()\r
152                     + File.separator + getError()));\r
153             StreamGobbler errorGobbler = new StreamGobbler(proc\r
154                     .getErrorStream(), errorStream, OutputType.ERROR);\r
155 \r
156             // any output?\r
157             outStream = new PrintStream(new File(pbuilder.directory()\r
158                     + File.separator + getOutput()));\r
159             StreamGobbler outputGobbler = new StreamGobbler(proc\r
160                     .getInputStream(), outStream, OutputType.OUTPUT);\r
161 \r
162             // kick them off\r
163             errorf = es.submit(errorGobbler);\r
164             outputf = es.submit(outputGobbler);\r
165 \r
166             // any error???\r
167             int exitVal = proc.waitFor();\r
168             log.info("Calculation completed at " + System.nanoTime());\r
169             Util.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED\r
170                     .toString());\r
171             // Let streams to write for a little more\r
172             errorf.get(2, TimeUnit.SECONDS);\r
173             outputf.get(2, TimeUnit.SECONDS);\r
174             // Close streams\r
175             errorStream.close();\r
176             outStream.close();\r
177             log.debug("Local process exit value: " + exitVal);\r
178         } catch (ExecutionException e) {\r
179             // Log and ignore this is not important\r
180             log.trace("Native Process output threw exception: "\r
181                     + e.getMessage());\r
182         } catch (TimeoutException e) {\r
183             // Log and ignore this is not important\r
184             log\r
185                     .trace("Native Process output took longer then 2s to write, aborting: "\r
186                             + e.getMessage());\r
187         } catch (InterruptedException e) {\r
188             log.error("Native Process was interrupted aborting: "\r
189                     + e.getMessage());\r
190             proc.destroy();\r
191             errorf.cancel(true);\r
192             outputf.cancel(true);\r
193             // restore interruption status\r
194             Thread.currentThread().interrupt();\r
195         } finally {\r
196             if (proc != null) {\r
197                 // just to make sure that we do not left anything running\r
198                 proc.destroy();\r
199             }\r
200             if (errorf != null) {\r
201                 errorf.cancel(true);\r
202             }\r
203             if (outputf != null) {\r
204                 outputf.cancel(true);\r
205             }\r
206             FileUtil.closeSilently(log, errorStream);\r
207             FileUtil.closeSilently(log, outStream);\r
208         }\r
209         return confExec;\r
210     }\r
211 \r
212     private String getOutput() {\r
213         if (confExec.getOutput() != null\r
214                 && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
215             return confExec.getOutput();\r
216         }\r
217         return PROC_OUT_FILE;\r
218     }\r
219 \r
220     private String getError() {\r
221         if (confExec.getError() != null\r
222                 && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
223             return confExec.getError();\r
224         }\r
225         return PROC_ERR_FILE;\r
226     }\r
227 \r
228 }\r