X-Git-Url: http://source.jalview.org/gitweb/?a=blobdiff_plain;f=datadb%2Fcompbio%2Fcassandra%2FCassandraRemover.java;h=dbad0180234525c4b42ee94a43ce4c467aa65989;hb=5e76924c64903dd27013651144b1b7e2fcfc7c27;hp=d066d8201c54f098327842fd1264cd9d6e319bcb;hpb=d4eea7f5a8fea637ab742ce69c84a0f835266567;p=proteocache.git diff --git a/datadb/compbio/cassandra/CassandraRemover.java b/datadb/compbio/cassandra/CassandraRemover.java index d066d82..dbad018 100644 --- a/datadb/compbio/cassandra/CassandraRemover.java +++ b/datadb/compbio/cassandra/CassandraRemover.java @@ -1,51 +1,261 @@ package compbio.cassandra; -import org.apache.log4j.Logger; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.List; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; +import compbio.cassandra.readers.CassandraReader; -public class CassandraRemover { - private Session session; - private static Logger log = Logger.getLogger(CassandraNativeConnector.class); +/** + * The class removes jobs from the cassandra database. 4 different strategies + * are possiable: 1. remove 1 job with given job ID 2. remove jobs launched from + * an IP 3. remove jobs with particular protein sequence 4. remove jobs launched + * within a time range (date1, data2) + * + * @author Alexander Sherstnev + * @author Natasha Sherstneva + * @version 1.0 + * @since Nov 2013 + */ +public class CassandraRemover extends CassandraReader { + static SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd"); - public CassandraRemover() { - Session inis = CassandraNativeConnector.getSession(); - setSession (inis); + /** + * private method for real deleting one job + * + * @param jobid + * job ID + * @param date + * job execution date + * + * @return nothing + */ + private int RemoveJob(String jobid, long date) { + + if (date < 0L) { + log.error("CassandraRemover error: job " + jobid + " with date " + date + + " can not be deleted in JobDateInfo. Daily statistics is inconsistent"); + return 0; + } + + String status = FindStatus(jobid); + String com1 = "DELETE FROM ProteinLog WHERE JobID = '" + jobid + "';"; + System.out.println("Command: " + com1); + CassandraQuery(com1); + + String com2 = "UPDATE jpredarchive SET finalstatus = 'DELETED' WHERE JobID = '" + jobid + "' ;"; + System.out.println("Command: " + com2); + CassandraQuery(com2); + + String com3 = "SELECT * FROM JobDateInfo WHERE jobday = " + date + ";"; + System.out.println("Command: " + com3); + ResultSet results = CassandraQuery(com3); + if (results.isExhausted()) { + log.error("CassandraRemover error: job " + jobid + " with date " + date + + " can not be deleted in JobDateInfo. Daily statistics is inconsistent"); + return 0; + } + Row row = results.one(); + if (status.equals("OK")) { + long njobsOK = row.getLong("TotalOK") - 1; + String com4 = "DELETE FROM ProteinRow WHERE JobID = '" + jobid + "';"; + System.out.println("Command: " + com4); + CassandraQuery(com4); + + String com5 = "DELETE FROM ProteinData WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";"; + System.out.println("Command: " + com5); + CassandraQuery(com5); + UpdateJobDateInfo(date, "TotalOK", njobsOK); + } else { + String com6 = "DELETE FROM FailLog WHERE JobID = '" + jobid + "' AND jobtime = " + date + ";"; + System.out.println("Command: " + com6); + CassandraQuery(com6); + if (status.equals("STOPPED")) { + long njobsStopped = row.getLong("TotalStopped") - 1; + UpdateJobDateInfo(date, "TotalStopped", njobsStopped); + } else if (status.equals("ERROR")) { + long njobsError = row.getLong("TotalError") - 1; + UpdateJobDateInfo(date, "TotalError", njobsError); + } else if (status.equals("TIMEDOUT")) { + long njobsTimeOut = row.getLong("TotalTimeOut") - 1; + UpdateJobDateInfo(date, "TotalTimeOut", njobsTimeOut); + } + } + System.out.println("Job " + jobid + " removed..."); + return 1; } - public void setSession(Session s) { - assert s != null; - session = s; + /** + * update a pariticular column in the JobDateInfo table + * + * @param jobid + * job ID + * + * @return nothing + * + */ + private void UpdateJobDateInfo(long date, String ColumnName, long totalcol) { + String com = "UPDATE JobDateInfo SET " + ColumnName + " = " + totalcol + " WHERE jobday = " + date + ";"; + CassandraQuery(com); + } + + /** + * external method for deleting job with given job ID (strategy 1) + * + * @param jobid + * job ID + * + * @return a number of deleted jobs + * + */ + public int RemoveJobById(String jobid) { + if (jobid == null) + return 0; + long date = FindJobDate(jobid); + return RemoveJob(jobid, date); } - - /* - * getting a record from CF for current jobId + + /** + * external method for deleting jobs within a time range (strategy 4) + * + * @param date1 + * starting date + * + * @param date2 + * ending date + * + * @return a number of deleted jobs + * */ - public StructureJobLog ReadJobLog(String jobid) { - final long startTime = System.currentTimeMillis(); - String com = "SELECT Protein, StartTime FROM ProteinLog WHERE JobID = '" + jobid + "';"; + public int RemoveJobByDate(String date1, String date2) { + if (date1 == null || date2 == null) + return 0; + + int njobs = 0; + Long dateBegin = convertDate(date1); + Long dateEnd = convertDate(date2); + Calendar start = Calendar.getInstance(); + start.setTime(new Date(dateBegin)); + Calendar end = Calendar.getInstance(); + end.setTime(new Date(dateEnd)); + + for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) { + String com1 = "SELECT JobID FROM ProteinData WHERE jobtime = " + date.getTime() + ";"; + System.out.println("Command: " + com1); + ResultSet results = CassandraQuery(com1); + if (!results.isExhausted()) { + List rows = results.all(); + for (Row r : rows) { + String jobid = r.getString("JobID"); + if (jobid != null) { + njobs += RemoveJob(jobid, date.getTime()); + } + } + } + + String com2 = "SELECT JobID FROM FailLog WHERE jobtime = " + date.getTime() + ";"; + ResultSet resultsfail = CassandraQuery(com2); + if (!resultsfail.isExhausted()) { + List rows = resultsfail.all(); + for (Row r : rows) { + String jobid = r.getString("JobID"); + if (jobid != null) { + njobs += RemoveJob(jobid, date.getTime()); + } + } + } + } + return njobs; + } + + /** + * external method for deleting jobs launched from a particular IP (strategy + * 2) + * + * @param ip + * the IP + * + * @return a number of deleted jobs + * + */ + public int RemoveJobByIp(String ip) { + int njobs = 0; + if (ip == null) + return 0; + String com = "SELECT databegin, JobID FROM ProteinLog WHERE ip = '" + ip + "';"; + ResultSet results = CassandraQuery(com); + if (!results.isExhausted()) { + List rows = results.all(); + for (Row r : rows) { + Long date = convertDate(r.getString("databegin")); + String jobid = r.getString("JobID"); + if (date != null || jobid != null) { + njobs += RemoveJob(jobid, date); + } + } + } + return njobs; + } + + /** + * external method for deleting jobs with a protein sequence (strategy 3) + * + * @param sequence + * the sequence + * + * @return a number of deleted jobs + * + */ + public int RemoveJobBySequence(String sequence) { + int njobs = 0; + if (sequence == null) + return 0; + String com = "SELECT JobID FROM ProteinRow WHERE Protein = '" + sequence + "';"; + ResultSet results = CassandraQuery(com); + if (!results.isExhausted()) { + List rows = results.all(); + for (Row r : rows) { + String jobid = r.getString("JobID"); + long date = FindJobDate(jobid); + njobs += RemoveJob(jobid, date); + } + } + return njobs; + } + + private long FindJobDate(String jobid) { + String com = "SELECT databegin FROM ProteinLog WHERE JobID = '" + jobid + "';"; + ResultSet results = CassandraQuery(com); + if (!results.isExhausted()) { + return convertDate(results.one().getString("databegin")); + } + return -1L; + } + + private String FindStatus(String jobid) { + String com = "SELECT FinalStatus FROM ProteinLog WHERE JobID = '" + jobid + "';"; System.out.println("Command: " + com); - ResultSet results = session.execute(com); - if (results.isExhausted()) - return null; - final long queryTime = System.currentTimeMillis(); - Row row = results.one(); - String com1 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;"; - System.out.println("Command: " + com1); - ResultSet results1 = session.execute(com1); - if (results1.isExhausted()) - return null; - Row row1 = results1.one(); - StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"), - row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class)); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); - final long endTime = System.currentTimeMillis(); - System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; - } - - + ResultSet results = CassandraQuery(com); + if (!results.isExhausted()) { + return results.one().getString("FinalStatus"); + } + return "UNKNOWN"; + } + + protected long convertDate(String d) { + try { + if (null != d) { + Date startdate = dateformatter.parse(d); + return startdate.getTime(); + } + } catch (ParseException e) { + e.printStackTrace(); + } + return 0L; + } }