20 \r
21 package compbio.engine.local;\r
22 \r
23 import java.io.File;\r
24 import java.io.IOException;\r
25 import java.io.PrintStream;\r
26 import java.util.List;\r
27 import java.util.Map.Entry;\r
28 import java.util.concurrent.Callable;\r
29 import java.util.concurrent.ExecutionException;\r
30 import java.util.concurrent.ExecutorService;\r
31 import java.util.concurrent.Executors;\r
32 import java.util.concurrent.Future;\r
33 import java.util.concurrent.TimeUnit;\r
34 import java.util.concurrent.TimeoutException;\r
35 \r
36 import org.apache.log4j.Logger;\r
37 \r
38 import compbio.engine.client.ConfiguredExecutable;\r
39 import compbio.engine.client.PathValidator;\r
40 import compbio.engine.client.PipedExecutable;\r
41 import compbio.engine.client.EngineUtil;\r
42 import compbio.engine.client.Executable.ExecProvider;\r
43 import compbio.engine.local.StreamGobbler.OutputType;\r
44 import compbio.metadata.JobStatus;\r
45 import compbio.metadata.JobSubmissionException;\r
46 import compbio.util.FileUtil;\r
47 import compbio.util.SysPrefs;\r
48 import compbio.util.annotation.Immutable;\r
49 \r
50 @Immutable\r
51 public final class ExecutableWrapper implements\r
52         Callable<ConfiguredExecutable<?>> {\r
53 \r
54     public static final String PROC_IN_FILE = "procInput.txt";\r
55     public static final String PROC_OUT_FILE = "procOutput.txt";\r
56     public static final String PROC_ERR_FILE = "procError.txt";\r
57 \r
58     private static ExecutorService es;\r
59     private final ConfiguredExecutable<?> confExec;\r
60     private final ProcessBuilder pbuilder;\r
61 \r
62     private static final Logger log = Logger.getLogger(ExecutableWrapper.class);\r
63 \r
64     public ExecutableWrapper(ConfiguredExecutable<?> executable, String workDirectory) throws JobSubmissionException {\r
65         this.confExec = executable;\r
66         String cmd = null;\r
67         try {\r
68                 cmd = executable.getCommand(ExecProvider.Local);\r
69                 PathValidator.validateExecutable(cmd);\r
70         } catch (IllegalArgumentException e) {\r
71                 log.error(e.getMessage(), e.getCause());\r
72                 throw new JobSubmissionException(e);\r
73         }\r
74         List<String> params = executable.getParameters().getCommands();\r
75         params.add(0, cmd);\r
76 \r
77         pbuilder = new ProcessBuilder(params);\r
78         if (executable.getEnvironment() != null) {\r
79                 log.debug("Setting command environment variables: " + pbuilder.environment());\r
80                 EngineUtil.mergeEnvVariables(pbuilder.environment(), executable.getEnvironment());\r
81                 log.debug("Process environment:" + pbuilder.environment());\r
82         }\r
83         log.debug("Setting command: " + pbuilder.command());\r
84         PathValidator.validateDirectory(workDirectory);\r
85         pbuilder.directory(new File(workDirectory));\r
86         log.debug("Current working directory is " + SysPrefs.getCurrentDirectory());\r
87         log.debug("Setting working directory: " + workDirectory);\r
88         // Initialize private executor to dump processes output if any to the file system\r
89         synchronized (log) {\r
90                 if (es == null) {\r
91                         /* \r
92                          * Two threads are necessary for the process to write in two streams error and output\r
93                          * simultaneously and hold the stream until exit. If only one thread is used, the\r
94                          * second stream may never get access to the thread efficiently deadlocking the proccess!\r
95                          */\r
96                         this.es = Executors.newCachedThreadPool();\r
97                                 log.debug("Initializing executor for local processes output dump");\r
98                                 // Make sure that the executors are going to be properly closed\r
99                                 Runtime.getRuntime().addShutdownHook(new Thread() {\r
100                                 @Override\r
101                                 public void run() {\r
102                                         shutdownService();\r
103                                 }\r
104                                 });\r
105                 }\r
106         }\r
107     }\r
108 \r
109     /**\r
110      * Stops internal executor service which captures streams of native\r
111      * executables. This method is intended for stopping service if deployed in\r
112      * the web application content. There is NO NEED of using this method\r
113      * otherwise as the executor service is taken care of internally.\r
114      */\r
115     public static final void shutdownService() {\r
116         if (es != null) {\r
117                 es.shutdownNow();\r
118         }\r
119     }\r
120 \r
121     /**\r
122      * It is vital that output and error streams are captured immediately for\r
123      * this call() to succeed. Thus each instance if ExecutableWrapper has 2 its\r
124      * own thread ready to capture the output. If executor could not execute\r
125      * capture immediately this could lead to the call method to stale, as\r
126      * execution could not proceed without output being captured. Every call to\r
127      * call() method will use 2 threads\r
128      * @throws JobSubmissionException \r
129      */\r
130     @Override\r
131     public ConfiguredExecutable<?> call() throws IOException {\r
132         Process proc = null;\r
133         Future<?> errorf = null;\r
134         Future<?> outputf = null;\r
135         PrintStream errorStream = null;\r
136         PrintStream outStream = null;\r
137         PrintStream comStream = null;\r
138 \r
139     try {\r
140                 log.info("Calculation started at " + System.nanoTime());\r
141                 EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.STARTED.toString());\r
142                 proc = pbuilder.start();\r
143 \r
144                 // store input command and program environment\r
145                 comStream = new PrintStream(new File(pbuilder.directory() + File.separator + PROC_IN_FILE));\r
146                 comStream.append("# program command\n");\r
147                 for (String par : pbuilder.command()) {\r
148                         comStream.append(par + " ");\r
149                 }\r
150                 comStream.append("\n\n# program environment\n");\r
151                 for (Entry<String, String> var : pbuilder.environment().entrySet()) {\r
152                         comStream.append(var.getKey() + " =\t" + var.getValue() + "\n");\r
153                 }\r
154                 comStream.close();\r
155 \r
156                 // any error message?\r
157                 errorStream = new PrintStream(new File(pbuilder.directory() + File.separator + getError()));\r
158                 StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), errorStream, OutputType.ERROR);\r
159 \r
160                 // any output?\r
161                 outStream = new PrintStream(new File(pbuilder.directory() + File.separator + getOutput()));\r
162                 StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), outStream, OutputType.OUTPUT);\r
163 \r
164                 // kick it off\r
165                 errorf = es.submit(errorGobbler);\r
166                 outputf = es.submit(outputGobbler);\r
167 \r
168                 // any error???\r
169                 int exitVal = proc.waitFor();\r
170                 log.info("Calculation completed at " + System.nanoTime());\r
171                 EngineUtil.writeStatFile(confExec.getWorkDirectory(), JobStatus.FINISHED.toString());\r
172                 // Let streams to write for a little more\r
173                 errorf.get(2, TimeUnit.SECONDS);\r
174                 outputf.get(2, TimeUnit.SECONDS);\r
175 \r
176                 // Close streams\r
177                 errorStream.close();\r
178                 outStream.close();\r
179                 log.debug("Local process exit value: " + exitVal);\r
180         } catch (ExecutionException e) {\r
181                 // Log and ignore this is not important\r
182                 log.trace("Native Process output threw exception: " + e.getMessage());\r
183         } catch (TimeoutException e) {\r
184                 // Log and ignore this is not important\r
185                 log.trace("Native Process output took longer then 2s to write, aborting: " + e.getMessage());\r
186         } catch (InterruptedException e) {\r
187                 log.error("Native Process was interrupted aborting: " + e.getMessage());\r
188                 proc.destroy();\r
189                 errorf.cancel(true);\r
190                 outputf.cancel(true);\r
191                 // restore interruption status\r
192                 Thread.currentThread().interrupt();\r
193         } finally {\r
194                 // just to make sure that we do not left anything running\r
195                 if (proc != null) {\r
196                         proc.destroy();\r
197                 }\r
198                 if (errorf != null) {\r
199                         errorf.cancel(true);\r
200                 }\r
201                 if (outputf != null) {\r
202                         outputf.cancel(true);\r
203                 }\r
204                 FileUtil.closeSilently(log, errorStream);\r
205                 FileUtil.closeSilently(log, outStream);\r
206         }\r
207         return confExec;\r
208     }\r
209 \r
210     private String getOutput() {\r
211         if (confExec.getOutput() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
212                 return confExec.getOutput();\r
213         }\r
214         return PROC_OUT_FILE;\r
215     }\r
216 \r
217     private String getError() {\r
218         if (confExec.getError() != null && confExec.getExecutable() instanceof PipedExecutable<?>) {\r
219                 return confExec.getError();\r
220         }\r
221         return PROC_ERR_FILE;\r
222     }\r
223 }\r