Re-engineering of database I/O
[proteocache.git] / datadb / compbio / cassandra / CassandraWriter.java
diff --git a/datadb/compbio/cassandra/CassandraWriter.java b/datadb/compbio/cassandra/CassandraWriter.java
new file mode 100644 (file)
index 0000000..6ee62fd
--- /dev/null
@@ -0,0 +1,142 @@
+package compbio.cassandra;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.BoundStatement;
+
+import compbio.engine.ProteoCachePropertyHelperManager;
+import compbio.util.PropertyHelper;
+
+public class CassandraWriter {
+       private Session session;
+       private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
+       private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
+
+       CassandraWriter() {
+               Session inis = CassandraNativeConnector.getSession();
+               setSession(inis);
+       }
+
+       public void setSession(Session s) {
+               assert s != null;
+               session = s;
+       }
+
+       public boolean JobisNotInsterted(String jobid) {
+               ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
+               if (results1.isExhausted()) {
+                       return true;
+               }
+               return false;
+       }
+
+       public boolean JobisNotArchived(String jobid) {
+               ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';");
+               if (results1.isExhausted()) {
+                       return true;
+               }
+               return false;
+       }
+
+       /*
+        * inserting data into the tables for queries
+        */
+       public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
+                       String statusFinal, String protein, List<FastaSequence> predictions) {
+               if (JobisNotInsterted(jobid)) {
+                       String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
+                                       + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
+                                       + "','" + protein + "');";
+                       session.execute(com1);
+
+                       String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
+                                       + "');";
+                       session.execute(com2);
+
+                       String allpredictions = "";
+                       for (FastaSequence pred : predictions) {
+                               String predictionname = pred.getId();
+                               String prediction = pred.getSequence().replaceAll("\n", "");
+                               allpredictions += "'" + predictionname + "':'" + prediction + "',";
+                       }
+                       String final_prediction = "";
+                       if (null != allpredictions) {
+                               final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
+                       }
+
+                       String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
+                       ResultSet results2 = session.execute(check2);
+                       if (results2.isExhausted()) {
+                               String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
+                                               + final_prediction + "});";
+                               session.execute(com3);
+                       }
+
+                       // update some internal tables 
+                       String check3 = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';";
+                       ResultSet results3 = session.execute(check3);
+                       boolean updateparameter = true;
+                       if (!results3.isExhausted()) {
+                               Row r = results3.one();
+                               if (jobtime >= Long.parseLong(r.getString("Value")))
+                                       updateparameter = false;
+                       }
+                       if (updateparameter) {
+                               String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + String.valueOf(jobtime)
+                                               + "');";
+                               session.execute(com);
+                       }
+                       String check4 = "SELECT * FROM MainParameters WHERE Name = 'TotalNumberOfJobs';";
+                       ResultSet results4 = session.execute(check4);
+                       updateparameter = true;
+                       int njobs = 1;
+                       if (!results4.isExhausted()) {
+                               Row r = results4.one();
+                               njobs += Integer.parseInt(r.getString("Value"));
+                       }
+                       String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('TotalNumberOfJobs','" + String.valueOf(njobs) + "');";
+                       session.execute(com);
+
+                       return 1;
+               }
+               return 0;
+       }
+
+       /*
+        * insert data from a real Jpred job: timing+IP, Execution Status, Final
+        * status, protein sequence, predictions, alignment, LOG and tar.gz files
+        */
+       public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
+                       List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
+               if (JobisNotArchived(jobid)) {
+                       String log = LogFile.replaceAll("'", "");
+                       session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
+                                       + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
+                       if (false) {
+                               PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
+                               BoundStatement boundStatement = new BoundStatement(statement);
+                               session.execute(boundStatement.bind(jobid, archivepath));
+                       }
+
+                       for (FastaSequence p : predictions) {
+                               session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
+                                               + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+                       }
+
+                       for (FastaSequence s : seqs) {
+                               session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
+                                               + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+                       }
+                       return 1;
+               }
+               return 0;
+       }
+
+}