1 /* Copyright (c) 2009 Peter Troshin
\r
3 * JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0
\r
5 * This library is free software; you can redistribute it and/or modify it under the terms of the
\r
6 * Apache License version 2 as published by the Apache Software Foundation
\r
8 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
\r
9 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
\r
10 * License for more details.
\r
12 * A copy of the license is in apache_license.txt. It is also available here:
\r
13 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
\r
15 * Any republication or derived work distributed in source code form
\r
16 * must include this copyright and license notice.
\r
19 package compbio.engine.local;
\r
21 import java.util.concurrent.BlockingQueue;
\r
22 import java.util.concurrent.LinkedBlockingQueue;
\r
23 import java.util.concurrent.ThreadPoolExecutor;
\r
24 import java.util.concurrent.TimeUnit;
\r
25 import java.util.concurrent.atomic.AtomicLong;
\r
27 import org.apache.log4j.Logger;
\r
29 import compbio.engine.conf.PropertyHelperManager;
\r
30 import compbio.util.PropertyHelper;
\r
31 import compbio.util.Util;
\r
33 public final class LocalExecutorService extends ThreadPoolExecutor {
\r
35 private final static Logger log = Logger
\r
36 .getLogger(LocalExecutorService.class);
\r
37 private final static String threadNumPropName = "engine.local.thread.number";
\r
39 private static LocalExecutorService INSTANCE = null;
\r
40 private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
\r
41 private final AtomicLong numTasks = new AtomicLong();
\r
42 private final AtomicLong totalTime = new AtomicLong();
\r
44 private LocalExecutorService(int corePoolSize, int maximumPoolSize,
\r
45 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
\r
46 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
\r
50 * This method returns the single instance of CachedThreadPoolExecutor which
\r
51 * it cashes internally
\r
53 * @return the instance
\r
55 public synchronized static LocalExecutorService getExecutor() {
\r
56 if (INSTANCE == null) {
\r
59 log.info("Current Active Threads Count: " + INSTANCE.getActiveCount());
\r
63 private static LocalExecutorService init() {
\r
64 int procNum = Runtime.getRuntime().availableProcessors();
\r
65 // Add safety net if this function is unavailable
\r
70 procNum = procNum - 1; // leave one processor for overhead
\r
73 PropertyHelper ph = PropertyHelperManager.getPropertyHelper();
\r
74 String threadNum = ph.getProperty(threadNumPropName);
\r
75 log.debug("Thread number for local execution from conf file is "
\r
78 if (!Util.isEmpty(threadNum)) {
\r
80 threads = Integer.parseInt(threadNum);
\r
81 if (threads > 1 && threads < procNum * 2) {
\r
84 } catch (NumberFormatException e) {
\r
85 log.error("Cannot understand " + threadNumPropName
\r
86 + " property. Expecting whole number, but given "
\r
91 log.debug("Constructing thread pool for executor with " + procNum
\r
93 LocalExecutorService executor = new LocalExecutorService(procNum,
\r
94 procNum, 0L, TimeUnit.MILLISECONDS,
\r
95 new LinkedBlockingQueue<Runnable>());
\r
96 // Make sure that the executor is going to be properly closed
\r
97 Runtime.getRuntime().addShutdownHook(new Thread() {
\r
100 public void run() {
\r
108 * This stops all executing processes via interruption. Thus it is vital
\r
109 * that all processes that use this service respond to interruption
\r
111 * Stops internal executor service which captures streams of native
\r
112 * executables. This method is intended for stopping service if deployed in
\r
113 * the web application context. There is NO NEED of using this method
\r
114 * otherwise as the executor service is taken care of internally.
\r
116 public static void shutDown() {
\r
117 if (INSTANCE != null) {
\r
118 INSTANCE.shutdownNow();
\r
123 * If the Executor queue is empty
\r
125 * @return true is not all threads are busy, false otherwise
\r
127 public boolean canAcceptMoreWork() {
\r
128 // alternative to use: INSTANCE.getQueue().isEmpty(); - but this will
\r
129 // inevitably put the last task to the queue
\r
130 return INSTANCE.getMaximumPoolSize() > INSTANCE.getActiveCount();
\r
134 protected void beforeExecute(Thread t, Runnable r) {
\r
135 super.beforeExecute(t, r);
\r
136 // class of r is java.util.concurrent.FutureTask
\r
137 log.info(String.format("Thread %s: start %s", t, r));
\r
138 startTime.set(System.nanoTime());
\r
142 protected void afterExecute(Runnable r, Throwable t) {
\r
144 long endTime = System.nanoTime();
\r
145 long taskTime = endTime - startTime.get();
\r
146 numTasks.incrementAndGet();
\r
147 totalTime.addAndGet(taskTime);
\r
148 log.info(String.format("Throwable %s: end %s, time=%dns", t, r,
\r
151 super.afterExecute(r, t);
\r
156 protected void terminated() {
\r
158 if (numTasks.get() != 0) {
\r
159 log.info(String.format("Terminated : avg time=%dns",
\r
160 totalTime.get() / numTasks.get()));
\r
163 super.terminated();
\r