Merge branch 'master' into servlets
[proteocache.git] / datadb / compbio / cassandra / CassandraNativeConnector.java
index 7109c78..6e6992d 100644 (file)
@@ -2,8 +2,12 @@ package compbio.cassandra;
 
 import java.io.IOException;
 import java.util.Calendar;
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Host;
@@ -11,54 +15,90 @@ import com.datastax.driver.core.Metadata;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.BoundStatement;
+
+import compbio.engine.ProteoCachePropertyHelperManager;
+import compbio.util.PropertyHelper;
+import compbio.util.Util;
 
 public class CassandraNativeConnector {
        private static Cluster cluster;
        private static Session session;
+       private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
+       private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
+
+       public static String CASSANDRA_HOSTNAME = "localhost";
+       public static boolean READ_WEB_JPRED = false;
+       public static boolean READ_LOCALFILE_JPRED = false;
+
+       private static boolean initBooleanValue(String key) {
+               assert key != null;
+               String status = ph.getProperty(key);
+               log.debug("Loading property: " + key + " with value: " + status);
+               if (Util.isEmpty(status)) {
+                       return false;
+               }
+               return new Boolean(status.trim()).booleanValue();
+       }
 
        /*
-        * connect to the cluster and look weather the dababase has any data inside
+        * connect to the cluster and look whether all tables exist
         */
        public void Connect() {
-               // local cassandra cluster
-               cluster = Cluster.builder().addContactPoint("localhost").build();
-               // distributed cassandra cluster
-               /* cluster = Cluster.builder().addContactPoint("10.0.115.190").build(); */
+
+               String cassandrahostname = ph.getProperty("cassandra.host");
+               if (null != cassandrahostname) {
+                       CASSANDRA_HOSTNAME = cassandrahostname;
+               }
+               READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web");
+               READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local");
+
+               cluster = Cluster.builder().addContactPoint(CASSANDRA_HOSTNAME).build();
+
                Metadata metadata = cluster.getMetadata();
                System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
                for (Host host : metadata.getAllHosts()) {
                        System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
                }
-
                session = cluster.connect();
+               CreateTables();
+               System.out.println("Cassandra connected");
+       }
+
+       private void CreateTables() {
                session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
-               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinRow (Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
-               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinLog "
-                               + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
-               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinData (jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
-               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.JpredArchive " + 
-               "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map<ascii,ascii>, predictions map<ascii,ascii>, archive blob, LOG varchar, PRIMARY KEY(JobID));");
+               session.execute("USE ProteinKeyspace");
 
-               session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinKeyspace.ProteinRow (protein);");
-               session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinKeyspace.ProteinData (jobtime);");
+               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinRow "
+                               + "(Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
+               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinLog "
+                               + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, "
+                               + "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
+               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinData "
+                               + "(jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
+               session.execute("CREATE COLUMNFAMILY IF NOT EXISTS JpredArchive "
+                               + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map<ascii,ascii>, "
+                               + "predictions map<ascii,ascii>, archive blob, LOG varchar, PRIMARY KEY(JobID));");
 
-               System.out.println("Cassandra connected");
+               session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);");
+               session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);");
        }
 
        /*
         * parsing data source and filling the database
         */
        public void Parsing() throws IOException {
-               if (true) {
+               if (READ_WEB_JPRED) {
                        // if (source.equals("http")) {
                        // get data from real Jpred production server
                        System.out.println("Parsing web data source......");
                        String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
                        String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
                        JpredParserHTTP parser = new JpredParserHTTP(prefix);
-                       parser.Parsing(datasrc, 4);
+                       parser.Parsing(datasrc, 5);
                }
-               if (false) {
+               if (READ_LOCALFILE_JPRED) {
                        // if (source.equals("file")) {
                        // get irtifical data generated for the DB stress tests
                        System.out.println("Parsing local file data source......");
@@ -75,22 +115,35 @@ public class CassandraNativeConnector {
                System.out.println("Cassandra has been shut down");
        }
 
-       /*
-        * inserting data into the db
-        */
-       public void FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal,
-                       String protein, List<FastaSequence> predictions) {
+       public boolean JobisNotInsterted(String jobid) {
+               ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
+               if (results1.isExhausted()) {
+                       return true;
+               }
+               return false;
+       }
 
-               String check1 = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
-               ResultSet results1 = session.execute(check1);
+       public boolean JobisNotArchived(String jobid) {
+               ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';");
                if (results1.isExhausted()) {
-                       String com1 = "INSERT INTO ProteinKeyspace.ProteinLog "
-                                       + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','"
-                                       + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');";
-                       session.execute(com1);
+                       return true;
+               }
+               return false;
+       }
 
-                       String com2 = "INSERT INTO ProteinKeyspace.ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid
+       /*
+        * inserting data into the tables for queries
+        */
+       public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
+                       String statusFinal, String protein, List<FastaSequence> predictions) {
+               if (JobisNotInsterted(jobid)) {
+                       String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
+                                       + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
                                        + "','" + protein + "');";
+                       session.execute(com1);
+
+                       String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
+                                       + "');";
                        session.execute(com2);
 
                        String allpredictions = "";
@@ -104,60 +157,54 @@ public class CassandraNativeConnector {
                                final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
                        }
 
-                       String check2 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "';";
+                       String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
                        ResultSet results2 = session.execute(check2);
                        if (results2.isExhausted()) {
-                               String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','"
-                                               + jobid + "',{" + final_prediction + "});";
+                               String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
+                                               + final_prediction + "});";
                                session.execute(com3);
                        }
+                       return 1;
                }
+               return 0;
        }
 
-       public void ArchiveData(long starttime, int exectime, String ip, String jobid, String statusEx, String statusFinal,
-                       String protein, List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile) {
-
-               String check1 = "SELECT * FROM ProteinKeyspace.JpredArchive WHERE JobID = '" + jobid + "';";
-               ResultSet results1 = session.execute(check1);
-               if (results1.isExhausted()) {
-                       String allpredictions = "";
-                       for (FastaSequence pred : predictions) {
-                               String predictionname = pred.getId();
-                               String prediction = pred.getSequence().replaceAll("\n", "");
-                               allpredictions += "'" + predictionname + "':'" + prediction + "',";
-                       }
-                       String final_allpredictions = "";
-                       if (null != allpredictions) {
-                               final_allpredictions = allpredictions.substring(0, allpredictions.length() - 1);
+       /*
+        * insert data from a real Jpred job: timing+IP, Execution Status, Final
+        * status, protein sequence, predictions, alignment, LOG and tar.gz files
+        */
+       public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
+                       List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
+               if (JobisNotArchived(jobid)) {
+                       String log = LogFile.replaceAll("'", "");
+                       session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
+                                       + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
+                       if (false) {
+                               PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
+                               BoundStatement boundStatement = new BoundStatement(statement);
+                               session.execute(boundStatement.bind(jobid, archivepath));
                        }
-                       String alignment = "";
-                       for (FastaSequence seq : seqs) {
-                               String predictionname = seq.getId();
-                               String prediction = seq.getSequence().replaceAll("\n", "");
-                               alignment += "'" + predictionname + "':'" + prediction + "',";
+
+                       for (FastaSequence p : predictions) {
+                               session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
+                                               + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
                        }
-                       String final_alignment = "";
-                       if (null != allpredictions) {
-                               final_alignment = alignment.substring(0, allpredictions.length() - 1);
+
+                       for (FastaSequence s : seqs) {
+                               session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
+                                               + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
                        }
-                       
-                       String com1 = "INSERT INTO ProteinKeyspace.JpredArchive "
-                                       + "(JobID, Protein, IP, StartTime, ExecTime, alignment, predictions, LOG))"
-                                       + " VALUES ('" 
-                                       + jobid + "','" + protein + "','" + ip + "'," + starttime + "," + exectime
-                                       + "',[" + final_allpredictions + "],[" + final_alignment + "],'" + LogFile + "]);";
-                       session.execute(com1);
+                       return 1;
                }
+               return 0;
        }
 
-       
-       
        /*
         * getting data from the db
         */
        public List<Pair<String, String>> ReadProteinDataTable() {
                final long startTime = System.currentTimeMillis();
-               String com = "SELECT DataBegin,DataEnd FROM ProteinKeyspace.ProteinLog;";
+               String com = "SELECT DataBegin,DataEnd FROM ProteinLog;";
                System.out.println("Command: " + com);
                ResultSet results = session.execute(com);
                final long queryTime = System.currentTimeMillis();
@@ -177,15 +224,144 @@ public class CassandraNativeConnector {
        }
 
        /*
+        * getting data from the db ProteinData
+        */
+       public Integer ReadDateTable(long queryDate) {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT jobtime, JobID FROM ProteinData WHERE jobtime = " + queryDate + ";";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               final long queryTime = System.currentTimeMillis();
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               if (results.isExhausted())
+                       return 0;
+               List<Row> rows = results.all();
+               final long endTime = System.currentTimeMillis();
+               System.out.println("Processing time is " + (endTime - queryTime) + " msec");
+               return rows.size();
+       }
+
+       /*
+        * getting whole protein sequence from the db ProteinRow
+        */
+       public List<StructureProteinPrediction> ReadWholeSequence(String queryProtein) {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT JobID, Predictions FROM ProteinRow WHERE Protein = '" + queryProtein + "';";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               if (results.isExhausted())
+                       return null;
+               final long queryTime = System.currentTimeMillis();
+               List<Row> rows = results.all();
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               System.out.println(" rows analysed,  " + rows.size());
+               List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
+               int c = 0;
+               for (Row r : rows) {
+                       StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap(
+                                       "Predictions", String.class, String.class));
+                       res.add(structure);
+                       ++c;
+               }
+               final long endTime = System.currentTimeMillis();
+               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+               return res;
+       }
+
+       /*
+        * getting part of protein sequence from the db ProteinRow
+        */
+       public List<StructureProteinPrediction> ReadPartOfSequence(String queryProtein) {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT * FROM ProteinRow;";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               if (results.isExhausted())
+                       return null;
+               final long queryTime = System.currentTimeMillis();
+               List<Row> rows = results.all();
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               System.out.println(" rows analysed,  " + rows.size());
+               List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
+               int c = 0;
+               for (Row r : rows) {
+                       String prot = r.getString("Protein");
+                       if (prot.matches("(.*)" + queryProtein + "(.*)")) {
+                               StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions",
+                                               String.class, String.class));
+                               res.add(structure);
+                               ++c;
+                       }
+               }
+               final long endTime = System.currentTimeMillis();
+               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+               return res;
+       }
+
+       /*
+        * getting protein sequences by counter
+        */
+       public Map<String, Integer> ReadProteinDataByCounter() {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT Protein FROM ProteinRow;";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               if (results.isExhausted())
+                       return null;
+               final long queryTime = System.currentTimeMillis();
+               List<Row> rows = results.all();
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               System.out.println(" rows analysed,  " + rows.size());
+               Map<String, Integer> res = new HashMap<String, Integer>();
+               int c = 0;
+               for (Row r : rows) {
+                       String protein = r.getString("Protein");
+                       if (res.containsKey(protein))
+                               res.put(protein, res.get(protein) + 1);
+                       else
+                               res.put(protein, 1);
+               }
+               final long endTime = System.currentTimeMillis();
+               System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+               return res;
+       }
+
+       /*
+        * getting protein sequences by counter
+        */
+       public StructureJobLog ReadJobLog(String jobid) {
+               final long startTime = System.currentTimeMillis();
+               String com = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
+               System.out.println("Command: " + com);
+               ResultSet results = session.execute(com);
+               if (results.isExhausted())
+                       return null;
+               final long queryTime = System.currentTimeMillis();
+               Row row = results.one();
+               String com1 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
+               System.out.println("Command: " + com1);
+               ResultSet results1 = session.execute(com1);
+               if (results1.isExhausted())
+                       return null;
+               Row row1 = results1.one();
+               StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"),
+                               row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
+               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               final long endTime = System.currentTimeMillis();
+               System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec");
+               return res;
+       }
+
+       /*
         * getting earlest date of jobs from the db
         */
        public long getEarliestDateInDB() {
                final long startTime = System.currentTimeMillis();
-               String com = "SELECT jobtime FROM ProteinKeyspace.ProteinData;";
+               String com = "SELECT jobtime,JobID FROM ProteinData;";
                System.out.println("Command: " + com);
                ResultSet results = session.execute(com);
                final long queryTime = System.currentTimeMillis();
-               System.out.println("Query time is " + (queryTime - startTime) + " msec");
+               System.out.println("Query time is  " + (queryTime - startTime) + " msec");
 
                Calendar cal = Calendar.getInstance();
                long res = cal.getTimeInMillis();