5e5cf5a4379a7003fe91ae33861534aed25d2eda
[proteocache.git] / datadb / compbio / cassandra / CassandraNativeConnector.java
1 package compbio.cassandra;
2
3 import java.util.Calendar;
4
5 import org.apache.log4j.Logger;
6 import org.springframework.dao.DataIntegrityViolationException;
7
8 import com.datastax.driver.core.Cluster;
9 import com.datastax.driver.core.Configuration;
10 import com.datastax.driver.core.Host;
11 import com.datastax.driver.core.Metadata;
12 import com.datastax.driver.core.MetricsOptions;
13 import com.datastax.driver.core.PoolingOptions;
14 import com.datastax.driver.core.ProtocolOptions;
15 import com.datastax.driver.core.QueryOptions;
16 import com.datastax.driver.core.ResultSet;
17 import com.datastax.driver.core.Row;
18 import com.datastax.driver.core.SocketOptions;
19
20 import com.datastax.driver.core.Session;
21 import com.datastax.driver.core.exceptions.QueryExecutionException;
22 import com.datastax.driver.core.exceptions.QueryValidationException;
23 import com.datastax.driver.core.policies.Policies;
24
25 import compbio.engine.ProteoCachePropertyHelperManager;
26 import compbio.util.PropertyHelper;
27
28 public class CassandraNativeConnector {
29         private static Cluster cluster;
30         private static Session session;
31         private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
32         private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
33
34         public static String CASSANDRA_HOSTNAME = "localhost";
35
36         public static Session getSession () {
37                 return session;
38         }
39
40         /*
41          * connect to the cluster and look whether all tables exist
42          */
43         public void Connect() {
44
45                 String cassandrahostname = ph.getProperty("cassandra.host");
46                 if (null != cassandrahostname) {
47                         CASSANDRA_HOSTNAME = cassandrahostname;
48                 }
49
50                 Cluster.Builder builder = Cluster.builder();
51                 builder.addContactPoint(CASSANDRA_HOSTNAME);
52                 //PrintClusterConfiguration( builder.getConfiguration());
53                 cluster = builder.build();
54
55                 Metadata metadata = cluster.getMetadata();
56                 System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
57                 for (Host host : metadata.getAllHosts()) {
58                         System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
59                 }
60                 session = cluster.connect();
61                 CreateMainTables();
62                 System.out.println("Cassandra connected");
63         }
64
65         private void CreateMainTables() {
66                 session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
67                 session.execute("USE ProteinKeyspace");
68
69                 session.execute("CREATE TABLE IF NOT EXISTS MainParameters "
70                                 + "(Name ascii, Value ascii, PRIMARY KEY(Name));");
71                 
72                 session.execute("CREATE TABLE IF NOT EXISTS ProteinRow "
73                                 + "(Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
74
75                 session.execute("CREATE TABLE IF NOT EXISTS ProteinLog "
76                                 + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, "
77                                 + "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
78
79                 session.execute("CREATE TABLE IF NOT EXISTS ProteinData "
80                                 + "(jobtime bigint, JobID ascii, ExecTime int, Protein ascii, PRIMARY KEY(jobtime, JobID));");
81
82                 session.execute("CREATE TABLE IF NOT EXISTS FailLog "
83                                 + "(jobtime bigint, JobID ascii, ExecTime int, ip ascii, FinalStatus ascii, PRIMARY KEY(jobtime, JobID));");
84                 
85                 session.execute("CREATE TABLE IF NOT EXISTS JpredArchive "
86                                 + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, FinalStatus ascii, ExecutionStatus ascii, alignment map<ascii,ascii>, "
87                                 + "predictions map<ascii,ascii>, ArchiveLink varchar, LOG varchar, PRIMARY KEY(JobID));");
88
89                 session.execute("CREATE TABLE IF NOT EXISTS JobDateInfo "
90                                 + "(jobday bigint, Total bigint,  TotalOK bigint, TotalStopped bigint, TotalError bigint, TotalTimeOut bigint, Program varchar, Version varchar, PRIMARY KEY(jobday));");
91
92                 String com = "CREATE TABLE IF NOT EXISTS Users "
93                                 + "(name varchar, id bigint, email varchar, password varchar, organisation varchar, position varchar, signedtolist boolean, registrationdate bigint, PRIMARY KEY(id));";
94                 
95                 
96                 try {
97                         session.execute(com);
98                 } catch (QueryExecutionException e) {
99                         String mess = "CassandraNativeConnector.CreateMainTables: query execution exception...";
100                         System.out.println(mess);
101                         log.error(mess);
102                         log.error(e.getLocalizedMessage(), e.getCause());
103                 } catch (QueryValidationException e) {
104                         String mess = "CassandraNativeConnector.CreateMainTables: query validation exception... Command: " + com;
105                         System.out.println(mess);
106                         log.error(mess);
107                         log.error(e.getLocalizedMessage(), e.getCause());
108                 }
109
110                 session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);");
111                 session.execute("CREATE INDEX IF NOT EXISTS ProteinIp ON ProteinLog (ip);");
112                 session.execute("CREATE INDEX IF NOT EXISTS ON ProteinLog (ExecutionStatus);");
113                 session.execute("CREATE INDEX IF NOT EXISTS ON FailLog (FinalStatus);");
114                 session.execute("CREATE INDEX IF NOT EXISTS ON Users (email);");
115         //      session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);");
116         } 
117
118         public void Closing() {
119                 session.shutdown();
120                 cluster.shutdown();
121                 System.out.println("Cassandra has been shut down");
122         }
123
124         /*
125          * getting earlest date of jobs from the db
126          */
127         public static long getEarliestDateInDB() {
128                 String com = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';";
129                 System.out.println("Command: " + com);
130                 ResultSet results = session.execute(com);
131
132                 if (!results.isExhausted()) {
133                         Row r = results.one();
134                         return Long.parseLong(r.getString("Value"));
135                 }
136                 Calendar cal = Calendar.getInstance();
137                 return cal.getTimeInMillis();
138         }
139         
140         private void PrintClusterConfiguration(Configuration cc) {
141                 Policies policies = cc.getPolicies();
142                 SocketOptions sopt = cc.getSocketOptions();
143                 ProtocolOptions propt = cc.getProtocolOptions();
144                 PoolingOptions plopt = cc.getPoolingOptions();
145                 MetricsOptions mopt = cc.getMetricsOptions();
146                 QueryOptions qopt =  cc.getQueryOptions();
147                 System.out.println("Cluster configuration:");
148                 System.out.println("   Policies = " + policies.toString());
149                 System.out.println("   Socket Options = " + sopt.toString());
150                 System.out.println("   Protocol Options: compression = " + propt.getCompression());
151                 System.out.println("   Pooling Options = " + plopt.toString());
152                 System.out.println("   Metrics Options = " + mopt.toString());
153                 System.out.println("   Query Options = " + qopt.toString());
154         }
155
156 }