Next version of JABA
[jabaws.git] / engine / compbio / engine / local / LocalExecutorService.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.local;\r
20 \r
21 import java.util.concurrent.BlockingQueue;\r
22 import java.util.concurrent.LinkedBlockingQueue;\r
23 import java.util.concurrent.ThreadPoolExecutor;\r
24 import java.util.concurrent.TimeUnit;\r
25 import java.util.concurrent.atomic.AtomicLong;\r
26 \r
27 import org.apache.log4j.Logger;\r
28 \r
29 import compbio.engine.conf.PropertyHelperManager;\r
30 import compbio.util.PropertyHelper;\r
31 import compbio.util.Util;\r
32 \r
33 public final class LocalExecutorService extends ThreadPoolExecutor {\r
34 \r
35     private final static Logger log = Logger\r
36             .getLogger(LocalExecutorService.class);\r
37     private final static String threadNumPropName = "engine.local.thread.number";\r
38 \r
39     private static LocalExecutorService INSTANCE = null;\r
40     private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();\r
41     private final AtomicLong numTasks = new AtomicLong();\r
42     private final AtomicLong totalTime = new AtomicLong();\r
43 \r
44     private LocalExecutorService(int corePoolSize, int maximumPoolSize,\r
45             long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {\r
46         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);\r
47     }\r
48 \r
49     /**\r
50      * This method returns the single instance of CachedThreadPoolExecutor which\r
51      * it cashes internally\r
52      * \r
53      * @return\r
54      */\r
55     public synchronized static LocalExecutorService getExecutor() {\r
56         if (INSTANCE == null) {\r
57             INSTANCE = init();\r
58         }\r
59         log.info("Current Active Threads Count: " + INSTANCE.getActiveCount());\r
60         return INSTANCE;\r
61     }\r
62 \r
63     private static LocalExecutorService init() {\r
64         int procNum = Runtime.getRuntime().availableProcessors();\r
65         // Add safety net if this function is unavailable\r
66         if (procNum < 1) {\r
67             procNum = 1;\r
68         }\r
69         if (procNum > 4) {\r
70             procNum = procNum - 1; // leave one processor for overhead\r
71             // management\r
72         }\r
73         PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
74         String threadNum = ph.getProperty(threadNumPropName);\r
75         log.debug("Thread number for local execution from conf file is "\r
76                 + threadNum);\r
77         int threads = 0;\r
78         if (!Util.isEmpty(threadNum)) {\r
79             try {\r
80                 threads = Integer.parseInt(threadNum);\r
81                 if (threads > 1 && threads < procNum * 2) {\r
82                     procNum = threads;\r
83                 }\r
84             } catch (NumberFormatException e) {\r
85                 log.error("Cannot understand " + threadNumPropName\r
86                         + " property. Expecting whole number, but given "\r
87                         + threadNum);\r
88             }\r
89         }\r
90 \r
91         log.debug("Constructing thread pool for executor with " + procNum\r
92                 + " thread(s)");\r
93         LocalExecutorService executor = new LocalExecutorService(procNum,\r
94                 procNum, 0L, TimeUnit.MILLISECONDS,\r
95                 new LinkedBlockingQueue<Runnable>());\r
96         // Make sure that the executor is going to be properly closed\r
97         Runtime.getRuntime().addShutdownHook(new Thread() {\r
98             @Override\r
99             public void run() {\r
100                 shutDown();\r
101             }\r
102         });\r
103         return executor;\r
104     }\r
105 \r
106     /**\r
107      * This stops all executing processes via interruption. Thus it is vital\r
108      * that all processes that use this service respond to interruption\r
109      * \r
110      * Stops internal executor service which captures streams of native\r
111      * executables. This method is intended for stopping service if deployed in\r
112      * the web application context. There is NO NEED of using this method\r
113      * otherwise as the executor service is taken care of internally.\r
114      */\r
115     public static void shutDown() {\r
116         if (INSTANCE != null) {\r
117             INSTANCE.shutdownNow();\r
118         }\r
119     }\r
120 \r
121     /**\r
122      * If the Executor queue is empty\r
123      * \r
124      * @return true is not all threads are busy, false overwise\r
125      */\r
126     public boolean canAcceptMoreWork() {\r
127         // alternative to use: executor.getMaximumPoolSize() <\r
128         // executor.getActiveCount()\r
129         return INSTANCE.getQueue().isEmpty();\r
130     }\r
131 \r
132     @Override\r
133     protected void beforeExecute(Thread t, Runnable r) {\r
134         super.beforeExecute(t, r);\r
135         // class of r is java.util.concurrent.FutureTask\r
136         log.info(String.format("Thread %s: start %s", t, r));\r
137         startTime.set(System.nanoTime());\r
138     }\r
139 \r
140     @Override\r
141     protected void afterExecute(Runnable r, Throwable t) {\r
142         try {\r
143             long endTime = System.nanoTime();\r
144             long taskTime = endTime - startTime.get();\r
145             numTasks.incrementAndGet();\r
146             totalTime.addAndGet(taskTime);\r
147             log.info(String.format("Throwable %s: end %s, time=%dns", t, r,\r
148                     taskTime));\r
149         } finally {\r
150             super.afterExecute(r, t);\r
151         }\r
152     }\r
153 \r
154     @Override\r
155     protected void terminated() {\r
156         try {\r
157             if (numTasks.get() != 0) {\r
158                 log.info(String.format("Terminated : avg time=%dns", totalTime\r
159                         .get()\r
160                         / numTasks.get()));\r
161             }\r
162         } finally {\r
163             super.terminated();\r
164         }\r
165     }\r
166 }\r