\r
import java.util.List;\r
\r
+import org.apache.log4j.Logger;\r
+\r
import compbio.data.sequence.FastaSequence;\r
import compbio.engine.client.Executable;\r
import compbio.engine.local.LocalExecutorService;\r
import compbio.metadata.Limit;\r
import compbio.metadata.PresetManager;\r
\r
+/**\r
+ * This class decides where to execute the job. If the local engine is enabled\r
+ * in the configuration file and it has free threads and the size of the tasks\r
+ * permits the local execution, then the local execution will be favoured.\r
+ * \r
+ * @author pvtroshin\r
+ * @version 1.0 March 2009\r
+ */\r
public class LoadBalancer {\r
\r
+ private static Logger log = Logger.getLogger(LoadBalancer.class);\r
+\r
private LoadBalancer() {\r
}\r
\r
public static Executable.ExecProvider getEngine(Executable<?> executable) {\r
if (LocalExecutorService.getExecutor().canAcceptMoreWork()) {\r
+ log.debug("LOCAL engine HAS FREE threads will execute ... ");\r
return Executable.ExecProvider.Local;\r
}\r
+ log.debug("NO free threads on the LOCAL engine! Targeting for CLUSTER execution... ");\r
return Executable.ExecProvider.Cluster;\r
}\r
\r
// If limit is not defined then defaults to executing on the cluster\r
Limit<V> limit = executable\r
.getLimit(PresetManager.LOCAL_ENGINE_LIMIT_PRESET);\r
-\r
+ log.trace("Inspecting whether the job can be executed locally using limit: "\r
+ + limit);\r
if (limit == null || limit.isExceeded(dataSet)) {\r
+ log.debug("Job EXCEEDS LOCAL execution LIMIT targeting for cluster execution! ");\r
return Executable.ExecProvider.Cluster;\r
}\r
+ log.debug("Job FITS into the LOCAL execution limit consulting load balancer... ");\r
// Even if the data satisfies criteria for local execution it may still\r
// be executed on the cluster as the maximum capacity for local engine\r
// may be reached.\r
\r
public final class LocalExecutorService extends ThreadPoolExecutor {\r
\r
- private final static Logger log = Logger\r
- .getLogger(LocalExecutorService.class);\r
- private final static String threadNumPropName = "engine.local.thread.number";\r
-\r
- private static LocalExecutorService INSTANCE = null;\r
- private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();\r
- private final AtomicLong numTasks = new AtomicLong();\r
- private final AtomicLong totalTime = new AtomicLong();\r
-\r
- private LocalExecutorService(int corePoolSize, int maximumPoolSize,\r
- long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {\r
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);\r
- }\r
-\r
- /**\r
- * This method returns the single instance of CachedThreadPoolExecutor which\r
- * it cashes internally\r
- * \r
- * @return\r
- */\r
- public synchronized static LocalExecutorService getExecutor() {\r
- if (INSTANCE == null) {\r
- INSTANCE = init();\r
+ private final static Logger log = Logger\r
+ .getLogger(LocalExecutorService.class);\r
+ private final static String threadNumPropName = "engine.local.thread.number";\r
+\r
+ private static LocalExecutorService INSTANCE = null;\r
+ private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();\r
+ private final AtomicLong numTasks = new AtomicLong();\r
+ private final AtomicLong totalTime = new AtomicLong();\r
+\r
+ private LocalExecutorService(int corePoolSize, int maximumPoolSize,\r
+ long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {\r
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);\r
}\r
- log.info("Current Active Threads Count: " + INSTANCE.getActiveCount());\r
- return INSTANCE;\r
- }\r
-\r
- private static LocalExecutorService init() {\r
- int procNum = Runtime.getRuntime().availableProcessors();\r
- // Add safety net if this function is unavailable\r
- if (procNum < 1) {\r
- procNum = 1;\r
+\r
+ /**\r
+ * This method returns the single instance of CachedThreadPoolExecutor which\r
+ * it cashes internally\r
+ * \r
+ * @return\r
+ */\r
+ public synchronized static LocalExecutorService getExecutor() {\r
+ if (INSTANCE == null) {\r
+ INSTANCE = init();\r
+ }\r
+ log.info("Current Active Threads Count: " + INSTANCE.getActiveCount());\r
+ return INSTANCE;\r
}\r
- if (procNum > 4) {\r
- procNum = procNum - 1; // leave one processor for overhead\r
- // management\r
+\r
+ private static LocalExecutorService init() {\r
+ int procNum = Runtime.getRuntime().availableProcessors();\r
+ // Add safety net if this function is unavailable\r
+ if (procNum < 1) {\r
+ procNum = 1;\r
+ }\r
+ if (procNum > 4) {\r
+ procNum = procNum - 1; // leave one processor for overhead\r
+ // management\r
+ }\r
+ PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
+ String threadNum = ph.getProperty(threadNumPropName);\r
+ log.debug("Thread number for local execution from conf file is "\r
+ + threadNum);\r
+ int threads = 0;\r
+ if (!Util.isEmpty(threadNum)) {\r
+ try {\r
+ threads = Integer.parseInt(threadNum);\r
+ if (threads > 1 && threads < procNum * 2) {\r
+ procNum = threads;\r
+ }\r
+ } catch (NumberFormatException e) {\r
+ log.error("Cannot understand " + threadNumPropName\r
+ + " property. Expecting whole number, but given "\r
+ + threadNum);\r
+ }\r
+ }\r
+\r
+ log.debug("Constructing thread pool for executor with " + procNum\r
+ + " thread(s)");\r
+ LocalExecutorService executor = new LocalExecutorService(procNum,\r
+ procNum, 0L, TimeUnit.MILLISECONDS,\r
+ new LinkedBlockingQueue<Runnable>());\r
+ // Make sure that the executor is going to be properly closed\r
+ Runtime.getRuntime().addShutdownHook(new Thread() {\r
+\r
+ @Override\r
+ public void run() {\r
+ shutDown();\r
+ }\r
+ });\r
+ return executor;\r
}\r
- PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
- String threadNum = ph.getProperty(threadNumPropName);\r
- log.debug("Thread number for local execution from conf file is "\r
- + threadNum);\r
- int threads = 0;\r
- if (!Util.isEmpty(threadNum)) {\r
- try {\r
- threads = Integer.parseInt(threadNum);\r
- if (threads > 1 && threads < procNum * 2) {\r
- procNum = threads;\r
+\r
+ /**\r
+ * This stops all executing processes via interruption. Thus it is vital\r
+ * that all processes that use this service respond to interruption\r
+ * \r
+ * Stops internal executor service which captures streams of native\r
+ * executables. This method is intended for stopping service if deployed in\r
+ * the web application context. There is NO NEED of using this method\r
+ * otherwise as the executor service is taken care of internally.\r
+ */\r
+ public static void shutDown() {\r
+ if (INSTANCE != null) {\r
+ INSTANCE.shutdownNow();\r
}\r
- } catch (NumberFormatException e) {\r
- log.error("Cannot understand " + threadNumPropName\r
- + " property. Expecting whole number, but given "\r
- + threadNum);\r
- }\r
}\r
\r
- log.debug("Constructing thread pool for executor with " + procNum\r
- + " thread(s)");\r
- LocalExecutorService executor = new LocalExecutorService(procNum,\r
- procNum, 0L, TimeUnit.MILLISECONDS,\r
- new LinkedBlockingQueue<Runnable>());\r
- // Make sure that the executor is going to be properly closed\r
- Runtime.getRuntime().addShutdownHook(new Thread() {\r
- @Override\r
- public void run() {\r
- shutDown();\r
- }\r
- });\r
- return executor;\r
- }\r
-\r
- /**\r
- * This stops all executing processes via interruption. Thus it is vital\r
- * that all processes that use this service respond to interruption\r
- * \r
- * Stops internal executor service which captures streams of native\r
- * executables. This method is intended for stopping service if deployed in\r
- * the web application context. There is NO NEED of using this method\r
- * otherwise as the executor service is taken care of internally.\r
- */\r
- public static void shutDown() {\r
- if (INSTANCE != null) {\r
- INSTANCE.shutdownNow();\r
+ /**\r
+ * If the Executor queue is empty\r
+ * \r
+ * @return true is not all threads are busy, false otherwise\r
+ */\r
+ public boolean canAcceptMoreWork() {\r
+ // alternative to use: INSTANCE.getQueue().isEmpty(); - but this will\r
+ // inevitably put the last task to the queue\r
+ return INSTANCE.getMaximumPoolSize() > INSTANCE.getActiveCount();\r
}\r
- }\r
-\r
- /**\r
- * If the Executor queue is empty\r
- * \r
- * @return true is not all threads are busy, false overwise\r
- */\r
- public boolean canAcceptMoreWork() {\r
- // alternative to use: executor.getMaximumPoolSize() <\r
- // executor.getActiveCount()\r
- return INSTANCE.getQueue().isEmpty();\r
- }\r
-\r
- @Override\r
- protected void beforeExecute(Thread t, Runnable r) {\r
- super.beforeExecute(t, r);\r
- // class of r is java.util.concurrent.FutureTask\r
- log.info(String.format("Thread %s: start %s", t, r));\r
- startTime.set(System.nanoTime());\r
- }\r
-\r
- @Override\r
- protected void afterExecute(Runnable r, Throwable t) {\r
- try {\r
- long endTime = System.nanoTime();\r
- long taskTime = endTime - startTime.get();\r
- numTasks.incrementAndGet();\r
- totalTime.addAndGet(taskTime);\r
- log.info(String.format("Throwable %s: end %s, time=%dns", t, r,\r
- taskTime));\r
- } finally {\r
- super.afterExecute(r, t);\r
+\r
+ @Override\r
+ protected void beforeExecute(Thread t, Runnable r) {\r
+ super.beforeExecute(t, r);\r
+ // class of r is java.util.concurrent.FutureTask\r
+ log.info(String.format("Thread %s: start %s", t, r));\r
+ startTime.set(System.nanoTime());\r
}\r
- }\r
-\r
- @Override\r
- protected void terminated() {\r
- try {\r
- if (numTasks.get() != 0) {\r
- log.info(String.format("Terminated : avg time=%dns", totalTime\r
- .get()\r
- / numTasks.get()));\r
- }\r
- } finally {\r
- super.terminated();\r
+\r
+ @Override\r
+ protected void afterExecute(Runnable r, Throwable t) {\r
+ try {\r
+ long endTime = System.nanoTime();\r
+ long taskTime = endTime - startTime.get();\r
+ numTasks.incrementAndGet();\r
+ totalTime.addAndGet(taskTime);\r
+ log.info(String.format("Throwable %s: end %s, time=%dns", t, r,\r
+ taskTime));\r
+ } finally {\r
+ super.afterExecute(r, t);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ protected void terminated() {\r
+ try {\r
+ if (numTasks.get() != 0) {\r
+ log.info(String.format("Terminated : avg time=%dns",\r
+ totalTime.get() / numTasks.get()));\r
+ }\r
+ } finally {\r
+ super.terminated();\r
+ }\r
}\r
- }\r
}\r