1 /* Copyright (c) 2009 Peter Troshin
\r
3 * JAva Bioinformatics Analysis Web Services (JABAWS) @version: 1.0
\r
5 * This library is free software; you can redistribute it and/or modify it under the terms of the
\r
6 * Apache License version 2 as published by the Apache Software Foundation
\r
8 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
\r
9 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Apache
\r
10 * License for more details.
\r
12 * A copy of the license is in apache_license.txt. It is also available here:
\r
13 * @see: http://www.apache.org/licenses/LICENSE-2.0.txt
\r
15 * Any republication or derived work distributed in source code form
\r
16 * must include this copyright and license notice.
\r
19 package compbio.engine.local;
\r
21 import java.util.concurrent.BlockingQueue;
\r
22 import java.util.concurrent.LinkedBlockingQueue;
\r
23 import java.util.concurrent.ThreadPoolExecutor;
\r
24 import java.util.concurrent.TimeUnit;
\r
25 import java.util.concurrent.atomic.AtomicLong;
\r
27 import org.apache.log4j.Logger;
\r
29 import compbio.engine.conf.PropertyHelperManager;
\r
30 import compbio.util.PropertyHelper;
\r
31 import compbio.util.Util;
\r
33 public final class LocalExecutorService extends ThreadPoolExecutor {
\r
35 private final static Logger log = Logger.getLogger(LocalExecutorService.class);
\r
36 private final static String threadNumPropName = "engine.local.thread.number";
\r
38 private static LocalExecutorService INSTANCE = null;
\r
39 private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
\r
40 private final AtomicLong numTasks = new AtomicLong();
\r
41 private final AtomicLong totalTime = new AtomicLong();
\r
43 private LocalExecutorService(int corePoolSize, int maximumPoolSize,
\r
44 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
\r
45 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
\r
49 * This method returns the single instance of CachedThreadPoolExecutor which
\r
50 * it cashes internally
\r
52 * @return the instance
\r
54 public synchronized static LocalExecutorService getExecutor() {
\r
55 if (INSTANCE == null) {
\r
58 log.info("Current Active Threads Count: " + INSTANCE.getActiveCount());
\r
62 private static LocalExecutorService init() {
\r
63 int procNum = Runtime.getRuntime().availableProcessors();
\r
64 // Add safety net if this function is unavailable
\r
69 procNum = procNum - 1; // leave one processor for overhead
\r
72 PropertyHelper ph = PropertyHelperManager.getPropertyHelper();
\r
73 String threadNum = ph.getProperty(threadNumPropName);
\r
74 log.debug("Thread number for local execution from conf file is "
\r
77 if (!Util.isEmpty(threadNum)) {
\r
79 threads = Integer.parseInt(threadNum);
\r
80 if (threads > 1 && threads < procNum * 2) {
\r
83 } catch (NumberFormatException e) {
\r
84 log.error("Cannot understand " + threadNumPropName
\r
85 + " property. Expecting whole number, but given "
\r
90 log.debug("Constructing thread pool for executor with " + procNum
\r
92 LocalExecutorService executor = new LocalExecutorService(procNum,
\r
93 procNum, 0L, TimeUnit.MILLISECONDS,
\r
94 new LinkedBlockingQueue<Runnable>());
\r
95 // Make sure that the executor is going to be properly closed
\r
96 Runtime.getRuntime().addShutdownHook(new Thread() {
\r
107 * This stops all executing processes via interruption. Thus it is vital
\r
108 * that all processes that use this service respond to interruption
\r
110 * Stops internal executor service which captures streams of native
\r
111 * executables. This method is intended for stopping service if deployed in
\r
112 * the web application context. There is NO NEED of using this method
\r
113 * otherwise as the executor service is taken care of internally.
\r
115 public static void shutDown() {
\r
116 if (INSTANCE != null) {
\r
117 INSTANCE.shutdownNow();
\r
122 * If the Executor queue is empty
\r
124 * @return true is not all threads are busy, false otherwise
\r
126 public boolean canAcceptMoreWork() {
\r
127 // alternative to use: INSTANCE.getQueue().isEmpty(); - but this will
\r
128 // inevitably put the last task to the queue
\r
129 return INSTANCE.getMaximumPoolSize() > INSTANCE.getActiveCount();
\r
133 protected void beforeExecute(Thread t, Runnable r) {
\r
134 super.beforeExecute(t, r);
\r
135 // class of r is java.util.concurrent.FutureTask
\r
136 log.info(String.format("Thread %s: start %s", t, r));
\r
137 startTime.set(System.nanoTime());
\r
141 protected void afterExecute(Runnable r, Throwable t) {
\r
143 long endTime = System.nanoTime();
\r
144 long taskTime = endTime - startTime.get();
\r
145 numTasks.incrementAndGet();
\r
146 totalTime.addAndGet(taskTime);
\r
147 log.info(String.format("Throwable %s: end %s, time=%dns", t, r,
\r
150 super.afterExecute(r, t);
\r
155 protected void terminated() {
\r
157 if (numTasks.get() != 0) {
\r
158 log.info(String.format("Terminated : avg time=%dns",
\r
159 totalTime.get() / numTasks.get()));
\r
162 super.terminated();
\r