Further work on statistics display - improvements to stat collector
authorpvtroshin <pvtroshin@e3abac25-378b-4346-85de-24260fe3988d>
Fri, 27 May 2011 18:11:08 +0000 (18:11 +0000)
committerpvtroshin <pvtroshin@e3abac25-378b-4346-85de-24260fe3988d>
Fri, 27 May 2011 18:11:08 +0000 (18:11 +0000)
git-svn-id: link to svn.lifesci.dundee.ac.uk/svn/barton/ptroshin/JABA2@4184 e3abac25-378b-4346-85de-24260fe3988d

webservices/compbio/stat/collector/ExecutionStatCollector.java
webservices/compbio/stat/collector/StatDB.java
webservices/compbio/stat/servlet/StatisticCollector.java

index cef65f9..140de7c 100644 (file)
@@ -2,11 +2,11 @@ package compbio.stat.collector;
 \r
 import java.io.File;\r
 import java.io.FileFilter;\r
-import java.io.FileWriter;\r
 import java.io.IOException;\r
 import java.sql.SQLException;\r
 import java.text.SimpleDateFormat;\r
 import java.util.ArrayList;\r
+import java.util.Date;\r
 import java.util.HashMap;\r
 import java.util.HashSet;\r
 import java.util.List;\r
@@ -17,11 +17,9 @@ import org.apache.log4j.Logger;
 \r
 import compbio.engine.client.ConfExecutable;\r
 import compbio.engine.client.Executable;\r
-import compbio.engine.conf.PropertyHelperManager;\r
 import compbio.metadata.JobStatus;\r
 import compbio.runner.msa.ClustalW;\r
 import compbio.util.FileUtil;\r
-import compbio.util.PropertyHelper;\r
 import compbio.ws.client.Services;\r
 \r
 /**\r
@@ -52,7 +50,7 @@ import compbio.ws.client.Services;
  * @author pvtroshin\r
  * \r
  */\r
-public class ExecutionStatCollector {\r
+public class ExecutionStatCollector implements Runnable {\r
 \r
        static final int UNDEFINED = -1;\r
 \r
@@ -61,20 +59,53 @@ public class ExecutionStatCollector {
 \r
        static SimpleDateFormat DF = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");\r
 \r
-       static PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
-\r
        final private List<JobStat> stats;\r
+       /**\r
+        * Consider the job that has been working for longer than timeOutInHours\r
+        * completed, whatever the outcome\r
+        */\r
+       final private int timeOutInHours;\r
 \r
-       public ExecutionStatCollector(String workDirectory) {\r
+       /**\r
+        * List subdirectories in the job directory\r
+        * \r
+        * @param workDirectory\r
+        * @param timeOutInHours\r
+        */\r
+       public ExecutionStatCollector(String workDirectory, int timeOutInHours) {\r
+               log.info("Starting stat collector for directory: " + workDirectory);\r
+               log.info("Maximum allowed runtime(h): " + timeOutInHours);\r
                File[] files = FileUtil.getFiles(workDirectory, directories);\r
                stats = new ArrayList<JobStat>();\r
+               assert timeOutInHours > 0;\r
+               this.timeOutInHours = timeOutInHours;\r
                for (File file : files) {\r
                        JobDirectory jd = new JobDirectory(file);\r
-                       stats.add(jd.getJobStat());\r
+                       JobStat jstat = jd.getJobStat();\r
+                       // Do not record stats on the job that has not completed yet\r
+                       if (hasCompleted(jd)) {\r
+                               stats.add(jstat);\r
+                       } else {\r
+                               log.debug("Skipping the job: " + jstat);\r
+                               log.debug("As it has not completed yet");\r
+                       }\r
                        // System.out.println(jd.getJobStat().getJobReportTabulated());\r
                }\r
        }\r
 \r
+       boolean hasCompleted(JobDirectory jd) {\r
+               JobStat jstat = jd.getJobStat();\r
+               if (jstat.hasResult() || jstat.getIsCancelled()\r
+                               || jstat.getIsFinished() || hasTimedOut(jd)) {\r
+                       return true;\r
+               }\r
+               return false;\r
+       }\r
+\r
+       boolean hasTimedOut(JobDirectory jd) {\r
+               return ((System.currentTimeMillis() - jd.jobdir.lastModified()) / (1000 * 60 * 60)) > timeOutInHours;\r
+       }\r
+\r
        public StatProcessor getStats() {\r
                return new StatProcessor(stats);\r
        }\r
@@ -82,33 +113,18 @@ public class ExecutionStatCollector {
        public void writeStatToDB() throws SQLException {\r
                Set<JobStat> rjobs = new HashSet<JobStat>(stats);\r
                StatDB statdb = new StatDB();\r
+               log.debug("Removing records that has already been recorded");\r
+\r
                statdb.removeRecordedJobs(rjobs);\r
+               log.debug("New records left: " + rjobs.size());\r
                statdb.insertData(rjobs);\r
-               statdb.conn.close();\r
-       }\r
-\r
-       static String getClusterJobDir() {\r
-               String clusterdir = ph.getProperty("cluster.tmp.directory");\r
-               if (clusterdir != null) {\r
-                       clusterdir.trim();\r
-               }\r
-               return clusterdir;\r
-       }\r
-\r
-       static void updateTime(File statFile) throws IOException {\r
-               long lastMod = statFile.lastModified();\r
-               FileWriter fw = new FileWriter(statFile);\r
-               fw.write(new Long(lastMod).toString());\r
-               fw.close();\r
        }\r
 \r
-       static String getLocalJobDir() {\r
-               String locdir = ph.getProperty("local.tmp.directory");\r
-               if (locdir != null) {\r
-                       locdir.trim();\r
-               }\r
-               return locdir;\r
-       }\r
+       /*\r
+        * static void updateTime(File statFile) throws IOException { long lastMod =\r
+        * statFile.lastModified(); FileWriter fw = new FileWriter(statFile);\r
+        * fw.write(new Long(lastMod).toString()); fw.close(); }\r
+        */\r
 \r
        /**\r
         * \r
@@ -121,9 +137,6 @@ public class ExecutionStatCollector {
                // updateTime(new File(\r
                // "D:\\workspace\\JABA2\\jobsout\\AACon#170462904473672\\STARTED"));\r
 \r
-               String workDir = PropertyHelperManager.getLocalPath()\r
-                               + getLocalJobDir().trim();\r
-               System.out.println(workDir);\r
                File[] files = FileUtil.getFiles("Y:\\fc\\www-jws2\\jaba\\jobsout",\r
                                directories);\r
                List<JobStat> stats = new ArrayList<JobStat>();\r
@@ -147,7 +160,8 @@ public class ExecutionStatCollector {
        static FileFilter directories = new FileFilter() {\r
                @Override\r
                public boolean accept(File pathname) {\r
-                       return pathname.isDirectory();\r
+                       return pathname.isDirectory()\r
+                                       && !pathname.getName().startsWith(".");\r
                }\r
        };\r
 \r
@@ -310,4 +324,20 @@ public class ExecutionStatCollector {
                }\r
 \r
        }\r
+\r
+       @Override\r
+       public void run() {\r
+               log.info("Started updating statistics at " + new Date());\r
+\r
+               StatProcessor local_stats = getStats();\r
+               log.info("Found " + local_stats.getJobNumber() + " jobs!");\r
+               try {\r
+                       writeStatToDB();\r
+               } catch (SQLException e) {\r
+                       log.error("Fails to update jobs statistics database!");\r
+                       log.error(e.getLocalizedMessage(), e);\r
+               }\r
+               log.info("Finished updating statistics at " + new Date());\r
+       }\r
+\r
 }\r
index 55bf71a..0f851ce 100644 (file)
@@ -270,6 +270,18 @@ public class StatDB {
 \r
                return stats;\r
        }\r
+\r
+       /**\r
+        * Removes the job if\r
+        * \r
+        * 1) It has already been recorded\r
+        * \r
+        * 2) It has not completed and did not timeout - this is to prevent\r
+        * recording the information on the incomplete jobs.\r
+        * \r
+        * @param fsJobs\r
+        * @throws SQLException\r
+        */\r
        public void removeRecordedJobs(Set<JobStat> fsJobs) throws SQLException {\r
 \r
                String query = "select job_id from exec_stat";\r
@@ -288,7 +300,7 @@ public class StatDB {
                st.close();\r
        }\r
 \r
-       private static synchronized final void shutdownDBServer() {\r
+       public static synchronized final void shutdownDBServer() {\r
                // ## DATABASE SHUTDOWN SECTION ##\r
                /***\r
                 * In embedded mode, an application should shut down Derby. Shutdown\r
index 3ababa3..0ffe77d 100644 (file)
 package compbio.stat.servlet;\r
 \r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.ScheduledExecutorService;\r
+import java.util.concurrent.ScheduledFuture;\r
+import java.util.concurrent.TimeUnit;\r
+\r
 import javax.servlet.ServletContextEvent;\r
 import javax.servlet.ServletContextListener;\r
 \r
+import org.apache.log4j.Logger;\r
+\r
+import compbio.engine.conf.PropertyHelperManager;\r
+import compbio.stat.collector.ExecutionStatCollector;\r
+import compbio.stat.collector.StatDB;\r
+import compbio.util.PropertyHelper;\r
+\r
 public class StatisticCollector implements ServletContextListener {\r
 \r
+       static PropertyHelper ph = PropertyHelperManager.getPropertyHelper();\r
+\r
+       private final Logger log = Logger.getLogger(StatisticCollector.class);\r
+\r
+       private ScheduledFuture<?> localcf;\r
+       private ScheduledFuture<?> clustercf;\r
+       private ScheduledExecutorService executor;\r
+\r
        @Override\r
        public void contextDestroyed(ServletContextEvent arg0) {\r
-               // TODO Auto-generated method stub\r
-\r
+               try {\r
+                       if (localcf != null) {\r
+                               localcf.cancel(true);\r
+                       }\r
+                       if (clustercf != null) {\r
+                               clustercf.cancel(true);\r
+                       }\r
+                       executor.shutdown();\r
+                       executor.awaitTermination(3, TimeUnit.SECONDS);\r
+               } catch (InterruptedException e) {\r
+                       log.warn(e.getMessage(), e);\r
+               } finally {\r
+                       StatDB.shutdownDBServer();\r
+                       executor.shutdownNow();\r
+               }\r
        }\r
 \r
        @Override\r
        public void contextInitialized(ServletContextEvent arg0) {\r
-               // TODO Auto-generated method stub\r
+               String clusterWorkDir = getClusterJobDir();\r
+               int clusterMaxRuntime = getClusterJobTimeOut();\r
+\r
+               int localMaxRuntime = getLocalJobTimeOut();\r
+               String localWorkDir = getLocalJobDir();\r
+\r
+               log.info("Initializing statistics collector");\r
+               executor = Executors.newScheduledThreadPool(1);\r
+\r
+               if (collectClusterStats()) {\r
+                       ExecutionStatCollector clusterCollector = new ExecutionStatCollector(\r
+                                       clusterWorkDir, clusterMaxRuntime);\r
+                       clustercf = executor.scheduleAtFixedRate(clusterCollector, 60,\r
+                                       24 * 60, TimeUnit.MINUTES);\r
+                       log.info("Collecting cluster statistics ");\r
+               }\r
+               if (collectLocalStats()) {\r
+                       ExecutionStatCollector localCollector = new ExecutionStatCollector(\r
+                                       localWorkDir, localMaxRuntime);\r
+                       localcf = executor.scheduleAtFixedRate(localCollector, 10, 24 * 60,\r
+                                       TimeUnit.MINUTES);\r
+                       log.info("Collecting local statistics ");\r
+               }\r
+\r
+       }\r
+\r
+       static String getClusterJobDir() {\r
+               return getStringProperty(ph.getProperty("cluster.tmp.directory"));\r
+       }\r
+\r
+       static int getClusterJobTimeOut() {\r
+               int maxRunTime = 24 * 7;\r
+               String clusterMaxRuntime = ph.getProperty("cluster.stat.maxruntime");\r
+               if (clusterMaxRuntime != null) {\r
+                       clusterMaxRuntime = clusterMaxRuntime.trim();\r
+                       maxRunTime = Integer.parseInt(clusterMaxRuntime);\r
+               }\r
+               return maxRunTime;\r
+       }\r
+\r
+       static int getLocalJobTimeOut() {\r
+               int maxRunTime = 24;\r
+               String localMaxRuntime = ph.getProperty("local.stat.maxruntime");\r
+               if (localMaxRuntime != null) {\r
+                       localMaxRuntime = localMaxRuntime.trim();\r
+                       maxRunTime = Integer.parseInt(localMaxRuntime);\r
+               }\r
+\r
+               return maxRunTime;\r
+       }\r
+\r
+       static String getLocalJobDir() {\r
+               return getStringProperty(ph.getProperty("local.tmp.directory"));\r
+       }\r
+\r
+       private static String getStringProperty(String propName) {\r
+               String locdir = ph.getProperty(propName);\r
+               if (locdir != null) {\r
+                       locdir = locdir.trim();\r
+               }\r
+               return locdir;\r
+       }\r
+\r
+       static boolean collectClusterStats() {\r
+               return getBooleanProperty(ph\r
+                               .getProperty("cluster.stat.collector.enable"));\r
+\r
+       }\r
+\r
+       static boolean collectLocalStats() {\r
+               return getBooleanProperty(ph.getProperty("local.stat.collector.enable"));\r
+       }\r
 \r
+       private static boolean getBooleanProperty(String propName) {\r
+               boolean enabled = false;\r
+               if (propName != null) {\r
+                       propName = propName.trim();\r
+                       enabled = Boolean.parseBoolean(propName);\r
+               }\r
+               return enabled;\r
        }\r
 \r
 }\r