1 package compbio.cassandra;
3 import java.io.IOException;
6 import com.datastax.driver.core.Cluster;
7 import com.datastax.driver.core.Host;
8 import com.datastax.driver.core.Metadata;
9 import com.datastax.driver.core.Session;
10 import com.datastax.driver.core.ResultSet;
12 public class CassandraNativeConnector {
13 private static Cluster cluster;
14 private static Session session;
17 * private static Keyspace ksp; private static Mutator<Long> mutatorLong;
18 * private static Mutator<String> mutatorString; private static
19 * Mutator<String> mutatorLog; StringSerializer ss = StringSerializer.get();
20 * LongSerializer ls = LongSerializer.get();
24 * connect to the cluster and look weather the dababase has any data inside
26 public void Connect() {
27 // local cassandra cluster
28 cluster = Cluster.builder().addContactPoint("localhost").build();
29 // distributed cassandra cluster
30 /* cluster = Cluster.builder().addContactPoint("10.0.115.190").build(); */
31 Metadata metadata = cluster.getMetadata();
32 System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
33 for (Host host : metadata.getAllHosts()) {
34 System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
37 session = cluster.connect();
38 session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
39 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinRow (Protein ascii PRIMARY KEY, Predictions map<ascii,ascii>);");
40 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinLog "
41 + "(JobID ascii PRIMARY KEY, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, ExecutionStatus ascii, Protein ascii);");
42 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinData (jobtime bigint PRIMARY KEY, JobID ascii, Protein ascii);");
44 System.out.println("Cassandra connected");
48 * parsing data source and filling the database
50 public void Parsing() throws IOException {
52 // if (source.equals("http")) {
53 // get data from real Jpred production server
54 System.out.println("Parsing web data source......");
55 String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
56 String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
57 JpredParserHTTP parser = new JpredParserHTTP(prefix);
58 parser.Parsing(datasrc, 4);
61 // if (source.equals("file")) {
62 // get irtifical data generated for the DB stress tests
63 System.out.println("Parsing local file data source......");
64 String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
65 String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
66 JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
67 parser.Parsing(datasrc, 190);
71 public void Closing() {
73 System.out.println("Cassandra has been shut down");
77 * check whether the job id exists in the DB
79 public boolean CheckID(String jobid) {
80 String com = "SELECT * FROM ProteinKeyspace.ProteinData WHERE jobid = '" + jobid + "';";
81 System.out.println(com);
82 ResultSet results = session.execute(com);
83 if (null != results) {
90 * prepare data for insertion into the db
92 public void InsertData(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal,
93 String protein, List<FastaSequence> predictions) {
95 String check1 = "SELECT count(*) FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
96 //System.out.println(check1);
97 ResultSet results1 = session.execute(check1);
98 if (!results1.isExhausted()) {
99 String com1 = "INSERT INTO ProteinKeyspace.ProteinLog "
100 + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','"
101 + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');";
102 // System.out.println(com1);
103 session.execute(com1);
105 String com2 = "INSERT INTO ProteinKeyspace.ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid
106 + "','" + protein + "');";
107 // System.out.println(com2);
108 // session.execute(com2);
110 String allpredictions = "";
111 for (FastaSequence pred : predictions) {
112 String predictionname = pred.getId();
113 String prediction = pred.getSequence().replaceAll("\n", "");
114 allpredictions += "'" + predictionname + "':'" + prediction + "',";
116 String final_prediction = "";
117 if (null != allpredictions) {
118 final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
121 String check2 = "SELECT count(*) FROM ProteinKeyspace.ProteinRow WHERE Protein = '" + protein + "';";
122 //System.out.println(check1);
123 ResultSet results2 = session.execute(check2);
125 if (results1.isExhausted()) {
126 String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(Protein, " + jobid + ")" + " VALUES ('" + protein + "'," + "{"
127 + final_prediction + "}" + ");";
128 System.out.println(com3);
129 session.execute(com3);
131 String com4 = "ALTER TABLE ProteinKeyspace.ProteinRow ADD " + jobid + ");";
132 System.out.println(com4);
133 session.execute(com4);
134 String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(" + jobid + ")" + " VALUES ({" + final_prediction + "}" + ")"
135 + " WHERE Protein = '" + protein + "';";
136 System.out.println(com3);
137 session.execute(com3);