Add Jpred archive table
[proteocache.git] / datadb / compbio / cassandra / CassandraNativeConnector.java
1 package compbio.cassandra;
2
3 import java.io.IOException;
4 import java.util.Calendar;
5 import java.util.List;
6 import java.util.ArrayList;
7
8 import com.datastax.driver.core.Cluster;
9 import com.datastax.driver.core.Host;
10 import com.datastax.driver.core.Metadata;
11 import com.datastax.driver.core.Row;
12 import com.datastax.driver.core.Session;
13 import com.datastax.driver.core.ResultSet;
14
15 public class CassandraNativeConnector {
16         private static Cluster cluster;
17         private static Session session;
18
19         /*
20          * connect to the cluster and look weather the dababase has any data inside
21          */
22         public void Connect() {
23                 // local cassandra cluster
24                 cluster = Cluster.builder().addContactPoint("localhost").build();
25                 // distributed cassandra cluster
26                 /* cluster = Cluster.builder().addContactPoint("10.0.115.190").build(); */
27                 Metadata metadata = cluster.getMetadata();
28                 System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
29                 for (Host host : metadata.getAllHosts()) {
30                         System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
31                 }
32
33                 session = cluster.connect();
34                 session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
35                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinRow (Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
36                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinLog "
37                                 + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
38                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinData (jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
39                 session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.JpredArchive " + 
40                 "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map<ascii,ascii>, predictions map<ascii,ascii>, archive blob, LOG varchar, PRIMARY KEY(JobID));");
41
42                 session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinKeyspace.ProteinRow (protein);");
43                 session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinKeyspace.ProteinData (jobtime);");
44
45                 System.out.println("Cassandra connected");
46         }
47
48         /*
49          * parsing data source and filling the database
50          */
51         public void Parsing() throws IOException {
52                 if (true) {
53                         // if (source.equals("http")) {
54                         // get data from real Jpred production server
55                         System.out.println("Parsing web data source......");
56                         String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
57                         String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
58                         JpredParserHTTP parser = new JpredParserHTTP(prefix);
59                         parser.Parsing(datasrc, 4);
60                 }
61                 if (false) {
62                         // if (source.equals("file")) {
63                         // get irtifical data generated for the DB stress tests
64                         System.out.println("Parsing local file data source......");
65                         String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
66                         String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
67                         JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
68                         parser.Parsing(datasrc, 190);
69                 }
70         }
71
72         public void Closing() {
73                 session.shutdown();
74                 cluster.shutdown();
75                 System.out.println("Cassandra has been shut down");
76         }
77
78         /*
79          * inserting data into the db
80          */
81         public void FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal,
82                         String protein, List<FastaSequence> predictions) {
83
84                 String check1 = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
85                 ResultSet results1 = session.execute(check1);
86                 if (results1.isExhausted()) {
87                         String com1 = "INSERT INTO ProteinKeyspace.ProteinLog "
88                                         + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','"
89                                         + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');";
90                         session.execute(com1);
91
92                         String com2 = "INSERT INTO ProteinKeyspace.ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid
93                                         + "','" + protein + "');";
94                         session.execute(com2);
95
96                         String allpredictions = "";
97                         for (FastaSequence pred : predictions) {
98                                 String predictionname = pred.getId();
99                                 String prediction = pred.getSequence().replaceAll("\n", "");
100                                 allpredictions += "'" + predictionname + "':'" + prediction + "',";
101                         }
102                         String final_prediction = "";
103                         if (null != allpredictions) {
104                                 final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
105                         }
106
107                         String check2 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "';";
108                         ResultSet results2 = session.execute(check2);
109                         if (results2.isExhausted()) {
110                                 String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','"
111                                                 + jobid + "',{" + final_prediction + "});";
112                                 session.execute(com3);
113                         }
114                 }
115         }
116
117         public void ArchiveData(long starttime, int exectime, String ip, String jobid, String statusEx, String statusFinal,
118                         String protein, List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile) {
119
120                 String check1 = "SELECT * FROM ProteinKeyspace.JpredArchive WHERE JobID = '" + jobid + "';";
121                 ResultSet results1 = session.execute(check1);
122                 if (results1.isExhausted()) {
123                         String allpredictions = "";
124                         for (FastaSequence pred : predictions) {
125                                 String predictionname = pred.getId();
126                                 String prediction = pred.getSequence().replaceAll("\n", "");
127                                 allpredictions += "'" + predictionname + "':'" + prediction + "',";
128                         }
129                         String final_allpredictions = "";
130                         if (null != allpredictions) {
131                                 final_allpredictions = allpredictions.substring(0, allpredictions.length() - 1);
132                         }
133                         String alignment = "";
134                         for (FastaSequence seq : seqs) {
135                                 String predictionname = seq.getId();
136                                 String prediction = seq.getSequence().replaceAll("\n", "");
137                                 alignment += "'" + predictionname + "':'" + prediction + "',";
138                         }
139                         String final_alignment = "";
140                         if (null != allpredictions) {
141                                 final_alignment = alignment.substring(0, allpredictions.length() - 1);
142                         }
143                         
144                         String com1 = "INSERT INTO ProteinKeyspace.JpredArchive "
145                                         + "(JobID, Protein, IP, StartTime, ExecTime, alignment, predictions, LOG))"
146                                         + " VALUES ('" 
147                                         + jobid + "','" + protein + "','" + ip + "'," + starttime + "," + exectime
148                                         + "',[" + final_allpredictions + "],[" + final_alignment + "],'" + LogFile + "]);";
149                         session.execute(com1);
150                 }
151         }
152
153         
154         
155         /*
156          * getting data from the db
157          */
158         public List<Pair<String, String>> ReadProteinDataTable() {
159                 final long startTime = System.currentTimeMillis();
160                 String com = "SELECT DataBegin,DataEnd FROM ProteinKeyspace.ProteinLog;";
161                 System.out.println("Command: " + com);
162                 ResultSet results = session.execute(com);
163                 final long queryTime = System.currentTimeMillis();
164                 List<Row> rows = results.all();
165                 System.out.println("Query time is " + (queryTime - startTime) + " msec");
166
167                 List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
168                 int c = 0;
169                 for (Row r : rows) {
170                         Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"), r.getString("DataEnd"));
171                         res.add(pair);
172                         ++c;
173                 }
174                 final long endTime = System.currentTimeMillis();
175                 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
176                 return res;
177         }
178
179         /*
180          * getting earlest date of jobs from the db
181          */
182         public long getEarliestDateInDB() {
183                 final long startTime = System.currentTimeMillis();
184                 String com = "SELECT jobtime FROM ProteinKeyspace.ProteinData;";
185                 System.out.println("Command: " + com);
186                 ResultSet results = session.execute(com);
187                 final long queryTime = System.currentTimeMillis();
188                 System.out.println("Query time is " + (queryTime - startTime) + " msec");
189
190                 Calendar cal = Calendar.getInstance();
191                 long res = cal.getTimeInMillis();
192                 int c = 0;
193                 while (!results.isExhausted()) {
194                         Row r = results.one();
195                         long d1 = r.getLong("jobtime");
196                         if (res > d1) {
197                                 res = d1;
198                         }
199                         ++c;
200                 }
201                 final long endTime = System.currentTimeMillis();
202                 System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
203                 return res;
204         }
205
206 }