1 package compbio.cassandra;
3 import java.io.IOException;
4 import java.util.Calendar;
5 import java.util.HashMap;
7 import java.util.ArrayList;
10 import org.apache.log4j.Logger;
12 import com.datastax.driver.core.Cluster;
13 import com.datastax.driver.core.Host;
14 import com.datastax.driver.core.Metadata;
15 import com.datastax.driver.core.Row;
16 import com.datastax.driver.core.Session;
17 import com.datastax.driver.core.ResultSet;
18 import com.datastax.driver.core.PreparedStatement;
19 import com.datastax.driver.core.BoundStatement;
21 import compbio.engine.ProteoCachePropertyHelperManager;
22 import compbio.util.PropertyHelper;
23 import compbio.util.Util;
25 public class CassandraNativeConnector {
26 private static Cluster cluster;
27 private static Session session;
28 private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
29 private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
31 public static String CASSANDRA_HOSTNAME = "localhost";
32 public static boolean READ_WEB_JPRED = false;
33 public static boolean READ_LOCALFILE_JPRED = false;
35 private static boolean initBooleanValue(String key) {
37 String status = ph.getProperty(key);
38 log.debug("Loading property: " + key + " with value: " + status);
39 if (Util.isEmpty(status)) {
42 return new Boolean(status.trim()).booleanValue();
46 * connect to the cluster and look whether all tables exist
48 public void Connect() {
50 String cassandrahostname = ph.getProperty("cassandra.host");
51 if (null != cassandrahostname) {
52 CASSANDRA_HOSTNAME = cassandrahostname;
54 READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web");
55 READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local");
57 cluster = Cluster.builder().addContactPoint(CASSANDRA_HOSTNAME).build();
59 Metadata metadata = cluster.getMetadata();
60 System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
61 for (Host host : metadata.getAllHosts()) {
62 System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
64 session = cluster.connect();
66 System.out.println("Cassandra connected");
69 private void CreateTables() {
70 session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
71 session.execute("USE ProteinKeyspace");
73 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinRow "
74 + "(Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
75 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinLog "
76 + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, "
77 + "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
78 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinData "
79 + "(jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
80 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS JpredArchive "
81 + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map<ascii,ascii>, "
82 + "predictions map<ascii,ascii>, archive blob, LOG varchar, PRIMARY KEY(JobID));");
84 session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);");
85 session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);");
89 * parsing data source and filling the database
91 public void Parsing() throws IOException {
93 // if (source.equals("http")) {
94 // get data from real Jpred production server
95 System.out.println("Parsing web data source......");
96 String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
97 String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
98 JpredParserHTTP parser = new JpredParserHTTP(prefix);
99 parser.Parsing(datasrc, 5);
101 if (READ_LOCALFILE_JPRED) {
102 // if (source.equals("file")) {
103 // get irtifical data generated for the DB stress tests
104 System.out.println("Parsing local file data source......");
105 String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
106 String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
107 JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
108 parser.Parsing(datasrc, 190);
112 public void Closing() {
115 System.out.println("Cassandra has been shut down");
118 public boolean JobisNotInsterted(String jobid) {
119 ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
120 if (results1.isExhausted()) {
126 public boolean JobisNotArchived(String jobid) {
127 ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';");
128 if (results1.isExhausted()) {
135 * inserting data into the tables for queries
137 public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
138 String statusFinal, String protein, List<FastaSequence> predictions) {
139 if (JobisNotInsterted(jobid)) {
140 String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
141 + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
142 + "','" + protein + "');";
143 session.execute(com1);
145 String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
147 session.execute(com2);
149 String allpredictions = "";
150 for (FastaSequence pred : predictions) {
151 String predictionname = pred.getId();
152 String prediction = pred.getSequence().replaceAll("\n", "");
153 allpredictions += "'" + predictionname + "':'" + prediction + "',";
155 String final_prediction = "";
156 if (null != allpredictions) {
157 final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
160 String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
161 ResultSet results2 = session.execute(check2);
162 if (results2.isExhausted()) {
163 String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
164 + final_prediction + "});";
165 session.execute(com3);
173 * insert data from a real Jpred job: timing+IP, Execution Status, Final
174 * status, protein sequence, predictions, alignment, LOG and tar.gz files
176 public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
177 List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
178 if (JobisNotArchived(jobid)) {
179 String log = LogFile.replaceAll("'", "");
180 session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
181 + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
183 PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
184 BoundStatement boundStatement = new BoundStatement(statement);
185 session.execute(boundStatement.bind(jobid, archivepath));
188 for (FastaSequence p : predictions) {
189 session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
190 + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
193 for (FastaSequence s : seqs) {
194 session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
195 + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
203 * getting data from the db
205 public List<Pair<String, String>> ReadProteinDataTable() {
206 final long startTime = System.currentTimeMillis();
207 String com = "SELECT DataBegin,DataEnd FROM ProteinKeyspace.ProteinLog;";
208 System.out.println("Command: " + com);
209 ResultSet results = session.execute(com);
210 final long queryTime = System.currentTimeMillis();
211 List<Row> rows = results.all();
212 System.out.println("Query time is " + (queryTime - startTime) + " msec");
214 List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
217 Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"), r.getString("DataEnd"));
221 final long endTime = System.currentTimeMillis();
222 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
227 * getting data from the db ProteinData
229 public Integer ReadDateTable(long queryDate) {
230 final long startTime = System.currentTimeMillis();
231 String com = "SELECT jobtime, JobID FROM ProteinKeyspace.ProteinData WHERE jobtime = " + queryDate + ";";
232 System.out.println("Command: " + com);
233 ResultSet results = session.execute(com);
234 final long queryTime = System.currentTimeMillis();
235 System.out.println("Query time is " + (queryTime - startTime) + " msec");
236 if (results.isExhausted())
238 List<Row> rows = results.all();
239 final long endTime = System.currentTimeMillis();
240 System.out.println ("Processing time is " + (endTime - queryTime) + " msec");
245 * getting whole protein sequence from the db ProteinRow
247 public List<StructureProteinPrediction> ReadWholeSequence(String queryProtein) {
248 final long startTime = System.currentTimeMillis();
249 String com = "SELECT JobID, Predictions FROM ProteinKeyspace.ProteinRow WHERE Protein = '" + queryProtein + "';";
250 System.out.println("Command: " + com);
251 ResultSet results = session.execute(com);
252 if (results.isExhausted())
254 final long queryTime = System.currentTimeMillis();
255 List<Row> rows = results.all();
256 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
257 System.out.println (" rows analysed, " + rows.size());
258 List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
261 StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap("Predictions", String.class, String.class));
265 final long endTime = System.currentTimeMillis();
266 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
271 * getting data from the db ProteinData
273 public Integer ReadDateTable(long queryDate) {
274 final long startTime = System.currentTimeMillis();
275 String com = "SELECT jobtime, JobID FROM ProteinKeyspace.ProteinData WHERE jobtime = " + queryDate + ";";
276 System.out.println("Command: " + com);
277 ResultSet results = session.execute(com);
278 if (results.isExhausted())
280 final long queryTime = System.currentTimeMillis();
281 List<Row> rows = results.all();
282 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
287 * getting part of protein sequence from the db ProteinRow
289 public List<StructureProteinPrediction> ReadPartOfSequence(String queryProtein) {
290 final long startTime = System.currentTimeMillis();
291 String com = "SELECT * FROM ProteinKeyspace.ProteinRow;";
292 System.out.println("Command: " + com);
293 ResultSet results = session.execute(com);
294 if (results.isExhausted())
296 final long queryTime = System.currentTimeMillis();
297 List<Row> rows = results.all();
298 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
299 System.out.println (" rows analysed, " + rows.size());
300 List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
303 String prot = r.getString("Protein");
304 if (prot.matches("(.*)" + queryProtein + "(.*)")) {
305 StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions", String.class, String.class));
310 final long endTime = System.currentTimeMillis();
311 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
316 * getting protein sequences by counter
318 public Map<String, Integer> ReadProteinDataByCounter() {
319 final long startTime = System.currentTimeMillis();
320 String com = "SELECT Protein FROM ProteinKeyspace.ProteinRow;";
321 System.out.println("Command: " + com);
322 ResultSet results = session.execute(com);
323 if (results.isExhausted())
325 final long queryTime = System.currentTimeMillis();
326 List<Row> rows = results.all();
327 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
328 System.out.println (" rows analysed, " + rows.size());
329 Map<String, Integer> res = new HashMap<String, Integer>();
332 String protein = r.getString("Protein");
333 if (res.containsKey(protein))
334 res.put(protein, res.get(protein) + 1);
338 final long endTime = System.currentTimeMillis();
339 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
344 * getting protein sequences by counter
346 public StructureJobLog ReadJobLog(String jobid) {
347 final long startTime = System.currentTimeMillis();
348 String com = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
349 System.out.println("Command: " + com);
350 ResultSet results = session.execute(com);
351 if (results.isExhausted())
353 final long queryTime = System.currentTimeMillis();
354 Row row = results.one();
355 String com1 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
356 System.out.println("Command: " + com1);
357 ResultSet results1 = session.execute(com1);
358 if (results1.isExhausted())
360 Row row1 = results1.one();
361 StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"), row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
362 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
363 final long endTime = System.currentTimeMillis();
364 System.out.println (" rows analysed, execution time is " + (endTime - startTime) + " msec");
369 * getting earlest date of jobs from the db
371 public long getEarliestDateInDB() {
372 final long startTime = System.currentTimeMillis();
373 String com = "SELECT jobtime,JobID FROM ProteinKeyspace.ProteinData;";
374 System.out.println("Command: " + com);
375 ResultSet results = session.execute(com);
376 final long queryTime = System.currentTimeMillis();
377 System.out.println("Query time is " + (queryTime - startTime) + " msec");
379 Calendar cal = Calendar.getInstance();
380 long res = cal.getTimeInMillis();
382 while (!results.isExhausted()) {
383 Row r = results.one();
384 long d1 = r.getLong("jobtime");
390 final long endTime = System.currentTimeMillis();
391 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");