From 137b5daa9ff4b57f0ebf203cb6243c7b529209f8 Mon Sep 17 00:00:00 2001 From: pvtroshin Date: Wed, 24 Nov 2010 14:27:11 +0000 Subject: [PATCH] Changes from JWS2 branch concerning local execution and logging git-svn-id: link to svn.lifesci.dundee.ac.uk/svn/barton/ptroshin/JABA2@3389 e3abac25-378b-4346-85de-24260fe3988d --- engine/compbio/engine/LoadBalancer.java | 19 +- .../compbio/engine/local/LocalExecutorService.java | 244 ++++++++++---------- 2 files changed, 140 insertions(+), 123 deletions(-) diff --git a/engine/compbio/engine/LoadBalancer.java b/engine/compbio/engine/LoadBalancer.java index b23b57c..6185d42 100644 --- a/engine/compbio/engine/LoadBalancer.java +++ b/engine/compbio/engine/LoadBalancer.java @@ -20,21 +20,35 @@ package compbio.engine; import java.util.List; +import org.apache.log4j.Logger; + import compbio.data.sequence.FastaSequence; import compbio.engine.client.Executable; import compbio.engine.local.LocalExecutorService; import compbio.metadata.Limit; import compbio.metadata.PresetManager; +/** + * This class decides where to execute the job. If the local engine is enabled + * in the configuration file and it has free threads and the size of the tasks + * permits the local execution, then the local execution will be favoured. + * + * @author pvtroshin + * @version 1.0 March 2009 + */ public class LoadBalancer { + private static Logger log = Logger.getLogger(LoadBalancer.class); + private LoadBalancer() { } public static Executable.ExecProvider getEngine(Executable executable) { if (LocalExecutorService.getExecutor().canAcceptMoreWork()) { + log.debug("LOCAL engine HAS FREE threads will execute ... "); return Executable.ExecProvider.Local; } + log.debug("NO free threads on the LOCAL engine! Targeting for CLUSTER execution... "); return Executable.ExecProvider.Cluster; } @@ -46,10 +60,13 @@ public class LoadBalancer { // If limit is not defined then defaults to executing on the cluster Limit limit = executable .getLimit(PresetManager.LOCAL_ENGINE_LIMIT_PRESET); - + log.trace("Inspecting whether the job can be executed locally using limit: " + + limit); if (limit == null || limit.isExceeded(dataSet)) { + log.debug("Job EXCEEDS LOCAL execution LIMIT targeting for cluster execution! "); return Executable.ExecProvider.Cluster; } + log.debug("Job FITS into the LOCAL execution limit consulting load balancer... "); // Even if the data satisfies criteria for local execution it may still // be executed on the cluster as the maximum capacity for local engine // may be reached. diff --git a/engine/compbio/engine/local/LocalExecutorService.java b/engine/compbio/engine/local/LocalExecutorService.java index 2a78a5a..04a0bd5 100644 --- a/engine/compbio/engine/local/LocalExecutorService.java +++ b/engine/compbio/engine/local/LocalExecutorService.java @@ -32,135 +32,135 @@ import compbio.util.Util; public final class LocalExecutorService extends ThreadPoolExecutor { - private final static Logger log = Logger - .getLogger(LocalExecutorService.class); - private final static String threadNumPropName = "engine.local.thread.number"; - - private static LocalExecutorService INSTANCE = null; - private final ThreadLocal startTime = new ThreadLocal(); - private final AtomicLong numTasks = new AtomicLong(); - private final AtomicLong totalTime = new AtomicLong(); - - private LocalExecutorService(int corePoolSize, int maximumPoolSize, - long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); - } - - /** - * This method returns the single instance of CachedThreadPoolExecutor which - * it cashes internally - * - * @return - */ - public synchronized static LocalExecutorService getExecutor() { - if (INSTANCE == null) { - INSTANCE = init(); + private final static Logger log = Logger + .getLogger(LocalExecutorService.class); + private final static String threadNumPropName = "engine.local.thread.number"; + + private static LocalExecutorService INSTANCE = null; + private final ThreadLocal startTime = new ThreadLocal(); + private final AtomicLong numTasks = new AtomicLong(); + private final AtomicLong totalTime = new AtomicLong(); + + private LocalExecutorService(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } - log.info("Current Active Threads Count: " + INSTANCE.getActiveCount()); - return INSTANCE; - } - - private static LocalExecutorService init() { - int procNum = Runtime.getRuntime().availableProcessors(); - // Add safety net if this function is unavailable - if (procNum < 1) { - procNum = 1; + + /** + * This method returns the single instance of CachedThreadPoolExecutor which + * it cashes internally + * + * @return + */ + public synchronized static LocalExecutorService getExecutor() { + if (INSTANCE == null) { + INSTANCE = init(); + } + log.info("Current Active Threads Count: " + INSTANCE.getActiveCount()); + return INSTANCE; } - if (procNum > 4) { - procNum = procNum - 1; // leave one processor for overhead - // management + + private static LocalExecutorService init() { + int procNum = Runtime.getRuntime().availableProcessors(); + // Add safety net if this function is unavailable + if (procNum < 1) { + procNum = 1; + } + if (procNum > 4) { + procNum = procNum - 1; // leave one processor for overhead + // management + } + PropertyHelper ph = PropertyHelperManager.getPropertyHelper(); + String threadNum = ph.getProperty(threadNumPropName); + log.debug("Thread number for local execution from conf file is " + + threadNum); + int threads = 0; + if (!Util.isEmpty(threadNum)) { + try { + threads = Integer.parseInt(threadNum); + if (threads > 1 && threads < procNum * 2) { + procNum = threads; + } + } catch (NumberFormatException e) { + log.error("Cannot understand " + threadNumPropName + + " property. Expecting whole number, but given " + + threadNum); + } + } + + log.debug("Constructing thread pool for executor with " + procNum + + " thread(s)"); + LocalExecutorService executor = new LocalExecutorService(procNum, + procNum, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + // Make sure that the executor is going to be properly closed + Runtime.getRuntime().addShutdownHook(new Thread() { + + @Override + public void run() { + shutDown(); + } + }); + return executor; } - PropertyHelper ph = PropertyHelperManager.getPropertyHelper(); - String threadNum = ph.getProperty(threadNumPropName); - log.debug("Thread number for local execution from conf file is " - + threadNum); - int threads = 0; - if (!Util.isEmpty(threadNum)) { - try { - threads = Integer.parseInt(threadNum); - if (threads > 1 && threads < procNum * 2) { - procNum = threads; + + /** + * This stops all executing processes via interruption. Thus it is vital + * that all processes that use this service respond to interruption + * + * Stops internal executor service which captures streams of native + * executables. This method is intended for stopping service if deployed in + * the web application context. There is NO NEED of using this method + * otherwise as the executor service is taken care of internally. + */ + public static void shutDown() { + if (INSTANCE != null) { + INSTANCE.shutdownNow(); } - } catch (NumberFormatException e) { - log.error("Cannot understand " + threadNumPropName - + " property. Expecting whole number, but given " - + threadNum); - } } - log.debug("Constructing thread pool for executor with " + procNum - + " thread(s)"); - LocalExecutorService executor = new LocalExecutorService(procNum, - procNum, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue()); - // Make sure that the executor is going to be properly closed - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - shutDown(); - } - }); - return executor; - } - - /** - * This stops all executing processes via interruption. Thus it is vital - * that all processes that use this service respond to interruption - * - * Stops internal executor service which captures streams of native - * executables. This method is intended for stopping service if deployed in - * the web application context. There is NO NEED of using this method - * otherwise as the executor service is taken care of internally. - */ - public static void shutDown() { - if (INSTANCE != null) { - INSTANCE.shutdownNow(); + /** + * If the Executor queue is empty + * + * @return true is not all threads are busy, false otherwise + */ + public boolean canAcceptMoreWork() { + // alternative to use: INSTANCE.getQueue().isEmpty(); - but this will + // inevitably put the last task to the queue + return INSTANCE.getMaximumPoolSize() > INSTANCE.getActiveCount(); } - } - - /** - * If the Executor queue is empty - * - * @return true is not all threads are busy, false overwise - */ - public boolean canAcceptMoreWork() { - // alternative to use: executor.getMaximumPoolSize() < - // executor.getActiveCount() - return INSTANCE.getQueue().isEmpty(); - } - - @Override - protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); - // class of r is java.util.concurrent.FutureTask - log.info(String.format("Thread %s: start %s", t, r)); - startTime.set(System.nanoTime()); - } - - @Override - protected void afterExecute(Runnable r, Throwable t) { - try { - long endTime = System.nanoTime(); - long taskTime = endTime - startTime.get(); - numTasks.incrementAndGet(); - totalTime.addAndGet(taskTime); - log.info(String.format("Throwable %s: end %s, time=%dns", t, r, - taskTime)); - } finally { - super.afterExecute(r, t); + + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + // class of r is java.util.concurrent.FutureTask + log.info(String.format("Thread %s: start %s", t, r)); + startTime.set(System.nanoTime()); } - } - - @Override - protected void terminated() { - try { - if (numTasks.get() != 0) { - log.info(String.format("Terminated : avg time=%dns", totalTime - .get() - / numTasks.get())); - } - } finally { - super.terminated(); + + @Override + protected void afterExecute(Runnable r, Throwable t) { + try { + long endTime = System.nanoTime(); + long taskTime = endTime - startTime.get(); + numTasks.incrementAndGet(); + totalTime.addAndGet(taskTime); + log.info(String.format("Throwable %s: end %s, time=%dns", t, r, + taskTime)); + } finally { + super.afterExecute(r, t); + } + } + + @Override + protected void terminated() { + try { + if (numTasks.get() != 0) { + log.info(String.format("Terminated : avg time=%dns", + totalTime.get() / numTasks.get())); + } + } finally { + super.terminated(); + } } - } } -- 1.7.10.2