package compbio.cassandra; import java.io.IOException; import java.util.List; import org.apache.log4j.Logger; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.BoundStatement; import compbio.engine.ProteoCachePropertyHelperManager; import compbio.util.PropertyHelper; public class CassandraWriter { private Session session; private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper(); private static Logger log = Logger.getLogger(CassandraNativeConnector.class); CassandraWriter() { Session inis = CassandraNativeConnector.getSession(); setSession(inis); } public void setSession(Session s) { assert s != null; session = s; } public boolean JobisNotInsterted(String jobid) { ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';"); if (results1.isExhausted()) { return true; } return false; } public boolean JobisNotArchived(String jobid) { ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';"); if (results1.isExhausted()) { return true; } return false; } /* * inserting data into the tables for queries */ public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal, String protein, List predictions) { if (JobisNotInsterted(jobid)) { String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');"; session.execute(com1); String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein + "');"; session.execute(com2); String allpredictions = ""; for (FastaSequence pred : predictions) { String predictionname = pred.getId(); String prediction = pred.getSequence().replaceAll("\n", ""); allpredictions += "'" + predictionname + "':'" + prediction + "',"; } String final_prediction = ""; if (null != allpredictions) { final_prediction = allpredictions.substring(0, allpredictions.length() - 1); } String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';"; ResultSet results2 = session.execute(check2); if (results2.isExhausted()) { String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{" + final_prediction + "});"; session.execute(com3); } // update some internal tables String check3 = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';"; ResultSet results3 = session.execute(check3); boolean updateparameter = true; if (!results3.isExhausted()) { Row r = results3.one(); if (jobtime >= Long.parseLong(r.getString("Value"))) updateparameter = false; } if (updateparameter) { String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + String.valueOf(jobtime) + "');"; session.execute(com); } String check4 = "SELECT * FROM MainParameters WHERE Name = 'TotalNumberOfJobs';"; ResultSet results4 = session.execute(check4); updateparameter = true; int njobs = 1; if (!results4.isExhausted()) { Row r = results4.one(); njobs += Integer.parseInt(r.getString("Value")); } String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('TotalNumberOfJobs','" + String.valueOf(njobs) + "');"; session.execute(com); return 1; } return 0; } /* * insert data from a real Jpred job: timing+IP, Execution Status, Final * status, protein sequence, predictions, alignment, LOG and tar.gz files */ public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein, List predictions, List seqs, String LogFile, String archivepath) { if (JobisNotArchived(jobid)) { String log = LogFile.replaceAll("'", ""); session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');"); if (false) { PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);"); BoundStatement boundStatement = new BoundStatement(statement); session.execute(boundStatement.bind(jobid, archivepath)); } for (FastaSequence p : predictions) { session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'" + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); } for (FastaSequence s : seqs) { session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'" + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); } return 1; } return 0; } }