From 52edb573d0491d34b3951c18bced4c3e00a58d13 Mon Sep 17 00:00:00 2001 From: Sasha Sherstnev Date: Tue, 12 Nov 2013 18:30:45 +0000 Subject: [PATCH 1/1] RIntroduce Bean for Jpred job --- datadb/compbio/cassandra/CassandraReader.java | 3 - datadb/compbio/cassandra/CassandraWriter.java | 62 +++++++------ datadb/compbio/cassandra/JpredParserHTTP.java | 98 ++++++-------------- datadb/compbio/cassandra/JpredParserLocalFile.java | 2 +- 4 files changed, 61 insertions(+), 104 deletions(-) diff --git a/datadb/compbio/cassandra/CassandraReader.java b/datadb/compbio/cassandra/CassandraReader.java index c7d08bf..af697a0 100644 --- a/datadb/compbio/cassandra/CassandraReader.java +++ b/datadb/compbio/cassandra/CassandraReader.java @@ -12,9 +12,6 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.ResultSet; -import compbio.engine.ProteoCachePropertyHelperManager; -import compbio.util.PropertyHelper; - public class CassandraReader { private Session session; private static Logger log = Logger.getLogger(CassandraNativeConnector.class); diff --git a/datadb/compbio/cassandra/CassandraWriter.java b/datadb/compbio/cassandra/CassandraWriter.java index c478d68..48bbda7 100644 --- a/datadb/compbio/cassandra/CassandraWriter.java +++ b/datadb/compbio/cassandra/CassandraWriter.java @@ -1,6 +1,5 @@ package compbio.cassandra; -import java.io.IOException; import java.util.List; import org.apache.log4j.Logger; @@ -8,9 +7,8 @@ import org.apache.log4j.Logger; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.BoundStatement; +import compbio.engine.JpredJob; import compbio.engine.ProteoCachePropertyHelperManager; import compbio.util.PropertyHelper; @@ -48,20 +46,25 @@ public class CassandraWriter { /* * inserting data into the tables for queries */ - public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, - String statusFinal, String protein, List predictions) { - if (JobisNotInsterted(jobid)) { + public int FormQueryTables(JpredJob job) { + if (JobisNotInsterted(job.getJobID())) { + String id = job.getJobID(); + String ip = job.getIP(); + String protein = job.getProtein(); + String finalstatus = job.getFinalStatus(); + String execstatus = job.getExecutionStatus(); String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" - + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx - + "','" + protein + "');"; + + " VALUES ('" + id + "','" + ip + "','" + job.getStartingTimeStr() + "','" + job.getEndTimeStr() + "','" + finalstatus + + "','" + execstatus + "','" + protein + "');"; session.execute(com1); - String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein - + "');"; + String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + job.getStartingDate() + ",'" + id + + "','" + protein + "');"; session.execute(com2); String allpredictions = ""; - for (FastaSequence pred : predictions) { + List pr = job.getPredictions(); + for (FastaSequence pred : pr) { String predictionname = pred.getId(); String prediction = pred.getSequence().replaceAll("\n", ""); allpredictions += "'" + predictionname + "':'" + prediction + "',"; @@ -71,10 +74,10 @@ public class CassandraWriter { final_prediction = allpredictions.substring(0, allpredictions.length() - 1); } - String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';"; + String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + job.getJobID() + "';"; ResultSet results2 = session.execute(check2); if (results2.isExhausted()) { - String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{" + String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + id + "',{" + final_prediction + "});"; session.execute(com3); } @@ -85,15 +88,15 @@ public class CassandraWriter { boolean updateparameter = true; if (!results3.isExhausted()) { Row r = results3.one(); - if (jobtime >= Long.parseLong(r.getString("Value"))) + if (job.getStartingDate() >= Long.parseLong(r.getString("Value"))) updateparameter = false; } if (updateparameter) { - String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + String.valueOf(jobtime) + String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + job.getStartingDateStr() + "');"; session.execute(com); } - String check4 = "SELECT * FROM JobDateInfo WHERE jobday = " + jobtime + ";"; + String check4 = "SELECT * FROM JobDateInfo WHERE jobday = " + job.getStartingDate() + ";"; ResultSet results4 = session.execute(check4); updateparameter = true; int njobs = 1; @@ -101,7 +104,7 @@ public class CassandraWriter { Row r = results4.one(); njobs += r.getLong("Total"); } - String com = "INSERT INTO JobDateInfo " + "(jobday, Total)" + " VALUES (" + jobtime + "," + njobs + ");"; + String com = "INSERT INTO JobDateInfo " + "(jobday, Total)" + " VALUES (" + job.getStartingDate() + "," + njobs + ");"; session.execute(com); return 1; @@ -113,26 +116,25 @@ public class CassandraWriter { * insert data from a real Jpred job: timing+IP, Execution Status, Final * status, protein sequence, predictions, alignment, LOG and tar.gz files */ - public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein, - List predictions, List seqs, String LogFile, String archivepath) { - if (JobisNotArchived(jobid)) { - String log = LogFile.replaceAll("'", ""); - session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein - + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');"); - if (false) { - PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);"); - BoundStatement boundStatement = new BoundStatement(statement); - session.execute(boundStatement.bind(jobid, archivepath)); - } + public int ArchiveData(JpredJob job, String archivepath) { + if (JobisNotArchived(job.getJobID())) { + String id = job.getJobID(); + String log = job.getLog().replaceAll("'", ""); + String com = "INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG, ArchiveLink) VALUES ('" + id + "','" + + job.getProtein() + "','" + job.getIP() + "'," + job.getStartingTime() + "," + job.getExecutionTime() + ",'" + log + + "','" + archivepath + "');"; + session.execute(com); + List predictions = job.getPredictions(); for (FastaSequence p : predictions) { session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'" - + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); + + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + id + "';"); } + List seqs = job.getAlignment(); for (FastaSequence s : seqs) { session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'" - + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); + + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + id + "';"); } return 1; } diff --git a/datadb/compbio/cassandra/JpredParserHTTP.java b/datadb/compbio/cassandra/JpredParserHTTP.java index bf4c460..e53ddd8 100644 --- a/datadb/compbio/cassandra/JpredParserHTTP.java +++ b/datadb/compbio/cassandra/JpredParserHTTP.java @@ -1,8 +1,6 @@ package compbio.cassandra; import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -11,14 +9,13 @@ import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; import compbio.cassandra.JpredParser; +import compbio.engine.JpredJob; public class JpredParserHTTP implements JpredParser { private CassandraWriter cw = new CassandraWriter(); @@ -44,10 +41,7 @@ public class JpredParserHTTP implements JpredParser { cal.add(Calendar.DATE, -nDays); for (int i = 0; i < nDays; ++i) { cal.add(Calendar.DATE, 1); - int month = cal.get(Calendar.MONTH) + 1; - int year = cal.get(Calendar.YEAR); - int day = cal.get(Calendar.DATE); - String date = year + "/" + month + "/" + day; + String date = cal.get(Calendar.YEAR) + "/" + (cal.get(Calendar.MONTH) + 1) + "/" + cal.get(Calendar.DATE); ParsingForDate(source, date); } } @@ -89,59 +83,25 @@ public class JpredParserHTTP implements JpredParser { return out; } - private List parseArchiveFile(final InputStream stream) throws IOException { - DataInputStream data_in = new DataInputStream(stream); - List out = new ArrayList(); - while (true) { - try { - out.add(data_in.readByte()); - } catch (EOFException eof) { - break; - } - } - return out; - } - - private int analyseJob(String[] job) throws IOException { + private int analyseJob(String[] jobinfo) throws IOException { boolean running = true; boolean ConcisefileExists = false; boolean LogfileExists = false; - String id = job[job.length - 1]; - String startdatestring = job[0].substring(0, job[0].indexOf(":")); - Date startdate = new Date(0); - Date starttime = new Date(0); - Date endtime = new Date(0); + JpredJob job = new JpredJob (jobinfo[jobinfo.length - 1], jobinfo[0], jobinfo[1]); + job.setIP(jobinfo[2]); Date currDate = new Date(); - String ip = job[2]; - String execstatus = "OK"; - String finalstatus = "OK"; - String protein = ""; - long exectime = 0; - String log = ""; - String maindir = dirprefix + "/" + id + "/"; - String concisefile = dirprefix + "/" + id + "/" + id + ".concise.fasta"; - String archivefile = dirprefix + "/" + id + "/" + id + ".tar.gz"; - String logfile = dirprefix + "/" + id + "/LOG"; - SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd"); - SimpleDateFormat timeformatter = new SimpleDateFormat("yyyy/MM/dd:H:m:s"); - try { - startdate = dateformatter.parse(startdatestring); - starttime = timeformatter.parse(job[0]); - endtime = timeformatter.parse(job[1]); - exectime = (endtime.getTime() - starttime.getTime()) / 1000; - } catch (ParseException e) { - e.printStackTrace(); - } + String maindir = dirprefix + "/" + job.getJobID() + "/"; + //System.out.println("analyzing job " + job.getJobID()); try { URL dirurl = new URL(maindir); HttpURLConnection httpConnection_dirurl = (HttpURLConnection) dirurl.openConnection(); if (httpConnection_dirurl.getResponseCode() < 199 || 300 <= httpConnection_dirurl.getResponseCode()) { return 0; } - URL conciseurl = new URL(concisefile); - URL archiveurl = new URL(archivefile); - URL logurl = new URL(logfile); + URL conciseurl = new URL(maindir + job.getJobID() + ".concise.fasta"); + URL archiveurl = new URL(maindir + job.getJobID() + ".tar.gz"); + URL logurl = new URL(maindir + "LOG"); HttpURLConnection httpConnection_conciseurl = (HttpURLConnection) conciseurl.openConnection(); HttpURLConnection httpConnection_logurl = (HttpURLConnection) logurl.openConnection(); HttpURLConnection httpConnection_archiveurl = (HttpURLConnection) archiveurl.openConnection(); @@ -149,39 +109,37 @@ public class JpredParserHTTP implements JpredParser { ConcisefileExists = true; running = false; try { - protein = parsePredictions(conciseurl.openStream(), id); + job.setProtein(parsePredictions(conciseurl.openStream(), job.getJobID())); } catch (IOException e) { e.printStackTrace(); } } else { // The job still can be running of failed... ++countNoData; - alignment = new ArrayList(); - predictions = new ArrayList(); } if (199 < httpConnection_logurl.getResponseCode() && httpConnection_logurl.getResponseCode() < 300) { LogfileExists = true; - log = parseLogFile(logurl.openStream()); + job.setLog(parseLogFile(logurl.openStream())); } else { // The job has not been started at all... - execstatus = "FAIL"; - finalstatus = "STOPPED"; + job.setExecutionStatus("FAIL"); + job.setFinalStatus("STOPPED"); running = false; } - if (log.matches("(.*)TIMEOUT\\syour\\sjob\\stimed\\sout(.*)")) { + if (job.getLog().matches("(.*)TIMEOUT\\syour\\sjob\\stimed\\sout(.*)")) { // blast job was too long (more than 3600 secs by default)... - execstatus = "FAIL"; - finalstatus = "TIMEDOUT"; + job.setExecutionStatus("FAIL"); + job.setFinalStatus("TIMEDOUT"); running = false; - } else if (log.matches("(.*)Jpred\\serror:\\sDied\\sat(.*)")) { + } else if (job.getLog().matches("(.*)Jpred\\serror:\\sDied\\sat(.*)")) { // an internal Jpred error... - execstatus = "FAIL"; - finalstatus = "JPREDERROR"; + job.setExecutionStatus("FAIL"); + job.setFinalStatus("JPREDERROR"); running = false; - } else if ((currDate.getTime() - endtime.getTime()) / 1000 > 3601 && LogfileExists && !ConcisefileExists) { + } else if ((currDate.getTime() - job.getEndTime()) / 1000 > 3601 && LogfileExists && !ConcisefileExists) { // the job was stopped with unknown reason... - execstatus = "FAIL"; - finalstatus = "STOPPED"; + job.setExecutionStatus("FAIL"); + job.setFinalStatus("STOPPED"); running = false; } @@ -193,12 +151,12 @@ public class JpredParserHTTP implements JpredParser { } if (!running) { - long t = startdate.getTime(); - cw.FormQueryTables(t, job[0], job[1], ip, id, execstatus, finalstatus, protein, predictions); - cw.ArchiveData(t, exectime, ip, id, execstatus, finalstatus, protein, predictions, alignment, log, archivefile); + job.setAlignment(alignment); + job.setPredictions(predictions); + cw.FormQueryTables(job); + cw.ArchiveData(job, "undefined"); return 1; - } else - System.out.println("job " + id + " is running"); + } return 0; } diff --git a/datadb/compbio/cassandra/JpredParserLocalFile.java b/datadb/compbio/cassandra/JpredParserLocalFile.java index a379d4e..d4a22e6 100644 --- a/datadb/compbio/cassandra/JpredParserLocalFile.java +++ b/datadb/compbio/cassandra/JpredParserLocalFile.java @@ -103,7 +103,7 @@ public class JpredParserLocalFile implements JpredParser { } catch (ParseException e) { e.printStackTrace(); } - countinsertions += cw.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs); + //countinsertions += cw.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs); } fr.close(); } catch (IOException e) { -- 1.7.10.2