package compbio.cassandra; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.List; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import compbio.cassandra.readers.CassandraReader; /** * The class removes jobs from the cassandra database. 4 different strategies * are possiable: 1. remove 1 job with given job ID 2. remove jobs launched from * an IP 3. remove jobs with particular protein sequence 4. remove jobs launched * within a time range (date1, data2) * * @author Alexander Sherstnev * @author Natasha Sherstneva * @version 1.0 * @since Nov 2013 */ public class CassandraRemover extends CassandraReader { static SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd"); /** * private method for real deleting one job * * @param jobid * job ID * @param date * job execution date * * @return nothing */ private int RemoveJob(String jobid, long date) { if (date < 0L) { log.error("CassandraRemover error: job " + jobid + " with date " + date + " can not be deleted in JobDateInfo. Daily statistics is inconsistent"); return 0; } String status = FindStatus(jobid); String com1 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';"; System.out.println("Command: " + com1); CassandraQuery(com1); String com2 = "UPDATE jpredarchive SET finalstatus = 'DELETED' WHERE JobID = '" + jobid + "' ;"; System.out.println("Command: " + com2); CassandraQuery(com2); String com3 = "SELECT * FROM JobDateInfo WHERE jobday = " + date + ";"; System.out.println("Command: " + com3); ResultSet results = CassandraQuery(com3); if (results.isExhausted()) { log.error("CassandraRemover error: job " + jobid + " with date " + date + " can not be deleted in JobDateInfo. Daily statistics is inconsistent"); return 0; } Row row = results.one(); if (status.equals("OK")) { long njobsOK = row.getLong("TotalOK") - 1; String com4 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';"; System.out.println("Command: " + com4); CassandraQuery(com4); String com5 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";"; System.out.println("Command: " + com5); CassandraQuery(com5); UpdateJobDateInfo(date, "TotalOK", njobsOK); } else { String com6 = "DELETE FROM FailLog WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";"; System.out.println("Command: " + com6); CassandraQuery(com6); if (status.equals("STOPPED")) { long njobsStopped = row.getLong("TotalStopped") - 1; UpdateJobDateInfo(date, "TotalStopped", njobsStopped); } else if (status.equals("ERROR")) { long njobsError = row.getLong("TotalError") - 1; UpdateJobDateInfo(date, "TotalError", njobsError); } else if (status.equals("TIMEDOUT")) { long njobsTimeOut = row.getLong("TotalTimeOut") - 1; UpdateJobDateInfo(date, "TotalTimeOut", njobsTimeOut); } } System.out.println("Job " + jobid + " removed..."); return 1; } /** * update a pariticular column in the JobDateInfo table * * @param jobid * job ID * * @return nothing * */ private void UpdateJobDateInfo(long date, String ColumnName, long totalcol) { String com = "UPDATE JobDateInfo SET " + ColumnName + " = " + totalcol + " WHERE jobday = " + date + ";"; CassandraQuery(com); } /** * external method for deleting job with given job ID (strategy 1) * * @param jobid * job ID * * @return a number of deleted jobs * */ public int RemoveJobById(String jobid) { if (jobid == null) return 0; long date = FindJobDate(jobid); return RemoveJob(jobid, date); } /** * external method for deleting jobs within a time range (strategy 4) * * @param date1 * starting date * * @param date2 * ending date * * @return a number of deleted jobs * */ public int RemoveJobByDate(String date1, String date2) { if (date1 == null || date2 == null) return 0; int njobs = 0; Long dateBegin = convertDate(date1); Long dateEnd = convertDate(date2); Calendar start = Calendar.getInstance(); start.setTime(new Date(dateBegin)); Calendar end = Calendar.getInstance(); end.setTime(new Date(dateEnd)); for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) { String com1 = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";"; System.out.println("Command: " + com1); ResultSet results = CassandraQuery(com1); if (!results.isExhausted()) { List rows = results.all(); for (Row r : rows) { String jobid = r.getString("JobID"); if (jobid != null) { njobs += RemoveJob(jobid, date.getTime()); } } } String com2 = "SELECT JobID FROM FailLog WHERE jobtime = " + date.getTime() + ";"; ResultSet resultsfail = CassandraQuery(com2); if (!resultsfail.isExhausted()) { List rows = resultsfail.all(); for (Row r : rows) { String jobid = r.getString("JobID"); if (jobid != null) { njobs += RemoveJob(jobid, date.getTime()); } } } } return njobs; } /** * external method for deleting jobs launched from a particular IP (strategy * 2) * * @param ip * the IP * * @return a number of deleted jobs * */ public int RemoveJobByIp(String ip) { int njobs = 0; if (ip == null) return 0; String com = "SELECT databegin, JobID FROM ProteinLog WHERE ip = '" + ip + "';"; ResultSet results = CassandraQuery(com); if (!results.isExhausted()) { List rows = results.all(); for (Row r : rows) { Long date = convertDate(r.getString("databegin")); String jobid = r.getString("JobID"); if (date != null || jobid != null) { njobs += RemoveJob(jobid, date); } } } return njobs; } /** * external method for deleting jobs with a protein sequence (strategy 3) * * @param sequence * the sequence * * @return a number of deleted jobs * */ public int RemoveJobBySequence(String sequence) { int njobs = 0; if (sequence == null) return 0; String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + sequence + "';"; ResultSet results = CassandraQuery(com); if (!results.isExhausted()) { List rows = results.all(); for (Row r : rows) { String jobid = r.getString("JobID"); long date = FindJobDate(jobid); njobs += RemoveJob(jobid, date); } } return njobs; } private long FindJobDate(String jobid) { String com = "SELECT databegin FROM ProteinLog WHERE JobID = '" + jobid + "';"; ResultSet results = CassandraQuery(com); if (!results.isExhausted()) { return convertDate(results.one().getString("databegin")); } return -1L; } private String FindStatus(String jobid) { String com = "SELECT FinalStatus FROM ProteinLog WHERE JobID = '" + jobid + "';"; System.out.println("Command: " + com); ResultSet results = CassandraQuery(com); if (!results.isExhausted()) { return results.one().getString("FinalStatus"); } return "UNKNOWN"; } protected long convertDate(String d) { try { if (null != d) { Date startdate = dateformatter.parse(d); return startdate.getTime(); } } catch (ParseException e) { e.printStackTrace(); } return 0L; } }