RIntroduce Bean for Jpred job
authorSasha Sherstnev <a.sherstnev@dundee.ac.uk>
Tue, 12 Nov 2013 18:30:45 +0000 (18:30 +0000)
committerSasha Sherstnev <a.sherstnev@dundee.ac.uk>
Tue, 12 Nov 2013 18:30:45 +0000 (18:30 +0000)
datadb/compbio/cassandra/CassandraReader.java
datadb/compbio/cassandra/CassandraWriter.java
datadb/compbio/cassandra/JpredParserHTTP.java
datadb/compbio/cassandra/JpredParserLocalFile.java

index c7d08bf..af697a0 100644 (file)
@@ -12,9 +12,6 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.ResultSet;
 
-import compbio.engine.ProteoCachePropertyHelperManager;
-import compbio.util.PropertyHelper;
-
 public class CassandraReader {
        private Session session;
        private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
index c478d68..48bbda7 100644 (file)
@@ -1,6 +1,5 @@
 package compbio.cassandra;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.log4j.Logger;
@@ -8,9 +7,8 @@ import org.apache.log4j.Logger;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.BoundStatement;
 
+import compbio.engine.JpredJob;
 import compbio.engine.ProteoCachePropertyHelperManager;
 import compbio.util.PropertyHelper;
 
@@ -48,20 +46,25 @@ public class CassandraWriter {
        /*
         * inserting data into the tables for queries
         */
-       public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
-                       String statusFinal, String protein, List<FastaSequence> predictions) {
-               if (JobisNotInsterted(jobid)) {
+       public int FormQueryTables(JpredJob job) {
+               if (JobisNotInsterted(job.getJobID())) {
+                       String id = job.getJobID();
+                       String ip = job.getIP();
+                       String protein = job.getProtein();
+                       String finalstatus = job.getFinalStatus();
+                       String execstatus = job.getExecutionStatus();
                        String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
-                                       + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
-                                       + "','" + protein + "');";
+                                       + " VALUES ('" + id + "','" + ip + "','" + job.getStartingTimeStr() + "','" + job.getEndTimeStr() + "','" + finalstatus
+                                       + "','" + execstatus + "','" + protein + "');";
                        session.execute(com1);
 
-                       String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
-                                       + "');";
+                       String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + job.getStartingDate() + ",'" + id
+                                       + "','" + protein + "');";
                        session.execute(com2);
 
                        String allpredictions = "";
-                       for (FastaSequence pred : predictions) {
+                       List<FastaSequence> pr = job.getPredictions();
+                       for (FastaSequence pred : pr) {
                                String predictionname = pred.getId();
                                String prediction = pred.getSequence().replaceAll("\n", "");
                                allpredictions += "'" + predictionname + "':'" + prediction + "',";
@@ -71,10 +74,10 @@ public class CassandraWriter {
                                final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
                        }
 
-                       String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
+                       String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + job.getJobID() + "';";
                        ResultSet results2 = session.execute(check2);
                        if (results2.isExhausted()) {
-                               String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
+                               String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + id + "',{"
                                                + final_prediction + "});";
                                session.execute(com3);
                        }
@@ -85,15 +88,15 @@ public class CassandraWriter {
                        boolean updateparameter = true;
                        if (!results3.isExhausted()) {
                                Row r = results3.one();
-                               if (jobtime >= Long.parseLong(r.getString("Value")))
+                               if (job.getStartingDate() >= Long.parseLong(r.getString("Value")))
                                        updateparameter = false;
                        }
                        if (updateparameter) {
-                               String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + String.valueOf(jobtime)
+                               String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + job.getStartingDateStr()
                                                + "');";
                                session.execute(com);
                        }
-                       String check4 = "SELECT * FROM JobDateInfo WHERE jobday = " + jobtime + ";";
+                       String check4 = "SELECT * FROM JobDateInfo WHERE jobday = " + job.getStartingDate() + ";";
                        ResultSet results4 = session.execute(check4);
                        updateparameter = true;
                        int njobs = 1;
@@ -101,7 +104,7 @@ public class CassandraWriter {
                                Row r = results4.one();
                                njobs += r.getLong("Total");
                        }
-                       String com = "INSERT INTO JobDateInfo " + "(jobday, Total)" + " VALUES (" + jobtime + "," + njobs + ");";
+                       String com = "INSERT INTO JobDateInfo " + "(jobday, Total)" + " VALUES (" + job.getStartingDate() + "," + njobs + ");";
                        session.execute(com);
 
                        return 1;
@@ -113,26 +116,25 @@ public class CassandraWriter {
         * insert data from a real Jpred job: timing+IP, Execution Status, Final
         * status, protein sequence, predictions, alignment, LOG and tar.gz files
         */
-       public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
-                       List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
-               if (JobisNotArchived(jobid)) {
-                       String log = LogFile.replaceAll("'", "");
-                       session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
-                                       + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
-                       if (false) {
-                               PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
-                               BoundStatement boundStatement = new BoundStatement(statement);
-                               session.execute(boundStatement.bind(jobid, archivepath));
-                       }
+       public int ArchiveData(JpredJob job, String archivepath) {
+               if (JobisNotArchived(job.getJobID())) {
+                       String id = job.getJobID();
+                       String log = job.getLog().replaceAll("'", "");
+                       String com = "INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG, ArchiveLink) VALUES ('" + id + "','"
+                                       + job.getProtein() + "','" + job.getIP() + "'," + job.getStartingTime() + "," + job.getExecutionTime() + ",'" + log
+                                       + "','" + archivepath + "');";
+                       session.execute(com);
 
+                       List<FastaSequence> predictions = job.getPredictions();
                        for (FastaSequence p : predictions) {
                                session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
-                                               + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+                                               + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + id + "';");
                        }
 
+                       List<FastaSequence> seqs = job.getAlignment();
                        for (FastaSequence s : seqs) {
                                session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
-                                               + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+                                               + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + id + "';");
                        }
                        return 1;
                }
index bf4c460..e53ddd8 100644 (file)
@@ -1,8 +1,6 @@
 package compbio.cassandra;
 
 import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -11,14 +9,13 @@ import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLConnection;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.List;
 
 import compbio.cassandra.JpredParser;
+import compbio.engine.JpredJob;
 
 public class JpredParserHTTP implements JpredParser {
        private CassandraWriter cw = new CassandraWriter();
@@ -44,10 +41,7 @@ public class JpredParserHTTP implements JpredParser {
                cal.add(Calendar.DATE, -nDays);
                for (int i = 0; i < nDays; ++i) {
                        cal.add(Calendar.DATE, 1);
-                       int month = cal.get(Calendar.MONTH) + 1;
-                       int year = cal.get(Calendar.YEAR);
-                       int day = cal.get(Calendar.DATE);
-                       String date = year + "/" + month + "/" + day;
+                       String date = cal.get(Calendar.YEAR) + "/" + (cal.get(Calendar.MONTH) + 1) + "/" + cal.get(Calendar.DATE);
                        ParsingForDate(source, date);
                }
        }
@@ -89,59 +83,25 @@ public class JpredParserHTTP implements JpredParser {
                return out;
        }
 
-       private List<Byte> parseArchiveFile(final InputStream stream) throws IOException {
-               DataInputStream data_in = new DataInputStream(stream);
-               List<Byte> out = new ArrayList<Byte>();
-               while (true) {
-                       try {
-                               out.add(data_in.readByte());
-                       } catch (EOFException eof) {
-                               break;
-                       }
-               }
-               return out;
-       }
-
-       private int analyseJob(String[] job) throws IOException {
+       private int analyseJob(String[] jobinfo) throws IOException {
                boolean running = true;
                boolean ConcisefileExists = false;
                boolean LogfileExists = false;
-               String id = job[job.length - 1];
-               String startdatestring = job[0].substring(0, job[0].indexOf(":"));
-               Date startdate = new Date(0);
-               Date starttime = new Date(0);
-               Date endtime = new Date(0);
+               JpredJob job = new JpredJob (jobinfo[jobinfo.length - 1], jobinfo[0], jobinfo[1]);
+               job.setIP(jobinfo[2]);
                Date currDate = new Date();
-               String ip = job[2];
-               String execstatus = "OK";
-               String finalstatus = "OK";
-               String protein = "";
-               long exectime = 0;
-               String log = "";
-               String maindir = dirprefix + "/" + id + "/";
-               String concisefile = dirprefix + "/" + id + "/" + id + ".concise.fasta";
-               String archivefile = dirprefix + "/" + id + "/" + id + ".tar.gz";
-               String logfile = dirprefix + "/" + id + "/LOG";
-               SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd");
-               SimpleDateFormat timeformatter = new SimpleDateFormat("yyyy/MM/dd:H:m:s");
-               try {
-                       startdate = dateformatter.parse(startdatestring);
-                       starttime = timeformatter.parse(job[0]);
-                       endtime = timeformatter.parse(job[1]);
-                       exectime = (endtime.getTime() - starttime.getTime()) / 1000;
-               } catch (ParseException e) {
-                       e.printStackTrace();
-               }
+               String maindir = dirprefix + "/" + job.getJobID() + "/";
 
+               //System.out.println("analyzing job " + job.getJobID());
                try {
                        URL dirurl = new URL(maindir);
                        HttpURLConnection httpConnection_dirurl = (HttpURLConnection) dirurl.openConnection();
                        if (httpConnection_dirurl.getResponseCode() < 199 || 300 <= httpConnection_dirurl.getResponseCode()) {
                                return 0;
                        }
-                       URL conciseurl = new URL(concisefile);
-                       URL archiveurl = new URL(archivefile);
-                       URL logurl = new URL(logfile);
+                       URL conciseurl = new URL(maindir + job.getJobID() + ".concise.fasta");
+                       URL archiveurl = new URL(maindir + job.getJobID() + ".tar.gz");
+                       URL logurl = new URL(maindir + "LOG");
                        HttpURLConnection httpConnection_conciseurl = (HttpURLConnection) conciseurl.openConnection();
                        HttpURLConnection httpConnection_logurl = (HttpURLConnection) logurl.openConnection();
                        HttpURLConnection httpConnection_archiveurl = (HttpURLConnection) archiveurl.openConnection();
@@ -149,39 +109,37 @@ public class JpredParserHTTP implements JpredParser {
                                ConcisefileExists = true;
                                running = false;
                                try {
-                                       protein = parsePredictions(conciseurl.openStream(), id);
+                                       job.setProtein(parsePredictions(conciseurl.openStream(), job.getJobID()));
                                } catch (IOException e) {
                                        e.printStackTrace();
                                }
                        } else {
                                // The job still can be running of failed...
                                ++countNoData;
-                               alignment = new ArrayList<FastaSequence>();
-                               predictions = new ArrayList<FastaSequence>();
                        }
                        if (199 < httpConnection_logurl.getResponseCode() && httpConnection_logurl.getResponseCode() < 300) {
                                LogfileExists = true;
-                               log = parseLogFile(logurl.openStream());
+                               job.setLog(parseLogFile(logurl.openStream()));
                        } else {
                                // The job has not been started at all...
-                               execstatus = "FAIL";
-                               finalstatus = "STOPPED";
+                               job.setExecutionStatus("FAIL");
+                               job.setFinalStatus("STOPPED");
                                running = false;
                        }
-                       if (log.matches("(.*)TIMEOUT\\syour\\sjob\\stimed\\sout(.*)")) {
+                       if (job.getLog().matches("(.*)TIMEOUT\\syour\\sjob\\stimed\\sout(.*)")) {
                                // blast job was too long (more than 3600 secs by default)...
-                               execstatus = "FAIL";
-                               finalstatus = "TIMEDOUT";
+                               job.setExecutionStatus("FAIL");
+                               job.setFinalStatus("TIMEDOUT");
                                running = false;
-                       } else if (log.matches("(.*)Jpred\\serror:\\sDied\\sat(.*)")) {
+                       } else if (job.getLog().matches("(.*)Jpred\\serror:\\sDied\\sat(.*)")) {
                                // an internal Jpred error...
-                               execstatus = "FAIL";
-                               finalstatus = "JPREDERROR";
+                               job.setExecutionStatus("FAIL");
+                               job.setFinalStatus("JPREDERROR");
                                running = false;
-                       } else if ((currDate.getTime() - endtime.getTime()) / 1000 > 3601 && LogfileExists && !ConcisefileExists) {
+                       } else if ((currDate.getTime() - job.getEndTime()) / 1000 > 3601 && LogfileExists && !ConcisefileExists) {
                                // the job was stopped with unknown reason...
-                               execstatus = "FAIL";
-                               finalstatus = "STOPPED";
+                               job.setExecutionStatus("FAIL");
+                               job.setFinalStatus("STOPPED");
                                running = false;
                        }
 
@@ -193,12 +151,12 @@ public class JpredParserHTTP implements JpredParser {
                }
 
                if (!running) {
-                       long t = startdate.getTime();
-                       cw.FormQueryTables(t, job[0], job[1], ip, id, execstatus, finalstatus, protein, predictions);
-                       cw.ArchiveData(t, exectime, ip, id, execstatus, finalstatus, protein, predictions, alignment, log, archivefile);
+                       job.setAlignment(alignment);
+                       job.setPredictions(predictions);
+                       cw.FormQueryTables(job);
+                       cw.ArchiveData(job, "undefined");
                        return 1;
-               } else
-                       System.out.println("job " + id + " is running");
+               }
 
                return 0;
        }
index a379d4e..d4a22e6 100644 (file)
@@ -103,7 +103,7 @@ public class JpredParserLocalFile implements JpredParser {
                                                        } catch (ParseException e) {
                                                                e.printStackTrace();
                                                        }
-                                                       countinsertions += cw.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs);
+                                                       //countinsertions += cw.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs);
                                                }
                                                fr.close();
                                        } catch (IOException e) {