import java.util.Date;
import java.util.List;
-import org.apache.log4j.Logger;
-
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.QueryExecutionException;
-import com.datastax.driver.core.exceptions.QueryValidationException;
+import compbio.cassandra.readers.CassandraReader;
-public class CassandraRemover {
- private Session session;
+/**
+ * The class removes jobs from the cassandra database. 4 different strategies
+ * are possiable: 1. remove 1 job with given job ID 2. remove jobs launched from
+ * an IP 3. remove jobs with particular protein sequence 4. remove jobs launched
+ * within a time range (date1, data2)
+ *
+ * @author Alexander Sherstnev
+ * @author Natasha Sherstneva
+ * @version 1.0
+ * @since Nov 2013
+ */
+public class CassandraRemover extends CassandraReader {
static SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd");
- private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
- public CassandraRemover() {
- Session inis = CassandraNativeConnector.getSession();
- setSession(inis);
- }
+ /**
+ * private method for real deleting one job
+ *
+ * @param jobid
+ * job ID
+ * @param date
+ * job execution date
+ *
+ * @return nothing
+ */
+ private int RemoveJob(String jobid, long date) {
- public void setSession(Session s) {
- assert s != null;
- session = s;
- }
+ if (date < 0L) {
+ log.error("CassandraRemover error: job " + jobid + " with date " + date
+ + " can not be deleted in JobDateInfo. Daily statistics is inconsistent");
+ return 0;
+ }
- /*
- * delete a record from CF for current jobId
- */
- private void RemoveJob(String jobid, long date) {
String status = FindStatus(jobid);
- String com0 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';";
- System.out.println("Command: " + com0);
- session.execute(com0);
- String com3 = "UPDATE jpredarchive SET finalstatus = 'DELETED' WHERE JobID = '" + jobid + "' ;";
+ String com1 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';";
+ System.out.println("Command: " + com1);
+ CassandraQuery(com1);
+
+ String com2 = "UPDATE jpredarchive SET finalstatus = 'DELETED' WHERE JobID = '" + jobid + "' ;";
+ System.out.println("Command: " + com2);
+ CassandraQuery(com2);
+
+ String com3 = "SELECT * FROM JobDateInfo WHERE jobday = " + date + ";";
System.out.println("Command: " + com3);
- session.execute(com3);
- String com = "SELECT * FROM JobDateInfo WHERE jobday = " + date + ";";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
+ ResultSet results = CassandraQuery(com3);
+ if (results.isExhausted()) {
+ log.error("CassandraRemover error: job " + jobid + " with date " + date
+ + " can not be deleted in JobDateInfo. Daily statistics is inconsistent");
+ return 0;
+ }
Row row = results.one();
long njobs = row.getLong("Total") - 1;
if (status.equals("OK")) {
long njobsOK = row.getLong("TotalOK") - 1;
- String com1 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';";
- System.out.println("Command: " + com1);
- session.execute(com1);
- String com2 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";";
- System.out.println("Command: " + com2);
- session.execute(com2);
+ String com4 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';";
+ System.out.println("Command: " + com4);
+ CassandraQuery(com4);
+
+ String com5 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";";
+ System.out.println("Command: " + com5);
+ CassandraQuery(com5);
UpdateJobDateInfo(date, "TotalOK", njobsOK, njobs);
} else {
String com6 = "DELETE FROM FailLog WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";";
System.out.println("Command: " + com6);
- session.execute(com6);
+ CassandraQuery(com6);
if (status.equals("STOPPED")) {
long njobsStopped = row.getLong("TotalStopped") - 1;
UpdateJobDateInfo(date, "TotalStopped", njobsStopped, njobs);
UpdateJobDateInfo(date, "TotalTimeOut", njobsTimeOut, njobs);
}
}
- System.out.println("Remove jobs: " + jobid);
+ System.out.println("Job " + jobid + " removed...");
+ return 1;
}
+ /**
+ * update a pariticular column in the JobDateInfo table
+ *
+ * @param jobid
+ * job ID
+ *
+ * @return nothing
+ *
+ */
private void UpdateJobDateInfo(long date, String ColumnName, long totalcol, long total) {
- String com4 = "UPDATE JobDateInfo SET " + ColumnName + " = " + totalcol + ", Total = " + total + " WHERE jobday = " + date + ";";
- System.out.println("Command: " + com4);
- session.execute(com4);
+ String com = "UPDATE JobDateInfo SET " + ColumnName + " = " + totalcol + ", Total = " + total + " WHERE jobday = " + date + ";";
+ System.out.println("Command: " + com);
+ CassandraQuery(com);
}
+ /**
+ * external method for deleting job with given job ID (strategy 1)
+ *
+ * @param jobid
+ * job ID
+ *
+ * @return a number of deleted jobs
+ *
+ */
public int RemoveJobById(String jobid) {
if (jobid == null)
return 0;
- Long date = FindDate(jobid);
- RemoveJob(jobid, date);
- return 1;
+ long date = FindJobDate(jobid);
+ return RemoveJob(jobid, date);
}
+ /**
+ * external method for deleting jobs within a time range (strategy 4)
+ *
+ * @param date1
+ * starting date
+ *
+ * @param date2
+ * ending date
+ *
+ * @return a number of deleted jobs
+ *
+ */
public int RemoveJobByDate(String date1, String date2) {
- int numremover = 0;
if (date1 == null || date2 == null)
return 0;
+
+ int njobs = 0;
Long dateBegin = convertDate(date1);
Long dateEnd = convertDate(date2);
Calendar start = Calendar.getInstance();
start.setTime(new Date(dateBegin));
Calendar end = Calendar.getInstance();
end.setTime(new Date(dateEnd));
+
for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) {
- System.out.println("--------------------------------------------------------------------: ");
- String com = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
+ String com1 = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";";
+ System.out.println("Command: " + com1);
+ ResultSet results = CassandraQuery(com1);
if (!results.isExhausted()) {
List<Row> rows = results.all();
for (Row r : rows) {
String jobid = r.getString("JobID");
if (jobid != null) {
- RemoveJob(jobid, date.getTime());
- numremover++;
+ njobs += RemoveJob(jobid, date.getTime());
}
}
}
- String comm = "SELECT JobID FROM FailLog WHERE jobtime = " + date.getTime() + ";";
- ResultSet resultsfail = session.execute(comm);
+
+ String com2 = "SELECT JobID FROM FailLog WHERE jobtime = " + date.getTime() + ";";
+ ResultSet resultsfail = CassandraQuery(com2);
if (!resultsfail.isExhausted()) {
List<Row> rows = resultsfail.all();
for (Row r : rows) {
String jobid = r.getString("JobID");
if (jobid != null) {
- RemoveJob(jobid, date.getTime());
- numremover++;
+ njobs += RemoveJob(jobid, date.getTime());
}
}
}
}
- return numremover;
+ return njobs;
}
+ /**
+ * external method for deleting jobs launched from a particular IP (strategy
+ * 2)
+ *
+ * @param ip
+ * the IP
+ *
+ * @return a number of deleted jobs
+ *
+ */
public int RemoveJobByIp(String ip) {
- int numremover = 0;
+ int njobs = 0;
if (ip == null)
return 0;
String com = "SELECT databegin, JobID FROM ProteinLog WHERE ip = '" + ip + "';";
- ResultSet results = session.execute(com);
+ ResultSet results = CassandraQuery(com);
if (!results.isExhausted()) {
List<Row> rows = results.all();
for (Row r : rows) {
Long date = convertDate(r.getString("databegin"));
String jobid = r.getString("JobID");
- if (date == null || jobid == null)
- continue;
- RemoveJob(jobid, date);
- numremover++;
+ if (date != null || jobid != null) {
+ njobs += RemoveJob(jobid, date);
+ }
}
}
- return numremover;
+ return njobs;
}
- public int RemoveJobBySequence(String seq) {
- int numremover = 0;
- if (seq == null)
+ /**
+ * external method for deleting jobs with a protein sequence (strategy 3)
+ *
+ * @param sequence
+ * the sequence
+ *
+ * @return a number of deleted jobs
+ *
+ */
+ public int RemoveJobBySequence(String sequence) {
+ int njobs = 0;
+ if (sequence == null)
return 0;
- String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + seq + "';";
- ResultSet results = session.execute(com);
+ String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + sequence + "';";
+ ResultSet results = CassandraQuery(com);
if (!results.isExhausted()) {
List<Row> rows = results.all();
for (Row r : rows) {
String jobid = r.getString("JobID");
- Long date = FindDate(jobid);
- RemoveJob(jobid, date);
- numremover++;
+ long date = FindJobDate(jobid);
+ njobs += RemoveJob(jobid, date);
}
}
- return numremover;
+ return njobs;
}
- private Long FindDate(String jobid) {
+ private long FindJobDate(String jobid) {
String com = "SELECT databegin FROM ProteinLog WHERE JobID = '" + jobid + "';";
- ResultSet results = session.execute(com);
- Long date = convertDate(results.one().getString("databegin"));
- return date;
+ ResultSet results = CassandraQuery(com);
+ if (!results.isExhausted()) {
+ return convertDate(results.one().getString("databegin"));
+ }
+ return -1L;
}
private String FindStatus(String jobid) {
- String status = "UNKNOWN";
- String command = "SELECT FinalStatus FROM ProteinLog WHERE JobID = '" + jobid + "';";
- try {
- System.out.println("Command: " + command);
- ResultSet results = session.execute(command);
- Row raw = results.one();
- if (null != raw) {
- status = raw.getString("FinalStatus");
- }
- } catch (QueryExecutionException e) {
- String mess = "CassandraRemover: query execution exception...";
- System.out.println(mess);
- log.error(mess);
- log.error(e.getLocalizedMessage(), e.getCause());
- } catch (QueryValidationException e) {
- String mess = "CassandraRemover: query validation exception... Command: " + command;
- System.out.println(mess);
- log.error(mess);
- log.error(e.getLocalizedMessage(), e.getCause());
+ String com = "SELECT FinalStatus FROM ProteinLog WHERE JobID = '" + jobid + "';";
+ System.out.println("Command: " + com);
+ ResultSet results = CassandraQuery(com);
+ if (!results.isExhausted()) {
+ return results.one().getString("FinalStatus");
}
- System.out.println("*****status: " + status);
- return status;
+ return "UNKNOWN";
}
protected long convertDate(String d) {