fix query log info, jobs by counter
[proteocache.git] / datadb / compbio / cassandra / CassandraNativeConnector.java
1 package compbio.cassandra;
2
3 import java.io.IOException;
4 import java.util.Calendar;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.ArrayList;
8 import java.util.Map;
9
10 import com.datastax.driver.core.Cluster;
11 import com.datastax.driver.core.Host;
12 import com.datastax.driver.core.Metadata;
13 import com.datastax.driver.core.Row;
14 import com.datastax.driver.core.Session;
15 import com.datastax.driver.core.ResultSet;
16
17 public class CassandraNativeConnector {
18         private static Cluster cluster;
19         private static Session session;
20         /*
21          * connect to the cluster and look weather the dababase has any data inside
22          */
23         public void Connect() {
24                 // local cassandra cluster
25                 cluster = Cluster.builder().addContactPoint("localhost").build();
26                 // distributed cassandra cluster
27                 /* cluster = Cluster.builder().addContactPoint("10.0.115.190").build(); */
28                 Metadata metadata = cluster.getMetadata();
29                 System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
30                 for (Host host : metadata.getAllHosts()) {
31                         System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
32                 }
33
34                 session = cluster.connect();
35                 session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
36                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinRow (Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
37                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinLog "
38                                 + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
39                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinData (jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
40
41                 session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinKeyspace.ProteinRow (protein);");
42                 session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinKeyspace.ProteinData (jobtime);");
43
44                 System.out.println("Cassandra connected");
45         }
46
47         /*
48          * parsing data source and filling the database
49          */
50         public void Parsing() throws IOException {
51                 if (true) {
52                         // if (source.equals("http")) {
53                         // get data from real Jpred production server
54                         System.out.println("Parsing web data source......");
55                         String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
56                         String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
57                         JpredParserHTTP parser = new JpredParserHTTP(prefix);
58                         parser.Parsing(datasrc, 4);
59                 }
60                 if (false) {
61                         // if (source.equals("file")) {
62                         // get irtifical data generated for the DB stress tests
63                         System.out.println("Parsing local file data source......");
64                         String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
65                         String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
66                         JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
67                         parser.Parsing(datasrc, 190);
68                 }
69         }
70
71         public void Closing() {
72                 session.shutdown();
73                 cluster.shutdown();
74                 System.out.println("Cassandra has been shut down");
75         }
76
77         /*
78          * inserting data into the db
79          */
80         public void InsertData(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal,
81                         String protein, List<FastaSequence> predictions) {
82                 String check1 = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
83                 ResultSet results1 = session.execute(check1);
84                 if (results1.isExhausted()) {
85                         String com1 = "INSERT INTO ProteinKeyspace.ProteinLog "
86                                         + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','"
87                                         + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');";
88                         session.execute(com1);
89                         String com2 = "INSERT INTO ProteinKeyspace.ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid
90                                         + "','" + protein + "');";
91                         session.execute(com2);
92                         String allpredictions = "";
93                         for (FastaSequence pred : predictions) {
94                                 String predictionname = pred.getId();
95                                 String prediction = pred.getSequence().replaceAll("\n", "");
96                                 allpredictions += "'" + predictionname + "':'" + prediction + "',";
97                         }
98                         String final_prediction = "";
99                         if (null != allpredictions) {
100                                 final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
101                         }
102                         String check2 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
103                         ResultSet results2 = session.execute(check2);
104                         if (results2.isExhausted()) {
105                                 String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" 
106                         + protein + "','" + jobid + "',{" + final_prediction + "});";
107                                 session.execute(com3);
108                         }
109                         String check3 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "';";
110                 }
111         }
112
113         /*
114          * getting data from the db
115          */
116         public List<Pair<String, String>> ReadProteinDataTable() {
117                 final long startTime = System.currentTimeMillis();
118                 String com = "SELECT DataBegin,DataEnd FROM ProteinKeyspace.ProteinLog;";
119                 System.out.println("Command: " + com);
120                 ResultSet results = session.execute(com);
121                 final long queryTime = System.currentTimeMillis();
122                 List<Row> rows = results.all();
123                 System.out.println ("Query time is " + (queryTime - startTime) + " msec");
124
125                 List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
126                 int c = 0;
127                 for (Row r : rows) {
128                         Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"),r.getString("DataEnd"));
129                         res.add(pair);
130                         ++c;
131                 }
132                 final long endTime = System.currentTimeMillis();
133                 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
134                 return res;
135         }
136         
137         /*
138          * getting data from the db ProteinData
139          */
140         public Integer ReadDateTable(long queryDate) {
141                 final long startTime = System.currentTimeMillis();
142                 String com = "SELECT jobtime, JobID FROM ProteinKeyspace.ProteinData WHERE jobtime = " + queryDate + ";";
143                 System.out.println("Command: " + com);
144                 ResultSet results = session.execute(com);
145                 if (results.isExhausted())
146                         return null;
147                 final long queryTime = System.currentTimeMillis();
148                 List<Row> rows = results.all();
149                 System.out.println ("Query time is " + (queryTime - startTime) + " msec");        
150                 return rows.size();
151         }
152
153         /*
154          * getting whole protein sequence from the db ProteinRow
155          */
156         public List<StructureProteinPrediction> ReadWholeSequence(String queryProtein) {
157                 final long startTime = System.currentTimeMillis();
158                 String com = "SELECT JobID, Predictions FROM ProteinKeyspace.ProteinRow WHERE Protein = '" + queryProtein + "';";
159                 System.out.println("Command: " + com);
160                 ResultSet results = session.execute(com);
161                 if (results.isExhausted())
162                         return null;
163                 final long queryTime = System.currentTimeMillis();
164                 List<Row> rows = results.all();
165                 System.out.println ("Query time is " + (queryTime - startTime) + " msec");   
166                 System.out.println (" rows analysed,  " + rows.size());
167                 List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
168                 int c = 0;
169                 for (Row r : rows) {
170                         StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap("Predictions", String.class, String.class));         
171                         res.add(structure);
172                         ++c;
173                 }
174                 final long endTime = System.currentTimeMillis();
175                 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
176                 return res;
177         }
178         
179         /*
180          * getting part of protein sequence from the db ProteinRow
181          */
182         public List<StructureProteinPrediction>  ReadPartOfSequence(String queryProtein) {
183                 final long startTime = System.currentTimeMillis();
184                 String com = "SELECT * FROM ProteinKeyspace.ProteinRow;";
185                 System.out.println("Command: " + com);
186                 ResultSet results = session.execute(com);
187                 if (results.isExhausted())
188                         return null;
189                 final long queryTime = System.currentTimeMillis();
190                 List<Row> rows = results.all();
191                 System.out.println ("Query time is " + (queryTime - startTime) + " msec");   
192                 System.out.println (" rows analysed,  " + rows.size());
193                 List<StructureProteinPrediction>  res = new ArrayList<StructureProteinPrediction>();
194                 int c = 0;
195                 for (Row r : rows) {
196                         String prot = r.getString("Protein");
197                         if (prot.matches("(.*)" + queryProtein + "(.*)")) {
198                         //      System.out.println(prot);
199                                 StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions", String.class, String.class));         
200                                 res.add(structure);
201                                 ++c;
202                         }
203                 }
204                 final long endTime = System.currentTimeMillis();
205                 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
206                 return res;
207         }
208         
209         /*
210          * getting protein sequences by counter
211          */
212         public Map<String, Integer>  ReadProteinDataByCounter() {
213                 final long startTime = System.currentTimeMillis();
214                 String com = "SELECT Protein FROM ProteinKeyspace.ProteinRow;";
215                 System.out.println("Command: " + com);
216                 ResultSet results = session.execute(com);
217                 if (results.isExhausted())
218                         return null;
219                 final long queryTime = System.currentTimeMillis();
220                 List<Row> rows = results.all();
221                 System.out.println ("Query time is " + (queryTime - startTime) + " msec");   
222                 System.out.println (" rows analysed,  " + rows.size());
223                 Map<String, Integer> res = new HashMap<String, Integer>();
224                 int c = 0;
225                 for (Row r : rows) {
226                         String protein = r.getString("Protein");
227                         if (res.containsKey(protein)) 
228                                 res.put(protein, res.get(protein) + 1);
229                         else
230                                 res.put(protein, 1);
231                 }
232                 final long endTime = System.currentTimeMillis();
233                 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
234                 return res;
235         }
236         
237         
238         /*
239          * getting protein sequences by counter
240          */
241         public StructureJobLog ReadJobLog(String jobid) {
242                 final long startTime = System.currentTimeMillis();
243                 String com = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
244                 System.out.println("Command: " + com);
245                 ResultSet results = session.execute(com);
246                 if (results.isExhausted())
247                         return null;
248                 final long queryTime = System.currentTimeMillis();
249                 Row row = results.one();
250                 String com1 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
251                 System.out.println("Command: " + com1);
252                 ResultSet results1 = session.execute(com1);
253                 if (results1.isExhausted())
254                         return null;
255                 Row row1 = results1.one();
256                 StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"), row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
257                 System.out.println ("Query time is " + (queryTime - startTime) + " msec");   
258                 final long endTime = System.currentTimeMillis();
259                 System.out.println (" rows analysed, execution time is " + (endTime - startTime) + " msec");
260                 return res;
261         }
262         
263         /*
264          * getting earlest date of jobs from the db
265          */
266         public long getEarliestDateInDB() {
267                 final long startTime = System.currentTimeMillis();
268                 String com = "SELECT jobtime,JobID FROM ProteinKeyspace.ProteinData;";
269                 System.out.println("Command: " + com);
270                 ResultSet results = session.execute(com);
271                 final long queryTime = System.currentTimeMillis();
272                 System.out.println ("Query time is  " + (queryTime - startTime) + " msec");
273
274                 Calendar cal = Calendar.getInstance();
275                 long res = cal.getTimeInMillis();
276                 int c = 0;
277                 while (!results.isExhausted()) {
278                         Row r = results.one();
279                         long d1 = r.getLong("jobtime");
280                         if (res > d1) {
281                                 res = d1;
282                         }
283                         ++c;
284                 }
285                 final long endTime = System.currentTimeMillis();
286                 System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
287                 return res;
288         }
289         
290 }