re-formatting code
[jabaws.git] / engine / compbio / engine / Configurator.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine;\r
20 \r
21 import java.io.File;\r
22 import java.security.InvalidParameterException;\r
23 import java.util.List;\r
24 \r
25 import org.apache.log4j.Logger;\r
26 \r
27 import compbio.data.sequence.FastaSequence;\r
28 import compbio.engine.client.ConfExecutable;\r
29 import compbio.engine.client.ConfiguredExecutable;\r
30 import compbio.engine.client.Executable;\r
31 import compbio.engine.client.PathValidator;\r
32 import compbio.engine.cluster.drmaa.AsyncJobRunner;\r
33 import compbio.engine.cluster.drmaa.JobRunner;\r
34 import compbio.engine.conf.DirectoryManager;\r
35 import compbio.engine.conf.PropertyHelperManager;\r
36 import compbio.engine.local.AsyncLocalRunner;\r
37 import compbio.engine.local.LocalRunner;\r
38 import compbio.metadata.JobSubmissionException;\r
39 import compbio.util.PropertyHelper;\r
40 import compbio.util.SysPrefs;\r
41 import compbio.util.Util;\r
42 \r
43 public class Configurator {\r
44 \r
45         private static Logger log = Logger.getLogger(Configurator.class);\r
46         private static final PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
47 \r
48         public static final boolean IS_LOCAL_ENGINE_ENABLED = initBooleanValue("engine.local.enable");\r
49         public static final boolean IS_CLUSTER_ENGINE_ENABLED = initBooleanValue("engine.cluster.enable");\r
50         public final static String LOCAL_WORK_DIRECTORY = initLocalDirectory();\r
51         public final static String CLUSTER_WORK_DIRECTORY = initClusterWorkDirectory();\r
52 \r
53 \r
54 \r
55         private static boolean initBooleanValue(String key) {\r
56                 assert key != null;\r
57                 String status = ph.getProperty(key);\r
58                 log.debug("Loading property: " + key + " with value: " + status);\r
59                 if (Util.isEmpty(status)) {\r
60                         return false;\r
61                 }\r
62                 return new Boolean(status.trim()).booleanValue();\r
63         }\r
64 \r
65         private static String initClusterWorkDirectory() {\r
66                 String tmpDir = null;\r
67                 if (IS_CLUSTER_ENGINE_ENABLED) {\r
68                         tmpDir = ph.getProperty("cluster.tmp.directory");\r
69                         if (!Util.isEmpty(tmpDir)) {\r
70                                 tmpDir = tmpDir.trim();\r
71                         } else {\r
72                                 throw new RuntimeException(\r
73                                                 "Cluster work directory must be provided! ");\r
74                         }\r
75                         if (LOCAL_WORK_DIRECTORY != null\r
76                                         && LOCAL_WORK_DIRECTORY.equals(CLUSTER_WORK_DIRECTORY)) {\r
77                                 throw new InvalidParameterException(\r
78                                                 "Cluster engine output directory must be different of that for local engine!");\r
79                         }\r
80                 }\r
81                 return tmpDir;\r
82         }\r
83 \r
84         private static String initLocalDirectory() {\r
85                 String tmp_dir = ph.getProperty("local.tmp.directory");\r
86                 // Use system temporary directory if local.tmp.directory is not defined\r
87                 if (Util.isEmpty(tmp_dir)) {\r
88                         tmp_dir = SysPrefs.getSystemTmpDir();\r
89                         log.debug("local.tmp.directory is not defined using system tmp: " + tmp_dir);\r
90                 }\r
91                 if (!PathValidator.isAbsolutePath(tmp_dir)) {\r
92                         log.debug("local.tmp.directory path is relative! " + tmp_dir);\r
93                         tmp_dir = compbio.engine.client.Util.convertToAbsolute(tmp_dir);\r
94                         log.debug("local.tmp.directory path changed to absolute: " + tmp_dir);\r
95                 }\r
96                 return tmp_dir.trim();\r
97         }\r
98 \r
99         /**\r
100          * Depending on the values defined in the properties\r
101          * (engine.cluster.enable=true and engine.local.enable=true) return either\r
102          * Cluster job submission engine {@link #JobRunner} or local job submission\r
103          * engine {@link #LocalRunner} If both engines enabled than ask\r
104          * {@link LoadBalancer} for an engine. This method will fall back and return\r
105          * local engine if\r
106          * \r
107          * 1) No engines are defined in the properties or they have been defined\r
108          * incorrectly\r
109          * \r
110          * 2) Execution environment is Windows as the system cannot really run\r
111          * cluster submission from windows\r
112          * \r
113          * @param executable\r
114          * @return SyncExecutor backed up by either cluster or local engines\r
115          * @throws JobSubmissionException\r
116          */\r
117         static Executable.ExecProvider getExecProvider(\r
118                         ConfiguredExecutable<?> executable, List<FastaSequence> dataSet)\r
119                         throws JobSubmissionException {\r
120                 // Look where executable claims to be executed\r
121                 Executable.ExecProvider provider = executable.getSupportedRuntimes();\r
122                 if (!IS_CLUSTER_ENGINE_ENABLED && !IS_LOCAL_ENGINE_ENABLED) {\r
123                         // Both engines disabled!\r
124                         throw new RuntimeException(\r
125                                         "Both engines are disabled! "\r
126                                                         + "Check conf/Engine.cluster.properties and conf/Engine.local.properties. "\r
127                                                         + "At least one engine must be enabled!");\r
128                 }\r
129                 if (provider == Executable.ExecProvider.Local) {\r
130                         if (IS_LOCAL_ENGINE_ENABLED) {\r
131                                 return Executable.ExecProvider.Local;\r
132                         } else {\r
133                                 throw new JobSubmissionException(\r
134                                                 "Executable can be executed only on locally, but local engine is disabled!");\r
135                         }\r
136                 }\r
137                 if (provider == Executable.ExecProvider.Cluster) {\r
138                         if (IS_CLUSTER_ENGINE_ENABLED) {\r
139                                 return Executable.ExecProvider.Cluster;\r
140                         } else {\r
141                                 throw new JobSubmissionException(\r
142                                                 "Executable can be executed only on the cluster, but cluster engine is disabled!");\r
143                         }\r
144                 }\r
145                 // We are here if executable can be executed on both Cluster and Local\r
146                 // engines i.e. provider = Any\r
147                 // If we still here executable supports All exec environments\r
148                 // Check whether we work on windows\r
149                 if (SysPrefs.isWindows) {\r
150                         // no matter what the settings are, we cannot send tasks to the\r
151                         // cluster from windows env\r
152                         return Executable.ExecProvider.Local;\r
153                 }\r
154                 // Now if both engines are configured that load balance them\r
155                 if (IS_CLUSTER_ENGINE_ENABLED && IS_LOCAL_ENGINE_ENABLED) {\r
156                         // If the dataset is NULL than base a decision on local engine load\r
157                         // only\r
158                         if (dataSet == null) {\r
159                                 return LoadBalancer.getEngine(executable);\r
160                         }\r
161                         // If the dataset is provided, consider it\r
162                         // This should be the main root for any load balancing\r
163                         // configurations\r
164                         return LoadBalancer.getEngine(executable, dataSet);\r
165                 } else if (IS_CLUSTER_ENGINE_ENABLED) {\r
166                         return Executable.ExecProvider.Cluster;\r
167                 }\r
168                 // If we are here, than local engine is enabled or one of the two will\r
169                 // happen (1) exception is thrown if both engines are disabled\r
170                 // or (2) previous statement will return the cluster engine\r
171                 return Executable.ExecProvider.Local;\r
172         }\r
173 \r
174         public static <T> ConfiguredExecutable<T> configureExecutable(\r
175                         Executable<T> executable) throws JobSubmissionException {\r
176 \r
177                 ConfExecutable<T> confExec = new ConfExecutable<T>(executable,\r
178                                 DirectoryManager.getTaskDirectory(executable.getClass()));\r
179                 Executable.ExecProvider provider = getExecProvider(confExec, null);\r
180                 confExec.setExecProvider(provider);\r
181                 setupWorkDirectory(confExec, provider);\r
182                 return confExec;\r
183         }\r
184 \r
185         public static <T> ConfiguredExecutable<T> configureExecutable(\r
186                         Executable<T> executable, List<FastaSequence> dataSet)\r
187                         throws JobSubmissionException {\r
188 \r
189                 ConfExecutable<T> confExec = new ConfExecutable<T>(executable,\r
190                                 DirectoryManager.getTaskDirectory(executable.getClass()));\r
191                 Executable.ExecProvider provider = getExecProvider(confExec, dataSet);\r
192                 confExec.setExecProvider(provider);\r
193                 setupWorkDirectory(confExec, provider);\r
194                 return confExec;\r
195         }\r
196 \r
197         static <T> void setupWorkDirectory(ConfExecutable<T> confExec,\r
198                         Executable.ExecProvider provider) {\r
199                 assert provider != null && provider != Executable.ExecProvider.Any;\r
200                 String workDir = "";\r
201                 if (provider == Executable.ExecProvider.Local) {\r
202                         workDir = Configurator.LOCAL_WORK_DIRECTORY + File.separator\r
203                                         + confExec.getTaskId();\r
204                 } else {\r
205                         workDir = Configurator.CLUSTER_WORK_DIRECTORY + File.separator\r
206                                         + confExec.getTaskId();\r
207                 }\r
208                 // Create working directory for the task\r
209                 File wdir = new File(workDir);\r
210                 wdir.mkdir();\r
211                 log.info("Creating working directory for the task in: "\r
212                                 + wdir.getAbsolutePath());\r
213                 // Tell the executable where to get the results\r
214                 confExec.setWorkDirectory(workDir);\r
215         }\r
216 \r
217         public static <T> ConfiguredExecutable<T> configureExecutable(\r
218                         Executable<T> executable, Executable.ExecProvider provider)\r
219                         throws JobSubmissionException {\r
220                 if (executable == null) {\r
221                         throw new InvalidParameterException("Executable must be provided!");\r
222                 }\r
223                 ConfExecutable<T> confExec = new ConfExecutable<T>(executable,\r
224                                 DirectoryManager.getTaskDirectory(executable.getClass()));\r
225                 if (provider == Executable.ExecProvider.Cluster\r
226                                 && !IS_CLUSTER_ENGINE_ENABLED) {\r
227                         throw new JobSubmissionException(\r
228                                         "Cluster engine is disabled or not configured!");\r
229                 }\r
230                 if (provider == Executable.ExecProvider.Local\r
231                                 && !IS_LOCAL_ENGINE_ENABLED) {\r
232                         throw new JobSubmissionException(\r
233                                         "Local engine is disabled or not configured!");\r
234                 }\r
235                 confExec.setExecProvider(provider);\r
236                 setupWorkDirectory(confExec, provider);\r
237                 return confExec;\r
238         }\r
239 \r
240         public static AsyncExecutor getAsyncEngine(\r
241                         ConfiguredExecutable<?> executable, Executable.ExecProvider provider) {\r
242 \r
243                 assert provider != Executable.ExecProvider.Any && provider != null;\r
244                 if (provider == Executable.ExecProvider.Cluster) {\r
245                         return new AsyncJobRunner();\r
246                 }\r
247                 return new AsyncLocalRunner();\r
248         }\r
249 \r
250         public static SyncExecutor getSyncEngine(\r
251                         ConfiguredExecutable<?> executable, Executable.ExecProvider provider)\r
252                         throws JobSubmissionException {\r
253 \r
254                 assert provider != Executable.ExecProvider.Any && provider != null;\r
255                 if (provider == Executable.ExecProvider.Cluster) {\r
256                         return JobRunner.getInstance(executable);\r
257                 }\r
258                 return new LocalRunner(executable);\r
259         }\r
260 \r
261         public static AsyncExecutor getAsyncEngine(\r
262                         ConfiguredExecutable<?> executable) {\r
263                 if (isTargetedForLocalExecution(executable)) {\r
264                         return new AsyncLocalRunner();\r
265                 }\r
266                 return new AsyncJobRunner();\r
267         }\r
268 \r
269         public static AsyncExecutor getAsyncEngine(String taskId) {\r
270                 if (isLocal(taskId)) {\r
271                         return new AsyncLocalRunner();\r
272                 }\r
273                 return new AsyncJobRunner();\r
274         }\r
275 \r
276         public static SyncExecutor getSyncEngine(ConfiguredExecutable<?> executable)\r
277                         throws JobSubmissionException {\r
278                 if (isTargetedForLocalExecution(executable)) {\r
279                         return new LocalRunner(executable);\r
280                 }\r
281                 return JobRunner.getInstance(executable);\r
282         }\r
283 \r
284         static boolean isTargetedForLocalExecution(\r
285                         ConfiguredExecutable<?> executable) {\r
286                 // In the uncommon case that the cluster and local execution temporary\r
287                 // directories are the same, in this case the method return true anyway\r
288 \r
289                 /*\r
290                  * Could have done this String taskDir = executable.getWorkDirectory();\r
291                  * int idx = taskDir.lastIndexOf(File.separator); String workDir =\r
292                  * taskDir.substring(0, idx); assert\r
293                  * !(workDir.equals(CLUSTER_WORK_DIRECTORY) && workDir\r
294                  * .equals(LOCAL_WORK_DIRECTORY)) :\r
295                  * "Could not determine executable target!"; if\r
296                  * (workDir.equals(LOCAL_WORK_DIRECTORY)) { return true; }\r
297                  */\r
298                 String taskDir = executable.getTaskId();\r
299                 return isLocal(taskDir);\r
300         }\r
301 \r
302         static boolean isLocal(String taskId) {\r
303                 if (Util.isEmpty(taskId)) {\r
304                         throw new NullPointerException("TaskId must be provided!");\r
305                 }\r
306                 if (!compbio.engine.client.Util.isValidJobId(taskId)) {\r
307                         throw new InvalidParameterException("TaskId is not valid!");\r
308                 }\r
309                 return !taskId.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX);\r
310         }\r
311 \r
312         public static String getWorkDirectory(String taskId) {\r
313                 assert !compbio.util.Util.isEmpty(taskId);\r
314                 assert compbio.engine.client.Util.isValidJobId(taskId);\r
315                 log.info("Getting workdirectory for TaskID: " + taskId);\r
316                 if (taskId.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX)) {\r
317                         return CLUSTER_WORK_DIRECTORY + File.separator + taskId;\r
318                 }\r
319                 return LOCAL_WORK_DIRECTORY + File.separator + taskId;\r
320         }\r
321 }\r