Refactoring (renaming) 2 classes: AsyncJobRunner.java -> AsyncClusterRunner.java...
[jabaws.git] / engine / compbio / engine / local / ExecutableWrapper.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  * Copyright (c) 2013 Alexander Sherstnev\r
3  * \r
4  *  Java Bioinformatics Analysis Web Services (JABAWS)\r
5  * @version: 2.5\r
6  * \r
7  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
8  *  Apache License version 2 as published by the Apache Software Foundation\r
9  * \r
10  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
11  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
12  *  License for more details.\r
13  * \r
14  *  A copy of the license is in apache_license.txt. It is also available here:\r
15  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
16  * \r
17  * Any republication or derived work distributed in source code form\r
18  * must include this copyright and license notice.\r
19  */\r
20 \r
21 package compbio.engine.local;\r
22 \r
23 import java.io.File;\r
24 import java.io.IOException;\r
25 import java.io.PrintStream;\r
26 import java.util.List;\r
27 import java.util.Map.Entry;\r
28 import java.util.concurrent.Callable;\r
29 import java.util.concurrent.ExecutionException;\r
30 import java.util.concurrent.ExecutorService;\r
31 import java.util.concurrent.Executors;\r
32 import java.util.concurrent.Future;\r
33 import java.util.concurrent.TimeUnit;\r
34 import java.util.concurrent.TimeoutException;\r
35 \r
36 import org.apache.log4j.Logger;\r
37 \r
38 import compbio.engine.client.ConfiguredExecutable;\r
39 import compbio.engine.client.PathValidator;\r
40 import compbio.engine.client.PipedExecutable;\r
41 import compbio.engine.client.EngineUtil;\r
42 import compbio.engine.client.Executable.ExecProvider;\r
43 import compbio.engine.local.StreamGobbler.OutputType;\r
44 import compbio.metadata.JobStatus;\r
45 import compbio.metadata.JobSubmissionException;\r
46 import compbio.util.FileUtil;\r
47 import compbio.util.SysPrefs;\r
48 import compbio.util.annotation.Immutable;\r
49 \r
50 @Immutable\r
51 public final class ExecutableWrapper implements\r
52         Callable<ConfiguredExecutable<?>> {\r
53 \r
54         public static final String PROC_IN_FILE = "procInput.txt";\r
55         public static final String PROC_OUT_FILE = "procOutput.txt";\r
56         public static final String PROC_ERR_FILE = "procError.txt";\r
57 \r
58         private static ExecutorService es;\r
59         private final ConfiguredExecutable<?> confExec;\r
60         private final ProcessBuilder pbuilder;\r
61 \r
62         private static final Logger log = Logger.getLogger(ExecutableWrapper.class);\r
63 \r
64         public ExecutableWrapper(ConfiguredExecutable<?> executable, String workDirectory) throws JobSubmissionException {\r
65                 this.confExec = executable;\r
66                 String cmd = null;\r
67                 try {\r
68                         cmd = executable.getCommand(ExecProvider.Local);\r
69                         PathValidator.validateExecutable(cmd);\r
70                 } catch (IllegalArgumentException e) {\r
71                         log.error(e.getMessage(), e.getCause());\r
72                         throw new JobSubmissionException(e);\r
73                 }\r
74                 List<String> params = executable.getParameters().getCommands();\r
75                 params.add(0, cmd);\r
76 \r
77                 pbuilder = new ProcessBuilder(params);\r
78                 if (executable.getEnvironment() != null) {\r
79                         log.debug("Setting command environment variables: " + pbuilder.environment());\r
80                         EngineUtil.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment());\r
81                         log.debug("Process environment:" + pbuilder.environment());\r
82                 }\r
83                 log.debug("Setting command: " + pbuilder.command());\r
84                 PathValidator.validateDirectory(workDirectory);\r
85                 pbuilder.directory(new File(workDirectory));\r
86                 log.debug("Current working directory is " + SysPrefs.getCurrentDirectory());\r
87                 log.debug("Setting working directory: " + workDirectory);\r
88                 // Initialize private executor to dump processes output if any to the file system\r
89                 synchronized (log) {\r
90                         if (es == null) {\r
91                                 /* \r
92                                  * Two threads are necessary for the process to write in two streams error and output\r
93                                  * simultaneously and hold the stream until exit. If only one thread is used, the\r
94                                  * second stream may never get access to the thread efficiently deadlocking the proccess!\r
95                                  */\r
96                         this.es = Executors.newCachedThreadPool();\r
97                                 log.debug("Initializing executor for local processes output dump");\r
98                                 // Make sure that the executors are going to be properly closed\r
99                                 Runtime.getRuntime().addShutdownHook(new Thread() {\r
100                                 @Override\r
101                                 public void run() {\r
102                                         shutdownService();\r
103                                 }\r
104                                 });\r
105                         }\r
106                 }\r
107         }\r
108 \r
109         /**\r
110          * Stops internal executor service which captures streams of native\r
111          * executables. This method is intended for stopping service if deployed in\r
112          * the web application content. There is NO NEED of using this method\r
113          * otherwise as the executor service is taken care of internally.\r
114         */\r
115         public static final void shutdownService() {\r
116                 if (es != null) {\r
117                         es.shutdownNow();\r
118                 }\r
119         }\r
120 \r
121         /**\r
122          * It is vital that output and error streams are captured immediately for\r
123          * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its\r
124          * own thread ready to capture the output. If executor could not execute\r
125          * capture immediately this could lead to the call method to stale, as\r
126          * execution could not proceed without output being captured. Every call to\r
127          * call() method will use 2 threads\r
128          * @throws JobSubmissionException \r
129         */\r
130         @Override\r
131         public ConfiguredExecutable<?> call() throws IOException {\r
132                 Process proc = null;\r
133                 Future<?> errorf = null;\r
134                 Future<?> outputf = null;\r
135                 PrintStream errorStream = null;\r
136                 PrintStream outStream = null;\r
137                 PrintStream comStream = null;\r
138 \r
139                 try {\r
140                         log.info("Calculation started at " + System.nanoTime());\r
141                         EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString());\r
142                         proc = pbuilder.start();\r
143 \r
144                         // store input command and program environment\r
145                         comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE));\r
146                         comStream.append("# program command\n");\r
147                         for (String par : pbuilder.command()) {\r
148                         comStream.append(par + " ");\r
149                         }\r
150                         comStream.append("\n\n# program environment\n");\r
151                         for (Entry<String, String> var : pbuilder.environment().entrySet()) {\r
152                                 comStream.append(var.getKey() + " =\t" + var.getValue() + "\n");\r
153                         }\r
154                         comStream.close();\r
155 \r
156                         // any error message?\r
157                         errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError()));\r
158                         StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR);\r
159 \r
160                         // any output?\r
161                         outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput()));\r
162                         StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT);\r
163 \r
164                         // kick it off\r
165                         errorf = es.submit(errorGobbler);\r
166                         outputf = es.submit(outputGobbler);\r
167 \r
168                         // any error???\r
169                         int exitVal = proc.waitFor();\r
170                         //proc.getClass();\r
171                         log.info("Calculation completed at " + System.nanoTime());\r
172                         EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString());\r
173 \r
174                         // Let streams to write for a little more\r
175                         errorf.get(2, TimeUnit.SECONDS);\r
176                         outputf.get(2, TimeUnit.SECONDS);\r
177 \r
178                         // Close streams\r
179                         errorStream.close();\r
180                         outStream.close();\r
181                         log.debug("Local process exit value: " + exitVal);\r
182                 } catch (ExecutionException e) {\r
183                         // Log and ignore this is not important\r
184                         log.trace("Native Process output threw exception: " + e.getMessage());\r
185                 } catch (TimeoutException e) {\r
186                         // Log and ignore this is not important\r
187                         log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage());\r
188                 } catch (InterruptedException e) {\r
189                         log.error("Native Process was interrupted aborting: " + e.getMessage());\r
190                         System.err.println("Native Process was interrupted aborting: " + e.getMessage());\r
191                         proc.destroy();\r
192                         errorf.cancel(true);\r
193                         outputf.cancel(true);\r
194                         // restore interruption status\r
195                         Thread.currentThread().interrupt();\r
196                 } finally {\r
197                         // just to make sure that we do not left anything running\r
198                         if (proc != null) {\r
199                                 proc.destroy();\r
200                         }\r
201                         if (errorf != null) {\r
202                                 errorf.cancel(true);\r
203                         }\r
204                         if (outputf != null) {\r
205                                 outputf.cancel(true);\r
206                         }\r
207                         FileUtil.closeSilently(log, errorStream);\r
208                         FileUtil.closeSilently(log, outStream);\r
209                 }\r
210                 return confExec;\r
211         }\r
212 \r
213         private String getOutput() {\r
214                 if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
215                         return confExec.getOutput();\r
216                 }\r
217                 return PROC_OUT_FILE;\r
218         }\r
219 \r
220         private String getError() {\r
221                 if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
222                         return confExec.getError();\r
223                 }\r
224                 return PROC_ERR_FILE;\r
225         }\r
226 \r
227 }\r