package compbio.cassandra; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.List; import org.apache.log4j.Logger; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; public class CassandraRemover { private Session session; static SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd"); private static Logger log = Logger.getLogger(CassandraNativeConnector.class); public CassandraRemover() { Session inis = CassandraNativeConnector.getSession(); setSession(inis); } public void setSession(Session s) { assert s != null; session = s; } /* * delete a record from CF for current jobId */ private void RemoveJob(String jobid, long date) { String status = FindStatus(jobid); String com0 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';"; System.out.println("Command: " + com0); session.execute(com0); String com3 = "UPDATE jpredarchive SET finalstatus = 'DELETED' WHERE JobID = '" + jobid + "' ;"; System.out.println("Command: " + com3); session.execute(com3); String com = "SELECT * FROM JobDateInfo WHERE jobday = " + date + ";"; System.out.println("Command: " + com); ResultSet results = session.execute(com); Row row = results.one(); long njobs = row.getLong("Total") - 1; if (status.equals("OK")) { long njobsOK = row.getLong("TotalOK") - 1; String com1 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';"; System.out.println("Command: " + com1); session.execute(com1); String com2 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";"; System.out.println("Command: " + com2); session.execute(com2); UpdateJobDateInfo(date, "TotalOK", njobsOK, njobs); } else { String com6 = "DELETE FROM FailLog WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";"; System.out.println("Command: " + com6); session.execute(com6); if (status.equals("STOPPED")) { long njobsStopped = row.getLong("TotalStopped") - 1; UpdateJobDateInfo(date, "TotalStopped", njobsStopped, njobs); } else if (status.equals("ERROR")) { long njobsError = row.getLong("TotalError") - 1; UpdateJobDateInfo(date, "TotalError", njobsError, njobs); } else if (status.equals("TIMEDOUT")) { long njobsTimeOut = row.getLong("TotalTimeOut") - 1; UpdateJobDateInfo(date, "TotalTimeOut", njobsTimeOut, njobs); } } System.out.println("Remove jobs: " + jobid); } private void UpdateJobDateInfo(long date, String ColumnName, long totalcol, long total) { String com4 = "UPDATE JobDateInfo SET " + ColumnName + " = " + totalcol + ", Total = " + total + " WHERE jobday = " + date + ";"; System.out.println("Command: " + com4); session.execute(com4); } public int RemoveJobById(String jobid) { if (jobid == null) return 0; Long date = FindDate(jobid); RemoveJob(jobid, date); return 1; } public int RemoveJobByDate(String date1, String date2) { int numremover = 0; if (date1 == null || date2 == null) return 0; Long dateBegin = convertDate(date1); Long dateEnd = convertDate(date2); Calendar start = Calendar.getInstance(); start.setTime(new Date(dateBegin)); Calendar end = Calendar.getInstance(); end.setTime(new Date(dateEnd)); for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) { System.out.println("--------------------------------------------------------------------: "); String com = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";"; System.out.println("Command: " + com); ResultSet results = session.execute(com); if (!results.isExhausted()) { List rows = results.all(); for (Row r : rows) { String jobid = r.getString("JobID"); if (jobid != null) { RemoveJob(jobid, date.getTime()); numremover++; } } } String comm = "SELECT JobID FROM FailLog WHERE jobtime = " + date.getTime() + ";"; System.out.println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^: " + comm); ResultSet resultsfail = session.execute(comm); System.out.println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^: " + comm); if (!resultsfail.isExhausted()) { List rows = resultsfail.all(); for (Row r : rows) { String jobid = r.getString("JobID"); System.out.println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^: " + jobid); if (jobid != null) { RemoveJob(jobid, date.getTime()); numremover++; } } } } return numremover; } public int RemoveJobByIp(String ip) { int numremover = 0; if (ip == null) return 0; String com = "SELECT databegin, JobID FROM ProteinLog WHERE ip = '" + ip + "';"; ResultSet results = session.execute(com); if (!results.isExhausted()) { List rows = results.all(); for (Row r : rows) { Long date = convertDate(r.getString("databegin")); String jobid = r.getString("JobID"); if (date == null || jobid == null) continue; RemoveJob(jobid, date); numremover++; } } return numremover; } public int RemoveJobBySequence(String seq) { int numremover = 0; if (seq == null) return 0; String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + seq + "';"; ResultSet results = session.execute(com); if (!results.isExhausted()) { List rows = results.all(); for (Row r : rows) { String jobid = r.getString("JobID"); Long date = FindDate(jobid); RemoveJob(jobid, date); numremover++; } } return numremover; } private Long FindDate(String jobid) { String com = "SELECT databegin FROM ProteinLog WHERE JobID = '" + jobid + "';"; ResultSet results = session.execute(com); Long date = convertDate(results.one().getString("databegin")); return date; } private String FindStatus(String jobid) { String com = "SELECT FinalStatus FROM ProteinLog WHERE JobID = '" + jobid + "';"; System.out.println("Command: " + com); ResultSet results = session.execute(com); String status = results.one().getString("FinalStatus"); System.out.println("*****status: " + status); return status; } protected long convertDate(String d) { try { if (null != d) { Date startdate = dateformatter.parse(d); return startdate.getTime(); } } catch (ParseException e) { e.printStackTrace(); } return 0L; } }