From cbdbc382e3dc3c0f3fc406d68945b618a137d766 Mon Sep 17 00:00:00 2001 From: Sasha Sherstnev Date: Mon, 27 Jan 2014 14:49:54 +0000 Subject: [PATCH] Improve stability of Cassandra API calls (commands are checked) --- .../compbio/cassandra/CassandraNewTableWriter.java | 3 - datadb/compbio/cassandra/CassandraReaderOld.java | 3 +- datadb/compbio/cassandra/CassandraRemover.java | 241 ++++++++++++-------- .../compbio/cassandra/readers/CassandraReader.java | 8 +- .../readers/CassandraReaderExecutionTime.java | 6 +- .../cassandra/readers/ExecutionTimeReader.java | 2 +- 6 files changed, 153 insertions(+), 110 deletions(-) diff --git a/datadb/compbio/cassandra/CassandraNewTableWriter.java b/datadb/compbio/cassandra/CassandraNewTableWriter.java index 4ee10c8..3b6c741 100644 --- a/datadb/compbio/cassandra/CassandraNewTableWriter.java +++ b/datadb/compbio/cassandra/CassandraNewTableWriter.java @@ -9,11 +9,8 @@ import org.apache.log4j.Logger; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.exceptions.QueryExecutionException; -import compbio.engine.ProteoCachePropertyHelperManager; import compbio.cassandra.CassandraNativeConnector; public class CassandraNewTableWriter { diff --git a/datadb/compbio/cassandra/CassandraReaderOld.java b/datadb/compbio/cassandra/CassandraReaderOld.java index ff37145..6055e60 100644 --- a/datadb/compbio/cassandra/CassandraReaderOld.java +++ b/datadb/compbio/cassandra/CassandraReaderOld.java @@ -17,6 +17,7 @@ import compbio.beans.JobBean; import compbio.beans.ProteinBean; import compbio.beans.Total; import compbio.engine.JobStatus; +import compbio.engine.Pair; public class CassandraReaderOld { private Session session; @@ -255,7 +256,6 @@ public class CassandraReaderOld { int c = 0; for (Row r : rows) { String protein = r.getString("Protein"); - String id = r.getString("JobID"); if (res.containsKey(protein)) res.put(protein, res.get(protein) + 1); else @@ -284,7 +284,6 @@ public class CassandraReaderOld { int c = 0; for (Row r : rows) { String ip = r.getString("ip"); - String id = r.getString("JobID"); if (res.containsKey(ip)) res.put(ip, res.get(ip) + 1); else diff --git a/datadb/compbio/cassandra/CassandraRemover.java b/datadb/compbio/cassandra/CassandraRemover.java index 76cc787..d7aeafb 100644 --- a/datadb/compbio/cassandra/CassandraRemover.java +++ b/datadb/compbio/cassandra/CassandraRemover.java @@ -6,58 +6,75 @@ import java.util.Calendar; import java.util.Date; import java.util.List; -import org.apache.log4j.Logger; - import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.exceptions.QueryExecutionException; -import com.datastax.driver.core.exceptions.QueryValidationException; +import compbio.cassandra.readers.CassandraReader; -public class CassandraRemover { - private Session session; +/** + * The class removes jobs from the cassandra database. 4 different strategies + * are possiable: 1. remove 1 job with given job ID 2. remove jobs launched from + * an IP 3. remove jobs with particular protein sequence 4. remove jobs launched + * within a time range (date1, data2) + * + * @author Alexander Sherstnev + * @author Natasha Sherstneva + * @version 1.0 + * @since Nov 2013 + */ +public class CassandraRemover extends CassandraReader { static SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd"); - private static Logger log = Logger.getLogger(CassandraNativeConnector.class); - public CassandraRemover() { - Session inis = CassandraNativeConnector.getSession(); - setSession(inis); - } + /** + * private method for real deleting one job + * + * @param jobid + * job ID + * @param date + * job execution date + * + * @return nothing + */ + private int RemoveJob(String jobid, long date) { - public void setSession(Session s) { - assert s != null; - session = s; - } + if (date < 0L) { + log.error("CassandraRemover error: job " + jobid + " with date " + date + + " can not be deleted in JobDateInfo. Daily statistics is inconsistent"); + return 0; + } - /* - * delete a record from CF for current jobId - */ - private void RemoveJob(String jobid, long date) { String status = FindStatus(jobid); - String com0 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';"; - System.out.println("Command: " + com0); - session.execute(com0); - String com3 = "UPDATE jpredarchive SET finalstatus = 'DELETED' WHERE JobID = '" + jobid + "' ;"; + String com1 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';"; + System.out.println("Command: " + com1); + CassandraQuery(com1); + + String com2 = "UPDATE jpredarchive SET finalstatus = 'DELETED' WHERE JobID = '" + jobid + "' ;"; + System.out.println("Command: " + com2); + CassandraQuery(com2); + + String com3 = "SELECT * FROM JobDateInfo WHERE jobday = " + date + ";"; System.out.println("Command: " + com3); - session.execute(com3); - String com = "SELECT * FROM JobDateInfo WHERE jobday = " + date + ";"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); + ResultSet results = CassandraQuery(com3); + if (results.isExhausted()) { + log.error("CassandraRemover error: job " + jobid + " with date " + date + + " can not be deleted in JobDateInfo. Daily statistics is inconsistent"); + return 0; + } Row row = results.one(); long njobs = row.getLong("Total") - 1; if (status.equals("OK")) { long njobsOK = row.getLong("TotalOK") - 1; - String com1 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';"; - System.out.println("Command: " + com1); - session.execute(com1); - String com2 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";"; - System.out.println("Command: " + com2); - session.execute(com2); + String com4 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';"; + System.out.println("Command: " + com4); + CassandraQuery(com4); + + String com5 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";"; + System.out.println("Command: " + com5); + CassandraQuery(com5); UpdateJobDateInfo(date, "TotalOK", njobsOK, njobs); } else { String com6 = "DELETE FROM FailLog WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";"; System.out.println("Command: " + com6); - session.execute(com6); + CassandraQuery(com6); if (status.equals("STOPPED")) { long njobsStopped = row.getLong("TotalStopped") - 1; UpdateJobDateInfo(date, "TotalStopped", njobsStopped, njobs); @@ -69,132 +86,166 @@ public class CassandraRemover { UpdateJobDateInfo(date, "TotalTimeOut", njobsTimeOut, njobs); } } - System.out.println("Remove jobs: " + jobid); + System.out.println("Job " + jobid + " removed..."); + return 1; } + /** + * update a pariticular column in the JobDateInfo table + * + * @param jobid + * job ID + * + * @return nothing + * + */ private void UpdateJobDateInfo(long date, String ColumnName, long totalcol, long total) { - String com4 = "UPDATE JobDateInfo SET " + ColumnName + " = " + totalcol + ", Total = " + total + " WHERE jobday = " + date + ";"; - System.out.println("Command: " + com4); - session.execute(com4); + String com = "UPDATE JobDateInfo SET " + ColumnName + " = " + totalcol + ", Total = " + total + " WHERE jobday = " + date + ";"; + System.out.println("Command: " + com); + CassandraQuery(com); } + /** + * external method for deleting job with given job ID (strategy 1) + * + * @param jobid + * job ID + * + * @return a number of deleted jobs + * + */ public int RemoveJobById(String jobid) { if (jobid == null) return 0; - Long date = FindDate(jobid); - RemoveJob(jobid, date); - return 1; + long date = FindJobDate(jobid); + return RemoveJob(jobid, date); } + /** + * external method for deleting jobs within a time range (strategy 4) + * + * @param date1 + * starting date + * + * @param date2 + * ending date + * + * @return a number of deleted jobs + * + */ public int RemoveJobByDate(String date1, String date2) { - int numremover = 0; if (date1 == null || date2 == null) return 0; + + int njobs = 0; Long dateBegin = convertDate(date1); Long dateEnd = convertDate(date2); Calendar start = Calendar.getInstance(); start.setTime(new Date(dateBegin)); Calendar end = Calendar.getInstance(); end.setTime(new Date(dateEnd)); + for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) { - System.out.println("--------------------------------------------------------------------: "); - String com = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); + String com1 = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";"; + System.out.println("Command: " + com1); + ResultSet results = CassandraQuery(com1); if (!results.isExhausted()) { List rows = results.all(); for (Row r : rows) { String jobid = r.getString("JobID"); if (jobid != null) { - RemoveJob(jobid, date.getTime()); - numremover++; + njobs += RemoveJob(jobid, date.getTime()); } } } - String comm = "SELECT JobID FROM FailLog WHERE jobtime = " + date.getTime() + ";"; - ResultSet resultsfail = session.execute(comm); + + String com2 = "SELECT JobID FROM FailLog WHERE jobtime = " + date.getTime() + ";"; + ResultSet resultsfail = CassandraQuery(com2); if (!resultsfail.isExhausted()) { List rows = resultsfail.all(); for (Row r : rows) { String jobid = r.getString("JobID"); if (jobid != null) { - RemoveJob(jobid, date.getTime()); - numremover++; + njobs += RemoveJob(jobid, date.getTime()); } } } } - return numremover; + return njobs; } + /** + * external method for deleting jobs launched from a particular IP (strategy + * 2) + * + * @param ip + * the IP + * + * @return a number of deleted jobs + * + */ public int RemoveJobByIp(String ip) { - int numremover = 0; + int njobs = 0; if (ip == null) return 0; String com = "SELECT databegin, JobID FROM ProteinLog WHERE ip = '" + ip + "';"; - ResultSet results = session.execute(com); + ResultSet results = CassandraQuery(com); if (!results.isExhausted()) { List rows = results.all(); for (Row r : rows) { Long date = convertDate(r.getString("databegin")); String jobid = r.getString("JobID"); - if (date == null || jobid == null) - continue; - RemoveJob(jobid, date); - numremover++; + if (date != null || jobid != null) { + njobs += RemoveJob(jobid, date); + } } } - return numremover; + return njobs; } - public int RemoveJobBySequence(String seq) { - int numremover = 0; - if (seq == null) + /** + * external method for deleting jobs with a protein sequence (strategy 3) + * + * @param sequence + * the sequence + * + * @return a number of deleted jobs + * + */ + public int RemoveJobBySequence(String sequence) { + int njobs = 0; + if (sequence == null) return 0; - String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + seq + "';"; - ResultSet results = session.execute(com); + String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + sequence + "';"; + ResultSet results = CassandraQuery(com); if (!results.isExhausted()) { List rows = results.all(); for (Row r : rows) { String jobid = r.getString("JobID"); - Long date = FindDate(jobid); - RemoveJob(jobid, date); - numremover++; + long date = FindJobDate(jobid); + njobs += RemoveJob(jobid, date); } } - return numremover; + return njobs; } - private Long FindDate(String jobid) { + private long FindJobDate(String jobid) { String com = "SELECT databegin FROM ProteinLog WHERE JobID = '" + jobid + "';"; - ResultSet results = session.execute(com); - Long date = convertDate(results.one().getString("databegin")); - return date; + ResultSet results = CassandraQuery(com); + if (!results.isExhausted()) { + return convertDate(results.one().getString("databegin")); + } + return -1L; } private String FindStatus(String jobid) { - String status = "UNKNOWN"; - String command = "SELECT FinalStatus FROM ProteinLog WHERE JobID = '" + jobid + "';"; - try { - System.out.println("Command: " + command); - ResultSet results = session.execute(command); - Row raw = results.one(); - if (null != raw) { - status = raw.getString("FinalStatus"); - } - } catch (QueryExecutionException e) { - String mess = "CassandraRemover: query execution exception..."; - System.out.println(mess); - log.error(mess); - log.error(e.getLocalizedMessage(), e.getCause()); - } catch (QueryValidationException e) { - String mess = "CassandraRemover: query validation exception... Command: " + command; - System.out.println(mess); - log.error(mess); - log.error(e.getLocalizedMessage(), e.getCause()); + String com = "SELECT FinalStatus FROM ProteinLog WHERE JobID = '" + jobid + "';"; + System.out.println("Command: " + com); + ResultSet results = CassandraQuery(com); + if (!results.isExhausted()) { + return results.one().getString("FinalStatus"); } - System.out.println("*****status: " + status); - return status; + return "UNKNOWN"; } protected long convertDate(String d) { diff --git a/datadb/compbio/cassandra/readers/CassandraReader.java b/datadb/compbio/cassandra/readers/CassandraReader.java index 7de48e6..01dc122 100644 --- a/datadb/compbio/cassandra/readers/CassandraReader.java +++ b/datadb/compbio/cassandra/readers/CassandraReader.java @@ -9,8 +9,8 @@ import com.datastax.driver.core.exceptions.QueryValidationException; import compbio.cassandra.CassandraNativeConnector; public class CassandraReader { - private static long earlestDate = 0; - private Session session; + protected static long earlestDate = 0; + protected Session session; protected static Logger log = Logger.getLogger(CassandraNativeConnector.class); public CassandraReader() { @@ -28,13 +28,13 @@ public class CassandraReader { ResultSet results = session.execute(command); return results; } catch (QueryExecutionException e) { - String mess = "CassandraUserManagerImpl.findAllUsers: query execution exception..."; + String mess = "ProteoCache Cassandra DB interface: query execution exception...\n Command: " + command; System.out.println(mess); log.error(mess); log.error(e.getLocalizedMessage(), e.getCause()); return null; } catch (QueryValidationException e) { - String mess = "CassandraUserManagerImpl.findAllUsers: query validation exception... Command: " + command; + String mess = "CProteoCache Cassandra DB interface: query validation exception...\n Command: " + command; System.out.println(mess); log.error(mess); log.error(e.getLocalizedMessage(), e.getCause()); diff --git a/datadb/compbio/cassandra/readers/CassandraReaderExecutionTime.java b/datadb/compbio/cassandra/readers/CassandraReaderExecutionTime.java index 134a139..32bea66 100644 --- a/datadb/compbio/cassandra/readers/CassandraReaderExecutionTime.java +++ b/datadb/compbio/cassandra/readers/CassandraReaderExecutionTime.java @@ -8,7 +8,7 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.ResultSet; -import compbio.cassandra.Pair; +import compbio.engine.Pair; public class CassandraReaderExecutionTime { private Session session; @@ -22,10 +22,6 @@ public class CassandraReaderExecutionTime { session = s; } - private void setConditions() { - - } - public boolean JobisNotInsterted(String jobid) { ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';"); if (results1.isExhausted()) { diff --git a/datadb/compbio/cassandra/readers/ExecutionTimeReader.java b/datadb/compbio/cassandra/readers/ExecutionTimeReader.java index bca4666..d019c35 100644 --- a/datadb/compbio/cassandra/readers/ExecutionTimeReader.java +++ b/datadb/compbio/cassandra/readers/ExecutionTimeReader.java @@ -11,7 +11,7 @@ import compbio.beans.DateBean; import compbio.beans.ExecutionTimeBean; import compbio.beans.TotalExecutionTime; import compbio.cassandra.DateFormatter; -import compbio.cassandra.Pair; +import compbio.engine.Pair; public class ExecutionTimeReader extends CassandraReader { -- 1.7.10.2