1 package compbio.cassandra;
3 import java.io.IOException;
4 import java.util.Calendar;
6 import java.util.ArrayList;
8 import com.datastax.driver.core.Cluster;
9 import com.datastax.driver.core.Host;
10 import com.datastax.driver.core.Metadata;
11 import com.datastax.driver.core.Row;
12 import com.datastax.driver.core.Session;
13 import com.datastax.driver.core.ResultSet;
15 public class CassandraNativeConnector {
16 private static Cluster cluster;
17 private static Session session;
20 * connect to the cluster and look weather the dababase has any data inside
22 public void Connect() {
23 // local cassandra cluster
24 cluster = Cluster.builder().addContactPoint("localhost").build();
25 // distributed cassandra cluster
26 /* cluster = Cluster.builder().addContactPoint("10.0.115.190").build(); */
27 Metadata metadata = cluster.getMetadata();
28 System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
29 for (Host host : metadata.getAllHosts()) {
30 System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
33 session = cluster.connect();
34 session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
35 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinRow (Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
36 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinLog "
37 + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
38 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinData (jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
39 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.JpredArchive " +
40 "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map<ascii,ascii>, predictions map<ascii,ascii>, archive blob, LOG varchar, PRIMARY KEY(JobID));");
42 session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinKeyspace.ProteinRow (protein);");
43 session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinKeyspace.ProteinData (jobtime);");
45 System.out.println("Cassandra connected");
49 * parsing data source and filling the database
51 public void Parsing() throws IOException {
53 // if (source.equals("http")) {
54 // get data from real Jpred production server
55 System.out.println("Parsing web data source......");
56 String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
57 String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
58 JpredParserHTTP parser = new JpredParserHTTP(prefix);
59 parser.Parsing(datasrc, 4);
62 // if (source.equals("file")) {
63 // get irtifical data generated for the DB stress tests
64 System.out.println("Parsing local file data source......");
65 String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
66 String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
67 JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
68 parser.Parsing(datasrc, 190);
72 public void Closing() {
75 System.out.println("Cassandra has been shut down");
79 * inserting data into the db
81 public void FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal,
82 String protein, List<FastaSequence> predictions) {
84 String check1 = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
85 ResultSet results1 = session.execute(check1);
86 if (results1.isExhausted()) {
87 String com1 = "INSERT INTO ProteinKeyspace.ProteinLog "
88 + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','"
89 + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');";
90 session.execute(com1);
92 String com2 = "INSERT INTO ProteinKeyspace.ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid
93 + "','" + protein + "');";
94 session.execute(com2);
96 String allpredictions = "";
97 for (FastaSequence pred : predictions) {
98 String predictionname = pred.getId();
99 String prediction = pred.getSequence().replaceAll("\n", "");
100 allpredictions += "'" + predictionname + "':'" + prediction + "',";
102 String final_prediction = "";
103 if (null != allpredictions) {
104 final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
107 String check2 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "';";
108 ResultSet results2 = session.execute(check2);
109 if (results2.isExhausted()) {
110 String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','"
111 + jobid + "',{" + final_prediction + "});";
112 session.execute(com3);
117 public void ArchiveData(long starttime, int exectime, String ip, String jobid, String statusEx, String statusFinal,
118 String protein, List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile) {
120 String check1 = "SELECT * FROM ProteinKeyspace.JpredArchive WHERE JobID = '" + jobid + "';";
121 ResultSet results1 = session.execute(check1);
122 if (results1.isExhausted()) {
123 String allpredictions = "";
124 for (FastaSequence pred : predictions) {
125 String predictionname = pred.getId();
126 String prediction = pred.getSequence().replaceAll("\n", "");
127 allpredictions += "'" + predictionname + "':'" + prediction + "',";
129 String final_allpredictions = "";
130 if (null != allpredictions) {
131 final_allpredictions = allpredictions.substring(0, allpredictions.length() - 1);
133 String alignment = "";
134 for (FastaSequence seq : seqs) {
135 String predictionname = seq.getId();
136 String prediction = seq.getSequence().replaceAll("\n", "");
137 alignment += "'" + predictionname + "':'" + prediction + "',";
139 String final_alignment = "";
140 if (null != allpredictions) {
141 final_alignment = alignment.substring(0, allpredictions.length() - 1);
144 String com1 = "INSERT INTO ProteinKeyspace.JpredArchive "
145 + "(JobID, Protein, IP, StartTime, ExecTime, alignment, predictions, LOG))"
147 + jobid + "','" + protein + "','" + ip + "'," + starttime + "," + exectime
148 + "',[" + final_allpredictions + "],[" + final_alignment + "],'" + LogFile + "]);";
149 session.execute(com1);
156 * getting data from the db
158 public List<Pair<String, String>> ReadProteinDataTable() {
159 final long startTime = System.currentTimeMillis();
160 String com = "SELECT DataBegin,DataEnd FROM ProteinKeyspace.ProteinLog;";
161 System.out.println("Command: " + com);
162 ResultSet results = session.execute(com);
163 final long queryTime = System.currentTimeMillis();
164 List<Row> rows = results.all();
165 System.out.println("Query time is " + (queryTime - startTime) + " msec");
167 List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
170 Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"), r.getString("DataEnd"));
174 final long endTime = System.currentTimeMillis();
175 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
180 * getting earlest date of jobs from the db
182 public long getEarliestDateInDB() {
183 final long startTime = System.currentTimeMillis();
184 String com = "SELECT jobtime FROM ProteinKeyspace.ProteinData;";
185 System.out.println("Command: " + com);
186 ResultSet results = session.execute(com);
187 final long queryTime = System.currentTimeMillis();
188 System.out.println("Query time is " + (queryTime - startTime) + " msec");
190 Calendar cal = Calendar.getInstance();
191 long res = cal.getTimeInMillis();
193 while (!results.isExhausted()) {
194 Row r = results.one();
195 long d1 = r.getLong("jobtime");
201 final long endTime = System.currentTimeMillis();
202 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");