Refactoring (renaming) 2 classes: AsyncJobRunner.java -> AsyncClusterRunner.java...
[jabaws.git] / engine / compbio / engine / Configurator.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine;\r
20 \r
21 import java.io.File;\r
22 import java.security.InvalidParameterException;\r
23 import java.util.List;\r
24 \r
25 import org.apache.log4j.Logger;\r
26 \r
27 import compbio.data.sequence.FastaSequence;\r
28 import compbio.engine.client.ConfExecutable;\r
29 import compbio.engine.client.ConfiguredExecutable;\r
30 import compbio.engine.client.Executable;\r
31 import compbio.engine.client.PathValidator;\r
32 import compbio.engine.client.EngineUtil;\r
33 import compbio.engine.cluster.drmaa.AsyncClusterRunner;\r
34 import compbio.engine.cluster.drmaa.ClusterRunner;\r
35 import compbio.engine.conf.DirectoryManager;\r
36 import compbio.engine.conf.PropertyHelperManager;\r
37 import compbio.engine.local.AsyncLocalRunner;\r
38 import compbio.engine.local.LocalRunner;\r
39 import compbio.metadata.JobSubmissionException;\r
40 import compbio.util.PropertyHelper;\r
41 import compbio.util.SysPrefs;\r
42 import compbio.util.Util;\r
43 \r
44 public class Configurator {\r
45 \r
46         private static Logger log = Logger.getLogger(Configurator.class);\r
47         private static final PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
48 \r
49         public static final boolean IS_LOCAL_ENGINE_ENABLED = initBooleanValue("engine.local.enable");\r
50         public static final boolean IS_CLUSTER_ENGINE_ENABLED = initBooleanValue("engine.cluster.enable");\r
51         public final static String LOCAL_WORK_DIRECTORY = initLocalDirectory();\r
52         public final static String CLUSTER_WORK_DIRECTORY = initClusterWorkDirectory();\r
53 \r
54         private static boolean initBooleanValue(String key) {\r
55                 assert key != null;\r
56                 String status = ph.getProperty(key);\r
57                 log.debug("Loading property: " + key + " with value: " + status);\r
58                 if (Util.isEmpty(status)) {\r
59                         return false;\r
60                 }\r
61                 return new Boolean(status.trim()).booleanValue();\r
62         }\r
63 \r
64         private static String initClusterWorkDirectory() {\r
65                 String tmpDir = null;\r
66                 if (IS_CLUSTER_ENGINE_ENABLED) {\r
67                         tmpDir = ph.getProperty("cluster.tmp.directory");\r
68                         if (!Util.isEmpty(tmpDir)) {\r
69                                 tmpDir = tmpDir.trim();\r
70                         } else {\r
71                                 throw new RuntimeException("Cluster work directory must be provided! ");\r
72                         }\r
73                         if (LOCAL_WORK_DIRECTORY != null && LOCAL_WORK_DIRECTORY.equals(CLUSTER_WORK_DIRECTORY)) {\r
74                                 throw new InvalidParameterException("Cluster engine output directory must be different of that for local engine!");\r
75                         }\r
76                 }\r
77                 return tmpDir;\r
78         }\r
79 \r
80         private static String initLocalDirectory() {\r
81                 String tmp_dir = ph.getProperty("local.tmp.directory");\r
82                 // Use system temporary directory if local.tmp.directory is not defined\r
83                 if (Util.isEmpty(tmp_dir)) {\r
84                         tmp_dir = SysPrefs.getSystemTmpDir();\r
85                         log.debug("local.tmp.directory is not defined using system tmp: " + tmp_dir);\r
86                 }\r
87                 if (!PathValidator.isAbsolutePath(tmp_dir)) {\r
88                         log.debug("local.tmp.directory path is relative! " + tmp_dir);\r
89                         tmp_dir = EngineUtil.convertToAbsolute(tmp_dir);\r
90                         log.debug("local.tmp.directory path changed to absolute: " + tmp_dir);\r
91                 }\r
92                 return tmp_dir.trim();\r
93         }\r
94 \r
95         /**\r
96          * Depending on the values defined in the properties\r
97          * (engine.cluster.enable=true and engine.local.enable=true) return either\r
98          * Cluster job submission engine {@link #JobRunner} or local job submission\r
99          * engine {@link #LocalRunner} If both engines enabled than ask\r
100          * {@link LoadBalancer} for an engine. This method will fall back and return\r
101          * local engine if\r
102          * \r
103          * 1) No engines are defined in the properties or they have been defined incorrectly\r
104          * \r
105          * 2) Execution environment is Windows as the system cannot really run\r
106          * cluster submission from windows\r
107          * \r
108          * @param executable\r
109          * @return SyncExecutor backed up by either cluster or local engines\r
110          * @throws JobSubmissionException\r
111          */\r
112         static Executable.ExecProvider getExecProvider(ConfiguredExecutable<?> executable, List<FastaSequence> dataSet)\r
113                         throws JobSubmissionException {\r
114                 // Look where executable claims to be executed\r
115                 Executable.ExecProvider provider = executable.getSupportedRuntimes();\r
116                 if (!IS_CLUSTER_ENGINE_ENABLED && !IS_LOCAL_ENGINE_ENABLED) {\r
117                         // Both engines disabled!\r
118                         throw new RuntimeException("Both engines are disabled! "\r
119                                         + "Check conf/Engine.cluster.properties and conf/Engine.local.properties. At least one engine must be enabled!");\r
120                 }\r
121                 if (provider == Executable.ExecProvider.Local) {\r
122                         if (IS_LOCAL_ENGINE_ENABLED) {\r
123                                 return Executable.ExecProvider.Local;\r
124                         } else {\r
125                                 throw new JobSubmissionException("Executable can be executed only on locally, but local engine is disabled!");\r
126                         }\r
127                 }\r
128                 if (provider == Executable.ExecProvider.Cluster) {\r
129                         if (IS_CLUSTER_ENGINE_ENABLED) {\r
130                                 return Executable.ExecProvider.Cluster;\r
131                         } else {\r
132                                 throw new JobSubmissionException("Executable can be executed only on the cluster, but cluster engine is disabled!");\r
133                         }\r
134                 }\r
135                 // We are here if executable can be executed on both Cluster and Local\r
136                 // engines i.e. provider = Any\r
137                 // If we still here executable supports All exec environments\r
138                 // Check whether we work on windows\r
139                 if (SysPrefs.isWindows) {\r
140                         // no matter what the settings are, we cannot send tasks to the\r
141                         // cluster from windows env\r
142                         return Executable.ExecProvider.Local;\r
143                 }\r
144                 // Now if both engines are configured that load balance them\r
145                 if (IS_CLUSTER_ENGINE_ENABLED && IS_LOCAL_ENGINE_ENABLED) {\r
146                         // If the dataset is NULL than base a decision on local engine load\r
147                         // only\r
148                         if (dataSet == null) {\r
149                                 return LoadBalancer.getEngine(executable);\r
150                         }\r
151                         // If the dataset is provided, consider it\r
152                         // This should be the main root for any load balancing\r
153                         // configurations\r
154                         return LoadBalancer.getEngine(executable, dataSet);\r
155                 } else if (IS_CLUSTER_ENGINE_ENABLED) {\r
156                         return Executable.ExecProvider.Cluster;\r
157                 }\r
158                 // If we are here, than local engine is enabled or one of the two will\r
159                 // happen (1) exception is thrown if both engines are disabled\r
160                 // or (2) previous statement will return the cluster engine\r
161                 return Executable.ExecProvider.Local;\r
162         }\r
163 \r
164         public static <T> ConfiguredExecutable<T> configureExecutable(Executable<T> executable) throws JobSubmissionException {\r
165 \r
166                 ConfExecutable<T> confExec = new ConfExecutable<T>(executable, DirectoryManager.getTaskDirectory(executable.getClass()));\r
167                 Executable.ExecProvider provider = getExecProvider(confExec, null);\r
168                 confExec.setExecProvider(provider);\r
169                 setupWorkDirectory(confExec, provider);\r
170                 return confExec;\r
171         }\r
172 \r
173         public static <T> ConfiguredExecutable<T> configureExecutable(Executable<T> executable, List<FastaSequence> dataSet)\r
174                         throws JobSubmissionException {\r
175 \r
176                 ConfExecutable<T> confExec = new ConfExecutable<T>(executable, DirectoryManager.getTaskDirectory(executable.getClass()));\r
177                 Executable.ExecProvider provider = getExecProvider(confExec, dataSet);\r
178                 confExec.setExecProvider(provider);\r
179                 setupWorkDirectory(confExec, provider);\r
180                 return confExec;\r
181         }\r
182 \r
183         static <T> void setupWorkDirectory(ConfExecutable<T> confExec, Executable.ExecProvider provider) {\r
184                 assert provider != null && provider != Executable.ExecProvider.Any;\r
185                 String workDir = "";\r
186                 if (provider == Executable.ExecProvider.Local) {\r
187                         workDir = Configurator.LOCAL_WORK_DIRECTORY + File.separator + confExec.getTaskId();\r
188                 } else {\r
189                         workDir = Configurator.CLUSTER_WORK_DIRECTORY + File.separator + confExec.getTaskId();\r
190                 }\r
191                 // Create working directory for the task\r
192                 File wdir = new File(workDir);\r
193                 wdir.mkdir();\r
194                 log.info("Creating working directory for the task in: " + wdir.getAbsolutePath());\r
195                 // Tell the executable where to get the results\r
196                 confExec.setWorkDirectory(workDir);\r
197         }\r
198 \r
199         public static <T> ConfiguredExecutable<T> configureExecutable(Executable<T> executable, Executable.ExecProvider provider)\r
200                         throws JobSubmissionException {\r
201                 if (executable == null) {\r
202                         throw new InvalidParameterException("Executable must be provided!");\r
203                 }\r
204                 ConfExecutable<T> confExec = new ConfExecutable<T>(executable, DirectoryManager.getTaskDirectory(executable.getClass()));\r
205                 if (provider == Executable.ExecProvider.Cluster && !IS_CLUSTER_ENGINE_ENABLED) {\r
206                         throw new JobSubmissionException("Cluster engine is disabled or not configured!");\r
207                 }\r
208                 if (provider == Executable.ExecProvider.Local && !IS_LOCAL_ENGINE_ENABLED) {\r
209                         throw new JobSubmissionException("Local engine is disabled or not configured!");\r
210                 }\r
211                 confExec.setExecProvider(provider);\r
212                 setupWorkDirectory(confExec, provider);\r
213                 return confExec;\r
214         }\r
215 \r
216         public static AsyncExecutor getAsyncEngine(ConfiguredExecutable<?> executable, Executable.ExecProvider provider) {\r
217 \r
218                 assert provider != Executable.ExecProvider.Any && provider != null;\r
219                 if (provider == Executable.ExecProvider.Cluster) {\r
220                         return new AsyncClusterRunner();\r
221                 }\r
222                 return new AsyncLocalRunner();\r
223         }\r
224 \r
225         public static SyncExecutor getSyncEngine(ConfiguredExecutable<?> executable, Executable.ExecProvider provider)\r
226                         throws JobSubmissionException {\r
227 \r
228                 assert provider != Executable.ExecProvider.Any && provider != null;\r
229                 if (provider == Executable.ExecProvider.Cluster) {\r
230                         return ClusterRunner.getInstance(executable);\r
231                 }\r
232                 return new LocalRunner(executable);\r
233         }\r
234 \r
235         public static AsyncExecutor getAsyncEngine(ConfiguredExecutable<?> executable) {\r
236                 if (isTargetedForLocalExecution(executable)) {\r
237                         return new AsyncLocalRunner();\r
238                 }\r
239                 return new AsyncClusterRunner();\r
240         }\r
241 \r
242         public static AsyncExecutor getAsyncEngine(String taskId) {\r
243                 if (isLocal(taskId)) {\r
244                         return new AsyncLocalRunner();\r
245                 }\r
246                 return new AsyncClusterRunner();\r
247         }\r
248 \r
249         public static SyncExecutor getSyncEngine(ConfiguredExecutable<?> executable) throws JobSubmissionException {\r
250                 if (isTargetedForLocalExecution(executable)) {\r
251                         return new LocalRunner(executable);\r
252                 }\r
253                 return ClusterRunner.getInstance(executable);\r
254         }\r
255 \r
256         static boolean isTargetedForLocalExecution(ConfiguredExecutable<?> executable) {\r
257                 // In the uncommon case that the cluster and local execution temporary\r
258                 // directories are the same, in this case the method return true anyway\r
259 \r
260                 /*\r
261                  * Could have done this String taskDir = executable.getWorkDirectory();\r
262                  * int idx = taskDir.lastIndexOf(File.separator); String workDir =\r
263                  * taskDir.substring(0, idx); assert\r
264                  * !(workDir.equals(CLUSTER_WORK_DIRECTORY) && workDir\r
265                  * .equals(LOCAL_WORK_DIRECTORY)) :\r
266                  * "Could not determine executable target!"; if\r
267                  * (workDir.equals(LOCAL_WORK_DIRECTORY)) { return true; }\r
268                  */\r
269                 String taskDir = executable.getTaskId();\r
270                 return isLocal(taskDir);\r
271         }\r
272 \r
273         static boolean isLocal(String taskId) {\r
274                 if (Util.isEmpty(taskId)) {\r
275                         throw new NullPointerException("TaskId must be provided!");\r
276                 }\r
277                 if (!EngineUtil.isValidJobId(taskId)) {\r
278                         throw new InvalidParameterException("TaskId is not valid!");\r
279                 }\r
280                 return !taskId.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX);\r
281         }\r
282 \r
283         public static String getWorkDirectory(String taskId) {\r
284                 assert !compbio.util.Util.isEmpty(taskId);\r
285                 assert EngineUtil.isValidJobId(taskId);\r
286                 log.info("Getting workdirectory for TaskID: " + taskId);\r
287                 if (taskId.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX)) {\r
288                         return CLUSTER_WORK_DIRECTORY + File.separator + taskId;\r
289                 }\r
290                 return LOCAL_WORK_DIRECTORY + File.separator + taskId;\r
291         }\r
292 }\r