Remove keyspace name from queries
[proteocache.git] / datadb / compbio / cassandra / CassandraNativeConnector.java
1 package compbio.cassandra;
2
3 import java.io.IOException;
4 import java.util.Calendar;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.ArrayList;
8 import java.util.Map;
9
10 import org.apache.log4j.Logger;
11
12 import com.datastax.driver.core.Cluster;
13 import com.datastax.driver.core.Host;
14 import com.datastax.driver.core.Metadata;
15 import com.datastax.driver.core.Row;
16 import com.datastax.driver.core.Session;
17 import com.datastax.driver.core.ResultSet;
18 import com.datastax.driver.core.PreparedStatement;
19 import com.datastax.driver.core.BoundStatement;
20
21 import compbio.engine.ProteoCachePropertyHelperManager;
22 import compbio.util.PropertyHelper;
23 import compbio.util.Util;
24
25 public class CassandraNativeConnector {
26         private static Cluster cluster;
27         private static Session session;
28         private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
29         private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
30
31         public static String CASSANDRA_HOSTNAME = "localhost";
32         public static boolean READ_WEB_JPRED = false;
33         public static boolean READ_LOCALFILE_JPRED = false;
34
35         private static boolean initBooleanValue(String key) {
36                 assert key != null;
37                 String status = ph.getProperty(key);
38                 log.debug("Loading property: " + key + " with value: " + status);
39                 if (Util.isEmpty(status)) {
40                         return false;
41                 }
42                 return new Boolean(status.trim()).booleanValue();
43         }
44
45         /*
46          * connect to the cluster and look whether all tables exist
47          */
48         public void Connect() {
49
50                 String cassandrahostname = ph.getProperty("cassandra.host");
51                 if (null != cassandrahostname) {
52                         CASSANDRA_HOSTNAME = cassandrahostname;
53                 }
54                 READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web");
55                 READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local");
56
57                 cluster = Cluster.builder().addContactPoint(CASSANDRA_HOSTNAME).build();
58
59                 Metadata metadata = cluster.getMetadata();
60                 System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
61                 for (Host host : metadata.getAllHosts()) {
62                         System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
63                 }
64                 session = cluster.connect();
65                 CreateTables();
66                 System.out.println("Cassandra connected");
67         }
68
69         private void CreateTables() {
70                 session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
71                 session.execute("USE ProteinKeyspace");
72
73                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinRow "
74                                 + "(Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
75                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinLog "
76                                 + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, "
77                                 + "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
78                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinData "
79                                 + "(jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
80                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS JpredArchive "
81                                 + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map<ascii,ascii>, "
82                                 + "predictions map<ascii,ascii>, archive blob, LOG varchar, PRIMARY KEY(JobID));");
83
84                 session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);");
85                 session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);");
86         }
87
88         /*
89          * parsing data source and filling the database
90          */
91         public void Parsing() throws IOException {
92                 if (READ_WEB_JPRED) {
93                         // if (source.equals("http")) {
94                         // get data from real Jpred production server
95                         System.out.println("Parsing web data source......");
96                         String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
97                         String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
98                         JpredParserHTTP parser = new JpredParserHTTP(prefix);
99                         parser.Parsing(datasrc, 5);
100                 }
101                 if (READ_LOCALFILE_JPRED) {
102                         // if (source.equals("file")) {
103                         // get irtifical data generated for the DB stress tests
104                         System.out.println("Parsing local file data source......");
105                         String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
106                         String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
107                         JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
108                         parser.Parsing(datasrc, 190);
109                 }
110         }
111
112         public void Closing() {
113                 session.shutdown();
114                 cluster.shutdown();
115                 System.out.println("Cassandra has been shut down");
116         }
117
118         public boolean JobisNotInsterted(String jobid) {
119                 ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
120                 if (results1.isExhausted()) {
121                         return true;
122                 }
123                 return false;
124         }
125
126         public boolean JobisNotArchived(String jobid) {
127                 ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';");
128                 if (results1.isExhausted()) {
129                         return true;
130                 }
131                 return false;
132         }
133
134         /*
135          * inserting data into the tables for queries
136          */
137         public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
138                         String statusFinal, String protein, List<FastaSequence> predictions) {
139                 if (JobisNotInsterted(jobid)) {
140                         String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
141                                         + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
142                                         + "','" + protein + "');";
143                         session.execute(com1);
144
145                         String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
146                                         + "');";
147                         session.execute(com2);
148
149                         String allpredictions = "";
150                         for (FastaSequence pred : predictions) {
151                                 String predictionname = pred.getId();
152                                 String prediction = pred.getSequence().replaceAll("\n", "");
153                                 allpredictions += "'" + predictionname + "':'" + prediction + "',";
154                         }
155                         String final_prediction = "";
156                         if (null != allpredictions) {
157                                 final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
158                         }
159
160                         String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
161                         ResultSet results2 = session.execute(check2);
162                         if (results2.isExhausted()) {
163                                 String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
164                                                 + final_prediction + "});";
165                                 session.execute(com3);
166                         }
167                         return 1;
168                 }
169                 return 0;
170         }
171
172         /*
173          * insert data from a real Jpred job: timing+IP, Execution Status, Final
174          * status, protein sequence, predictions, alignment, LOG and tar.gz files
175          */
176         public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
177                         List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
178                 if (JobisNotArchived(jobid)) {
179                         String log = LogFile.replaceAll("'", "");
180                         session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
181                                         + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
182                         if (false) {
183                                 PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
184                                 BoundStatement boundStatement = new BoundStatement(statement);
185                                 session.execute(boundStatement.bind(jobid, archivepath));
186                         }
187
188                         for (FastaSequence p : predictions) {
189                                 session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
190                                                 + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
191                         }
192
193                         for (FastaSequence s : seqs) {
194                                 session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
195                                                 + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
196                         }
197                         return 1;
198                 }
199                 return 0;
200         }
201
202         /*
203          * getting data from the db
204          */
205         public List<Pair<String, String>> ReadProteinDataTable() {
206                 final long startTime = System.currentTimeMillis();
207                 String com = "SELECT DataBegin,DataEnd FROM ProteinLog;";
208                 System.out.println("Command: " + com);
209                 ResultSet results = session.execute(com);
210                 final long queryTime = System.currentTimeMillis();
211                 List<Row> rows = results.all();
212                 System.out.println("Query time is " + (queryTime - startTime) + " msec");
213
214                 List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
215                 int c = 0;
216                 for (Row r : rows) {
217                         Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"), r.getString("DataEnd"));
218                         res.add(pair);
219                         ++c;
220                 }
221                 final long endTime = System.currentTimeMillis();
222                 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
223                 return res;
224         }
225
226         /*
227          * getting data from the db ProteinData
228          */
229         public Integer ReadDateTable(long queryDate) {
230                 final long startTime = System.currentTimeMillis();
231                 String com = "SELECT jobtime, JobID FROM ProteinData WHERE jobtime = " + queryDate + ";";
232                 System.out.println("Command: " + com);
233                 ResultSet results = session.execute(com);
234                 final long queryTime = System.currentTimeMillis();
235                 System.out.println("Query time is " + (queryTime - startTime) + " msec");
236                 if (results.isExhausted())
237                         return 0;
238                 List<Row> rows = results.all();
239                 final long endTime = System.currentTimeMillis();
240                 System.out.println("Processing time is " + (endTime - queryTime) + " msec");
241                 return rows.size();
242         }
243
244         /*
245          * getting whole protein sequence from the db ProteinRow
246          */
247         public List<StructureProteinPrediction> ReadWholeSequence(String queryProtein) {
248                 final long startTime = System.currentTimeMillis();
249                 String com = "SELECT JobID, Predictions FROM ProteinRow WHERE Protein = '" + queryProtein + "';";
250                 System.out.println("Command: " + com);
251                 ResultSet results = session.execute(com);
252                 if (results.isExhausted())
253                         return null;
254                 final long queryTime = System.currentTimeMillis();
255                 List<Row> rows = results.all();
256                 System.out.println("Query time is " + (queryTime - startTime) + " msec");
257                 System.out.println(" rows analysed,  " + rows.size());
258                 List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
259                 int c = 0;
260                 for (Row r : rows) {
261                         StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap(
262                                         "Predictions", String.class, String.class));
263                         res.add(structure);
264                         ++c;
265                 }
266                 final long endTime = System.currentTimeMillis();
267                 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
268                 return res;
269         }
270
271         /*
272          * getting part of protein sequence from the db ProteinRow
273          */
274         public List<StructureProteinPrediction> ReadPartOfSequence(String queryProtein) {
275                 final long startTime = System.currentTimeMillis();
276                 String com = "SELECT * FROM ProteinRow;";
277                 System.out.println("Command: " + com);
278                 ResultSet results = session.execute(com);
279                 if (results.isExhausted())
280                         return null;
281                 final long queryTime = System.currentTimeMillis();
282                 List<Row> rows = results.all();
283                 System.out.println("Query time is " + (queryTime - startTime) + " msec");
284                 System.out.println(" rows analysed,  " + rows.size());
285                 List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
286                 int c = 0;
287                 for (Row r : rows) {
288                         String prot = r.getString("Protein");
289                         if (prot.matches("(.*)" + queryProtein + "(.*)")) {
290                                 StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions",
291                                                 String.class, String.class));
292                                 res.add(structure);
293                                 ++c;
294                         }
295                 }
296                 final long endTime = System.currentTimeMillis();
297                 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
298                 return res;
299         }
300
301         /*
302          * getting protein sequences by counter
303          */
304         public Map<String, Integer> ReadProteinDataByCounter() {
305                 final long startTime = System.currentTimeMillis();
306                 String com = "SELECT Protein FROM ProteinRow;";
307                 System.out.println("Command: " + com);
308                 ResultSet results = session.execute(com);
309                 if (results.isExhausted())
310                         return null;
311                 final long queryTime = System.currentTimeMillis();
312                 List<Row> rows = results.all();
313                 System.out.println("Query time is " + (queryTime - startTime) + " msec");
314                 System.out.println(" rows analysed,  " + rows.size());
315                 Map<String, Integer> res = new HashMap<String, Integer>();
316                 int c = 0;
317                 for (Row r : rows) {
318                         String protein = r.getString("Protein");
319                         if (res.containsKey(protein))
320                                 res.put(protein, res.get(protein) + 1);
321                         else
322                                 res.put(protein, 1);
323                 }
324                 final long endTime = System.currentTimeMillis();
325                 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
326                 return res;
327         }
328
329         /*
330          * getting protein sequences by counter
331          */
332         public StructureJobLog ReadJobLog(String jobid) {
333                 final long startTime = System.currentTimeMillis();
334                 String com = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
335                 System.out.println("Command: " + com);
336                 ResultSet results = session.execute(com);
337                 if (results.isExhausted())
338                         return null;
339                 final long queryTime = System.currentTimeMillis();
340                 Row row = results.one();
341                 String com1 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
342                 System.out.println("Command: " + com1);
343                 ResultSet results1 = session.execute(com1);
344                 if (results1.isExhausted())
345                         return null;
346                 Row row1 = results1.one();
347                 StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"),
348                                 row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
349                 System.out.println("Query time is " + (queryTime - startTime) + " msec");
350                 final long endTime = System.currentTimeMillis();
351                 System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec");
352                 return res;
353         }
354
355         /*
356          * getting earlest date of jobs from the db
357          */
358         public long getEarliestDateInDB() {
359                 final long startTime = System.currentTimeMillis();
360                 String com = "SELECT jobtime,JobID FROM ProteinData;";
361                 System.out.println("Command: " + com);
362                 ResultSet results = session.execute(com);
363                 final long queryTime = System.currentTimeMillis();
364                 System.out.println("Query time is  " + (queryTime - startTime) + " msec");
365
366                 Calendar cal = Calendar.getInstance();
367                 long res = cal.getTimeInMillis();
368                 int c = 0;
369                 while (!results.isExhausted()) {
370                         Row r = results.one();
371                         long d1 = r.getLong("jobtime");
372                         if (res > d1) {
373                                 res = d1;
374                         }
375                         ++c;
376                 }
377                 final long endTime = System.currentTimeMillis();
378                 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
379                 return res;
380         }
381
382 }