package compbio.cassandra;
-import java.io.IOException;
import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
+
+import org.apache.log4j.Logger;
+import org.springframework.dao.DataIntegrityViolationException;
import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Configuration;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.MetricsOptions;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SocketOptions;
+
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.exceptions.QueryExecutionException;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import com.datastax.driver.core.policies.Policies;
+
+import compbio.engine.ProteoCachePropertyHelperManager;
+import compbio.util.PropertyHelper;
public class CassandraNativeConnector {
private static Cluster cluster;
private static Session session;
- /*
- * connect to the cluster and look weather the dababase has any data inside
+ private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
+ private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
+
+ public static String CASSANDRA_HOSTNAME = "localhost";
+
+ public static Session getSession() {
+ return session;
+ }
+
+ /**
+ * connect to the cluster and look whether all tables exist
*/
public void Connect() {
- // local cassandra cluster
- cluster = Cluster.builder().addContactPoint("localhost").build();
- // distributed cassandra cluster
- /* cluster = Cluster.builder().addContactPoint("10.0.115.190").build(); */
+
+ String cassandrahostname = ph.getProperty("cassandra.host");
+ if (null != cassandrahostname) {
+ CASSANDRA_HOSTNAME = cassandrahostname;
+ }
+
+ Cluster.Builder builder = Cluster.builder();
+ builder.addContactPoint(CASSANDRA_HOSTNAME);
+ // PrintClusterConfiguration( builder.getConfiguration());
+ cluster = builder.build();
+
Metadata metadata = cluster.getMetadata();
System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
for (Host host : metadata.getAllHosts()) {
System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
}
-
session = cluster.connect();
+ CreateMainTables();
+ System.out.println("Cassandra connected");
+ }
+
+ private void CreateMainTables() {
session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
- session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinRow (Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
- session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinLog "
- + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
- session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinData (jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
+ session.execute("USE ProteinKeyspace");
- session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinKeyspace.ProteinRow (protein);");
- session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinKeyspace.ProteinData (jobtime);");
+ session.execute("CREATE TABLE IF NOT EXISTS MainParameters " + "(Name ascii, Value ascii, PRIMARY KEY(Name));");
- System.out.println("Cassandra connected");
- }
+ session.execute("CREATE TABLE IF NOT EXISTS ProteinRow "
+ + "(Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
- /*
- * parsing data source and filling the database
- */
- public void Parsing() throws IOException {
- if (true) {
- // if (source.equals("http")) {
- // get data from real Jpred production server
- System.out.println("Parsing web data source......");
- String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
- String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
- JpredParserHTTP parser = new JpredParserHTTP(prefix);
- parser.Parsing(datasrc, 4);
- }
- if (false) {
- // if (source.equals("file")) {
- // get irtifical data generated for the DB stress tests
- System.out.println("Parsing local file data source......");
- String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
- String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
- JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
- parser.Parsing(datasrc, 190);
- }
+ session.execute("CREATE TABLE IF NOT EXISTS ProteinLog "
+ + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, "
+ + "ExecutionStatus ascii, Protein ascii, ProgramName ascii, ProgramVersion ascii, PRIMARY KEY(JobID));");
+
+ session.execute("CREATE TABLE IF NOT EXISTS ProteinData "
+ + "(jobtime bigint, JobID ascii, ExecTime int, Protein ascii, PRIMARY KEY(jobtime, JobID));");
+
+ session.execute("CREATE TABLE IF NOT EXISTS FailLog "
+ + "(jobtime bigint, JobID ascii, ExecTime int, ip ascii, FinalStatus ascii, PRIMARY KEY(jobtime, JobID));");
+
+ session.execute("CREATE TABLE IF NOT EXISTS JpredArchive "
+ + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, FinalStatus ascii, ExecutionStatus ascii, alignment map<ascii,ascii>, "
+ + "predictions map<ascii,ascii>, ArchiveLink varchar, LOG varchar, ProgramName ascii, ProgramVersion ascii, PRIMARY KEY(JobID));");
+
+ session.execute("CREATE TABLE IF NOT EXISTS JobDateInfo "
+ + "(jobday bigint, TotalOK bigint, TotalStopped bigint, TotalError bigint, TotalTimeOut bigint, PRIMARY KEY(jobday));");
+
+ session.execute("CREATE TABLE IF NOT EXISTS Programs "
+ + "(Program varchar, Version varchar, Description varchar, weblink varchar, PRIMARY KEY(Program,Version));");
+
+ session.execute("CREATE TABLE IF NOT EXISTS Users "
+ + "(name varchar, id bigint, email varchar, password varchar, organisation varchar, position varchar, signedtolist boolean, registrationdate bigint, PRIMARY KEY(id));");
+
+ // session.execute("ALTER TABLE ProteinLog ADD ProgramName ascii;");
+ // session.execute("ALTER TABLE ProteinLog ADD ProgramVersion ascii;");
+ // session.execute("ALTER TABLE JpredArchive ADD ProgramName ascii;");
+ // session.execute("ALTER TABLE JpredArchive ADD ProgramVersion ascii;");
+
+ session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);");
+ session.execute("CREATE INDEX IF NOT EXISTS ProteinIp ON ProteinLog (ip);");
+ session.execute("CREATE INDEX IF NOT EXISTS ON ProteinLog (ExecutionStatus);");
+ session.execute("CREATE INDEX IF NOT EXISTS ON FailLog (FinalStatus);");
+ session.execute("CREATE INDEX IF NOT EXISTS ON Users (email);");
+ // session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);");
}
public void Closing() {
}
/*
- * inserting data into the db
- */
- public void InsertData(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal,
- String protein, List<FastaSequence> predictions) {
-
- String check1 = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
- ResultSet results1 = session.execute(check1);
- if (results1.isExhausted()) {
- String com1 = "INSERT INTO ProteinKeyspace.ProteinLog "
- + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','"
- + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');";
- session.execute(com1);
-
- String com2 = "INSERT INTO ProteinKeyspace.ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid
- + "','" + protein + "');";
- session.execute(com2);
-
- String allpredictions = "";
- for (FastaSequence pred : predictions) {
- String predictionname = pred.getId();
- String prediction = pred.getSequence().replaceAll("\n", "");
- allpredictions += "'" + predictionname + "':'" + prediction + "',";
- }
- String final_prediction = "";
- if (null != allpredictions) {
- final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
- }
-
- String check2 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "';";
- ResultSet results2 = session.execute(check2);
- if (results2.isExhausted()) {
- String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('"
- + protein + "','" + jobid + "',{" + final_prediction + "});";
- session.execute(com3);
- }
- }
- }
-
- /*
- * getting data from the db
- */
- public List<Pair<String, String>> ReadProteinDataTable() {
- final long startTime = System.currentTimeMillis();
- String com = "SELECT DataBegin,DataEnd FROM ProteinKeyspace.ProteinLog;";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
- final long queryTime = System.currentTimeMillis();
- List<Row> rows = results.all();
- System.out.println ("Query time is " + (queryTime - startTime) + " msec");
-
- List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
- int c = 0;
- for (Row r : rows) {
- Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"),r.getString("DataEnd"));
- res.add(pair);
- ++c;
- }
- final long endTime = System.currentTimeMillis();
- System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
- return res;
- }
-
- /*
* getting earlest date of jobs from the db
*/
- public long getEarliestDateInDB() {
- final long startTime = System.currentTimeMillis();
- String com = "SELECT jobtime FROM ProteinKeyspace.ProteinData;";
- System.out.println("Command: " + com);
+ public static long getEarliestDateInDB() {
+ String com = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';";
ResultSet results = session.execute(com);
- final long queryTime = System.currentTimeMillis();
- System.out.println ("Query time is " + (queryTime - startTime) + " msec");
- Calendar cal = Calendar.getInstance();
- long res = cal.getTimeInMillis();
- int c = 0;
- while (!results.isExhausted()) {
+ if (!results.isExhausted()) {
Row r = results.one();
- long d1 = r.getLong("jobtime");
- if (res > d1) {
- res = d1;
- }
- ++c;
+ return Long.parseLong(r.getString("Value"));
}
- final long endTime = System.currentTimeMillis();
- System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
- return res;
+ Calendar cal = Calendar.getInstance();
+ return cal.getTimeInMillis();
}
-
+
+ private void PrintClusterConfiguration(Configuration cc) {
+ Policies policies = cc.getPolicies();
+ SocketOptions sopt = cc.getSocketOptions();
+ ProtocolOptions propt = cc.getProtocolOptions();
+ PoolingOptions plopt = cc.getPoolingOptions();
+ MetricsOptions mopt = cc.getMetricsOptions();
+ QueryOptions qopt = cc.getQueryOptions();
+ System.out.println("Cluster configuration:");
+ System.out.println(" Policies = " + policies.toString());
+ System.out.println(" Socket Options = " + sopt.toString());
+ System.out.println(" Protocol Options: compression = " + propt.getCompression());
+ System.out.println(" Pooling Options = " + plopt.toString());
+ System.out.println(" Metrics Options = " + mopt.toString());
+ System.out.println(" Query Options = " + qopt.toString());
+ }
+
}