Changes from JWS2 branch concerning local execution and logging
authorpvtroshin <pvtroshin@e3abac25-378b-4346-85de-24260fe3988d>
Wed, 24 Nov 2010 14:27:11 +0000 (14:27 +0000)
committerpvtroshin <pvtroshin@e3abac25-378b-4346-85de-24260fe3988d>
Wed, 24 Nov 2010 14:27:11 +0000 (14:27 +0000)
git-svn-id: link to svn.lifesci.dundee.ac.uk/svn/barton/ptroshin/JABA2@3389 e3abac25-378b-4346-85de-24260fe3988d

engine/compbio/engine/LoadBalancer.java
engine/compbio/engine/local/LocalExecutorService.java

index b23b57c..6185d42 100644 (file)
@@ -20,21 +20,35 @@ package compbio.engine;
 \r
 import java.util.List;\r
 \r
+import org.apache.log4j.Logger;\r
+\r
 import compbio.data.sequence.FastaSequence;\r
 import compbio.engine.client.Executable;\r
 import compbio.engine.local.LocalExecutorService;\r
 import compbio.metadata.Limit;\r
 import compbio.metadata.PresetManager;\r
 \r
+/**\r
+ * This class decides where to execute the job. If the local engine is enabled\r
+ * in the configuration file and it has free threads and the size of the tasks\r
+ * permits the local execution, then the local execution will be favoured.\r
+ * \r
+ * @author pvtroshin\r
+ * @version 1.0 March 2009\r
+ */\r
 public class LoadBalancer {\r
 \r
+       private static Logger log = Logger.getLogger(LoadBalancer.class);\r
+\r
        private LoadBalancer() {\r
        }\r
 \r
        public static Executable.ExecProvider getEngine(Executable<?> executable) {\r
                if (LocalExecutorService.getExecutor().canAcceptMoreWork()) {\r
+                       log.debug("LOCAL engine HAS FREE threads will execute ... ");\r
                        return Executable.ExecProvider.Local;\r
                }\r
+               log.debug("NO free threads on the LOCAL engine! Targeting for CLUSTER execution... ");\r
                return Executable.ExecProvider.Cluster;\r
        }\r
 \r
@@ -46,10 +60,13 @@ public class LoadBalancer {
                // If limit is not defined then defaults to executing on the cluster\r
                Limit<V> limit = executable\r
                                .getLimit(PresetManager.LOCAL_ENGINE_LIMIT_PRESET);\r
-\r
+               log.trace("Inspecting whether the job can be executed locally using limit: "\r
+                               + limit);\r
                if (limit == null || limit.isExceeded(dataSet)) {\r
+                       log.debug("Job EXCEEDS LOCAL execution LIMIT targeting for cluster execution! ");\r
                        return Executable.ExecProvider.Cluster;\r
                }\r
+               log.debug("Job FITS into the LOCAL execution limit consulting load balancer... ");\r
                // Even if the data satisfies criteria for local execution it may still\r
                // be executed on the cluster as the maximum capacity for local engine\r
                // may be reached.\r
index 2a78a5a..04a0bd5 100644 (file)
@@ -32,135 +32,135 @@ import compbio.util.Util;
 \r
 public final class LocalExecutorService extends ThreadPoolExecutor {\r
 \r
-    private final static Logger log = Logger\r
-           .getLogger(LocalExecutorService.class);\r
-    private final static String threadNumPropName = "engine.local.thread.number";\r
-\r
-    private static LocalExecutorService INSTANCE = null;\r
-    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();\r
-    private final AtomicLong numTasks = new AtomicLong();\r
-    private final AtomicLong totalTime = new AtomicLong();\r
-\r
-    private LocalExecutorService(int corePoolSize, int maximumPoolSize,\r
-           long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {\r
-       super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);\r
-    }\r
-\r
-    /**\r
-     * This method returns the single instance of CachedThreadPoolExecutor which\r
-     * it cashes internally\r
-     * \r
-     * @return\r
-     */\r
-    public synchronized static LocalExecutorService getExecutor() {\r
-       if (INSTANCE == null) {\r
-           INSTANCE = init();\r
+       private final static Logger log = Logger\r
+                       .getLogger(LocalExecutorService.class);\r
+       private final static String threadNumPropName = "engine.local.thread.number";\r
+\r
+       private static LocalExecutorService INSTANCE = null;\r
+       private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();\r
+       private final AtomicLong numTasks = new AtomicLong();\r
+       private final AtomicLong totalTime = new AtomicLong();\r
+\r
+       private LocalExecutorService(int corePoolSize, int maximumPoolSize,\r
+                       long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {\r
+               super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);\r
        }\r
-       log.info("Current Active Threads Count: " + INSTANCE.getActiveCount());\r
-       return INSTANCE;\r
-    }\r
-\r
-    private static LocalExecutorService init() {\r
-       int procNum = Runtime.getRuntime().availableProcessors();\r
-       // Add safety net if this function is unavailable\r
-       if (procNum < 1) {\r
-           procNum = 1;\r
+\r
+       /**\r
+        * This method returns the single instance of CachedThreadPoolExecutor which\r
+        * it cashes internally\r
+        * \r
+        * @return\r
+        */\r
+       public synchronized static LocalExecutorService getExecutor() {\r
+               if (INSTANCE == null) {\r
+                       INSTANCE = init();\r
+               }\r
+               log.info("Current Active Threads Count: " + INSTANCE.getActiveCount());\r
+               return INSTANCE;\r
        }\r
-       if (procNum > 4) {\r
-           procNum = procNum - 1; // leave one processor for overhead\r
-           // management\r
+\r
+       private static LocalExecutorService init() {\r
+               int procNum = Runtime.getRuntime().availableProcessors();\r
+               // Add safety net if this function is unavailable\r
+               if (procNum < 1) {\r
+                       procNum = 1;\r
+               }\r
+               if (procNum > 4) {\r
+                       procNum = procNum - 1; // leave one processor for overhead\r
+                       // management\r
+               }\r
+               PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
+               String threadNum = ph.getProperty(threadNumPropName);\r
+               log.debug("Thread number for local execution from conf file is "\r
+                               + threadNum);\r
+               int threads = 0;\r
+               if (!Util.isEmpty(threadNum)) {\r
+                       try {\r
+                               threads = Integer.parseInt(threadNum);\r
+                               if (threads > 1 && threads < procNum * 2) {\r
+                                       procNum = threads;\r
+                               }\r
+                       } catch (NumberFormatException e) {\r
+                               log.error("Cannot understand " + threadNumPropName\r
+                                               + " property. Expecting whole number, but given "\r
+                                               + threadNum);\r
+                       }\r
+               }\r
+\r
+               log.debug("Constructing thread pool for executor with " + procNum\r
+                               + " thread(s)");\r
+               LocalExecutorService executor = new LocalExecutorService(procNum,\r
+                               procNum, 0L, TimeUnit.MILLISECONDS,\r
+                               new LinkedBlockingQueue<Runnable>());\r
+               // Make sure that the executor is going to be properly closed\r
+               Runtime.getRuntime().addShutdownHook(new Thread() {\r
+\r
+                       @Override\r
+                       public void run() {\r
+                               shutDown();\r
+                       }\r
+               });\r
+               return executor;\r
        }\r
-       PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
-       String threadNum = ph.getProperty(threadNumPropName);\r
-       log.debug("Thread number for local execution from conf file is "\r
-               + threadNum);\r
-       int threads = 0;\r
-       if (!Util.isEmpty(threadNum)) {\r
-           try {\r
-               threads = Integer.parseInt(threadNum);\r
-               if (threads > 1 && threads < procNum * 2) {\r
-                   procNum = threads;\r
+\r
+       /**\r
+        * This stops all executing processes via interruption. Thus it is vital\r
+        * that all processes that use this service respond to interruption\r
+        * \r
+        * Stops internal executor service which captures streams of native\r
+        * executables. This method is intended for stopping service if deployed in\r
+        * the web application context. There is NO NEED of using this method\r
+        * otherwise as the executor service is taken care of internally.\r
+        */\r
+       public static void shutDown() {\r
+               if (INSTANCE != null) {\r
+                       INSTANCE.shutdownNow();\r
                }\r
-           } catch (NumberFormatException e) {\r
-               log.error("Cannot understand " + threadNumPropName\r
-                       + " property. Expecting whole number, but given "\r
-                       + threadNum);\r
-           }\r
        }\r
 \r
-       log.debug("Constructing thread pool for executor with " + procNum\r
-               + " thread(s)");\r
-       LocalExecutorService executor = new LocalExecutorService(procNum,\r
-               procNum, 0L, TimeUnit.MILLISECONDS,\r
-               new LinkedBlockingQueue<Runnable>());\r
-       // Make sure that the executor is going to be properly closed\r
-       Runtime.getRuntime().addShutdownHook(new Thread() {\r
-           @Override\r
-           public void run() {\r
-               shutDown();\r
-           }\r
-       });\r
-       return executor;\r
-    }\r
-\r
-    /**\r
-     * This stops all executing processes via interruption. Thus it is vital\r
-     * that all processes that use this service respond to interruption\r
-     * \r
-     * Stops internal executor service which captures streams of native\r
-     * executables. This method is intended for stopping service if deployed in\r
-     * the web application context. There is NO NEED of using this method\r
-     * otherwise as the executor service is taken care of internally.\r
-     */\r
-    public static void shutDown() {\r
-       if (INSTANCE != null) {\r
-           INSTANCE.shutdownNow();\r
+       /**\r
+        * If the Executor queue is empty\r
+        * \r
+        * @return true is not all threads are busy, false otherwise\r
+        */\r
+       public boolean canAcceptMoreWork() {\r
+               // alternative to use: INSTANCE.getQueue().isEmpty(); - but this will\r
+               // inevitably put the last task to the queue\r
+               return INSTANCE.getMaximumPoolSize() > INSTANCE.getActiveCount();\r
        }\r
-    }\r
-\r
-    /**\r
-     * If the Executor queue is empty\r
-     * \r
-     * @return true is not all threads are busy, false overwise\r
-     */\r
-    public boolean canAcceptMoreWork() {\r
-       // alternative to use: executor.getMaximumPoolSize() <\r
-       // executor.getActiveCount()\r
-       return INSTANCE.getQueue().isEmpty();\r
-    }\r
-\r
-    @Override\r
-    protected void beforeExecute(Thread t, Runnable r) {\r
-       super.beforeExecute(t, r);\r
-       // class of r is java.util.concurrent.FutureTask\r
-       log.info(String.format("Thread %s: start %s", t, r));\r
-       startTime.set(System.nanoTime());\r
-    }\r
-\r
-    @Override\r
-    protected void afterExecute(Runnable r, Throwable t) {\r
-       try {\r
-           long endTime = System.nanoTime();\r
-           long taskTime = endTime - startTime.get();\r
-           numTasks.incrementAndGet();\r
-           totalTime.addAndGet(taskTime);\r
-           log.info(String.format("Throwable %s: end %s, time=%dns", t, r,\r
-                   taskTime));\r
-       } finally {\r
-           super.afterExecute(r, t);\r
+\r
+       @Override\r
+       protected void beforeExecute(Thread t, Runnable r) {\r
+               super.beforeExecute(t, r);\r
+               // class of r is java.util.concurrent.FutureTask\r
+               log.info(String.format("Thread %s: start %s", t, r));\r
+               startTime.set(System.nanoTime());\r
        }\r
-    }\r
-\r
-    @Override\r
-    protected void terminated() {\r
-       try {\r
-           if (numTasks.get() != 0) {\r
-               log.info(String.format("Terminated : avg time=%dns", totalTime\r
-                       .get()\r
-                       / numTasks.get()));\r
-           }\r
-       } finally {\r
-           super.terminated();\r
+\r
+       @Override\r
+       protected void afterExecute(Runnable r, Throwable t) {\r
+               try {\r
+                       long endTime = System.nanoTime();\r
+                       long taskTime = endTime - startTime.get();\r
+                       numTasks.incrementAndGet();\r
+                       totalTime.addAndGet(taskTime);\r
+                       log.info(String.format("Throwable %s: end %s, time=%dns", t, r,\r
+                                       taskTime));\r
+               } finally {\r
+                       super.afterExecute(r, t);\r
+               }\r
+       }\r
+\r
+       @Override\r
+       protected void terminated() {\r
+               try {\r
+                       if (numTasks.get() != 0) {\r
+                               log.info(String.format("Terminated : avg time=%dns",\r
+                                               totalTime.get() / numTasks.get()));\r
+                       }\r
+               } finally {\r
+                       super.terminated();\r
+               }\r
        }\r
-    }\r
 }\r