1 /* Copyright (c) 2009 Peter Troshin
\r
3 * JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0
\r
5 * This library is free software; you can redistribute it and/or modify it under the terms of the
\r
6 * Apache License version 2 as published by the Apache Software Foundation
\r
8 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
\r
9 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
\r
10 * License for more details.
\r
12 * A copy of the license is in apache_license.txt. It is also available here:
\r
13 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
\r
15 * Any republication or derived work distributed in source code form
\r
16 * must include this copyright and license notice.
\r
19 package compbio.engine;
\r
21 import java.io.File;
\r
22 import java.security.InvalidParameterException;
\r
23 import java.util.List;
\r
25 import org.apache.log4j.Logger;
\r
27 import compbio.data.sequence.FastaSequence;
\r
28 import compbio.engine.client.ConfExecutable;
\r
29 import compbio.engine.client.ConfiguredExecutable;
\r
30 import compbio.engine.client.Executable;
\r
31 import compbio.engine.client.PathValidator;
\r
32 import compbio.engine.client.EngineUtil;
\r
33 import compbio.engine.cluster.drmaa.AsyncJobRunner;
\r
34 import compbio.engine.cluster.drmaa.JobRunner;
\r
35 import compbio.engine.conf.DirectoryManager;
\r
36 import compbio.engine.conf.PropertyHelperManager;
\r
37 import compbio.engine.local.AsyncLocalRunner;
\r
38 import compbio.engine.local.LocalRunner;
\r
39 import compbio.metadata.JobSubmissionException;
\r
40 import compbio.util.PropertyHelper;
\r
41 import compbio.util.SysPrefs;
\r
42 import compbio.util.Util;
\r
44 public class Configurator {
\r
46 private static Logger log = Logger.getLogger(Configurator.class);
\r
47 private static final PropertyHelper ph = PropertyHelperManager.getPropertyHelper();
\r
49 public static final boolean IS_LOCAL_ENGINE_ENABLED = initBooleanValue("engine.local.enable");
\r
50 public static final boolean IS_CLUSTER_ENGINE_ENABLED = initBooleanValue("engine.cluster.enable");
\r
51 public final static String LOCAL_WORK_DIRECTORY = initLocalDirectory();
\r
52 public final static String CLUSTER_WORK_DIRECTORY = initClusterWorkDirectory();
\r
56 private static boolean initBooleanValue(String key) {
\r
58 String status = ph.getProperty(key);
\r
59 log.debug("Loading property: " + key + " with value: " + status);
\r
60 if (Util.isEmpty(status)) {
\r
63 return new Boolean(status.trim()).booleanValue();
\r
66 private static String initClusterWorkDirectory() {
\r
67 String tmpDir = null;
\r
68 if (IS_CLUSTER_ENGINE_ENABLED) {
\r
69 tmpDir = ph.getProperty("cluster.tmp.directory");
\r
70 if (!Util.isEmpty(tmpDir)) {
\r
71 tmpDir = tmpDir.trim();
\r
73 throw new RuntimeException(
\r
74 "Cluster work directory must be provided! ");
\r
76 if (LOCAL_WORK_DIRECTORY != null
\r
77 && LOCAL_WORK_DIRECTORY.equals(CLUSTER_WORK_DIRECTORY)) {
\r
78 throw new InvalidParameterException(
\r
79 "Cluster engine output directory must be different of that for local engine!");
\r
85 private static String initLocalDirectory() {
\r
86 String tmp_dir = ph.getProperty("local.tmp.directory");
\r
87 // Use system temporary directory if local.tmp.directory is not defined
\r
88 if (Util.isEmpty(tmp_dir)) {
\r
89 tmp_dir = SysPrefs.getSystemTmpDir();
\r
90 log.debug("local.tmp.directory is not defined using system tmp: " + tmp_dir);
\r
92 if (!PathValidator.isAbsolutePath(tmp_dir)) {
\r
93 log.debug("local.tmp.directory path is relative! " + tmp_dir);
\r
94 tmp_dir = EngineUtil.convertToAbsolute(tmp_dir);
\r
95 log.debug("local.tmp.directory path changed to absolute: " + tmp_dir);
\r
97 return tmp_dir.trim();
\r
101 * Depending on the values defined in the properties
\r
102 * (engine.cluster.enable=true and engine.local.enable=true) return either
\r
103 * Cluster job submission engine {@link #JobRunner} or local job submission
\r
104 * engine {@link #LocalRunner} If both engines enabled than ask
\r
105 * {@link LoadBalancer} for an engine. This method will fall back and return
\r
108 * 1) No engines are defined in the properties or they have been defined
\r
111 * 2) Execution environment is Windows as the system cannot really run
\r
112 * cluster submission from windows
\r
114 * @param executable
\r
115 * @return SyncExecutor backed up by either cluster or local engines
\r
116 * @throws JobSubmissionException
\r
118 static Executable.ExecProvider getExecProvider(
\r
119 ConfiguredExecutable<?> executable, List<FastaSequence> dataSet)
\r
120 throws JobSubmissionException {
\r
121 // Look where executable claims to be executed
\r
122 Executable.ExecProvider provider = executable.getSupportedRuntimes();
\r
123 if (!IS_CLUSTER_ENGINE_ENABLED && !IS_LOCAL_ENGINE_ENABLED) {
\r
124 // Both engines disabled!
\r
125 throw new RuntimeException(
\r
126 "Both engines are disabled! "
\r
127 + "Check conf/Engine.cluster.properties and conf/Engine.local.properties. "
\r
128 + "At least one engine must be enabled!");
\r
130 if (provider == Executable.ExecProvider.Local) {
\r
131 if (IS_LOCAL_ENGINE_ENABLED) {
\r
132 return Executable.ExecProvider.Local;
\r
134 throw new JobSubmissionException(
\r
135 "Executable can be executed only on locally, but local engine is disabled!");
\r
138 if (provider == Executable.ExecProvider.Cluster) {
\r
139 if (IS_CLUSTER_ENGINE_ENABLED) {
\r
140 return Executable.ExecProvider.Cluster;
\r
142 throw new JobSubmissionException(
\r
143 "Executable can be executed only on the cluster, but cluster engine is disabled!");
\r
146 // We are here if executable can be executed on both Cluster and Local
\r
147 // engines i.e. provider = Any
\r
148 // If we still here executable supports All exec environments
\r
149 // Check whether we work on windows
\r
150 if (SysPrefs.isWindows) {
\r
151 // no matter what the settings are, we cannot send tasks to the
\r
152 // cluster from windows env
\r
153 return Executable.ExecProvider.Local;
\r
155 // Now if both engines are configured that load balance them
\r
156 if (IS_CLUSTER_ENGINE_ENABLED && IS_LOCAL_ENGINE_ENABLED) {
\r
157 // If the dataset is NULL than base a decision on local engine load
\r
159 if (dataSet == null) {
\r
160 return LoadBalancer.getEngine(executable);
\r
162 // If the dataset is provided, consider it
\r
163 // This should be the main root for any load balancing
\r
165 return LoadBalancer.getEngine(executable, dataSet);
\r
166 } else if (IS_CLUSTER_ENGINE_ENABLED) {
\r
167 return Executable.ExecProvider.Cluster;
\r
169 // If we are here, than local engine is enabled or one of the two will
\r
170 // happen (1) exception is thrown if both engines are disabled
\r
171 // or (2) previous statement will return the cluster engine
\r
172 return Executable.ExecProvider.Local;
\r
175 public static <T> ConfiguredExecutable<T> configureExecutable(
\r
176 Executable<T> executable) throws JobSubmissionException {
\r
178 ConfExecutable<T> confExec = new ConfExecutable<T>(executable,
\r
179 DirectoryManager.getTaskDirectory(executable.getClass()));
\r
180 Executable.ExecProvider provider = getExecProvider(confExec, null);
\r
181 confExec.setExecProvider(provider);
\r
182 setupWorkDirectory(confExec, provider);
\r
186 public static <T> ConfiguredExecutable<T> configureExecutable(
\r
187 Executable<T> executable, List<FastaSequence> dataSet)
\r
188 throws JobSubmissionException {
\r
190 ConfExecutable<T> confExec = new ConfExecutable<T>(executable,
\r
191 DirectoryManager.getTaskDirectory(executable.getClass()));
\r
192 Executable.ExecProvider provider = getExecProvider(confExec, dataSet);
\r
193 confExec.setExecProvider(provider);
\r
194 setupWorkDirectory(confExec, provider);
\r
198 static <T> void setupWorkDirectory(ConfExecutable<T> confExec,
\r
199 Executable.ExecProvider provider) {
\r
200 assert provider != null && provider != Executable.ExecProvider.Any;
\r
201 String workDir = "";
\r
202 if (provider == Executable.ExecProvider.Local) {
\r
203 workDir = Configurator.LOCAL_WORK_DIRECTORY + File.separator
\r
204 + confExec.getTaskId();
\r
206 workDir = Configurator.CLUSTER_WORK_DIRECTORY + File.separator
\r
207 + confExec.getTaskId();
\r
209 // Create working directory for the task
\r
210 File wdir = new File(workDir);
\r
212 log.info("Creating working directory for the task in: "
\r
213 + wdir.getAbsolutePath());
\r
214 // Tell the executable where to get the results
\r
215 confExec.setWorkDirectory(workDir);
\r
218 public static <T> ConfiguredExecutable<T> configureExecutable(
\r
219 Executable<T> executable, Executable.ExecProvider provider)
\r
220 throws JobSubmissionException {
\r
221 if (executable == null) {
\r
222 throw new InvalidParameterException("Executable must be provided!");
\r
224 ConfExecutable<T> confExec = new ConfExecutable<T>(executable,
\r
225 DirectoryManager.getTaskDirectory(executable.getClass()));
\r
226 if (provider == Executable.ExecProvider.Cluster
\r
227 && !IS_CLUSTER_ENGINE_ENABLED) {
\r
228 throw new JobSubmissionException(
\r
229 "Cluster engine is disabled or not configured!");
\r
231 if (provider == Executable.ExecProvider.Local
\r
232 && !IS_LOCAL_ENGINE_ENABLED) {
\r
233 throw new JobSubmissionException(
\r
234 "Local engine is disabled or not configured!");
\r
236 confExec.setExecProvider(provider);
\r
237 setupWorkDirectory(confExec, provider);
\r
241 public static AsyncExecutor getAsyncEngine(
\r
242 ConfiguredExecutable<?> executable, Executable.ExecProvider provider) {
\r
244 assert provider != Executable.ExecProvider.Any && provider != null;
\r
245 if (provider == Executable.ExecProvider.Cluster) {
\r
246 return new AsyncJobRunner();
\r
248 return new AsyncLocalRunner();
\r
251 public static SyncExecutor getSyncEngine(
\r
252 ConfiguredExecutable<?> executable, Executable.ExecProvider provider)
\r
253 throws JobSubmissionException {
\r
255 assert provider != Executable.ExecProvider.Any && provider != null;
\r
256 if (provider == Executable.ExecProvider.Cluster) {
\r
257 return JobRunner.getInstance(executable);
\r
259 return new LocalRunner(executable);
\r
262 public static AsyncExecutor getAsyncEngine(
\r
263 ConfiguredExecutable<?> executable) {
\r
264 if (isTargetedForLocalExecution(executable)) {
\r
265 return new AsyncLocalRunner();
\r
267 return new AsyncJobRunner();
\r
270 public static AsyncExecutor getAsyncEngine(String taskId) {
\r
271 if (isLocal(taskId)) {
\r
272 return new AsyncLocalRunner();
\r
274 return new AsyncJobRunner();
\r
277 public static SyncExecutor getSyncEngine(ConfiguredExecutable<?> executable)
\r
278 throws JobSubmissionException {
\r
279 if (isTargetedForLocalExecution(executable)) {
\r
280 return new LocalRunner(executable);
\r
282 return JobRunner.getInstance(executable);
\r
285 static boolean isTargetedForLocalExecution(
\r
286 ConfiguredExecutable<?> executable) {
\r
287 // In the uncommon case that the cluster and local execution temporary
\r
288 // directories are the same, in this case the method return true anyway
\r
291 * Could have done this String taskDir = executable.getWorkDirectory();
\r
292 * int idx = taskDir.lastIndexOf(File.separator); String workDir =
\r
293 * taskDir.substring(0, idx); assert
\r
294 * !(workDir.equals(CLUSTER_WORK_DIRECTORY) && workDir
\r
295 * .equals(LOCAL_WORK_DIRECTORY)) :
\r
296 * "Could not determine executable target!"; if
\r
297 * (workDir.equals(LOCAL_WORK_DIRECTORY)) { return true; }
\r
299 String taskDir = executable.getTaskId();
\r
300 return isLocal(taskDir);
\r
303 static boolean isLocal(String taskId) {
\r
304 if (Util.isEmpty(taskId)) {
\r
305 throw new NullPointerException("TaskId must be provided!");
\r
307 if (!EngineUtil.isValidJobId(taskId)) {
\r
308 throw new InvalidParameterException("TaskId is not valid!");
\r
310 return !taskId.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX);
\r
313 public static String getWorkDirectory(String taskId) {
\r
314 assert !compbio.util.Util.isEmpty(taskId);
\r
315 assert EngineUtil.isValidJobId(taskId);
\r
316 log.info("Getting workdirectory for TaskID: " + taskId);
\r
317 if (taskId.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX)) {
\r
318 return CLUSTER_WORK_DIRECTORY + File.separator + taskId;
\r
320 return LOCAL_WORK_DIRECTORY + File.separator + taskId;
\r