package compbio.cassandra; import java.io.IOException; import java.util.Calendar; import java.util.HashMap; import java.util.List; import java.util.ArrayList; import java.util.Map; import org.apache.log4j.Logger; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Host; import com.datastax.driver.core.Metadata; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.BoundStatement; import compbio.engine.ProteoCachePropertyHelperManager; import compbio.util.PropertyHelper; import compbio.util.Util; public class CassandraNativeConnector { private static Cluster cluster; private static Session session; private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper(); private static Logger log = Logger.getLogger(CassandraNativeConnector.class); public static String CASSANDRA_HOSTNAME = "localhost"; public static boolean READ_WEB_JPRED = false; public static boolean READ_LOCALFILE_JPRED = false; private static boolean initBooleanValue(String key) { assert key != null; String status = ph.getProperty(key); log.debug("Loading property: " + key + " with value: " + status); if (Util.isEmpty(status)) { return false; } return new Boolean(status.trim()).booleanValue(); } /* * connect to the cluster and look whether all tables exist */ public void Connect() { String cassandrahostname = ph.getProperty("cassandra.host"); if (null != cassandrahostname) { CASSANDRA_HOSTNAME = cassandrahostname; } READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web"); READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local"); cluster = Cluster.builder().addContactPoint(CASSANDRA_HOSTNAME).build(); Metadata metadata = cluster.getMetadata(); System.out.printf("Connected to cluster: %s\n", metadata.getClusterName()); for (Host host : metadata.getAllHosts()) { System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); } session = cluster.connect(); CreateTables(); System.out.println("Cassandra connected"); } private void CreateTables() { session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};"); session.execute("USE ProteinKeyspace"); session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinRow " + "(Protein ascii, JobID ascii, Predictions map, PRIMARY KEY(JobID));"); session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinLog " + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, " + "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));"); session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinData " + "(jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));"); session.execute("CREATE COLUMNFAMILY IF NOT EXISTS JpredArchive " + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map, " + "predictions map, archive blob, LOG varchar, PRIMARY KEY(JobID));"); session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);"); session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);"); } /* * parsing data source and filling the database */ public void Parsing() throws IOException { if (READ_WEB_JPRED) { // if (source.equals("http")) { // get data from real Jpred production server System.out.println("Parsing web data source......"); String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat"; String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results"; JpredParserHTTP parser = new JpredParserHTTP(prefix); parser.Parsing(datasrc, 5); } if (READ_LOCALFILE_JPRED) { // if (source.equals("file")) { // get irtifical data generated for the DB stress tests System.out.println("Parsing local file data source......"); String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat"; String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata"; JpredParserLocalFile parser = new JpredParserLocalFile(prefix); parser.Parsing(datasrc, 190); } } public void Closing() { session.shutdown(); cluster.shutdown(); System.out.println("Cassandra has been shut down"); } public boolean JobisNotInsterted(String jobid) { ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';"); if (results1.isExhausted()) { return true; } return false; } public boolean JobisNotArchived(String jobid) { ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';"); if (results1.isExhausted()) { return true; } return false; } /* * inserting data into the tables for queries */ public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal, String protein, List predictions) { if (JobisNotInsterted(jobid)) { String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');"; session.execute(com1); String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein + "');"; session.execute(com2); String allpredictions = ""; for (FastaSequence pred : predictions) { String predictionname = pred.getId(); String prediction = pred.getSequence().replaceAll("\n", ""); allpredictions += "'" + predictionname + "':'" + prediction + "',"; } String final_prediction = ""; if (null != allpredictions) { final_prediction = allpredictions.substring(0, allpredictions.length() - 1); } String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';"; ResultSet results2 = session.execute(check2); if (results2.isExhausted()) { String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{" + final_prediction + "});"; session.execute(com3); } return 1; } return 0; } /* * insert data from a real Jpred job: timing+IP, Execution Status, Final * status, protein sequence, predictions, alignment, LOG and tar.gz files */ public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein, List predictions, List seqs, String LogFile, String archivepath) { if (JobisNotArchived(jobid)) { String log = LogFile.replaceAll("'", ""); session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');"); if (false) { PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);"); BoundStatement boundStatement = new BoundStatement(statement); session.execute(boundStatement.bind(jobid, archivepath)); } for (FastaSequence p : predictions) { session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'" + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); } for (FastaSequence s : seqs) { session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'" + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); } return 1; } return 0; } /* * getting data from the db */ public List> ReadProteinDataTable() { final long startTime = System.currentTimeMillis(); String com = "SELECT DataBegin,DataEnd FROM ProteinLog;"; System.out.println("Command: " + com); ResultSet results = session.execute(com); final long queryTime = System.currentTimeMillis(); List rows = results.all(); System.out.println("Query time is " + (queryTime - startTime) + " msec"); List> res = new ArrayList>(); int c = 0; for (Row r : rows) { Pair pair = new Pair(r.getString("DataBegin"), r.getString("DataEnd")); res.add(pair); ++c; } final long endTime = System.currentTimeMillis(); System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); return res; } /* * getting data from the db ProteinData */ public Integer ReadDateTable(long queryDate) { final long startTime = System.currentTimeMillis(); String com = "SELECT jobtime, JobID FROM ProteinData WHERE jobtime = " + queryDate + ";"; System.out.println("Command: " + com); ResultSet results = session.execute(com); final long queryTime = System.currentTimeMillis(); System.out.println("Query time is " + (queryTime - startTime) + " msec"); if (results.isExhausted()) return 0; List rows = results.all(); final long endTime = System.currentTimeMillis(); System.out.println("Processing time is " + (endTime - queryTime) + " msec"); return rows.size(); } /* * getting whole protein sequence from the db ProteinRow */ public List ReadWholeSequence(String queryProtein) { final long startTime = System.currentTimeMillis(); String com = "SELECT JobID, Predictions FROM ProteinRow WHERE Protein = '" + queryProtein + "';"; System.out.println("Command: " + com); ResultSet results = session.execute(com); if (results.isExhausted()) return null; final long queryTime = System.currentTimeMillis(); List rows = results.all(); System.out.println("Query time is " + (queryTime - startTime) + " msec"); System.out.println(" rows analysed, " + rows.size()); List res = new ArrayList(); int c = 0; for (Row r : rows) { StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap( "Predictions", String.class, String.class)); res.add(structure); ++c; } final long endTime = System.currentTimeMillis(); System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); return res; } /* * getting part of protein sequence from the db ProteinRow */ public List ReadPartOfSequence(String queryProtein) { final long startTime = System.currentTimeMillis(); String com = "SELECT * FROM ProteinRow;"; System.out.println("Command: " + com); ResultSet results = session.execute(com); if (results.isExhausted()) return null; final long queryTime = System.currentTimeMillis(); List rows = results.all(); System.out.println("Query time is " + (queryTime - startTime) + " msec"); System.out.println(" rows analysed, " + rows.size()); List res = new ArrayList(); int c = 0; for (Row r : rows) { String prot = r.getString("Protein"); if (prot.matches("(.*)" + queryProtein + "(.*)")) { StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions", String.class, String.class)); res.add(structure); ++c; } } final long endTime = System.currentTimeMillis(); System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); return res; } /* * getting protein sequences by counter */ public Map ReadProteinDataByCounter() { final long startTime = System.currentTimeMillis(); String com = "SELECT Protein FROM ProteinRow;"; System.out.println("Command: " + com); ResultSet results = session.execute(com); if (results.isExhausted()) return null; final long queryTime = System.currentTimeMillis(); List rows = results.all(); System.out.println("Query time is " + (queryTime - startTime) + " msec"); System.out.println(" rows analysed, " + rows.size()); Map res = new HashMap(); int c = 0; for (Row r : rows) { String protein = r.getString("Protein"); if (res.containsKey(protein)) res.put(protein, res.get(protein) + 1); else res.put(protein, 1); } final long endTime = System.currentTimeMillis(); System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); return res; } /* * getting protein sequences by counter */ public StructureJobLog ReadJobLog(String jobid) { final long startTime = System.currentTimeMillis(); String com = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';"; System.out.println("Command: " + com); ResultSet results = session.execute(com); if (results.isExhausted()) return null; final long queryTime = System.currentTimeMillis(); Row row = results.one(); String com1 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;"; System.out.println("Command: " + com1); ResultSet results1 = session.execute(com1); if (results1.isExhausted()) return null; Row row1 = results1.one(); StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"), row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class)); System.out.println("Query time is " + (queryTime - startTime) + " msec"); final long endTime = System.currentTimeMillis(); System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec"); return res; } /* * getting earlest date of jobs from the db */ public long getEarliestDateInDB() { final long startTime = System.currentTimeMillis(); String com = "SELECT jobtime,JobID FROM ProteinData;"; System.out.println("Command: " + com); ResultSet results = session.execute(com); final long queryTime = System.currentTimeMillis(); System.out.println("Query time is " + (queryTime - startTime) + " msec"); Calendar cal = Calendar.getInstance(); long res = cal.getTimeInMillis(); int c = 0; while (!results.isExhausted()) { Row r = results.one(); long d1 = r.getLong("jobtime"); if (res > d1) { res = d1; } ++c; } final long endTime = System.currentTimeMillis(); System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); return res; } }