04a0bd5e4f905c5676f1b48a9dd15f9c87371d4e
[jabaws.git] / engine / compbio / engine / local / LocalExecutorService.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.local;\r
20 \r
21 import java.util.concurrent.BlockingQueue;\r
22 import java.util.concurrent.LinkedBlockingQueue;\r
23 import java.util.concurrent.ThreadPoolExecutor;\r
24 import java.util.concurrent.TimeUnit;\r
25 import java.util.concurrent.atomic.AtomicLong;\r
26 \r
27 import org.apache.log4j.Logger;\r
28 \r
29 import compbio.engine.conf.PropertyHelperManager;\r
30 import compbio.util.PropertyHelper;\r
31 import compbio.util.Util;\r
32 \r
33 public final class LocalExecutorService extends ThreadPoolExecutor {\r
34 \r
35         private final static Logger log = Logger\r
36                         .getLogger(LocalExecutorService.class);\r
37         private final static String threadNumPropName = "engine.local.thread.number";\r
38 \r
39         private static LocalExecutorService INSTANCE = null;\r
40         private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();\r
41         private final AtomicLong numTasks = new AtomicLong();\r
42         private final AtomicLong totalTime = new AtomicLong();\r
43 \r
44         private LocalExecutorService(int corePoolSize, int maximumPoolSize,\r
45                         long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {\r
46                 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);\r
47         }\r
48 \r
49         /**\r
50          * This method returns the single instance of CachedThreadPoolExecutor which\r
51          * it cashes internally\r
52          * \r
53          * @return\r
54          */\r
55         public synchronized static LocalExecutorService getExecutor() {\r
56                 if (INSTANCE == null) {\r
57                         INSTANCE = init();\r
58                 }\r
59                 log.info("Current Active Threads Count: " + INSTANCE.getActiveCount());\r
60                 return INSTANCE;\r
61         }\r
62 \r
63         private static LocalExecutorService init() {\r
64                 int procNum = Runtime.getRuntime().availableProcessors();\r
65                 // Add safety net if this function is unavailable\r
66                 if (procNum < 1) {\r
67                         procNum = 1;\r
68                 }\r
69                 if (procNum > 4) {\r
70                         procNum = procNum - 1; // leave one processor for overhead\r
71                         // management\r
72                 }\r
73                 PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
74                 String threadNum = ph.getProperty(threadNumPropName);\r
75                 log.debug("Thread number for local execution from conf file is "\r
76                                 + threadNum);\r
77                 int threads = 0;\r
78                 if (!Util.isEmpty(threadNum)) {\r
79                         try {\r
80                                 threads = Integer.parseInt(threadNum);\r
81                                 if (threads > 1 && threads < procNum * 2) {\r
82                                         procNum = threads;\r
83                                 }\r
84                         } catch (NumberFormatException e) {\r
85                                 log.error("Cannot understand " + threadNumPropName\r
86                                                 + " property. Expecting whole number, but given "\r
87                                                 + threadNum);\r
88                         }\r
89                 }\r
90 \r
91                 log.debug("Constructing thread pool for executor with " + procNum\r
92                                 + " thread(s)");\r
93                 LocalExecutorService executor = new LocalExecutorService(procNum,\r
94                                 procNum, 0L, TimeUnit.MILLISECONDS,\r
95                                 new LinkedBlockingQueue<Runnable>());\r
96                 // Make sure that the executor is going to be properly closed\r
97                 Runtime.getRuntime().addShutdownHook(new Thread() {\r
98 \r
99                         @Override\r
100                         public void run() {\r
101                                 shutDown();\r
102                         }\r
103                 });\r
104                 return executor;\r
105         }\r
106 \r
107         /**\r
108          * This stops all executing processes via interruption. Thus it is vital\r
109          * that all processes that use this service respond to interruption\r
110          * \r
111          * Stops internal executor service which captures streams of native\r
112          * executables. This method is intended for stopping service if deployed in\r
113          * the web application context. There is NO NEED of using this method\r
114          * otherwise as the executor service is taken care of internally.\r
115          */\r
116         public static void shutDown() {\r
117                 if (INSTANCE != null) {\r
118                         INSTANCE.shutdownNow();\r
119                 }\r
120         }\r
121 \r
122         /**\r
123          * If the Executor queue is empty\r
124          * \r
125          * @return true is not all threads are busy, false otherwise\r
126          */\r
127         public boolean canAcceptMoreWork() {\r
128                 // alternative to use: INSTANCE.getQueue().isEmpty(); - but this will\r
129                 // inevitably put the last task to the queue\r
130                 return INSTANCE.getMaximumPoolSize() > INSTANCE.getActiveCount();\r
131         }\r
132 \r
133         @Override\r
134         protected void beforeExecute(Thread t, Runnable r) {\r
135                 super.beforeExecute(t, r);\r
136                 // class of r is java.util.concurrent.FutureTask\r
137                 log.info(String.format("Thread %s: start %s", t, r));\r
138                 startTime.set(System.nanoTime());\r
139         }\r
140 \r
141         @Override\r
142         protected void afterExecute(Runnable r, Throwable t) {\r
143                 try {\r
144                         long endTime = System.nanoTime();\r
145                         long taskTime = endTime - startTime.get();\r
146                         numTasks.incrementAndGet();\r
147                         totalTime.addAndGet(taskTime);\r
148                         log.info(String.format("Throwable %s: end %s, time=%dns", t, r,\r
149                                         taskTime));\r
150                 } finally {\r
151                         super.afterExecute(r, t);\r
152                 }\r
153         }\r
154 \r
155         @Override\r
156         protected void terminated() {\r
157                 try {\r
158                         if (numTasks.get() != 0) {\r
159                                 log.info(String.format("Terminated : avg time=%dns",\r
160                                                 totalTime.get() / numTasks.get()));\r
161                         }\r
162                 } finally {\r
163                         super.terminated();\r
164                 }\r
165         }\r
166 }\r