import java.util.Date;
import java.util.List;
-import org.apache.log4j.Logger;
-
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-
-public class CassandraRemover {
- private Session session;
- static SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd");
- private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
+import compbio.cassandra.readers.CassandraReader;
- public CassandraRemover() {
- Session inis = CassandraNativeConnector.getSession();
- setSession (inis);
- }
+/**
+ * The class removes jobs from the cassandra database. 4 different strategies
+ * are possiable: 1. remove 1 job with given job ID 2. remove jobs launched from
+ * an IP 3. remove jobs with particular protein sequence 4. remove jobs launched
+ * within a time range (date1, data2)
+ *
+ * @author Alexander Sherstnev
+ * @author Natasha Sherstneva
+ * @version 1.0
+ * @since Nov 2013
+ */
+public class CassandraRemover extends CassandraReader {
+ static SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd");
- public void setSession(Session s) {
- assert s != null;
- session = s;
- }
-
- /*
- * delete a record from CF for current jobId
+ /**
+ * private method for real deleting one job
+ *
+ * @param jobid
+ * job ID
+ * @param date
+ * job execution date
+ *
+ * @return nothing
*/
- private void RemoveJob(String jobid, long date) {
- String com0 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';";
- System.out.println("Command: " + com0);
- session.execute(com0);
- String com1 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';";
+ private int RemoveJob(String jobid, long date) {
+
+ if (date < 0L) {
+ log.error("CassandraRemover error: job " + jobid + " with date " + date
+ + " can not be deleted in JobDateInfo. Daily statistics is inconsistent");
+ return 0;
+ }
+
+ String status = FindStatus(jobid);
+ String com1 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';";
System.out.println("Command: " + com1);
- session.execute(com1);
- String com2 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";";
+ CassandraQuery(com1);
+
+ String com2 = "UPDATE jpredarchive SET finalstatus = 'DELETED' WHERE JobID = '" + jobid + "' ;";
System.out.println("Command: " + com2);
- session.execute(com2);
- String com3 = "UPDATE jpredarchive SET finalstatus = 'DELETED' WHERE JobID = '" + jobid + "' ;";
+ CassandraQuery(com2);
+
+ String com3 = "SELECT * FROM JobDateInfo WHERE jobday = " + date + ";";
System.out.println("Command: " + com3);
- session.execute(com3);
- String com = "SELECT total FROM JobDateInfo WHERE jobday = " + date + ";";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
- long njobs = results.one().getLong("total");
- System.out.println("njobs: " + njobs);
- String com4 = "INSERT INTO JobDateInfo " + "(jobday, Total)" + " VALUES (" + date + "," + (njobs -1) + ");";
- System.out.println("Command: " + com4);
- session.execute(com4);
- System.out.println("Remove jobs: " + jobid);
+ ResultSet results = CassandraQuery(com3);
+ if (results.isExhausted()) {
+ log.error("CassandraRemover error: job " + jobid + " with date " + date
+ + " can not be deleted in JobDateInfo. Daily statistics is inconsistent");
+ return 0;
+ }
+ Row row = results.one();
+ if (status.equals("OK")) {
+ long njobsOK = row.getLong("TotalOK") - 1;
+ String com4 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';";
+ System.out.println("Command: " + com4);
+ CassandraQuery(com4);
+
+ String com5 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";";
+ System.out.println("Command: " + com5);
+ CassandraQuery(com5);
+ UpdateJobDateInfo(date, "TotalOK", njobsOK);
+ } else {
+ String com6 = "DELETE FROM FailLog WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";";
+ System.out.println("Command: " + com6);
+ CassandraQuery(com6);
+ if (status.equals("STOPPED")) {
+ long njobsStopped = row.getLong("TotalStopped") - 1;
+ UpdateJobDateInfo(date, "TotalStopped", njobsStopped);
+ } else if (status.equals("ERROR")) {
+ long njobsError = row.getLong("TotalError") - 1;
+ UpdateJobDateInfo(date, "TotalError", njobsError);
+ } else if (status.equals("TIMEDOUT")) {
+ long njobsTimeOut = row.getLong("TotalTimeOut") - 1;
+ UpdateJobDateInfo(date, "TotalTimeOut", njobsTimeOut);
+ }
+ }
+ System.out.println("Job " + jobid + " removed...");
+ return 1;
}
-
- public void RemoveJobById (String jobid) {
- if (jobid == null)
- return;
- Long date = FindDate(jobid);
- if (date == null)
- return;
- RemoveJob(jobid, date);
+
+ /**
+ * update a pariticular column in the JobDateInfo table
+ *
+ * @param jobid
+ * job ID
+ *
+ * @return nothing
+ *
+ */
+ private void UpdateJobDateInfo(long date, String ColumnName, long totalcol) {
+ String com = "UPDATE JobDateInfo SET " + ColumnName + " = " + totalcol + " WHERE jobday = " + date + ";";
+ CassandraQuery(com);
}
-
- public void RemoveJobByDate (String date1, String date2) {
- System.out.println("Start " + date1 + ", " + date2);
- if (date1 == null || date2 == null)
- return;
+
+ /**
+ * external method for deleting job with given job ID (strategy 1)
+ *
+ * @param jobid
+ * job ID
+ *
+ * @return a number of deleted jobs
+ *
+ */
+ public int RemoveJobById(String jobid) {
+ if (jobid == null)
+ return 0;
+ long date = FindJobDate(jobid);
+ return RemoveJob(jobid, date);
+ }
+
+ /**
+ * external method for deleting jobs within a time range (strategy 4)
+ *
+ * @param date1
+ * starting date
+ *
+ * @param date2
+ * ending date
+ *
+ * @return a number of deleted jobs
+ *
+ */
+ public int RemoveJobByDate(String date1, String date2) {
+ if (date1 == null || date2 == null)
+ return 0;
+
+ int njobs = 0;
Long dateBegin = convertDate(date1);
Long dateEnd = convertDate(date2);
- System.out.println("Date to long done!: ");
- if (dateBegin == null || dateEnd == null)
- return;
Calendar start = Calendar.getInstance();
start.setTime(new Date(dateBegin));
Calendar end = Calendar.getInstance();
end.setTime(new Date(dateEnd));
- System.out.println("Date to cal done!: ");
+
for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) {
- String com = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
+ String com1 = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";";
+ System.out.println("Command: " + com1);
+ ResultSet results = CassandraQuery(com1);
if (!results.isExhausted()) {
List<Row> rows = results.all();
for (Row r : rows) {
String jobid = r.getString("JobID");
- if (jobid != null)
- RemoveJob(jobid, date.getTime());
+ if (jobid != null) {
+ njobs += RemoveJob(jobid, date.getTime());
+ }
+ }
+ }
+
+ String com2 = "SELECT JobID FROM FailLog WHERE jobtime = " + date.getTime() + ";";
+ ResultSet resultsfail = CassandraQuery(com2);
+ if (!resultsfail.isExhausted()) {
+ List<Row> rows = resultsfail.all();
+ for (Row r : rows) {
+ String jobid = r.getString("JobID");
+ if (jobid != null) {
+ njobs += RemoveJob(jobid, date.getTime());
+ }
}
}
}
+ return njobs;
}
-
-
- public void RemoveJobByIp (String ip) {
- if (ip == null)
- return;
+
+ /**
+ * external method for deleting jobs launched from a particular IP (strategy
+ * 2)
+ *
+ * @param ip
+ * the IP
+ *
+ * @return a number of deleted jobs
+ *
+ */
+ public int RemoveJobByIp(String ip) {
+ int njobs = 0;
+ if (ip == null)
+ return 0;
String com = "SELECT databegin, JobID FROM ProteinLog WHERE ip = '" + ip + "';";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
+ ResultSet results = CassandraQuery(com);
if (!results.isExhausted()) {
List<Row> rows = results.all();
for (Row r : rows) {
Long date = convertDate(r.getString("databegin"));
String jobid = r.getString("JobID");
- if (date == null || jobid == null)
- continue;
- RemoveJob(jobid, date);
+ if (date != null || jobid != null) {
+ njobs += RemoveJob(jobid, date);
+ }
}
}
+ return njobs;
}
-
- public void RemoveJobBySequence (String seq) {
- if (seq == null)
- return;
- String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + seq + "';";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
+
+ /**
+ * external method for deleting jobs with a protein sequence (strategy 3)
+ *
+ * @param sequence
+ * the sequence
+ *
+ * @return a number of deleted jobs
+ *
+ */
+ public int RemoveJobBySequence(String sequence) {
+ int njobs = 0;
+ if (sequence == null)
+ return 0;
+ String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + sequence + "';";
+ ResultSet results = CassandraQuery(com);
if (!results.isExhausted()) {
List<Row> rows = results.all();
for (Row r : rows) {
String jobid = r.getString("JobID");
- if (jobid == null)
- continue;
- Long date = FindDate(jobid);
- if (date == null)
- continue;
- RemoveJob(jobid, date);
+ long date = FindJobDate(jobid);
+ njobs += RemoveJob(jobid, date);
}
}
+ return njobs;
}
-
- private Long FindDate(String jobid) {
+
+ private long FindJobDate(String jobid) {
String com = "SELECT databegin FROM ProteinLog WHERE JobID = '" + jobid + "';";
+ ResultSet results = CassandraQuery(com);
+ if (!results.isExhausted()) {
+ return convertDate(results.one().getString("databegin"));
+ }
+ return -1L;
+ }
+
+ private String FindStatus(String jobid) {
+ String com = "SELECT FinalStatus FROM ProteinLog WHERE JobID = '" + jobid + "';";
System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
- if (results.isExhausted())
- return null;
- Long date = convertDate(results.one().getString("databegin"));
- return date;
+ ResultSet results = CassandraQuery(com);
+ if (!results.isExhausted()) {
+ return results.one().getString("FinalStatus");
+ }
+ return "UNKNOWN";
}
-
-
- protected long convertDate (String d) {
+
+ protected long convertDate(String d) {
try {
if (null != d) {
Date startdate = dateformatter.parse(d);