Next version of JABA
[jabaws.git] / engine / compbio / engine / Configurator.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine;\r
20 \r
21 import java.io.File;\r
22 import java.security.InvalidParameterException;\r
23 import java.util.List;\r
24 \r
25 import org.apache.log4j.Logger;\r
26 \r
27 import compbio.data.sequence.FastaSequence;\r
28 import compbio.engine.client.ConfExecutable;\r
29 import compbio.engine.client.ConfiguredExecutable;\r
30 import compbio.engine.client.Executable;\r
31 import compbio.engine.client.PathValidator;\r
32 import compbio.engine.cluster.drmaa.AsyncJobRunner;\r
33 import compbio.engine.cluster.drmaa.JobRunner;\r
34 import compbio.engine.conf.DirectoryManager;\r
35 import compbio.engine.conf.PropertyHelperManager;\r
36 import compbio.engine.local.AsyncLocalRunner;\r
37 import compbio.engine.local.LocalRunner;\r
38 import compbio.metadata.JobSubmissionException;\r
39 import compbio.util.PropertyHelper;\r
40 import compbio.util.SysPrefs;\r
41 import compbio.util.Util;\r
42 \r
43 public class Configurator {\r
44 \r
45         private static Logger log = Logger.getLogger(Configurator.class);\r
46 \r
47         private static final PropertyHelper ph = PropertyHelperManager\r
48                         .getPropertyHelper();\r
49 \r
50         public static final boolean IS_LOCAL_ENGINE_ENABLED = initBooleanValue("engine.local.enable");\r
51         public static final boolean IS_CLUSTER_ENGINE_ENABLED = initBooleanValue("engine.cluster.enable");\r
52 \r
53         public final static String LOCAL_WORK_DIRECTORY = initLocalDirectory();\r
54         public final static String CLUSTER_WORK_DIRECTORY = initClusterWorkDirectory();\r
55 \r
56         private static boolean initBooleanValue(String key) {\r
57                 assert key != null;\r
58                 String status = ph.getProperty(key);\r
59                 log.debug("Loading property: " + key + " with value: " + status);\r
60                 if (Util.isEmpty(status)) {\r
61                         return false;\r
62                 }\r
63                 return new Boolean(status.trim()).booleanValue();\r
64         }\r
65 \r
66         private static String initClusterWorkDirectory() {\r
67                 String tmpDir = null;\r
68                 if (IS_CLUSTER_ENGINE_ENABLED) {\r
69                         tmpDir = ph.getProperty("cluster.tmp.directory");\r
70                         if (!Util.isEmpty(tmpDir)) {\r
71                                 tmpDir = tmpDir.trim();\r
72                         } else {\r
73                                 throw new RuntimeException(\r
74                                                 "Cluster work directory must be provided! ");\r
75                         }\r
76                         if (LOCAL_WORK_DIRECTORY != null\r
77                                         && LOCAL_WORK_DIRECTORY.equals(CLUSTER_WORK_DIRECTORY)) {\r
78                                 throw new InvalidParameterException(\r
79                                                 "Cluster engine output directory must be different of that for local engine!");\r
80                         }\r
81                 }\r
82                 return tmpDir;\r
83         }\r
84 \r
85         private static String initLocalDirectory() {\r
86                 String tmp_dir = ph.getProperty("local.tmp.directory");\r
87                 // Use system temp directory is local.tmp.directory is not defined\r
88                 if (Util.isEmpty(tmp_dir)) {\r
89                         tmp_dir = SysPrefs.getSystemTmpDir();\r
90                         log.debug("local.tmp.directory is not defined using system tmp: "\r
91                                         + tmp_dir);\r
92                 }\r
93                 if (!PathValidator.isAbsolutePath(tmp_dir)) {\r
94                         log.debug("local.tmp.directory path is relative! " + tmp_dir);\r
95                         tmp_dir = compbio.engine.client.Util.convertToAbsolute(tmp_dir);\r
96                         log.debug("local.tmp.directory path changed to absolute: "\r
97                                         + tmp_dir);\r
98                 }\r
99                 return tmp_dir.trim();\r
100         }\r
101 \r
102         /**\r
103          * Depending on the values defined in the properties\r
104          * (engine.cluster.enable=true and engine.local.enable=true) return either\r
105          * Cluster job submission engine {@link #JobRunner} or local job submission\r
106          * engine {@link #LocalRunner} If both engines enabled than ask\r
107          * {@link LoadBalancer} for an engine. This method will fall back and return\r
108          * local engine if\r
109          * \r
110          * 1) No engines are defined in the properties or they have been defined\r
111          * incorrectly\r
112          * \r
113          * 2) Execution environment is Windows as the system cannot really run\r
114          * cluster submission from windows\r
115          * \r
116          * @param executable\r
117          * @return SyncExecutor backed up by either cluster or local engines\r
118          * @throws JobSubmissionException\r
119          */\r
120         static Executable.ExecProvider getExecProvider(\r
121                         ConfiguredExecutable<?> executable, List<FastaSequence> dataSet)\r
122                         throws JobSubmissionException {\r
123                 // Look where executable claims to be executed\r
124                 Executable.ExecProvider provider = executable.getSupportedRuntimes();\r
125                 if (!IS_CLUSTER_ENGINE_ENABLED && !IS_LOCAL_ENGINE_ENABLED) {\r
126                         // Both engines disabled!\r
127                         throw new RuntimeException(\r
128                                         "Both engines are disabled! "\r
129                                                         + "Check conf/Engine.cluster.properties and conf/Engine.local.properties. "\r
130                                                         + "At least one engine must be enabled!");\r
131                 }\r
132                 if (provider == Executable.ExecProvider.Local) {\r
133                         if (IS_LOCAL_ENGINE_ENABLED) {\r
134                                 return Executable.ExecProvider.Local;\r
135                         } else {\r
136                                 throw new JobSubmissionException(\r
137                                                 "Executable can be executed only on locally, but local engine is disabled!");\r
138                         }\r
139                 }\r
140                 if (provider == Executable.ExecProvider.Cluster) {\r
141                         if (IS_CLUSTER_ENGINE_ENABLED) {\r
142                                 return Executable.ExecProvider.Cluster;\r
143                         } else {\r
144                                 throw new JobSubmissionException(\r
145                                                 "Executable can be executed only on the cluster, but cluster engine is disabled!");\r
146                         }\r
147                 }\r
148                 // We are here if executable can be executed on both Cluster and Local\r
149                 // engines\r
150                 // i.e. provider = Any\r
151                 // If we still here executable supports All exec environments\r
152                 // Check whether we work on windows\r
153                 if (SysPrefs.isWindows) {\r
154                         // no matter what the settings are, we cannot send tasks to the\r
155                         // cluster from windows env\r
156                         return Executable.ExecProvider.Local;\r
157                 }\r
158                 // Now if both engines are configured that load balance them\r
159                 if (IS_CLUSTER_ENGINE_ENABLED && IS_LOCAL_ENGINE_ENABLED) {\r
160                         // If the dataset is NULL than base a decision on local engine load\r
161                         // only\r
162                         if (dataSet == null) {\r
163                                 return LoadBalancer.getEngine(executable);\r
164                         }\r
165                         // If the dataset is provided, consider it\r
166                         // This should be the main root for any load balancing\r
167                         // configurations\r
168                         return LoadBalancer.getEngine(executable, dataSet);\r
169                 } else if (IS_CLUSTER_ENGINE_ENABLED) {\r
170                         return Executable.ExecProvider.Cluster;\r
171                 }\r
172                 // If we are here, than local engine is enabled or one of the two will\r
173                 // happen (1) exception is thrown if both engines are disabled\r
174                 // or (2) previous statement will return the cluster engine\r
175                 return Executable.ExecProvider.Local;\r
176         }\r
177 \r
178         public static <T> ConfiguredExecutable<T> configureExecutable(\r
179                         Executable<T> executable) throws JobSubmissionException {\r
180 \r
181                 ConfExecutable<T> confExec = new ConfExecutable<T>(executable,\r
182                                 DirectoryManager.getTaskDirectory(executable.getClass()));\r
183                 Executable.ExecProvider provider = getExecProvider(confExec, null);\r
184                 confExec.setExecProvider(provider);\r
185                 setupWorkDirectory(confExec, provider);\r
186                 return confExec;\r
187         }\r
188 \r
189         public static <T> ConfiguredExecutable<T> configureExecutable(\r
190                         Executable<T> executable, List<FastaSequence> dataSet)\r
191                         throws JobSubmissionException {\r
192 \r
193                 ConfExecutable<T> confExec = new ConfExecutable<T>(executable,\r
194                                 DirectoryManager.getTaskDirectory(executable.getClass()));\r
195                 Executable.ExecProvider provider = getExecProvider(confExec, dataSet);\r
196                 confExec.setExecProvider(provider);\r
197                 setupWorkDirectory(confExec, provider);\r
198                 return confExec;\r
199         }\r
200 \r
201         static <T> void setupWorkDirectory(ConfExecutable<T> confExec,\r
202                         Executable.ExecProvider provider) {\r
203                 assert provider != null && provider != Executable.ExecProvider.Any;\r
204                 String workDir = "";\r
205                 if (provider == Executable.ExecProvider.Local) {\r
206                         workDir = Configurator.LOCAL_WORK_DIRECTORY + File.separator\r
207                                         + confExec.getTaskId();\r
208                 } else {\r
209                         workDir = Configurator.CLUSTER_WORK_DIRECTORY + File.separator\r
210                                         + confExec.getTaskId();\r
211                 }\r
212                 // Create working directory for the task\r
213                 File wdir = new File(workDir);\r
214                 wdir.mkdir();\r
215                 log.info("Creating working directory for the task in: "\r
216                                 + wdir.getAbsolutePath());\r
217                 // Tell the executable where to get the results\r
218                 confExec.setWorkDirectory(workDir);\r
219         }\r
220 \r
221         public static <T> ConfiguredExecutable<T> configureExecutable(\r
222                         Executable<T> executable, Executable.ExecProvider provider)\r
223                         throws JobSubmissionException {\r
224                 if (executable == null) {\r
225                         throw new InvalidParameterException("Executable must be provided!");\r
226                 }\r
227                 ConfExecutable<T> confExec = new ConfExecutable<T>(executable,\r
228                                 DirectoryManager.getTaskDirectory(executable.getClass()));\r
229                 if (provider == Executable.ExecProvider.Cluster\r
230                                 && !IS_CLUSTER_ENGINE_ENABLED) {\r
231                         throw new JobSubmissionException(\r
232                                         "Cluster engine is disabled or not configured!");\r
233                 }\r
234                 if (provider == Executable.ExecProvider.Local\r
235                                 && !IS_LOCAL_ENGINE_ENABLED) {\r
236                         throw new JobSubmissionException(\r
237                                         "Local engine is disabled or not configured!");\r
238                 }\r
239                 confExec.setExecProvider(provider);\r
240                 setupWorkDirectory(confExec, provider);\r
241                 return confExec;\r
242         }\r
243 \r
244         public static AsyncExecutor getAsyncEngine(\r
245                         ConfiguredExecutable<?> executable, Executable.ExecProvider provider) {\r
246 \r
247                 assert provider != Executable.ExecProvider.Any && provider != null;\r
248                 if (provider == Executable.ExecProvider.Cluster) {\r
249                         return new AsyncJobRunner();\r
250                 }\r
251                 return new AsyncLocalRunner();\r
252         }\r
253 \r
254         public static SyncExecutor getSyncEngine(\r
255                         ConfiguredExecutable<?> executable, Executable.ExecProvider provider)\r
256                         throws JobSubmissionException {\r
257 \r
258                 assert provider != Executable.ExecProvider.Any && provider != null;\r
259                 if (provider == Executable.ExecProvider.Cluster) {\r
260                         return JobRunner.getInstance(executable);\r
261                 }\r
262                 return new LocalRunner(executable);\r
263         }\r
264 \r
265         public static AsyncExecutor getAsyncEngine(\r
266                         ConfiguredExecutable<?> executable) {\r
267                 if (isTargetedForLocalExecution(executable)) {\r
268                         return new AsyncLocalRunner();\r
269                 }\r
270                 return new AsyncJobRunner();\r
271         }\r
272 \r
273         public static AsyncExecutor getAsyncEngine(String taskId) {\r
274                 if (isLocal(taskId)) {\r
275                         return new AsyncLocalRunner();\r
276                 }\r
277                 return new AsyncJobRunner();\r
278         }\r
279 \r
280         public static SyncExecutor getSyncEngine(ConfiguredExecutable<?> executable)\r
281                         throws JobSubmissionException {\r
282                 if (isTargetedForLocalExecution(executable)) {\r
283                         return new LocalRunner(executable);\r
284                 }\r
285                 return JobRunner.getInstance(executable);\r
286         }\r
287 \r
288         static boolean isTargetedForLocalExecution(\r
289                         ConfiguredExecutable<?> executable) {\r
290                 // In the uncommon case that the cluster and local execution temp\r
291                 // directories are the same,\r
292                 // in this case the method return true anyway\r
293 \r
294                 /*\r
295                  * Could have done this String taskDir = executable.getWorkDirectory();\r
296                  * int idx = taskDir.lastIndexOf(File.separator); String workDir =\r
297                  * taskDir.substring(0, idx); assert\r
298                  * !(workDir.equals(CLUSTER_WORK_DIRECTORY) && workDir\r
299                  * .equals(LOCAL_WORK_DIRECTORY)) :\r
300                  * "Could not determine executable target!"; if\r
301                  * (workDir.equals(LOCAL_WORK_DIRECTORY)) { return true; }\r
302                  */\r
303                 String taskDir = executable.getTaskId();\r
304                 return isLocal(taskDir);\r
305         }\r
306 \r
307         static boolean isLocal(String taskId) {\r
308                 if (Util.isEmpty(taskId)) {\r
309                         throw new NullPointerException("TaskId must be provided!");\r
310                 }\r
311                 if (!compbio.engine.client.Util.isValidJobId(taskId)) {\r
312                         throw new InvalidParameterException("TaskId is not valid!");\r
313                 }\r
314                 return !taskId.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX);\r
315         }\r
316 \r
317         public static String getWorkDirectory(String taskId) {\r
318                 assert !compbio.util.Util.isEmpty(taskId);\r
319                 assert compbio.engine.client.Util.isValidJobId(taskId);\r
320                 log.info("Getting workdirectory for TaskID: " + taskId);\r
321                 if (taskId.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX)) {\r
322                         return CLUSTER_WORK_DIRECTORY + File.separator + taskId;\r
323                 }\r
324                 return LOCAL_WORK_DIRECTORY + File.separator + taskId;\r
325         }\r
326 }\r