Improve stability of Cassandra API calls (commands are checked)
authorSasha Sherstnev <a.sherstnev@dundee.ac.uk>
Mon, 27 Jan 2014 14:49:54 +0000 (14:49 +0000)
committerSasha Sherstnev <a.sherstnev@dundee.ac.uk>
Mon, 27 Jan 2014 14:49:54 +0000 (14:49 +0000)
datadb/compbio/cassandra/CassandraNewTableWriter.java
datadb/compbio/cassandra/CassandraReaderOld.java
datadb/compbio/cassandra/CassandraRemover.java
datadb/compbio/cassandra/readers/CassandraReader.java
datadb/compbio/cassandra/readers/CassandraReaderExecutionTime.java
datadb/compbio/cassandra/readers/ExecutionTimeReader.java

index 4ee10c8..3b6c741 100644 (file)
@@ -9,11 +9,8 @@ import org.apache.log4j.Logger;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.exceptions.QueryExecutionException;
 
-import compbio.engine.ProteoCachePropertyHelperManager;
 import compbio.cassandra.CassandraNativeConnector;
 
 public class CassandraNewTableWriter {
index ff37145..6055e60 100644 (file)
@@ -17,6 +17,7 @@ import compbio.beans.JobBean;
 import compbio.beans.ProteinBean;
 import compbio.beans.Total;
 import compbio.engine.JobStatus;
+import compbio.engine.Pair;
 
 public class CassandraReaderOld {
        private Session session;
@@ -255,7 +256,6 @@ public class CassandraReaderOld {
                int c = 0;
                for (Row r : rows) {
                        String protein = r.getString("Protein");
-                       String id = r.getString("JobID");
                        if (res.containsKey(protein))
                                res.put(protein, res.get(protein) + 1);
                        else
@@ -284,7 +284,6 @@ public class CassandraReaderOld {
                int c = 0;
                for (Row r : rows) {
                        String ip = r.getString("ip");
-                       String id = r.getString("JobID");
                        if (res.containsKey(ip))
                                res.put(ip, res.get(ip) + 1);
                        else
index 76cc787..d7aeafb 100644 (file)
@@ -6,58 +6,75 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.List;
 
-import org.apache.log4j.Logger;
-
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.QueryExecutionException;
-import com.datastax.driver.core.exceptions.QueryValidationException;
+import compbio.cassandra.readers.CassandraReader;
 
-public class CassandraRemover {
-       private Session session;
+/**
+ * The class removes jobs from the cassandra database. 4 different strategies
+ * are possiable: 1. remove 1 job with given job ID 2. remove jobs launched from
+ * an IP 3. remove jobs with particular protein sequence 4. remove jobs launched
+ * within a time range (date1, data2)
+ * 
+ * @author Alexander Sherstnev
+ * @author Natasha Sherstneva
+ * @version 1.0
+ * @since Nov 2013
+ */
+public class CassandraRemover extends CassandraReader {
        static SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd");
-       private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
 
-       public CassandraRemover() {
-               Session inis = CassandraNativeConnector.getSession();
-               setSession(inis);
-       }
+       /**
+        * private method for real deleting one job
+        * 
+        * @param jobid
+        *            job ID
+        * @param date
+        *            job execution date
+        * 
+        * @return nothing
+        */
+       private int RemoveJob(String jobid, long date) {
 
-       public void setSession(Session s) {
-               assert s != null;
-               session = s;
-       }
+               if (date < 0L) {
+                       log.error("CassandraRemover error: job " + jobid + " with date " + date
+                                       + " can not be deleted in JobDateInfo. Daily statistics is inconsistent");
+                       return 0;
+               }
 
-       /*
-        * delete a record from CF for current jobId
-        */
-       private void RemoveJob(String jobid, long date) {
                String status = FindStatus(jobid);
-               String com0 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';";
-               System.out.println("Command: " + com0);
-               session.execute(com0);
-               String com3 = "UPDATE jpredarchive SET finalstatus = 'DELETED'  WHERE JobID = '" + jobid + "' ;";
+               String com1 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';";
+               System.out.println("Command: " + com1);
+               CassandraQuery(com1);
+
+               String com2 = "UPDATE jpredarchive SET finalstatus = 'DELETED'  WHERE JobID = '" + jobid + "' ;";
+               System.out.println("Command: " + com2);
+               CassandraQuery(com2);
+
+               String com3 = "SELECT * FROM JobDateInfo WHERE jobday = " + date + ";";
                System.out.println("Command: " + com3);
-               session.execute(com3);
-               String com = "SELECT * FROM JobDateInfo WHERE jobday = " + date + ";";
-               System.out.println("Command: " + com);
-               ResultSet results = session.execute(com);
+               ResultSet results = CassandraQuery(com3);
+               if (results.isExhausted()) {
+                       log.error("CassandraRemover error: job " + jobid + " with date " + date
+                                       + " can not be deleted in JobDateInfo. Daily statistics is inconsistent");
+                       return 0;
+               }
                Row row = results.one();
                long njobs = row.getLong("Total") - 1;
                if (status.equals("OK")) {
                        long njobsOK = row.getLong("TotalOK") - 1;
-                       String com1 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';";
-                       System.out.println("Command: " + com1);
-                       session.execute(com1);
-                       String com2 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";";
-                       System.out.println("Command: " + com2);
-                       session.execute(com2);
+                       String com4 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';";
+                       System.out.println("Command: " + com4);
+                       CassandraQuery(com4);
+
+                       String com5 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";";
+                       System.out.println("Command: " + com5);
+                       CassandraQuery(com5);
                        UpdateJobDateInfo(date, "TotalOK", njobsOK, njobs);
                } else {
                        String com6 = "DELETE FROM FailLog WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";";
                        System.out.println("Command: " + com6);
-                       session.execute(com6);
+                       CassandraQuery(com6);
                        if (status.equals("STOPPED")) {
                                long njobsStopped = row.getLong("TotalStopped") - 1;
                                UpdateJobDateInfo(date, "TotalStopped", njobsStopped, njobs);
@@ -69,132 +86,166 @@ public class CassandraRemover {
                                UpdateJobDateInfo(date, "TotalTimeOut", njobsTimeOut, njobs);
                        }
                }
-               System.out.println("Remove jobs: " + jobid);
+               System.out.println("Job " + jobid + " removed...");
+               return 1;
        }
 
+       /**
+        * update a pariticular column in the JobDateInfo table
+        * 
+        * @param jobid
+        *            job ID
+        * 
+        * @return nothing
+        * 
+        */
        private void UpdateJobDateInfo(long date, String ColumnName, long totalcol, long total) {
-               String com4 = "UPDATE JobDateInfo SET " + ColumnName + " = " + totalcol + ", Total = " + total + " WHERE jobday = " + date + ";";
-               System.out.println("Command: " + com4);
-               session.execute(com4);
+               String com = "UPDATE JobDateInfo SET " + ColumnName + " = " + totalcol + ", Total = " + total + " WHERE jobday = " + date + ";";
+               System.out.println("Command: " + com);
+               CassandraQuery(com);
        }
 
+       /**
+        * external method for deleting job with given job ID (strategy 1)
+        * 
+        * @param jobid
+        *            job ID
+        * 
+        * @return a number of deleted jobs
+        * 
+        */
        public int RemoveJobById(String jobid) {
                if (jobid == null)
                        return 0;
-               Long date = FindDate(jobid);
-               RemoveJob(jobid, date);
-               return 1;
+               long date = FindJobDate(jobid);
+               return RemoveJob(jobid, date);
        }
 
+       /**
+        * external method for deleting jobs within a time range (strategy 4)
+        * 
+        * @param date1
+        *            starting date
+        * 
+        * @param date2
+        *            ending date
+        * 
+        * @return a number of deleted jobs
+        * 
+        */
        public int RemoveJobByDate(String date1, String date2) {
-               int numremover = 0;
                if (date1 == null || date2 == null)
                        return 0;
+
+               int njobs = 0;
                Long dateBegin = convertDate(date1);
                Long dateEnd = convertDate(date2);
                Calendar start = Calendar.getInstance();
                start.setTime(new Date(dateBegin));
                Calendar end = Calendar.getInstance();
                end.setTime(new Date(dateEnd));
+
                for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) {
-                       System.out.println("--------------------------------------------------------------------: ");
-                       String com = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";";
-                       System.out.println("Command: " + com);
-                       ResultSet results = session.execute(com);
+                       String com1 = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";";
+                       System.out.println("Command: " + com1);
+                       ResultSet results = CassandraQuery(com1);
                        if (!results.isExhausted()) {
                                List<Row> rows = results.all();
                                for (Row r : rows) {
                                        String jobid = r.getString("JobID");
                                        if (jobid != null) {
-                                               RemoveJob(jobid, date.getTime());
-                                               numremover++;
+                                               njobs += RemoveJob(jobid, date.getTime());
                                        }
                                }
                        }
-                       String comm = "SELECT JobID FROM FailLog WHERE jobtime = " + date.getTime() + ";";
-                       ResultSet resultsfail = session.execute(comm);
+
+                       String com2 = "SELECT JobID FROM FailLog WHERE jobtime = " + date.getTime() + ";";
+                       ResultSet resultsfail = CassandraQuery(com2);
                        if (!resultsfail.isExhausted()) {
                                List<Row> rows = resultsfail.all();
                                for (Row r : rows) {
                                        String jobid = r.getString("JobID");
                                        if (jobid != null) {
-                                               RemoveJob(jobid, date.getTime());
-                                               numremover++;
+                                               njobs += RemoveJob(jobid, date.getTime());
                                        }
                                }
                        }
                }
-               return numremover;
+               return njobs;
        }
 
+       /**
+        * external method for deleting jobs launched from a particular IP (strategy
+        * 2)
+        * 
+        * @param ip
+        *            the IP
+        * 
+        * @return a number of deleted jobs
+        * 
+        */
        public int RemoveJobByIp(String ip) {
-               int numremover = 0;
+               int njobs = 0;
                if (ip == null)
                        return 0;
                String com = "SELECT databegin, JobID FROM ProteinLog WHERE ip = '" + ip + "';";
-               ResultSet results = session.execute(com);
+               ResultSet results = CassandraQuery(com);
                if (!results.isExhausted()) {
                        List<Row> rows = results.all();
                        for (Row r : rows) {
                                Long date = convertDate(r.getString("databegin"));
                                String jobid = r.getString("JobID");
-                               if (date == null || jobid == null)
-                                       continue;
-                               RemoveJob(jobid, date);
-                               numremover++;
+                               if (date != null || jobid != null) {
+                                       njobs += RemoveJob(jobid, date);
+                               }
                        }
                }
-               return numremover;
+               return njobs;
        }
 
-       public int RemoveJobBySequence(String seq) {
-               int numremover = 0;
-               if (seq == null)
+       /**
+        * external method for deleting jobs with a protein sequence (strategy 3)
+        * 
+        * @param sequence
+        *            the sequence
+        * 
+        * @return a number of deleted jobs
+        * 
+        */
+       public int RemoveJobBySequence(String sequence) {
+               int njobs = 0;
+               if (sequence == null)
                        return 0;
-               String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + seq + "';";
-               ResultSet results = session.execute(com);
+               String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + sequence + "';";
+               ResultSet results = CassandraQuery(com);
                if (!results.isExhausted()) {
                        List<Row> rows = results.all();
                        for (Row r : rows) {
                                String jobid = r.getString("JobID");
-                               Long date = FindDate(jobid);
-                               RemoveJob(jobid, date);
-                               numremover++;
+                               long date = FindJobDate(jobid);
+                               njobs += RemoveJob(jobid, date);
                        }
                }
-               return numremover;
+               return njobs;
        }
 
-       private Long FindDate(String jobid) {
+       private long FindJobDate(String jobid) {
                String com = "SELECT databegin FROM ProteinLog WHERE JobID = '" + jobid + "';";
-               ResultSet results = session.execute(com);
-               Long date = convertDate(results.one().getString("databegin"));
-               return date;
+               ResultSet results = CassandraQuery(com);
+               if (!results.isExhausted()) {
+                       return convertDate(results.one().getString("databegin"));
+               }
+               return -1L;
        }
 
        private String FindStatus(String jobid) {
-               String status = "UNKNOWN";
-               String command = "SELECT FinalStatus FROM ProteinLog WHERE JobID = '" + jobid + "';";
-               try {
-                       System.out.println("Command: " + command);
-                       ResultSet results = session.execute(command);
-                       Row raw = results.one();
-                       if (null != raw) {
-                               status = raw.getString("FinalStatus");
-                       }
-               } catch (QueryExecutionException e) {
-                       String mess = "CassandraRemover: query execution exception...";
-                       System.out.println(mess);
-                       log.error(mess);
-                       log.error(e.getLocalizedMessage(), e.getCause());
-               } catch (QueryValidationException e) {
-                       String mess = "CassandraRemover: query validation exception... Command: " + command;
-                       System.out.println(mess);
-                       log.error(mess);
-                       log.error(e.getLocalizedMessage(), e.getCause());
+               String com = "SELECT FinalStatus FROM ProteinLog WHERE JobID = '" + jobid + "';";
+               System.out.println("Command: " + com);
+               ResultSet results = CassandraQuery(com);
+               if (!results.isExhausted()) {
+                       return results.one().getString("FinalStatus");
                }
-               System.out.println("*****status: " + status);
-               return status;
+               return "UNKNOWN";
        }
 
        protected long convertDate(String d) {
index 7de48e6..01dc122 100644 (file)
@@ -9,8 +9,8 @@ import com.datastax.driver.core.exceptions.QueryValidationException;
 import compbio.cassandra.CassandraNativeConnector;
 
 public class CassandraReader {
-       private static long earlestDate = 0;
-       private Session session;
+       protected static long earlestDate = 0;
+       protected Session session;
        protected static Logger log = Logger.getLogger(CassandraNativeConnector.class);
 
        public CassandraReader() {
@@ -28,13 +28,13 @@ public class CassandraReader {
                        ResultSet results = session.execute(command);
                        return results;
                } catch (QueryExecutionException e) {
-                       String mess = "CassandraUserManagerImpl.findAllUsers: query execution exception...";
+                       String mess = "ProteoCache Cassandra DB interface: query execution exception...\n   Command: " + command;
                        System.out.println(mess);
                        log.error(mess);
                        log.error(e.getLocalizedMessage(), e.getCause());
                        return null;
                } catch (QueryValidationException e) {
-                       String mess = "CassandraUserManagerImpl.findAllUsers: query validation exception... Command: " + command;
+                       String mess = "CProteoCache Cassandra DB interface: query validation exception...\n   Command: " + command;
                        System.out.println(mess);
                        log.error(mess);
                        log.error(e.getLocalizedMessage(), e.getCause());
index 134a139..32bea66 100644 (file)
@@ -8,7 +8,7 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.ResultSet;
 
-import compbio.cassandra.Pair;
+import compbio.engine.Pair;
 
 public class CassandraReaderExecutionTime {
        private Session session;
@@ -22,10 +22,6 @@ public class CassandraReaderExecutionTime {
                session = s;
        }
 
-       private void setConditions() {
-
-       }
-
        public boolean JobisNotInsterted(String jobid) {
                ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
                if (results1.isExhausted()) {
index bca4666..d019c35 100644 (file)
@@ -11,7 +11,7 @@ import compbio.beans.DateBean;
 import compbio.beans.ExecutionTimeBean;
 import compbio.beans.TotalExecutionTime;
 import compbio.cassandra.DateFormatter;
-import compbio.cassandra.Pair;
+import compbio.engine.Pair;
 
 public class ExecutionTimeReader extends CassandraReader {