package compbio.cassandra; import java.io.IOException; import java.util.List; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Host; import com.datastax.driver.core.Metadata; import com.datastax.driver.core.Session; import com.datastax.driver.core.ResultSet; public class CassandraNativeConnector { private static Cluster cluster; private static Session session; /* * private static Keyspace ksp; private static Mutator mutatorLong; * private static Mutator mutatorString; private static * Mutator mutatorLog; StringSerializer ss = StringSerializer.get(); * LongSerializer ls = LongSerializer.get(); */ /* * connect to the cluster and look weather the dababase has any data inside */ public void Connect() { // local cassandra cluster cluster = Cluster.builder().addContactPoint("localhost").build(); // distributed cassandra cluster /* cluster = Cluster.builder().addContactPoint("10.0.115.190").build(); */ Metadata metadata = cluster.getMetadata(); System.out.printf("Connected to cluster: %s\n", metadata.getClusterName()); for (Host host : metadata.getAllHosts()) { System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); } session = cluster.connect(); session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};"); session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinRow (Protein ascii PRIMARY KEY, Predictions map);"); session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinLog " + "(JobID ascii PRIMARY KEY, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, ExecutionStatus ascii, Protein ascii);"); session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinData (jobtime bigint PRIMARY KEY, JobID ascii, Protein ascii);"); System.out.println("Cassandra connected"); } /* * parsing data source and filling the database */ public void Parsing() throws IOException { if (false) { // if (source.equals("http")) { // get data from real Jpred production server System.out.println("Parsing web data source......"); String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat"; String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results"; JpredParserHTTP parser = new JpredParserHTTP(prefix); parser.Parsing(datasrc, 4); } if (true) { // if (source.equals("file")) { // get irtifical data generated for the DB stress tests System.out.println("Parsing local file data source......"); String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat"; String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata"; JpredParserLocalFile parser = new JpredParserLocalFile(prefix); parser.Parsing(datasrc, 190); } } public void Closing() { cluster.shutdown(); System.out.println("Cassandra has been shut down"); } /* * check whether the job id exists in the DB */ public boolean CheckID(String jobid) { String com = "SELECT * FROM ProteinKeyspace.ProteinData WHERE jobid = '" + jobid + "';"; System.out.println(com); ResultSet results = session.execute(com); if (null != results) { return true; } return false; } /* * prepare data for insertion into the db */ public void InsertData(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal, String protein, List predictions) { String check1 = "SELECT count(*) FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';"; //System.out.println(check1); ResultSet results1 = session.execute(check1); if (!results1.isExhausted()) { String com1 = "INSERT INTO ProteinKeyspace.ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');"; // System.out.println(com1); session.execute(com1); String com2 = "INSERT INTO ProteinKeyspace.ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein + "');"; // System.out.println(com2); // session.execute(com2); String allpredictions = ""; for (FastaSequence pred : predictions) { String predictionname = pred.getId(); String prediction = pred.getSequence().replaceAll("\n", ""); allpredictions += "'" + predictionname + "':'" + prediction + "',"; } String final_prediction = ""; if (null != allpredictions) { final_prediction = allpredictions.substring(0, allpredictions.length() - 1); } String check2 = "SELECT count(*) FROM ProteinKeyspace.ProteinRow WHERE Protein = '" + protein + "';"; //System.out.println(check1); ResultSet results2 = session.execute(check2); if (results1.isExhausted()) { String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(Protein, " + jobid + ")" + " VALUES ('" + protein + "'," + "{" + final_prediction + "}" + ");"; System.out.println(com3); session.execute(com3); } else { String com4 = "ALTER TABLE ProteinKeyspace.ProteinRow ADD " + jobid + ");"; System.out.println(com4); session.execute(com4); String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(" + jobid + ")" + " VALUES ({" + final_prediction + "}" + ")" + " WHERE Protein = '" + protein + "';"; System.out.println(com3); session.execute(com3); } } } }