1 package compbio.stat.collector;
\r
4 import java.io.FileFilter;
\r
5 import java.io.IOException;
\r
6 import java.sql.SQLException;
\r
7 import java.text.SimpleDateFormat;
\r
8 import java.util.ArrayList;
\r
9 import java.util.Date;
\r
10 import java.util.HashMap;
\r
11 import java.util.HashSet;
\r
12 import java.util.List;
\r
13 import java.util.Map;
\r
14 import java.util.Set;
\r
16 import org.apache.log4j.Logger;
\r
18 import compbio.engine.client.ConfExecutable;
\r
19 import compbio.engine.client.Executable;
\r
20 import compbio.metadata.JobStatus;
\r
21 import compbio.runner.msa.ClustalW;
\r
22 import compbio.util.FileUtil;
\r
23 import compbio.ws.client.Services;
\r
26 * Number of runs of each WS = number of folders with name
\r
28 * Number of successful runs = all runs with no result file
\r
30 * Per period of time = limit per file creating time Runtime (avg/max) =
\r
32 * started time - finished time
\r
34 * Task & result size = result.size
\r
36 * Abandoned runs - not collected runs
\r
38 * Cancelled runs - cancelled
\r
40 * Cluster vs local runs
\r
42 * Reasons for failure = look in the err out?
\r
45 * Metadata required:
\r
47 * work directory for local and cluster tasks = from Helper or cmd parameter. WS
\r
48 * names - enumeration. Status file names and content.
\r
53 public class ExecutionStatCollector implements Runnable {
\r
55 static final int UNDEFINED = -1;
\r
57 private static final Logger log = Logger
\r
58 .getLogger(ExecutionStatCollector.class);
\r
60 static SimpleDateFormat DF = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");
\r
62 final private List<JobStat> stats;
\r
64 * Consider the job that has been working for longer than timeOutInHours
\r
65 * completed, whatever the outcome
\r
67 final private int timeOutInHours;
\r
70 * List subdirectories in the job directory
\r
72 * @param workDirectory
\r
73 * @param timeOutInHours
\r
75 public ExecutionStatCollector(String workDirectory, int timeOutInHours) {
\r
76 log.info("Starting stat collector for directory: " + workDirectory);
\r
77 log.info("Maximum allowed runtime(h): " + timeOutInHours);
\r
78 File[] files = FileUtil.getFiles(workDirectory, directories);
\r
79 stats = new ArrayList<JobStat>();
\r
80 assert timeOutInHours > 0;
\r
81 this.timeOutInHours = timeOutInHours;
\r
82 for (File file : files) {
\r
83 JobDirectory jd = new JobDirectory(file);
\r
84 JobStat jstat = jd.getJobStat();
\r
85 // Do not record stats on the job that has not completed yet
\r
86 if (hasCompleted(jd)) {
\r
89 log.debug("Skipping the job: " + jstat);
\r
90 log.debug("As it has not completed yet");
\r
92 // System.out.println(jd.getJobStat().getJobReportTabulated());
\r
96 boolean hasCompleted(JobDirectory jd) {
\r
97 JobStat jstat = jd.getJobStat();
\r
98 if (jstat.hasResult() || jstat.getIsCancelled()
\r
99 || jstat.getIsFinished() || hasTimedOut(jd)) {
\r
105 boolean hasTimedOut(JobDirectory jd) {
\r
106 return ((System.currentTimeMillis() - jd.jobdir.lastModified()) / (1000 * 60 * 60)) > timeOutInHours;
\r
109 public StatProcessor getStats() {
\r
110 return new StatProcessor(stats);
\r
113 public void writeStatToDB() throws SQLException {
\r
114 Set<JobStat> rjobs = new HashSet<JobStat>(stats);
\r
115 StatDB statdb = new StatDB();
\r
116 log.debug("Removing records that has already been recorded");
\r
118 statdb.removeRecordedJobs(rjobs);
\r
119 log.debug("New records left: " + rjobs.size());
\r
120 statdb.insertData(rjobs);
\r
124 * static void updateTime(File statFile) throws IOException { long lastMod =
\r
125 * statFile.lastModified(); FileWriter fw = new FileWriter(statFile);
\r
126 * fw.write(new Long(lastMod).toString()); fw.close(); }
\r
132 * @throws IOException
\r
133 * @throws SQLException
\r
135 public static void main(String[] args) throws IOException, SQLException {
\r
137 // updateTime(new File(
\r
138 // "D:\\workspace\\JABA2\\jobsout\\AACon#170462904473672\\STARTED"));
\r
140 File[] files = FileUtil.getFiles("Y:\\fc\\www-jws2\\jaba\\jobsout",
\r
142 List<JobStat> stats = new ArrayList<JobStat>();
\r
143 for (File file : files) {
\r
144 JobDirectory jd = new JobDirectory(file);
\r
145 stats.add(jd.getJobStat());
\r
146 // System.out.println(jd.getJobStat().getJobReportTabulated());
\r
148 StatProcessor sp = new StatProcessor(stats);
\r
149 System.out.println(sp.reportStat());
\r
150 System.out.println();
\r
151 System.out.println("!!!!!!!!!!!!!!!!!!");
\r
152 System.out.println();
\r
154 Set<JobStat> rjobs = new HashSet<JobStat>(sp.stats);
\r
155 StatDB statdb = new StatDB();
\r
156 statdb.removeRecordedJobs(rjobs);
\r
157 statdb.insertData(rjobs);
\r
160 static FileFilter directories = new FileFilter() {
\r
162 public boolean accept(File pathname) {
\r
163 return pathname.isDirectory()
\r
164 && !pathname.getName().startsWith(".");
\r
168 static class JobDirectory {
\r
171 Map<String, File> files = new HashMap<String, File>();
\r
173 public JobDirectory(File directory) {
\r
174 this.jobdir = directory;
\r
175 for (File f : jobdir.listFiles()) {
\r
176 files.put(f.getName(), f);
\r
180 public boolean hasStatus(JobStatus status) {
\r
181 return files.containsKey(status.toString());
\r
184 boolean isCollected() {
\r
185 return hasStatus(JobStatus.COLLECTED);
\r
188 boolean isCancelled() {
\r
189 return hasStatus(JobStatus.CANCELLED);
\r
192 long getStartTime() {
\r
193 long starttime = UNDEFINED;
\r
194 File startfile = files.get(JobStatus.STARTED.toString());
\r
195 if (startfile == null) {
\r
196 startfile = files.get(JobStatus.SUBMITTED.toString());
\r
198 if (startfile != null) {
\r
199 starttime = startfile.lastModified();
\r
201 * String start = FileUtil.readFileToString(startfile);
\r
202 * starttime = Long.parseLong(start.trim());
\r
208 String getClusterJobID() {
\r
209 String clustjobId = "";
\r
210 File jobid = files.get("JOBID");
\r
212 if (jobid != null) {
\r
213 clustjobId = FileUtil.readFileToString(jobid);
\r
215 } catch (IOException ioe) {
\r
216 ioe.printStackTrace();
\r
219 return clustjobId.trim();
\r
222 long getFinishedTime() {
\r
223 long ftime = UNDEFINED;
\r
224 File finished = files.get(JobStatus.FINISHED.toString());
\r
225 if (finished != null) {
\r
226 ftime = finished.lastModified();
\r
228 * String start = FileUtil.readFileToString(finished); ftime =
\r
229 * Long.parseLong(start.trim());
\r
231 // System.out.println("f " + ftime);
\r
234 * } catch (IOException e) { log.log(Level.WARN,
\r
235 * "Cannot parse finished time: " + e.getMessage(), e); } catch
\r
236 * (NumberFormatException e) { log.log(Level.WARN,
\r
237 * "Cannot parse finished time: " + e.getMessage(), e); }
\r
242 @SuppressWarnings("unchecked")
\r
243 Class<Executable<?>> getWSRunnerName() {
\r
244 String name = jobdir.getName().split("#")[0];
\r
246 if (name.startsWith(ConfExecutable.CLUSTER_TASK_ID_PREFIX)) {
\r
247 assert ConfExecutable.CLUSTER_TASK_ID_PREFIX.length() == 1;
\r
248 name = name.substring(1);
\r
250 name = ClustalW.class.getPackage().getName() + "." + name;
\r
251 return (Class<Executable<?>>) Class.forName(name);
\r
252 } catch (ClassNotFoundException e) {
\r
253 e.printStackTrace();
\r
254 throw new RuntimeException(
\r
255 "Cannot match the directory name to the executable! Executable name is "
\r
260 private Services getService() {
\r
261 return Services.getService(getWSRunnerName());
\r
264 // Mafft, Muscle, Tcoffee, Clustal task:fasta.in result:fasta.out
\r
265 // Probcons task:fasta.in result:alignment.out
\r
267 * TODO replace with Universal names for WS!
\r
269 long getResultSize() {
\r
270 Class<Executable<?>> name = getWSRunnerName();
\r
272 if (name.getSimpleName().equalsIgnoreCase("Probcons")) {
\r
273 f = files.get("alignment.out");
\r
274 } else if (name.getSimpleName().equalsIgnoreCase("ClustalW")) {
\r
275 f = files.get("output.txt");
\r
277 f = files.get("fasta.out");
\r
285 long getInputSize() {
\r
286 File input = files.get("fasta.in");
\r
287 if (input != null) {
\r
288 return input.length();
\r
293 JobStat getJobStat() {
\r
294 return JobStat.newInstance(getService(), getClusterJobID(),
\r
295 jobdir.getName(), getStartTime(), getFinishedTime(),
\r
296 getInputSize(), getResultSize(), isCancelled(),
\r
301 public int hashCode() {
\r
302 final int prime = 31;
\r
304 result = prime * result
\r
305 + ((jobdir == null) ? 0 : jobdir.hashCode());
\r
310 public boolean equals(Object obj) {
\r
315 if (getClass() != obj.getClass())
\r
317 JobDirectory other = (JobDirectory) obj;
\r
318 if (jobdir == null) {
\r
319 if (other.jobdir != null)
\r
321 } else if (!jobdir.equals(other.jobdir))
\r
329 public void run() {
\r
330 log.info("Started updating statistics at " + new Date());
\r
332 StatProcessor local_stats = getStats();
\r
333 log.info("Found " + local_stats.getJobNumber() + " jobs!");
\r
336 } catch (SQLException e) {
\r
337 log.error("Fails to update jobs statistics database!");
\r
338 log.error(e.getLocalizedMessage(), e);
\r
340 log.info("Finished updating statistics at " + new Date());
\r