1 package compbio.cassandra;
3 import java.io.IOException;
4 import java.util.Calendar;
5 import java.util.HashMap;
7 import java.util.ArrayList;
10 import com.datastax.driver.core.Cluster;
11 import com.datastax.driver.core.Host;
12 import com.datastax.driver.core.Metadata;
13 import com.datastax.driver.core.Row;
14 import com.datastax.driver.core.Session;
15 import com.datastax.driver.core.ResultSet;
17 public class CassandraNativeConnector {
18 private static Cluster cluster;
19 private static Session session;
21 * connect to the cluster and look weather the dababase has any data inside
23 public void Connect() {
24 // local cassandra cluster
25 cluster = Cluster.builder().addContactPoint("localhost").build();
26 // distributed cassandra cluster
27 /* cluster = Cluster.builder().addContactPoint("10.0.115.190").build(); */
28 Metadata metadata = cluster.getMetadata();
29 System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
30 for (Host host : metadata.getAllHosts()) {
31 System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
34 session = cluster.connect();
35 session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
36 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinRow (Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
37 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinLog "
38 + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
39 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinData (jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
41 session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinKeyspace.ProteinRow (protein);");
42 session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinKeyspace.ProteinData (jobtime);");
44 System.out.println("Cassandra connected");
48 * parsing data source and filling the database
50 public void Parsing() throws IOException {
52 // if (source.equals("http")) {
53 // get data from real Jpred production server
54 System.out.println("Parsing web data source......");
55 String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
56 String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
57 JpredParserHTTP parser = new JpredParserHTTP(prefix);
58 parser.Parsing(datasrc, 4);
61 // if (source.equals("file")) {
62 // get irtifical data generated for the DB stress tests
63 System.out.println("Parsing local file data source......");
64 String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
65 String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
66 JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
67 parser.Parsing(datasrc, 190);
71 public void Closing() {
74 System.out.println("Cassandra has been shut down");
78 * inserting data into the db
80 public void InsertData(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal,
81 String protein, List<FastaSequence> predictions) {
82 String check1 = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
83 ResultSet results1 = session.execute(check1);
84 if (results1.isExhausted()) {
85 String com1 = "INSERT INTO ProteinKeyspace.ProteinLog "
86 + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','"
87 + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');";
88 session.execute(com1);
89 String com2 = "INSERT INTO ProteinKeyspace.ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid
90 + "','" + protein + "');";
91 session.execute(com2);
92 String allpredictions = "";
93 for (FastaSequence pred : predictions) {
94 String predictionname = pred.getId();
95 String prediction = pred.getSequence().replaceAll("\n", "");
96 allpredictions += "'" + predictionname + "':'" + prediction + "',";
98 String final_prediction = "";
99 if (null != allpredictions) {
100 final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
102 String check2 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
103 ResultSet results2 = session.execute(check2);
104 if (results2.isExhausted()) {
105 String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('"
106 + protein + "','" + jobid + "',{" + final_prediction + "});";
107 session.execute(com3);
109 String check3 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "';";
114 * getting data from the db
116 public List<Pair<String, String>> ReadProteinDataTable() {
117 final long startTime = System.currentTimeMillis();
118 String com = "SELECT DataBegin,DataEnd FROM ProteinKeyspace.ProteinLog;";
119 System.out.println("Command: " + com);
120 ResultSet results = session.execute(com);
121 final long queryTime = System.currentTimeMillis();
122 List<Row> rows = results.all();
123 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
125 List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
128 Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"),r.getString("DataEnd"));
132 final long endTime = System.currentTimeMillis();
133 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
138 * getting data from the db ProteinData
140 public Integer ReadDateTable(long queryDate) {
141 final long startTime = System.currentTimeMillis();
142 String com = "SELECT jobtime, JobID FROM ProteinKeyspace.ProteinData WHERE jobtime = " + queryDate + ";";
143 System.out.println("Command: " + com);
144 ResultSet results = session.execute(com);
145 if (results.isExhausted())
147 final long queryTime = System.currentTimeMillis();
148 List<Row> rows = results.all();
149 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
154 * getting whole protein sequence from the db ProteinRow
156 public List<StructureProteinPrediction> ReadWholeSequence(String queryProtein) {
157 final long startTime = System.currentTimeMillis();
158 String com = "SELECT JobID, Predictions FROM ProteinKeyspace.ProteinRow WHERE Protein = '" + queryProtein + "';";
159 System.out.println("Command: " + com);
160 ResultSet results = session.execute(com);
161 if (results.isExhausted())
163 final long queryTime = System.currentTimeMillis();
164 List<Row> rows = results.all();
165 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
166 System.out.println (" rows analysed, " + rows.size());
167 List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
170 StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap("Predictions", String.class, String.class));
174 final long endTime = System.currentTimeMillis();
175 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
180 * getting part of protein sequence from the db ProteinRow
182 public List<StructureProteinPrediction> ReadPartOfSequence(String queryProtein) {
183 final long startTime = System.currentTimeMillis();
184 String com = "SELECT * FROM ProteinKeyspace.ProteinRow;";
185 System.out.println("Command: " + com);
186 ResultSet results = session.execute(com);
187 if (results.isExhausted())
189 final long queryTime = System.currentTimeMillis();
190 List<Row> rows = results.all();
191 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
192 System.out.println (" rows analysed, " + rows.size());
193 List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
196 String prot = r.getString("Protein");
197 if (prot.matches("(.*)" + queryProtein + "(.*)")) {
198 // System.out.println(prot);
199 StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions", String.class, String.class));
204 final long endTime = System.currentTimeMillis();
205 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
210 * getting protein sequences by counter
212 public Map<String, Integer> ReadProteinDataByCounter() {
213 final long startTime = System.currentTimeMillis();
214 String com = "SELECT Protein FROM ProteinKeyspace.ProteinRow;";
215 System.out.println("Command: " + com);
216 ResultSet results = session.execute(com);
217 if (results.isExhausted())
219 final long queryTime = System.currentTimeMillis();
220 List<Row> rows = results.all();
221 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
222 System.out.println (" rows analysed, " + rows.size());
223 Map<String, Integer> res = new HashMap<String, Integer>();
226 String protein = r.getString("Protein");
227 if (res.containsKey(protein))
228 res.put(protein, res.get(protein) + 1);
232 final long endTime = System.currentTimeMillis();
233 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
239 * getting protein sequences by counter
241 public StructureJobLog ReadJobLog(String jobid) {
242 final long startTime = System.currentTimeMillis();
243 String com = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
244 System.out.println("Command: " + com);
245 ResultSet results = session.execute(com);
246 if (results.isExhausted())
248 final long queryTime = System.currentTimeMillis();
249 Row row = results.one();
250 String com1 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
251 System.out.println("Command: " + com1);
252 ResultSet results1 = session.execute(com1);
253 if (results1.isExhausted())
255 Row row1 = results1.one();
256 StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"), row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
257 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
258 final long endTime = System.currentTimeMillis();
259 System.out.println (" rows analysed, execution time is " + (endTime - startTime) + " msec");
264 * getting earlest date of jobs from the db
266 public long getEarliestDateInDB() {
267 final long startTime = System.currentTimeMillis();
268 String com = "SELECT jobtime,JobID FROM ProteinKeyspace.ProteinData;";
269 System.out.println("Command: " + com);
270 ResultSet results = session.execute(com);
271 final long queryTime = System.currentTimeMillis();
272 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
274 Calendar cal = Calendar.getInstance();
275 long res = cal.getTimeInMillis();
277 while (!results.isExhausted()) {
278 Row r = results.one();
279 long d1 = r.getLong("jobtime");
285 final long endTime = System.currentTimeMillis();
286 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");