+ String clusterWorkDir = getClusterJobDir();\r
+ int clusterMaxRuntime = getClusterJobTimeOut();\r
+\r
+ int localMaxRuntime = getLocalJobTimeOut();\r
+ String localWorkDir = getLocalJobDir();\r
+\r
+ log.info("Initializing statistics collector");\r
+ executor = Executors.newScheduledThreadPool(1);\r
+\r
+ if (collectClusterStats()) {\r
+ ExecutionStatCollector clusterCollector = new ExecutionStatCollector(\r
+ clusterWorkDir, clusterMaxRuntime);\r
+ clustercf = executor.scheduleAtFixedRate(clusterCollector, 60,\r
+ 24 * 60, TimeUnit.MINUTES);\r
+ log.info("Collecting cluster statistics ");\r
+ }\r
+ if (collectLocalStats()) {\r
+ ExecutionStatCollector localCollector = new ExecutionStatCollector(\r
+ localWorkDir, localMaxRuntime);\r
+ localcf = executor.scheduleAtFixedRate(localCollector, 10, 24 * 60,\r
+ TimeUnit.MINUTES);\r
+ log.info("Collecting local statistics ");\r
+ }\r
+\r
+ }\r
+\r
+ static String getClusterJobDir() {\r
+ return getStringProperty(ph.getProperty("cluster.tmp.directory"));\r
+ }\r
+\r
+ static int getClusterJobTimeOut() {\r
+ int maxRunTime = 24 * 7;\r
+ String clusterMaxRuntime = ph.getProperty("cluster.stat.maxruntime");\r
+ if (clusterMaxRuntime != null) {\r
+ clusterMaxRuntime = clusterMaxRuntime.trim();\r
+ maxRunTime = Integer.parseInt(clusterMaxRuntime);\r
+ }\r
+ return maxRunTime;\r
+ }\r
+\r
+ static int getLocalJobTimeOut() {\r
+ int maxRunTime = 24;\r
+ String localMaxRuntime = ph.getProperty("local.stat.maxruntime");\r
+ if (localMaxRuntime != null) {\r
+ localMaxRuntime = localMaxRuntime.trim();\r
+ maxRunTime = Integer.parseInt(localMaxRuntime);\r
+ }\r
+\r
+ return maxRunTime;\r
+ }\r
+\r
+ static String getLocalJobDir() {\r
+ return getStringProperty(ph.getProperty("local.tmp.directory"));\r
+ }\r
+\r
+ private static String getStringProperty(String propName) {\r
+ String locdir = ph.getProperty(propName);\r
+ if (locdir != null) {\r
+ locdir = locdir.trim();\r
+ }\r
+ return locdir;\r
+ }\r
+\r
+ static boolean collectClusterStats() {\r
+ return getBooleanProperty(ph\r
+ .getProperty("cluster.stat.collector.enable"));\r
+\r
+ }\r
+\r
+ static boolean collectLocalStats() {\r
+ return getBooleanProperty(ph.getProperty("local.stat.collector.enable"));\r
+ }\r