package compbio.cassandra; import java.util.List; import org.apache.log4j.Logger; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.exceptions.QueryExecutionException; import com.datastax.driver.core.exceptions.QueryValidationException; import compbio.data.sequence.FastaSequence; import compbio.engine.JpredJob; import compbio.engine.ProteoCachePropertyHelperManager; import compbio.util.PropertyHelper; public class CassandraWriter { private Session session; private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper(); private static Logger log = Logger.getLogger(CassandraNativeConnector.class); CassandraWriter() { Session inis = CassandraNativeConnector.getSession(); setSession(inis); } public void setSession(Session s) { assert s != null; session = s; } private ResultSet execute(String command) { try { ResultSet results = session.execute(command); return results; } catch (QueryExecutionException e) { String mess = "CassandraWriter: query execution exception..."; System.out.println(mess); log.error(mess); log.error(e.getLocalizedMessage(), e.getCause()); return null; } catch (QueryValidationException e) { String mess = "CassandraWriter: query validation exception... Command: " + command; System.out.println(mess); log.error(mess); log.error(e.getLocalizedMessage(), e.getCause()); return null; } } public boolean JobisNotInsterted(String jobid) { ResultSet results = execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';"); if (null != results && results.isExhausted()) { return true; } return false; } public boolean JobisNotArchived(String jobid) { ResultSet results = execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';"); if (null != results && results.isExhausted()) { return true; } return false; } /* * inserting data into the tables for queries */ public int FormQueryTables(JpredJob job) { if (JobisNotInsterted(job.getJobID())) { String id = job.getJobID(); String protein = job.getProtein(); String finalstatus = job.getFinalStatus(); String execstatus = job.getExecutionStatus(); String ProgramName = job.getProgramName(); String ProgramVersion = job.getProgramVersion(); String com1 = "INSERT INTO ProteinLog (JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein, ProgramName, ProgramVersion) VALUES ('" + id + "','" + job.getIP() + "','" + job.getStartingTimeStr() + "','" + job.getEndTimeStr() + "','" + finalstatus + "','" + execstatus + "','" + protein + "','" + ProgramName + "','" + ProgramVersion + "');"; ResultSet insert = execute(com1); if (null == insert) { System.out.println("CassandraWriter.FormQueryTables: couldn't insert into ProteinLog"); // return 0; } if (finalstatus.equals("OK")) { String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, ExecTime, Protein)" + " VALUES (" + job.getStartingDate() + ",'" + id + "'," + job.getExecutionTime() + ",'" + protein + "');"; if (null == execute(com2)) { System.out.println("CassandraWriter.FormQueryTables: couldn't insert into ProteinData"); // return 0; } String allpredictions = ""; List pr = job.getPredictions(); for (FastaSequence pred : pr) { String predictionname = pred.getId(); String prediction = pred.getSequence().replaceAll("\n", ""); allpredictions += "'" + predictionname + "':'" + prediction + "',"; } String final_prediction = ""; if (!allpredictions.equals("")) { final_prediction = allpredictions.substring(0, allpredictions.length() - 1); } ResultSet results2 = execute("SELECT * FROM ProteinRow WHERE JobID = '" + job.getJobID() + "';"); if (null != results2 && results2.isExhausted()) { String com3 = "INSERT INTO ProteinRow (Protein, JobID, Predictions) VALUES ('" + protein + "','" + id + "',{" + final_prediction + "});"; if (null == execute(com3)) { System.out.println("CassandraWriter.FormQueryTables: couldn't insert into ProteinRow"); return 0; } } } else { String com5 = "INSERT INTO FailLog (jobtime, JobID, ExecTime, ip, FinalStatus) VALUES (" + job.getStartingDate() + ",'" + id + "'," + job.getExecutionTime() + ",'" + job.getIP() + "', '" + finalstatus + "');"; if (null == execute(com5)) { System.out.println("CassandraWriter.FormQueryTables: couldn't insert into FailLog"); return 0; } } // update Main parameters if the job is the earliest job so far ResultSet results3 = execute("SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';"); if (null == results3) { System.out.println("CassandraWriter.FormQueryTables: couldn't get results from MainParameters"); // return 0; } boolean updateparameter = true; if (!results3.isExhausted()) { Row r = results3.one(); if (job.getStartingDate() >= Long.parseLong(r.getString("Value"))) updateparameter = false; } if (updateparameter) { String com6 = "INSERT INTO MainParameters (Name, Value) VALUES ('EarliestJobDate','" + job.getStartingDate() + "');"; if (null == execute(com6)) { System.out.println("CassandraWriter.FormQueryTables: couldn't insert into MainParameters"); return 0; } } // update internal job counts (used by the Daily Statistics // requests) // TODO I don't like the bit of code. There should not be so many // counters... int njobsTotal = 1; int njobsOk = 0; int njobsStop = 0; int njobsError = 0; int njobsTimeOut = 0; if (finalstatus.equals("OK")) njobsOk = 1; else if (finalstatus.equals("TIMEDOUT")) njobsTimeOut = 1; else if (finalstatus.equals("JPREDERROR")) njobsError = 1; else if (finalstatus.equals("STOPPED")) njobsStop = 1; ResultSet results4 = execute("SELECT * FROM JobDateInfo WHERE jobday = " + job.getStartingDate() + ";"); if (null == results4) { System.out.println("CassandraWriter.FormQueryTables: couldn't get data from JobDateInfo"); // return 0; } if (!results4.isExhausted()) { Row r = results4.one(); njobsTotal += r.getLong("Total"); njobsOk += r.getLong("TotalOK"); njobsError += r.getLong("TotalError"); njobsStop += r.getLong("TotalStopped"); njobsTimeOut += r.getLong("TotalTimeOut"); } String com = "INSERT INTO JobDateInfo " + "(jobday, Total, TotalOK, TotalStopped, TotalError, TotalTimeOut)" + " VALUES (" + job.getStartingDate() + "," + njobsTotal + "," + njobsOk + "," + njobsStop + "," + njobsError + "," + njobsTimeOut + ");"; if (null == execute(com)) { System.out.println("CassandraWriter.FormQueryTables: couldn't insert into JobDateInfo"); // return 0; } return 1; } return 0; } /* * insert data from a real Jpred job: timing+IP, Execution Status, Final * status, protein sequence, predictions, alignment, LOG and tar.gz files */ public int ArchiveData(JpredJob job, String archivepath) { if (JobisNotArchived(job.getJobID())) { String id = job.getJobID(); String log = job.getLog().replaceAll("'", ""); String com = "INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime, FinalStatus, ExecutionStatus, LOG, ArchiveLink, ProgramName, ProgramVersion) VALUES ('" + id + "','" + job.getProtein() + "','" + job.getIP() + "'," + job.getStartingTime() + "," + job.getExecutionTime() + ",'" + job.getFinalStatus() + "','" + job.getExecutionStatus() + "','" + log + "','" + archivepath + "','" + job.getProgramName() + "','" + job.getProgramVersion() + "');"; if (null == execute(com)) { System.out.println("CassandraWriter.ArchiveData: couldn't insert into JpredArchive"); } List predictions = job.getPredictions(); for (FastaSequence p : predictions) { if (null == execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'" + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + id + "';")) { System.out.println("CassandraWriter.ArchiveData: couldn't update data in JpredArchive"); } } List seqs = job.getAlignment(); for (FastaSequence s : seqs) { String com2 = "UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'" + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + id + "';"; if (null == execute(com2)) { System.out.println("CassandraWriter.ArchiveData: couldn't update data in JpredArchive"); } } return 1; } return 0; } }