Add posibility to look at failed jobs
[proteocache.git] / datadb / compbio / cassandra / CassandraNativeConnector.java
1 package compbio.cassandra;
2
3 import java.util.Calendar;
4
5 import org.apache.log4j.Logger;
6
7 import com.datastax.driver.core.Cluster;
8 import com.datastax.driver.core.Configuration;
9 import com.datastax.driver.core.Host;
10 import com.datastax.driver.core.Metadata;
11 import com.datastax.driver.core.MetricsOptions;
12 import com.datastax.driver.core.PoolingOptions;
13 import com.datastax.driver.core.ProtocolOptions;
14 import com.datastax.driver.core.QueryOptions;
15 import com.datastax.driver.core.ResultSet;
16 import com.datastax.driver.core.Row;
17 import com.datastax.driver.core.SocketOptions;
18
19 import com.datastax.driver.core.Session;
20 import com.datastax.driver.core.policies.Policies;
21
22 import compbio.engine.ProteoCachePropertyHelperManager;
23 import compbio.util.PropertyHelper;
24
25 public class CassandraNativeConnector {
26         private static Cluster cluster;
27         private static Session session;
28         private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
29         private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
30
31         public static String CASSANDRA_HOSTNAME = "localhost";
32
33         public static Session getSession () {
34                 return session;
35         }
36
37         /*
38          * connect to the cluster and look whether all tables exist
39          */
40         public void Connect() {
41
42                 String cassandrahostname = ph.getProperty("cassandra.host");
43                 if (null != cassandrahostname) {
44                         CASSANDRA_HOSTNAME = cassandrahostname;
45                 }
46
47                 Cluster.Builder builder = Cluster.builder();
48                 builder.addContactPoint(CASSANDRA_HOSTNAME);
49                 //PrintClusterConfiguration( builder.getConfiguration());
50                 cluster = builder.build();
51
52                 Metadata metadata = cluster.getMetadata();
53                 System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
54                 for (Host host : metadata.getAllHosts()) {
55                         System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
56                 }
57                 session = cluster.connect();
58                 CreateMainTables();
59                 System.out.println("Cassandra connected");
60         }
61
62         private void CreateMainTables() {
63                 session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
64                 session.execute("USE ProteinKeyspace");
65
66                 session.execute("CREATE TABLE IF NOT EXISTS MainParameters "
67                                 + "(Name ascii, Value ascii, PRIMARY KEY(Name));");
68                 
69                 session.execute("CREATE TABLE IF NOT EXISTS ProteinRow "
70                                 + "(Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
71
72                 session.execute("CREATE TABLE IF NOT EXISTS ProteinLog "
73                                 + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, "
74                                 + "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
75
76                 session.execute("CREATE TABLE IF NOT EXISTS ProteinData "
77                                 + "(jobtime bigint, JobID ascii, ExecTime int, Protein ascii, PRIMARY KEY(jobtime, JobID));");
78
79                 session.execute("CREATE TABLE IF NOT EXISTS FailLog "
80                                 + "(jobtime bigint, JobID ascii, ExecTime int, ip ascii, FinalStatus ascii, PRIMARY KEY(jobtime, JobID));");
81                 
82                 session.execute("CREATE TABLE IF NOT EXISTS JpredArchive "
83                                 + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, FinalStatus ascii, ExecutionStatus ascii, alignment map<ascii,ascii>, "
84                                 + "predictions map<ascii,ascii>, ArchiveLink varchar, LOG varchar, PRIMARY KEY(JobID));");
85                 
86                 session.execute("CREATE TABLE IF NOT EXISTS JobDateInfo "
87                                 + "(jobday bigint, Total bigint,  TotalOK bigint, TotalStopped bigint, TotalError bigint, TotalTimeOut bigint, Program varchar, Version varchar, PRIMARY KEY(jobday));");
88
89                 session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);");
90                 session.execute("CREATE INDEX IF NOT EXISTS ProteinIp ON ProteinLog (ip);");
91                 session.execute("CREATE INDEX IF NOT EXISTS ON ProteinLog (ExecutionStatus);");
92                 session.execute("CREATE INDEX IF NOT EXISTS ON FailLog (FinalStatus);");
93         //      session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);");
94         } 
95
96         public void Closing() {
97                 session.shutdown();
98                 cluster.shutdown();
99                 System.out.println("Cassandra has been shut down");
100         }
101
102         /*
103          * getting earlest date of jobs from the db
104          */
105         public static long getEarliestDateInDB() {
106                 String com = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';";
107                 System.out.println("Command: " + com);
108                 ResultSet results = session.execute(com);
109
110                 if (!results.isExhausted()) {
111                         Row r = results.one();
112                         return Long.parseLong(r.getString("Value"));
113                 }
114                 Calendar cal = Calendar.getInstance();
115                 return cal.getTimeInMillis();
116         }
117         
118         private void PrintClusterConfiguration(Configuration cc) {
119                 Policies policies = cc.getPolicies();
120                 SocketOptions sopt = cc.getSocketOptions();
121                 ProtocolOptions propt = cc.getProtocolOptions();
122                 PoolingOptions plopt = cc.getPoolingOptions();
123                 MetricsOptions mopt = cc.getMetricsOptions();
124                 QueryOptions qopt =  cc.getQueryOptions();
125                 System.out.println("Cluster configuration:");
126                 System.out.println("   Policies = " + policies.toString());
127                 System.out.println("   Socket Options = " + sopt.toString());
128                 System.out.println("   Protocol Options: compression = " + propt.getCompression());
129                 System.out.println("   Pooling Options = " + plopt.toString());
130                 System.out.println("   Metrics Options = " + mopt.toString());
131                 System.out.println("   Query Options = " + qopt.toString());
132         }
133
134 }