New Linux binaries for ViennaRNA
[jabaws.git] / engine / compbio / engine / local / ExecutableWrapper.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  * Copyright (c) 2013 Alexander Sherstnev\r
3  * \r
4  *  Java Bioinformatics Analysis Web Services (JABAWS)\r
5  * @version: 2.5\r
6  * \r
7  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
8  *  Apache License version 2 as published by the Apache Software Foundation\r
9  * \r
10  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
11  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
12  *  License for more details.\r
13  * \r
14  *  A copy of the license is in apache_license.txt. It is also available here:\r
15  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
16  * \r
17  * Any republication or derived work distributed in source code form\r
18  * must include this copyright and license notice.\r
19  */\r
20 \r
21 package compbio.engine.local;\r
22 \r
23 import java.io.File;\r
24 import java.io.IOException;\r
25 import java.io.PrintStream;\r
26 import java.util.List;\r
27 import java.util.Map;\r
28 import java.util.Map.Entry;\r
29 import java.util.concurrent.Callable;\r
30 import java.util.concurrent.ExecutionException;\r
31 import java.util.concurrent.ExecutorService;\r
32 import java.util.concurrent.Executors;\r
33 import java.util.concurrent.Future;\r
34 import java.util.concurrent.TimeUnit;\r
35 import java.util.concurrent.TimeoutException;\r
36 \r
37 import org.apache.log4j.Logger;\r
38 \r
39 import compbio.engine.client.ConfiguredExecutable;\r
40 import compbio.engine.client.PathValidator;\r
41 import compbio.engine.client.PipedExecutable;\r
42 import compbio.engine.client.Util;\r
43 import compbio.engine.client.Executable.ExecProvider;\r
44 import compbio.engine.local.StreamGobbler.OutputType;\r
45 import compbio.metadata.JobStatus;\r
46 import compbio.metadata.JobSubmissionException;\r
47 import compbio.util.FileUtil;\r
48 import compbio.util.SysPrefs;\r
49 import compbio.util.annotation.Immutable;\r
50 \r
51 @Immutable\r
52 public final class ExecutableWrapper implements\r
53         Callable<ConfiguredExecutable<?>> {\r
54 \r
55     public static final String PROC_IN_FILE = "procInput.txt";\r
56     public static final String PROC_OUT_FILE = "procOutput.txt";\r
57     public static final String PROC_ERR_FILE = "procError.txt";\r
58 \r
59     private static ExecutorService es;\r
60     private final ConfiguredExecutable<?> confExec;\r
61     private final ProcessBuilder pbuilder;\r
62 \r
63     private static final Logger log = Logger.getLogger(ExecutableWrapper.class);\r
64 \r
65     public ExecutableWrapper(ConfiguredExecutable<?> executable, String workDirectory) throws JobSubmissionException {\r
66         this.confExec = executable;\r
67         String cmd = null;\r
68         try {\r
69                 cmd = executable.getCommand(ExecProvider.Local);\r
70                 PathValidator.validateExecutable(cmd);\r
71         } catch (IllegalArgumentException e) {\r
72                 log.error(e.getMessage(), e.getCause());\r
73                 throw new JobSubmissionException(e);\r
74         }\r
75         List<String> params = executable.getParameters().getCommands();\r
76         params.add(0, cmd);\r
77 \r
78         pbuilder = new ProcessBuilder(params);\r
79         if (executable.getEnvironment() != null) {\r
80                 log.debug("Setting command environment variables: " + pbuilder.environment());\r
81                 Util.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment());\r
82                 log.debug("Process environment:" + pbuilder.environment());\r
83         }\r
84         log.debug("Setting command: " + pbuilder.command());\r
85         PathValidator.validateDirectory(workDirectory);\r
86         pbuilder.directory(new File(workDirectory));\r
87         log.debug("Current working directory is " + SysPrefs.getCurrentDirectory());\r
88         log.debug("Setting working directory: " + workDirectory);\r
89         // Initialize private executor to dump processes output if any to the file system\r
90         synchronized (log) {\r
91                 if (es == null) {\r
92                         /* \r
93                          * Two threads are necessary for the process to write in two streams error and output\r
94                          * simultaneously and hold the stream until exit. If only one thread is used, the\r
95                          * second stream may never get access to the thread efficiently deadlocking the proccess!\r
96                          */\r
97                         this.es = Executors.newCachedThreadPool();\r
98                                 log.debug("Initializing executor for local processes output dump");\r
99                                 // Make sure that the executors are going to be properly closed\r
100                                 Runtime.getRuntime().addShutdownHook(new Thread() {\r
101                                 @Override\r
102                                 public void run() {\r
103                                         shutdownService();\r
104                                 }\r
105                                 });\r
106                 }\r
107         }\r
108     }\r
109 \r
110     /**\r
111      * Stops internal executor service which captures streams of native\r
112      * executables. This method is intended for stopping service if deployed in\r
113      * the web application content. There is NO NEED of using this method\r
114      * otherwise as the executor service is taken care of internally.\r
115      */\r
116     public static final void shutdownService() {\r
117         if (es != null) {\r
118                 es.shutdownNow();\r
119         }\r
120     }\r
121 \r
122     /**\r
123      * It is vital that output and error streams are captured immediately for\r
124      * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its\r
125      * own thread ready to capture the output. If executor could not execute\r
126      * capture immediately this could lead to the call method to stale, as\r
127      * execution could not proceed without output being captured. Every call to\r
128      * call() method will use 2 threads\r
129      * @throws JobSubmissionException \r
130      */\r
131     @Override\r
132     public ConfiguredExecutable<?> call() throws IOException {\r
133         Process proc = null;\r
134         Future<?> errorf = null;\r
135         Future<?> outputf = null;\r
136         PrintStream errorStream = null;\r
137         PrintStream outStream = null;\r
138         PrintStream comStream = null;\r
139 \r
140     try {\r
141                 log.info("Calculation started at " + System.nanoTime());\r
142                 Util.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString());\r
143                 proc = pbuilder.start();\r
144 \r
145                 // store input command and program environment\r
146                 comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE));\r
147                 comStream.append("# program command\n");\r
148                 for (String par : pbuilder.command()) {\r
149                         comStream.append(par + " ");\r
150                 }\r
151                 comStream.append("\n\n# program environment\n");\r
152                 for (Entry<String, String> var : pbuilder.environment().entrySet()) {\r
153                         comStream.append(var.getKey() + " =\t" + var.getValue() + "\n");\r
154                 }\r
155                 comStream.close();\r
156 \r
157                 // any error message?\r
158                 errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError()));\r
159                 StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR);\r
160 \r
161                 // any output?\r
162                 outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput()));\r
163                 StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT);\r
164 \r
165                 // kick it off\r
166                 errorf = es.submit(errorGobbler);\r
167                 outputf = es.submit(outputGobbler);\r
168 \r
169                 // any error???\r
170                 int exitVal = proc.waitFor();\r
171                 log.info("Calculation completed at " + System.nanoTime());\r
172                 Util.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString());\r
173                 // Let streams to write for a little more\r
174                 errorf.get(2, TimeUnit.SECONDS);\r
175                 outputf.get(2, TimeUnit.SECONDS);\r
176 \r
177                 // Close streams\r
178                 errorStream.close();\r
179                 outStream.close();\r
180                 log.debug("Local process exit value: " + exitVal);\r
181         } catch (ExecutionException e) {\r
182                 // Log and ignore this is not important\r
183                 log.trace("Native Process output threw exception: " + e.getMessage());\r
184         } catch (TimeoutException e) {\r
185                 // Log and ignore this is not important\r
186                 log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage());\r
187         } catch (InterruptedException e) {\r
188                 log.error("Native Process was interrupted aborting: " + e.getMessage());\r
189                 proc.destroy();\r
190                 errorf.cancel(true);\r
191                 outputf.cancel(true);\r
192                 // restore interruption status\r
193                 Thread.currentThread().interrupt();\r
194         } finally {\r
195                 // just to make sure that we do not left anything running\r
196                 if (proc != null) {\r
197                         proc.destroy();\r
198                 }\r
199                 if (errorf != null) {\r
200                         errorf.cancel(true);\r
201                 }\r
202                 if (outputf != null) {\r
203                         outputf.cancel(true);\r
204                 }\r
205                 FileUtil.closeSilently(log, errorStream);\r
206                 FileUtil.closeSilently(log, outStream);\r
207         }\r
208         return confExec;\r
209     }\r
210 \r
211     private String getOutput() {\r
212         if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
213                 return confExec.getOutput();\r
214         }\r
215         return PROC_OUT_FILE;\r
216     }\r
217 \r
218     private String getError() {\r
219         if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
220                 return confExec.getError();\r
221         }\r
222         return PROC_ERR_FILE;\r
223     }\r
224 }\r