Swtich off GA by default
[jabaws.git] / engine / compbio / engine / local / LocalExecutorService.java
1 /* Copyright (c) 2009 Peter Troshin\r
2  *  \r
3  *  JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0     \r
4  * \r
5  *  This library is free software; you can redistribute it and/or modify it under the terms of the\r
6  *  Apache License version 2 as published by the Apache Software Foundation\r
7  * \r
8  *  This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without\r
9  *  even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache \r
10  *  License for more details.\r
11  * \r
12  *  A copy of the license is in apache_license.txt. It is also available here:\r
13  * @see: http://www.apache.org/licenses/LICENSE-2.0.txt\r
14  * \r
15  * Any republication or derived work distributed in source code form\r
16  * must include this copyright and license notice.\r
17  */\r
18 \r
19 package compbio.engine.local;\r
20 \r
21 import java.util.concurrent.BlockingQueue;\r
22 import java.util.concurrent.LinkedBlockingQueue;\r
23 import java.util.concurrent.ThreadPoolExecutor;\r
24 import java.util.concurrent.TimeUnit;\r
25 import java.util.concurrent.atomic.AtomicLong;\r
26 \r
27 import org.apache.log4j.Logger;\r
28 \r
29 import compbio.engine.conf.PropertyHelperManager;\r
30 import compbio.util.PropertyHelper;\r
31 import compbio.util.Util;\r
32 \r
33 public final class LocalExecutorService extends ThreadPoolExecutor {\r
34 \r
35         private final static Logger log = Logger.getLogger(LocalExecutorService.class);\r
36         private final static String threadNumPropName = "engine.local.thread.number";\r
37 \r
38         private static LocalExecutorService INSTANCE = null;\r
39         private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();\r
40         private final AtomicLong numTasks = new AtomicLong();\r
41         private final AtomicLong totalTime = new AtomicLong();\r
42 \r
43         private LocalExecutorService(int corePoolSize, int maximumPoolSize,\r
44                         long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {\r
45                 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);\r
46         }\r
47 \r
48         /**\r
49          * This method returns the single instance of CachedThreadPoolExecutor which\r
50          * it cashes internally\r
51          * \r
52          * @return the instance\r
53          */\r
54         public synchronized static LocalExecutorService getExecutor() {\r
55                 if (INSTANCE == null) {\r
56                         INSTANCE = init();\r
57                 }\r
58                 log.info("Current Active Threads Count: " + INSTANCE.getActiveCount());\r
59                 return INSTANCE;\r
60         }\r
61 \r
62         private static LocalExecutorService init() {\r
63                 int procNum = Runtime.getRuntime().availableProcessors();\r
64                 // Add safety net if this function is unavailable\r
65                 if (procNum < 1) {\r
66                         procNum = 1;\r
67                 }\r
68                 if (procNum > 4) {\r
69                         procNum = procNum - 1; // leave one processor for overhead\r
70                         // management\r
71                 }\r
72                 PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
73                 String threadNum = ph.getProperty(threadNumPropName);\r
74                 log.debug("Thread number for local execution from conf file is "\r
75                                 + threadNum);\r
76                 int threads = 0;\r
77                 if (!Util.isEmpty(threadNum)) {\r
78                         try {\r
79                                 threads = Integer.parseInt(threadNum);\r
80                                 if (threads > 1 && threads < procNum * 2) {\r
81                                         procNum = threads;\r
82                                 }\r
83                         } catch (NumberFormatException e) {\r
84                                 log.error("Cannot understand " + threadNumPropName\r
85                                                 + " property. Expecting whole number, but given "\r
86                                                 + threadNum);\r
87                         }\r
88                 }\r
89 \r
90                 log.debug("Constructing thread pool for executor with " + procNum\r
91                                 + " thread(s)");\r
92                 LocalExecutorService executor = new LocalExecutorService(procNum,\r
93                                 procNum, 0L, TimeUnit.MILLISECONDS,\r
94                                 new LinkedBlockingQueue<Runnable>());\r
95                 // Make sure that the executor is going to be properly closed\r
96                 Runtime.getRuntime().addShutdownHook(new Thread() {\r
97 \r
98                         @Override\r
99                         public void run() {\r
100                                 shutDown();\r
101                         }\r
102                 });\r
103                 return executor;\r
104         }\r
105 \r
106         /**\r
107          * This stops all executing processes via interruption. Thus it is vital\r
108          * that all processes that use this service respond to interruption\r
109          * \r
110          * Stops internal executor service which captures streams of native\r
111          * executables. This method is intended for stopping service if deployed in\r
112          * the web application context. There is NO NEED of using this method\r
113          * otherwise as the executor service is taken care of internally.\r
114          */\r
115         public static void shutDown() {\r
116                 if (INSTANCE != null) {\r
117                         INSTANCE.shutdownNow();\r
118                 }\r
119         }\r
120 \r
121         /**\r
122          * If the Executor queue is empty\r
123          * \r
124          * @return true is not all threads are busy, false otherwise\r
125          */\r
126         public boolean canAcceptMoreWork() {\r
127                 // alternative to use: INSTANCE.getQueue().isEmpty(); - but this will\r
128                 // inevitably put the last task to the queue\r
129                 return INSTANCE.getMaximumPoolSize() > INSTANCE.getActiveCount();\r
130         }\r
131 \r
132         @Override\r
133         protected void beforeExecute(Thread t, Runnable r) {\r
134                 super.beforeExecute(t, r);\r
135                 // class of r is java.util.concurrent.FutureTask\r
136                 log.info(String.format("Thread %s: start %s", t, r));\r
137                 startTime.set(System.nanoTime());\r
138         }\r
139 \r
140         @Override\r
141         protected void afterExecute(Runnable r, Throwable t) {\r
142                 try {\r
143                         long endTime = System.nanoTime();\r
144                         long taskTime = endTime - startTime.get();\r
145                         numTasks.incrementAndGet();\r
146                         totalTime.addAndGet(taskTime);\r
147                         log.info(String.format("Throwable %s: end %s, time=%dns", t, r,\r
148                                         taskTime));\r
149                 } finally {\r
150                         super.afterExecute(r, t);\r
151                 }\r
152         }\r
153 \r
154         @Override\r
155         protected void terminated() {\r
156                 try {\r
157                         if (numTasks.get() != 0) {\r
158                                 log.info(String.format("Terminated : avg time=%dns",\r
159                                                 totalTime.get() / numTasks.get()));\r
160                         }\r
161                 } finally {\r
162                         super.terminated();\r
163                 }\r
164         }\r
165 }\r