Re-engineering of database I/O
authorSasha Sherstnev <a.sherstnev@dundee.ac.uk>
Sat, 9 Nov 2013 11:43:30 +0000 (11:43 +0000)
committerSasha Sherstnev <a.sherstnev@dundee.ac.uk>
Sat, 9 Nov 2013 11:43:30 +0000 (11:43 +0000)
conf/Proteocache.properties
datadb/compbio/cassandra/CassandraNativeConnector.java
datadb/compbio/cassandra/CassandraReader.java
datadb/compbio/cassandra/CassandraWriter.java [new file with mode: 0644]
datadb/compbio/cassandra/JpredParserHTTP.java
datadb/compbio/cassandra/JpredParserLocalFile.java

index 1f1c5cf..35cfe56 100644 (file)
@@ -5,5 +5,22 @@ cassandra.host=localhost
 
 #################################################################################
 # Jpred sources
-cassandra.jpred.web=true
-cassandra.jpred.local=false
\ No newline at end of file
+# real Jpred web-server
+cassandra.jpred.web.update=true
+cassandra.jpred.web.inidelay=0
+cassandra.jpred.web.updaterate=200
+
+# update time period (in days)
+# by defauls for 100 past days
+cassandra.jpred.web.period=5
+
+
+#################################################################################
+# local test job source
+cassandra.jpred.local.update=false
+cassandra.jpred.local.inidelay=10
+cassandra.jpred.local.updaterate=200
+
+# update time period (in days)
+# by defauls for 100 past days
+cassandra.jpred.local.period=300
index 6e6992d..acb50de 100644 (file)
@@ -1,26 +1,18 @@
 package compbio.cassandra;
 
-import java.io.IOException;
 import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
 
 import org.apache.log4j.Logger;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Host;
 import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
 import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Row;
 
+import com.datastax.driver.core.Session;
 import compbio.engine.ProteoCachePropertyHelperManager;
 import compbio.util.PropertyHelper;
-import compbio.util.Util;
 
 public class CassandraNativeConnector {
        private static Cluster cluster;
@@ -29,17 +21,9 @@ public class CassandraNativeConnector {
        private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
 
        public static String CASSANDRA_HOSTNAME = "localhost";
-       public static boolean READ_WEB_JPRED = false;
-       public static boolean READ_LOCALFILE_JPRED = false;
 
-       private static boolean initBooleanValue(String key) {
-               assert key != null;
-               String status = ph.getProperty(key);
-               log.debug("Loading property: " + key + " with value: " + status);
-               if (Util.isEmpty(status)) {
-                       return false;
-               }
-               return new Boolean(status.trim()).booleanValue();
+       public static Session getSession () {
+               return session;
        }
 
        /*
@@ -51,8 +35,6 @@ public class CassandraNativeConnector {
                if (null != cassandrahostname) {
                        CASSANDRA_HOSTNAME = cassandrahostname;
                }
-               READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web");
-               READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local");
 
                cluster = Cluster.builder().addContactPoint(CASSANDRA_HOSTNAME).build();
 
@@ -62,321 +44,58 @@ public class CassandraNativeConnector {
                        System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
                }
                session = cluster.connect();
-               CreateTables();
+               CreateMainTables();
                System.out.println("Cassandra connected");
        }
 
-       private void CreateTables() {
+       private void CreateMainTables() {
                session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
                session.execute("USE ProteinKeyspace");
 
-               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinRow "
+               session.execute("CREATE TABLE IF NOT EXISTS MainParameters "
+                               + "(Name ascii, Value ascii, PRIMARY KEY(Name));");
+               
+               session.execute("CREATE TABLE IF NOT EXISTS ProteinRow "
                                + "(Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
-               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinLog "
+
+               session.execute("CREATE TABLE IF NOT EXISTS ProteinLog "
                                + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, "
                                + "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
-               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinData "
+
+               session.execute("CREATE TABLE IF NOT EXISTS ProteinData "
                                + "(jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
-               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS JpredArchive "
+
+               session.execute("CREATE TABLE IF NOT EXISTS JpredArchive "
                                + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map<ascii,ascii>, "
                                + "predictions map<ascii,ascii>, archive blob, LOG varchar, PRIMARY KEY(JobID));");
+               
+               session.execute("CREATE TABLE IF NOT EXISTS JobDateInfo "
+                               + "(jobday bigint, Total bigint, PRIMARY KEY(jobday));");
 
                session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);");
                session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);");
        }
 
-       /*
-        * parsing data source and filling the database
-        */
-       public void Parsing() throws IOException {
-               if (READ_WEB_JPRED) {
-                       // if (source.equals("http")) {
-                       // get data from real Jpred production server
-                       System.out.println("Parsing web data source......");
-                       String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
-                       String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
-                       JpredParserHTTP parser = new JpredParserHTTP(prefix);
-                       parser.Parsing(datasrc, 5);
-               }
-               if (READ_LOCALFILE_JPRED) {
-                       // if (source.equals("file")) {
-                       // get irtifical data generated for the DB stress tests
-                       System.out.println("Parsing local file data source......");
-                       String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
-                       String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
-                       JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
-                       parser.Parsing(datasrc, 190);
-               }
-       }
-
        public void Closing() {
                session.shutdown();
                cluster.shutdown();
                System.out.println("Cassandra has been shut down");
        }
 
-       public boolean JobisNotInsterted(String jobid) {
-               ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
-               if (results1.isExhausted()) {
-                       return true;
-               }
-               return false;
-       }
-
-       public boolean JobisNotArchived(String jobid) {
-               ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';");
-               if (results1.isExhausted()) {
-                       return true;
-               }
-               return false;
-       }
-
-       /*
-        * inserting data into the tables for queries
-        */
-       public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
-                       String statusFinal, String protein, List<FastaSequence> predictions) {
-               if (JobisNotInsterted(jobid)) {
-                       String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
-                                       + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
-                                       + "','" + protein + "');";
-                       session.execute(com1);
-
-                       String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
-                                       + "');";
-                       session.execute(com2);
-
-                       String allpredictions = "";
-                       for (FastaSequence pred : predictions) {
-                               String predictionname = pred.getId();
-                               String prediction = pred.getSequence().replaceAll("\n", "");
-                               allpredictions += "'" + predictionname + "':'" + prediction + "',";
-                       }
-                       String final_prediction = "";
-                       if (null != allpredictions) {
-                               final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
-                       }
-
-                       String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
-                       ResultSet results2 = session.execute(check2);
-                       if (results2.isExhausted()) {
-                               String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
-                                               + final_prediction + "});";
-                               session.execute(com3);
-                       }
-                       return 1;
-               }
-               return 0;
-       }
-
-       /*
-        * insert data from a real Jpred job: timing+IP, Execution Status, Final
-        * status, protein sequence, predictions, alignment, LOG and tar.gz files
-        */
-       public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
-                       List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
-               if (JobisNotArchived(jobid)) {
-                       String log = LogFile.replaceAll("'", "");
-                       session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
-                                       + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
-                       if (false) {
-                               PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
-                               BoundStatement boundStatement = new BoundStatement(statement);
-                               session.execute(boundStatement.bind(jobid, archivepath));
-                       }
-
-                       for (FastaSequence p : predictions) {
-                               session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
-                                               + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
-                       }
-
-                       for (FastaSequence s : seqs) {
-                               session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
-                                               + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
-                       }
-                       return 1;
-               }
-               return 0;
-       }
-
-       /*
-        * getting data from the db
-        */
-       public List<Pair<String, String>> ReadProteinDataTable() {
-               final long startTime = System.currentTimeMillis();
-               String com = "SELECT DataBegin,DataEnd FROM ProteinLog;";
-               System.out.println("Command: " + com);
-               ResultSet results = session.execute(com);
-               final long queryTime = System.currentTimeMillis();
-               List<Row> rows = results.all();
-               System.out.println("Query time is " + (queryTime - startTime) + " msec");
-
-               List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
-               int c = 0;
-               for (Row r : rows) {
-                       Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"), r.getString("DataEnd"));
-                       res.add(pair);
-                       ++c;
-               }
-               final long endTime = System.currentTimeMillis();
-               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
-               return res;
-       }
-
-       /*
-        * getting data from the db ProteinData
-        */
-       public Integer ReadDateTable(long queryDate) {
-               final long startTime = System.currentTimeMillis();
-               String com = "SELECT jobtime, JobID FROM ProteinData WHERE jobtime = " + queryDate + ";";
-               System.out.println("Command: " + com);
-               ResultSet results = session.execute(com);
-               final long queryTime = System.currentTimeMillis();
-               System.out.println("Query time is " + (queryTime - startTime) + " msec");
-               if (results.isExhausted())
-                       return 0;
-               List<Row> rows = results.all();
-               final long endTime = System.currentTimeMillis();
-               System.out.println("Processing time is " + (endTime - queryTime) + " msec");
-               return rows.size();
-       }
-
-       /*
-        * getting whole protein sequence from the db ProteinRow
-        */
-       public List<StructureProteinPrediction> ReadWholeSequence(String queryProtein) {
-               final long startTime = System.currentTimeMillis();
-               String com = "SELECT JobID, Predictions FROM ProteinRow WHERE Protein = '" + queryProtein + "';";
-               System.out.println("Command: " + com);
-               ResultSet results = session.execute(com);
-               if (results.isExhausted())
-                       return null;
-               final long queryTime = System.currentTimeMillis();
-               List<Row> rows = results.all();
-               System.out.println("Query time is " + (queryTime - startTime) + " msec");
-               System.out.println(" rows analysed,  " + rows.size());
-               List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
-               int c = 0;
-               for (Row r : rows) {
-                       StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap(
-                                       "Predictions", String.class, String.class));
-                       res.add(structure);
-                       ++c;
-               }
-               final long endTime = System.currentTimeMillis();
-               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
-               return res;
-       }
-
-       /*
-        * getting part of protein sequence from the db ProteinRow
-        */
-       public List<StructureProteinPrediction> ReadPartOfSequence(String queryProtein) {
-               final long startTime = System.currentTimeMillis();
-               String com = "SELECT * FROM ProteinRow;";
-               System.out.println("Command: " + com);
-               ResultSet results = session.execute(com);
-               if (results.isExhausted())
-                       return null;
-               final long queryTime = System.currentTimeMillis();
-               List<Row> rows = results.all();
-               System.out.println("Query time is " + (queryTime - startTime) + " msec");
-               System.out.println(" rows analysed,  " + rows.size());
-               List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
-               int c = 0;
-               for (Row r : rows) {
-                       String prot = r.getString("Protein");
-                       if (prot.matches("(.*)" + queryProtein + "(.*)")) {
-                               StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions",
-                                               String.class, String.class));
-                               res.add(structure);
-                               ++c;
-                       }
-               }
-               final long endTime = System.currentTimeMillis();
-               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
-               return res;
-       }
-
-       /*
-        * getting protein sequences by counter
-        */
-       public Map<String, Integer> ReadProteinDataByCounter() {
-               final long startTime = System.currentTimeMillis();
-               String com = "SELECT Protein FROM ProteinRow;";
-               System.out.println("Command: " + com);
-               ResultSet results = session.execute(com);
-               if (results.isExhausted())
-                       return null;
-               final long queryTime = System.currentTimeMillis();
-               List<Row> rows = results.all();
-               System.out.println("Query time is " + (queryTime - startTime) + " msec");
-               System.out.println(" rows analysed,  " + rows.size());
-               Map<String, Integer> res = new HashMap<String, Integer>();
-               int c = 0;
-               for (Row r : rows) {
-                       String protein = r.getString("Protein");
-                       if (res.containsKey(protein))
-                               res.put(protein, res.get(protein) + 1);
-                       else
-                               res.put(protein, 1);
-               }
-               final long endTime = System.currentTimeMillis();
-               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
-               return res;
-       }
-
-       /*
-        * getting protein sequences by counter
-        */
-       public StructureJobLog ReadJobLog(String jobid) {
-               final long startTime = System.currentTimeMillis();
-               String com = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
-               System.out.println("Command: " + com);
-               ResultSet results = session.execute(com);
-               if (results.isExhausted())
-                       return null;
-               final long queryTime = System.currentTimeMillis();
-               Row row = results.one();
-               String com1 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
-               System.out.println("Command: " + com1);
-               ResultSet results1 = session.execute(com1);
-               if (results1.isExhausted())
-                       return null;
-               Row row1 = results1.one();
-               StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"),
-                               row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
-               System.out.println("Query time is " + (queryTime - startTime) + " msec");
-               final long endTime = System.currentTimeMillis();
-               System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec");
-               return res;
-       }
-
        /*
         * getting earlest date of jobs from the db
         */
-       public long getEarliestDateInDB() {
-               final long startTime = System.currentTimeMillis();
-               String com = "SELECT jobtime,JobID FROM ProteinData;";
+       public static long getEarliestDateInDB() {
+               String com = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';";
                System.out.println("Command: " + com);
                ResultSet results = session.execute(com);
-               final long queryTime = System.currentTimeMillis();
-               System.out.println("Query time is  " + (queryTime - startTime) + " msec");
 
-               Calendar cal = Calendar.getInstance();
-               long res = cal.getTimeInMillis();
-               int c = 0;
-               while (!results.isExhausted()) {
+               if (!results.isExhausted()) {
                        Row r = results.one();
-                       long d1 = r.getLong("jobtime");
-                       if (res > d1) {
-                               res = d1;
-                       }
-                       ++c;
+                       return Long.parseLong(r.getString("Value"));
                }
-               final long endTime = System.currentTimeMillis();
-               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
-               return res;
+               Calendar cal = Calendar.getInstance();
+               return cal.getTimeInMillis();
        }
-
+       
 }
index d2d2e1b..c71243b 100644 (file)
 package compbio.cassandra;
 
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.ResultSet;
+
+import compbio.engine.ProteoCachePropertyHelperManager;
+import compbio.util.PropertyHelper;
+
+public class CassandraReader {
+       private Session session;
+       private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
+
+       public CassandraReader() {
+               Session inis = CassandraNativeConnector.getSession();
+               setSession (inis);
+       }
+
+       public void setSession(Session s) {
+               assert s != null;
+               session = s;
+       }
+
+       /*
+        * getting data from the db
+        */
+       public List<Pair<String, String>> ReadProteinDataTable() {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT DataBegin,DataEnd FROM ProteinLog;";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               final long queryTime = System.currentTimeMillis();
+               List<Row> rows = results.all();
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
 
-public interface CassandraReader {
+               List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
+               int c = 0;
+               for (Row r : rows) {
+                       Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"), r.getString("DataEnd"));
+                       res.add(pair);
+                       ++c;
+               }
+               final long endTime = System.currentTimeMillis();
+               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+               return res;
+       }
 
        /*
-        * Defines a source file with metainformation of Jpred Jobs
-        **/
-       void setSession (Session s);
-       
+        * getting data from the db ProteinData
+        */
+       public Integer ReadDateTable(long queryDate) {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT jobtime, JobID FROM ProteinData WHERE jobtime = " + queryDate + ";";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               final long queryTime = System.currentTimeMillis();
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               if (results.isExhausted())
+                       return 0;
+               List<Row> rows = results.all();
+               final long endTime = System.currentTimeMillis();
+               System.out.println("Processing time is " + (endTime - queryTime) + " msec");
+               return rows.size();
+       }
+
+       /*
+        * getting whole protein sequence from the db ProteinRow
+        */
+       public List<StructureProteinPrediction> ReadWholeSequence(String queryProtein) {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT JobID, Predictions FROM ProteinRow WHERE Protein = '" + queryProtein + "';";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               if (results.isExhausted())
+                       return null;
+               final long queryTime = System.currentTimeMillis();
+               List<Row> rows = results.all();
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               System.out.println(" rows analysed,  " + rows.size());
+               List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
+               int c = 0;
+               for (Row r : rows) {
+                       StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap(
+                                       "Predictions", String.class, String.class));
+                       res.add(structure);
+                       ++c;
+               }
+               final long endTime = System.currentTimeMillis();
+               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+               return res;
+       }
+
+       /*
+        * getting part of protein sequence from the db ProteinRow
+        */
+       public List<StructureProteinPrediction> ReadPartOfSequence(String queryProtein) {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT * FROM ProteinRow;";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               if (results.isExhausted())
+                       return null;
+               final long queryTime = System.currentTimeMillis();
+               List<Row> rows = results.all();
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               System.out.println(" rows analysed,  " + rows.size());
+               List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
+               int c = 0;
+               for (Row r : rows) {
+                       String prot = r.getString("Protein");
+                       if (prot.matches("(.*)" + queryProtein + "(.*)")) {
+                               StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions",
+                                               String.class, String.class));
+                               res.add(structure);
+                               ++c;
+                       }
+               }
+               final long endTime = System.currentTimeMillis();
+               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+               return res;
+       }
+
+       /*
+        * getting protein sequences by counter
+        */
+       public Map<String, Integer> ReadProteinDataByCounter() {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT Protein FROM ProteinRow;";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               if (results.isExhausted())
+                       return null;
+               final long queryTime = System.currentTimeMillis();
+               List<Row> rows = results.all();
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               System.out.println(" rows analysed,  " + rows.size());
+               Map<String, Integer> res = new HashMap<String, Integer>();
+               int c = 0;
+               for (Row r : rows) {
+                       String protein = r.getString("Protein");
+                       if (res.containsKey(protein))
+                               res.put(protein, res.get(protein) + 1);
+                       else
+                               res.put(protein, 1);
+               }
+               final long endTime = System.currentTimeMillis();
+               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+               return res;
+       }
+
        /*
-        * Makes real parsing of the source file
-        **/
-       /*void getResults();*/
+        * getting protein sequences by counter
+        */
+       public StructureJobLog ReadJobLog(String jobid) {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               if (results.isExhausted())
+                       return null;
+               final long queryTime = System.currentTimeMillis();
+               Row row = results.one();
+               String com1 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
+               System.out.println("Command: " + com1);
+               ResultSet results1 = session.execute(com1);
+               if (results1.isExhausted())
+                       return null;
+               Row row1 = results1.one();
+               StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"),
+                               row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               final long endTime = System.currentTimeMillis();
+               System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec");
+               return res;
+       }
 }
diff --git a/datadb/compbio/cassandra/CassandraWriter.java b/datadb/compbio/cassandra/CassandraWriter.java
new file mode 100644 (file)
index 0000000..6ee62fd
--- /dev/null
@@ -0,0 +1,142 @@
+package compbio.cassandra;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.BoundStatement;
+
+import compbio.engine.ProteoCachePropertyHelperManager;
+import compbio.util.PropertyHelper;
+
+public class CassandraWriter {
+       private Session session;
+       private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
+       private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
+
+       CassandraWriter() {
+               Session inis = CassandraNativeConnector.getSession();
+               setSession(inis);
+       }
+
+       public void setSession(Session s) {
+               assert s != null;
+               session = s;
+       }
+
+       public boolean JobisNotInsterted(String jobid) {
+               ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
+               if (results1.isExhausted()) {
+                       return true;
+               }
+               return false;
+       }
+
+       public boolean JobisNotArchived(String jobid) {
+               ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';");
+               if (results1.isExhausted()) {
+                       return true;
+               }
+               return false;
+       }
+
+       /*
+        * inserting data into the tables for queries
+        */
+       public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
+                       String statusFinal, String protein, List<FastaSequence> predictions) {
+               if (JobisNotInsterted(jobid)) {
+                       String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
+                                       + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
+                                       + "','" + protein + "');";
+                       session.execute(com1);
+
+                       String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
+                                       + "');";
+                       session.execute(com2);
+
+                       String allpredictions = "";
+                       for (FastaSequence pred : predictions) {
+                               String predictionname = pred.getId();
+                               String prediction = pred.getSequence().replaceAll("\n", "");
+                               allpredictions += "'" + predictionname + "':'" + prediction + "',";
+                       }
+                       String final_prediction = "";
+                       if (null != allpredictions) {
+                               final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
+                       }
+
+                       String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
+                       ResultSet results2 = session.execute(check2);
+                       if (results2.isExhausted()) {
+                               String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
+                                               + final_prediction + "});";
+                               session.execute(com3);
+                       }
+
+                       // update some internal tables 
+                       String check3 = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';";
+                       ResultSet results3 = session.execute(check3);
+                       boolean updateparameter = true;
+                       if (!results3.isExhausted()) {
+                               Row r = results3.one();
+                               if (jobtime >= Long.parseLong(r.getString("Value")))
+                                       updateparameter = false;
+                       }
+                       if (updateparameter) {
+                               String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + String.valueOf(jobtime)
+                                               + "');";
+                               session.execute(com);
+                       }
+                       String check4 = "SELECT * FROM MainParameters WHERE Name = 'TotalNumberOfJobs';";
+                       ResultSet results4 = session.execute(check4);
+                       updateparameter = true;
+                       int njobs = 1;
+                       if (!results4.isExhausted()) {
+                               Row r = results4.one();
+                               njobs += Integer.parseInt(r.getString("Value"));
+                       }
+                       String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('TotalNumberOfJobs','" + String.valueOf(njobs) + "');";
+                       session.execute(com);
+
+                       return 1;
+               }
+               return 0;
+       }
+
+       /*
+        * insert data from a real Jpred job: timing+IP, Execution Status, Final
+        * status, protein sequence, predictions, alignment, LOG and tar.gz files
+        */
+       public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
+                       List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
+               if (JobisNotArchived(jobid)) {
+                       String log = LogFile.replaceAll("'", "");
+                       session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
+                                       + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
+                       if (false) {
+                               PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
+                               BoundStatement boundStatement = new BoundStatement(statement);
+                               session.execute(boundStatement.bind(jobid, archivepath));
+                       }
+
+                       for (FastaSequence p : predictions) {
+                               session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
+                                               + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+                       }
+
+                       for (FastaSequence s : seqs) {
+                               session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
+                                               + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+                       }
+                       return 1;
+               }
+               return 0;
+       }
+
+}
index 27f66cc..5687a83 100644 (file)
@@ -21,17 +21,17 @@ import java.util.List;
 import compbio.cassandra.JpredParser;
 
 public class JpredParserHTTP implements JpredParser {
-       private CassandraNativeConnector cc = new CassandraNativeConnector();
+       private CassandraWriter cw = new CassandraWriter();
        private String dirprefix;
        private List<FastaSequence> alignment;
        private List<FastaSequence> predictions;
        private String jnetpred;
 
-       JpredParserHTTP() {
+       public JpredParserHTTP() {
                dirprefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
        }
 
-       JpredParserHTTP(String sourceurl) {
+       public JpredParserHTTP(String sourceurl) {
                dirprefix = sourceurl;
        }
 
@@ -125,7 +125,7 @@ public class JpredParserHTTP implements JpredParser {
                                        // unknown_email jp_J9HBCBT
                                        String id = table[table.length - 1];
                                        totalcount++;
-                                       if (cc.JobisNotInsterted(id)) {
+                                       if (cw.JobisNotInsterted(id)) {
                                                URL dataurl = new URL(dirprefix + "/" + id + "/" + id + ".concise.fasta");
                                                URL archiveurl = new URL(dirprefix + "/" + id + "/" + id + ".tar.gz");
                                                URL logurl = new URL(dirprefix + "/" + id + "/LOG");
@@ -150,7 +150,7 @@ public class JpredParserHTTP implements JpredParser {
                                                                                String ip = table[2];
                                                                                String execstatus = "OK";
                                                                                String finalstatus = "OK";
-                                                                               countinsertions += cc.FormQueryTables(startdate.getTime(), table[0], table[1], ip, id, execstatus,
+                                                                               countinsertions += cw.FormQueryTables(startdate.getTime(), table[0], table[1], ip, id, execstatus,
                                                                                                finalstatus, protein, predictions);
 
                                                                                long exectime = (endtime.getTime() - starttime.getTime()) / 1000;
@@ -158,7 +158,7 @@ public class JpredParserHTTP implements JpredParser {
                                                                                if (199 < response2 && response2 < 300) {
                                                                                        log = parseLogFile(logurl.openStream());
                                                                                }
-                                                                               cc.ArchiveData(startdate.getTime(), exectime, ip, id, execstatus, finalstatus, protein,
+                                                                               cw.ArchiveData(startdate.getTime(), exectime, ip, id, execstatus, finalstatus, protein,
                                                                                                predictions, alignment, log, archiveurl.toString());
                                                                        } catch (ParseException e) {
                                                                                e.printStackTrace();
index 4b254ae..a379d4e 100644 (file)
@@ -16,18 +16,18 @@ import java.util.Date;
 import java.util.List;
 
 public class JpredParserLocalFile implements JpredParser {
-       private CassandraNativeConnector cc = new CassandraNativeConnector();
+       private CassandraWriter cw = new CassandraWriter();
        private String dirprefix;
 
        public void setSource(String newsourceprefix) {
                this.dirprefix = newsourceprefix;
        }
 
-       JpredParserLocalFile() {
+       public JpredParserLocalFile() {
                this.dirprefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
        }
 
-       JpredParserLocalFile(String sourceurl) {
+       public JpredParserLocalFile(String sourceurl) {
                this.dirprefix = sourceurl;
        }
 
@@ -103,7 +103,7 @@ public class JpredParserLocalFile implements JpredParser {
                                                        } catch (ParseException e) {
                                                                e.printStackTrace();
                                                        }
-                                                       countinsertions += cc.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs);
+                                                       countinsertions += cw.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs);
                                                }
                                                fr.close();
                                        } catch (IOException e) {