1 package compbio.stat.collector;
\r
4 import java.io.FileFilter;
\r
5 import java.io.IOException;
\r
6 import java.sql.SQLException;
\r
7 import java.text.SimpleDateFormat;
\r
8 import java.util.ArrayList;
\r
9 import java.util.Date;
\r
10 import java.util.HashMap;
\r
11 import java.util.HashSet;
\r
12 import java.util.List;
\r
13 import java.util.Map;
\r
14 import java.util.Set;
\r
16 import org.apache.log4j.Logger;
\r
18 import compbio.engine.client.Executable;
\r
19 import compbio.engine.client.PathValidator;
\r
20 import compbio.metadata.JobStatus;
\r
21 import compbio.util.FileUtil;
\r
22 import compbio.ws.client.Services;
\r
25 * Number of runs of each WS = number of folders with name
\r
27 * Number of successful runs = all runs with no result file
\r
29 * Per period of time = limit per file creating time Runtime (avg/max) =
\r
31 * started time - finished time
\r
33 * Task & result size = result.size
\r
35 * Abandoned runs - not collected runs
\r
37 * Cancelled runs - cancelled
\r
39 * Cluster vs local runs
\r
41 * Reasons for failure = look in the err out?
\r
44 * Metadata required:
\r
46 * work directory for local and cluster tasks = from Helper or cmd parameter. WS
\r
47 * names - enumeration. Status file names and content.
\r
52 public class ExecutionStatCollector implements Runnable {
\r
54 static final int UNDEFINED = -1;
\r
56 private static final Logger log = Logger
\r
57 .getLogger(ExecutionStatCollector.class);
\r
59 static SimpleDateFormat DF = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");
\r
61 final private File workDirectory;
\r
62 final private List<JobStat> stats;
\r
64 * Consider the job that has been working for longer than timeOutInHours
\r
65 * completed, whatever the outcome
\r
67 final private int timeOutInHours;
\r
70 * List subdirectories in the job directory
\r
72 * @param workDirectory
\r
73 * @param timeOutInHours
\r
75 public ExecutionStatCollector(String workDirectory, int timeOutInHours) {
\r
76 log.info("Starting stat collector for directory: " + workDirectory);
\r
77 log.info("Maximum allowed runtime(h): " + timeOutInHours);
\r
78 if (!PathValidator.isValidDirectory(workDirectory)) {
\r
79 throw new IllegalArgumentException("workDirectory '"
\r
80 + workDirectory + "' does not exist!");
\r
82 this.workDirectory = new File(workDirectory);
\r
83 stats = new ArrayList<JobStat>();
\r
84 if (timeOutInHours <= 0) {
\r
85 throw new IllegalArgumentException(
\r
86 "Timeout value must be greater than 0! Given value: "
\r
89 this.timeOutInHours = timeOutInHours;
\r
92 boolean hasCompleted(JobDirectory jd) {
\r
93 JobStat jstat = jd.getJobStat();
\r
94 if (jstat.hasResult() || jstat.getIsCancelled()
\r
95 || jstat.getIsFinished() || hasTimedOut(jd)) {
\r
101 boolean hasTimedOut(JobDirectory jd) {
\r
102 return ((System.currentTimeMillis() - jd.jobdir.lastModified()) / (1000 * 60 * 60)) > timeOutInHours;
\r
105 /* Make sure that collectStatistics methods was called prior to calling this!
\r
106 * TODO consider running collectStatistics from here on the first call
\r
108 StatProcessor getStats() {
\r
109 if(stats.isEmpty()) {
\r
110 log.info("Please make sure collectStatistics method was called prior to calling getStats()!");
\r
112 return new StatProcessor(stats);
\r
115 void writeStatToDB() throws SQLException {
\r
116 Set<JobStat> rjobs = new HashSet<JobStat>(stats);
\r
117 StatDB statdb = new StatDB();
\r
118 log.debug("Removing records that has already been recorded");
\r
120 statdb.removeRecordedJobs(rjobs);
\r
121 log.debug("New records left: " + rjobs.size());
\r
122 statdb.insertData(rjobs);
\r
126 * static void updateTime(File statFile) throws IOException { long lastMod =
\r
127 * statFile.lastModified(); FileWriter fw = new FileWriter(statFile);
\r
128 * fw.write(new Long(lastMod).toString()); fw.close(); }
\r
134 public static void main(String[] args) throws IOException, SQLException {
\r
136 // updateTime(new File(
\r
137 // "D:\\workspace\\JABA2\\jobsout\\AACon#170462904473672\\STARTED"));
\r
139 File[] files = FileUtil.getFiles("Y:\\fc\\www-jws2\\jaba\\jobsout",
\r
141 List<JobStat> stats = new ArrayList<JobStat>();
\r
142 for (File file : files) {
\r
143 JobDirectory jd = new JobDirectory(file);
\r
144 stats.add(jd.getJobStat());
\r
145 // System.out.println(jd.getJobStat().getJobReportTabulated());
\r
147 StatProcessor sp = new StatProcessor(stats);
\r
148 System.out.println(sp.reportStat());
\r
149 System.out.println();
\r
150 System.out.println("!!!!!!!!!!!!!!!!!!");
\r
151 System.out.println();
\r
153 Set<JobStat> rjobs = new HashSet<JobStat>(sp.stats);
\r
154 StatDB statdb = new StatDB();
\r
155 statdb.removeRecordedJobs(rjobs);
\r
156 statdb.insertData(rjobs);
\r
159 static FileFilter directories = new FileFilter() {
\r
161 public boolean accept(File pathname) {
\r
162 return pathname.isDirectory()
\r
163 && !pathname.getName().startsWith(".");
\r
167 static class JobDirectory {
\r
170 Map<String, File> files = new HashMap<String, File>();
\r
172 JobDirectory(File directory) {
\r
173 this.jobdir = directory;
\r
174 for (File f : jobdir.listFiles()) {
\r
175 files.put(f.getName(), f);
\r
179 boolean hasStatus(JobStatus status) {
\r
180 return files.containsKey(status.toString());
\r
183 boolean isCollected() {
\r
184 return hasStatus(JobStatus.COLLECTED);
\r
187 boolean isCancelled() {
\r
188 return hasStatus(JobStatus.CANCELLED);
\r
191 long getStartTime() {
\r
192 long starttime = UNDEFINED;
\r
193 File startfile = files.get(JobStatus.STARTED.toString());
\r
194 if (startfile == null) {
\r
195 startfile = files.get(JobStatus.SUBMITTED.toString());
\r
197 if (startfile != null) {
\r
198 starttime = startfile.lastModified();
\r
200 * String start = FileUtil.readFileToString(startfile);
\r
201 * starttime = Long.parseLong(start.trim());
\r
207 String getClusterJobID() {
\r
208 String clustjobId = "";
\r
209 File jobid = files.get("JOBID");
\r
211 if (jobid != null) {
\r
212 clustjobId = FileUtil.readFileToString(jobid);
\r
214 } catch (IOException ioe) {
\r
216 "IO Exception while reading the content of JOBID file for job "
\r
219 return clustjobId.trim();
\r
222 long getFinishedTime() {
\r
223 long ftime = UNDEFINED;
\r
224 File finished = files.get(JobStatus.FINISHED.toString());
\r
225 if (finished != null) {
\r
226 ftime = finished.lastModified();
\r
228 * String start = FileUtil.readFileToString(finished); ftime =
\r
229 * Long.parseLong(start.trim());
\r
231 // System.out.println("f " + ftime);
\r
234 * } catch (IOException e) { log.log(Level.WARN,
\r
235 * "Cannot parse finished time: " + e.getMessage(), e); } catch
\r
236 * (NumberFormatException e) { log.log(Level.WARN,
\r
237 * "Cannot parse finished time: " + e.getMessage(), e); }
\r
242 private Services getService() {
\r
243 return Services.getServiceByJobDirectory(jobdir);
\r
246 // Mafft, Muscle, Tcoffee, Clustal task:fasta.in result:fasta.out
\r
247 // Probcons task:fasta.in result:alignment.out
\r
249 * TODO replace with Universal names for WS!
\r
251 long getResultSize() {
\r
252 Class<? extends Executable<?>> name = Services
\r
253 .getRunnerByJobDirectory(jobdir);
\r
256 if (name.getSimpleName().equalsIgnoreCase("Probcons")) {
\r
257 f = files.get("alignment.out");
\r
258 } else if (name.getSimpleName().equalsIgnoreCase("ClustalW")) {
\r
259 f = files.get("output.txt");
\r
261 f = files.get("fasta.out");
\r
270 * TODO unify input!
\r
272 long getInputSize() {
\r
273 Class<? extends Executable<?>> name = Services
\r
274 .getRunnerByJobDirectory(jobdir);
\r
277 if (name.getSimpleName().equalsIgnoreCase("ClustalW")) {
\r
278 input = files.get("input.txt");
\r
280 input = files.get("fasta.in");
\r
283 if (input != null) {
\r
284 return input.length();
\r
289 JobStat getJobStat() {
\r
290 return JobStat.newInstance(getService(), getClusterJobID(),
\r
291 jobdir.getName(), getStartTime(), getFinishedTime(),
\r
292 getInputSize(), getResultSize(), isCancelled(),
\r
297 public int hashCode() {
\r
298 final int prime = 31;
\r
300 result = prime * result
\r
301 + ((jobdir == null) ? 0 : jobdir.hashCode());
\r
306 public boolean equals(Object obj) {
\r
311 if (getClass() != obj.getClass())
\r
313 JobDirectory other = (JobDirectory) obj;
\r
314 if (jobdir == null) {
\r
315 if (other.jobdir != null)
\r
317 } else if (!jobdir.equals(other.jobdir))
\r
323 void collectStatistics() {
\r
324 File[] files = workDirectory.listFiles(directories);
\r
325 for (File file : files) {
\r
326 JobDirectory jd = new JobDirectory(file);
\r
327 JobStat jstat = jd.getJobStat();
\r
328 // Do not record stats on the job that has not completed yet
\r
329 if (hasCompleted(jd)) {
\r
332 log.debug("Skipping the job: " + jstat);
\r
333 log.debug("As it has not completed yet");
\r
335 // System.out.println(jd.getJobStat().getJobReportTabulated());
\r
340 public void run() {
\r
341 log.info("Started updating statistics at " + new Date());
\r
342 log.info("For directory: " + workDirectory.getAbsolutePath());
\r
344 collectStatistics();
\r
346 StatProcessor local_stats = getStats();
\r
347 log.info("Found " + local_stats.getJobNumber() + " jobs!");
\r
350 } catch (SQLException e) {
\r
351 log.error("Fails to update jobs statistics database!");
\r
352 log.error(e.getLocalizedMessage(), e);
\r
354 log.info("Finished updating statistics at " + new Date());
\r