\r
import java.io.File;\r
import java.io.FileFilter;\r
-import java.io.FileWriter;\r
import java.io.IOException;\r
import java.sql.SQLException;\r
import java.text.SimpleDateFormat;\r
import java.util.ArrayList;\r
+import java.util.Date;\r
import java.util.HashMap;\r
import java.util.HashSet;\r
import java.util.List;\r
\r
import compbio.engine.client.ConfExecutable;\r
import compbio.engine.client.Executable;\r
-import compbio.engine.conf.PropertyHelperManager;\r
import compbio.metadata.JobStatus;\r
import compbio.runner.msa.ClustalW;\r
import compbio.util.FileUtil;\r
-import compbio.util.PropertyHelper;\r
import compbio.ws.client.Services;\r
\r
/**\r
* @author pvtroshin\r
* \r
*/\r
-public class ExecutionStatCollector {\r
+public class ExecutionStatCollector implements Runnable {\r
\r
static final int UNDEFINED = -1;\r
\r
\r
static SimpleDateFormat DF = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");\r
\r
- static PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
-\r
final private List<JobStat> stats;\r
+ /**\r
+ * Consider the job that has been working for longer than timeOutInHours\r
+ * completed, whatever the outcome\r
+ */\r
+ final private int timeOutInHours;\r
\r
- public ExecutionStatCollector(String workDirectory) {\r
+ /**\r
+ * List subdirectories in the job directory\r
+ * \r
+ * @param workDirectory\r
+ * @param timeOutInHours\r
+ */\r
+ public ExecutionStatCollector(String workDirectory, int timeOutInHours) {\r
+ log.info("Starting stat collector for directory: " + workDirectory);\r
+ log.info("Maximum allowed runtime(h): " + timeOutInHours);\r
File[] files = FileUtil.getFiles(workDirectory, directories);\r
stats = new ArrayList<JobStat>();\r
+ assert timeOutInHours > 0;\r
+ this.timeOutInHours = timeOutInHours;\r
for (File file : files) {\r
JobDirectory jd = new JobDirectory(file);\r
- stats.add(jd.getJobStat());\r
+ JobStat jstat = jd.getJobStat();\r
+ // Do not record stats on the job that has not completed yet\r
+ if (hasCompleted(jd)) {\r
+ stats.add(jstat);\r
+ } else {\r
+ log.debug("Skipping the job: " + jstat);\r
+ log.debug("As it has not completed yet");\r
+ }\r
// System.out.println(jd.getJobStat().getJobReportTabulated());\r
}\r
}\r
\r
+ boolean hasCompleted(JobDirectory jd) {\r
+ JobStat jstat = jd.getJobStat();\r
+ if (jstat.hasResult() || jstat.getIsCancelled()\r
+ || jstat.getIsFinished() || hasTimedOut(jd)) {\r
+ return true;\r
+ }\r
+ return false;\r
+ }\r
+\r
+ boolean hasTimedOut(JobDirectory jd) {\r
+ return ((System.currentTimeMillis() - jd.jobdir.lastModified()) / (1000 * 60 * 60)) > timeOutInHours;\r
+ }\r
+\r
public StatProcessor getStats() {\r
return new StatProcessor(stats);\r
}\r
public void writeStatToDB() throws SQLException {\r
Set<JobStat> rjobs = new HashSet<JobStat>(stats);\r
StatDB statdb = new StatDB();\r
+ log.debug("Removing records that has already been recorded");\r
+\r
statdb.removeRecordedJobs(rjobs);\r
+ log.debug("New records left: " + rjobs.size());\r
statdb.insertData(rjobs);\r
- statdb.conn.close();\r
- }\r
-\r
- static String getClusterJobDir() {\r
- String clusterdir = ph.getProperty("cluster.tmp.directory");\r
- if (clusterdir != null) {\r
- clusterdir.trim();\r
- }\r
- return clusterdir;\r
- }\r
-\r
- static void updateTime(File statFile) throws IOException {\r
- long lastMod = statFile.lastModified();\r
- FileWriter fw = new FileWriter(statFile);\r
- fw.write(new Long(lastMod).toString());\r
- fw.close();\r
}\r
\r
- static String getLocalJobDir() {\r
- String locdir = ph.getProperty("local.tmp.directory");\r
- if (locdir != null) {\r
- locdir.trim();\r
- }\r
- return locdir;\r
- }\r
+ /*\r
+ * static void updateTime(File statFile) throws IOException { long lastMod =\r
+ * statFile.lastModified(); FileWriter fw = new FileWriter(statFile);\r
+ * fw.write(new Long(lastMod).toString()); fw.close(); }\r
+ */\r
\r
/**\r
* \r
// updateTime(new File(\r
// "D:\\workspace\\JABA2\\jobsout\\AACon#170462904473672\\STARTED"));\r
\r
- String workDir = PropertyHelperManager.getLocalPath()\r
- + getLocalJobDir().trim();\r
- System.out.println(workDir);\r
File[] files = FileUtil.getFiles("Y:\\fc\\www-jws2\\jaba\\jobsout",\r
directories);\r
List<JobStat> stats = new ArrayList<JobStat>();\r
static FileFilter directories = new FileFilter() {\r
@Override\r
public boolean accept(File pathname) {\r
- return pathname.isDirectory();\r
+ return pathname.isDirectory()\r
+ && !pathname.getName().startsWith(".");\r
}\r
};\r
\r
}\r
\r
}\r
+\r
+ @Override\r
+ public void run() {\r
+ log.info("Started updating statistics at " + new Date());\r
+\r
+ StatProcessor local_stats = getStats();\r
+ log.info("Found " + local_stats.getJobNumber() + " jobs!");\r
+ try {\r
+ writeStatToDB();\r
+ } catch (SQLException e) {\r
+ log.error("Fails to update jobs statistics database!");\r
+ log.error(e.getLocalizedMessage(), e);\r
+ }\r
+ log.info("Finished updating statistics at " + new Date());\r
+ }\r
+\r
}\r
package compbio.stat.servlet;\r
\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.ScheduledExecutorService;\r
+import java.util.concurrent.ScheduledFuture;\r
+import java.util.concurrent.TimeUnit;\r
+\r
import javax.servlet.ServletContextEvent;\r
import javax.servlet.ServletContextListener;\r
\r
+import org.apache.log4j.Logger;\r
+\r
+import compbio.engine.conf.PropertyHelperManager;\r
+import compbio.stat.collector.ExecutionStatCollector;\r
+import compbio.stat.collector.StatDB;\r
+import compbio.util.PropertyHelper;\r
+\r
public class StatisticCollector implements ServletContextListener {\r
\r
+ static PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
+\r
+ private final Logger log = Logger.getLogger(StatisticCollector.class);\r
+\r
+ private ScheduledFuture<?> localcf;\r
+ private ScheduledFuture<?> clustercf;\r
+ private ScheduledExecutorService executor;\r
+\r
@Override\r
public void contextDestroyed(ServletContextEvent arg0) {\r
- // TODO Auto-generated method stub\r
-\r
+ try {\r
+ if (localcf != null) {\r
+ localcf.cancel(true);\r
+ }\r
+ if (clustercf != null) {\r
+ clustercf.cancel(true);\r
+ }\r
+ executor.shutdown();\r
+ executor.awaitTermination(3, TimeUnit.SECONDS);\r
+ } catch (InterruptedException e) {\r
+ log.warn(e.getMessage(), e);\r
+ } finally {\r
+ StatDB.shutdownDBServer();\r
+ executor.shutdownNow();\r
+ }\r
}\r
\r
@Override\r
public void contextInitialized(ServletContextEvent arg0) {\r
- // TODO Auto-generated method stub\r
+ String clusterWorkDir = getClusterJobDir();\r
+ int clusterMaxRuntime = getClusterJobTimeOut();\r
+\r
+ int localMaxRuntime = getLocalJobTimeOut();\r
+ String localWorkDir = getLocalJobDir();\r
+\r
+ log.info("Initializing statistics collector");\r
+ executor = Executors.newScheduledThreadPool(1);\r
+\r
+ if (collectClusterStats()) {\r
+ ExecutionStatCollector clusterCollector = new ExecutionStatCollector(\r
+ clusterWorkDir, clusterMaxRuntime);\r
+ clustercf = executor.scheduleAtFixedRate(clusterCollector, 60,\r
+ 24 * 60, TimeUnit.MINUTES);\r
+ log.info("Collecting cluster statistics ");\r
+ }\r
+ if (collectLocalStats()) {\r
+ ExecutionStatCollector localCollector = new ExecutionStatCollector(\r
+ localWorkDir, localMaxRuntime);\r
+ localcf = executor.scheduleAtFixedRate(localCollector, 10, 24 * 60,\r
+ TimeUnit.MINUTES);\r
+ log.info("Collecting local statistics ");\r
+ }\r
+\r
+ }\r
+\r
+ static String getClusterJobDir() {\r
+ return getStringProperty(ph.getProperty("cluster.tmp.directory"));\r
+ }\r
+\r
+ static int getClusterJobTimeOut() {\r
+ int maxRunTime = 24 * 7;\r
+ String clusterMaxRuntime = ph.getProperty("cluster.stat.maxruntime");\r
+ if (clusterMaxRuntime != null) {\r
+ clusterMaxRuntime = clusterMaxRuntime.trim();\r
+ maxRunTime = Integer.parseInt(clusterMaxRuntime);\r
+ }\r
+ return maxRunTime;\r
+ }\r
+\r
+ static int getLocalJobTimeOut() {\r
+ int maxRunTime = 24;\r
+ String localMaxRuntime = ph.getProperty("local.stat.maxruntime");\r
+ if (localMaxRuntime != null) {\r
+ localMaxRuntime = localMaxRuntime.trim();\r
+ maxRunTime = Integer.parseInt(localMaxRuntime);\r
+ }\r
+\r
+ return maxRunTime;\r
+ }\r
+\r
+ static String getLocalJobDir() {\r
+ return getStringProperty(ph.getProperty("local.tmp.directory"));\r
+ }\r
+\r
+ private static String getStringProperty(String propName) {\r
+ String locdir = ph.getProperty(propName);\r
+ if (locdir != null) {\r
+ locdir = locdir.trim();\r
+ }\r
+ return locdir;\r
+ }\r
+\r
+ static boolean collectClusterStats() {\r
+ return getBooleanProperty(ph\r
+ .getProperty("cluster.stat.collector.enable"));\r
+\r
+ }\r
+\r
+ static boolean collectLocalStats() {\r
+ return getBooleanProperty(ph.getProperty("local.stat.collector.enable"));\r
+ }\r
\r
+ private static boolean getBooleanProperty(String propName) {\r
+ boolean enabled = false;\r
+ if (propName != null) {\r
+ propName = propName.trim();\r
+ enabled = Boolean.parseBoolean(propName);\r
+ }\r
+ return enabled;\r
}\r
\r
}\r