X-Git-Url: http://source.jalview.org/gitweb/?a=blobdiff_plain;f=datadb%2Fcompbio%2Fcassandra%2FCassandraNativeConnector.java;h=3360ad1dc201e70971f5bec8dc474477ae61fe66;hb=5fb4cb600b34a9b33e1a96aae9d66cdd1c3201dc;hp=1fb01fc3fd1b09afb25d6b37a41b26358de33800;hpb=7fafa396b8b56bb5cc7249dc7c12fd37edf5724b;p=proteocache.git diff --git a/datadb/compbio/cassandra/CassandraNativeConnector.java b/datadb/compbio/cassandra/CassandraNativeConnector.java index 1fb01fc..3360ad1 100644 --- a/datadb/compbio/cassandra/CassandraNativeConnector.java +++ b/datadb/compbio/cassandra/CassandraNativeConnector.java @@ -1,123 +1,145 @@ package compbio.cassandra; -import java.io.IOException; -import java.util.List; +import java.util.Calendar; + +import org.apache.log4j.Logger; +import org.springframework.dao.DataIntegrityViolationException; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; import com.datastax.driver.core.Host; import com.datastax.driver.core.Metadata; -import com.datastax.driver.core.Session; +import com.datastax.driver.core.MetricsOptions; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.ProtocolOptions; +import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SocketOptions; + +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import com.datastax.driver.core.policies.Policies; + +import compbio.engine.ProteoCachePropertyHelperManager; +import compbio.util.PropertyHelper; public class CassandraNativeConnector { private static Cluster cluster; private static Session session; + private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper(); + private static Logger log = Logger.getLogger(CassandraNativeConnector.class); - /* - * private static Keyspace ksp; private static Mutator mutatorLong; - * private static Mutator mutatorString; private static - * Mutator mutatorLog; StringSerializer ss = StringSerializer.get(); - * LongSerializer ls = LongSerializer.get(); - */ + public static String CASSANDRA_HOSTNAME = "localhost"; + + public static Session getSession() { + return session; + } /* - * connect to the cluster and look weather the dababase has any data inside + * connect to the cluster and look whether all tables exist */ public void Connect() { - // local cassandra cluster - cluster = Cluster.builder().addContactPoint("localhost").build(); - // distributed cassandra cluster - /* cluster = Cluster.builder().addContactPoint("10.0.115.190").build(); */ + + String cassandrahostname = ph.getProperty("cassandra.host"); + if (null != cassandrahostname) { + CASSANDRA_HOSTNAME = cassandrahostname; + } + + Cluster.Builder builder = Cluster.builder(); + builder.addContactPoint(CASSANDRA_HOSTNAME); + // PrintClusterConfiguration( builder.getConfiguration()); + cluster = builder.build(); + Metadata metadata = cluster.getMetadata(); System.out.printf("Connected to cluster: %s\n", metadata.getClusterName()); for (Host host : metadata.getAllHosts()) { System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); } - session = cluster.connect(); + CreateMainTables(); + System.out.println("Cassandra connected"); + } + + private void CreateMainTables() { session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinRow (Protein ascii, JobID ascii, Predictions map, PRIMARY KEY(JobID));"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinLog " - + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinData (jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));"); + session.execute("USE ProteinKeyspace"); - session.execute("CREATE INDEX ProteinSeq ON ProteinKeyspace.ProteinRow (protein);"); - session.execute("CREATE INDEX JobDateStamp ON ProteinKeyspace.ProteinData (jobtime);"); + session.execute("CREATE TABLE IF NOT EXISTS MainParameters " + "(Name ascii, Value ascii, PRIMARY KEY(Name));"); - System.out.println("Cassandra connected"); - } + session.execute("CREATE TABLE IF NOT EXISTS ProteinRow " + + "(Protein ascii, JobID ascii, Predictions map, PRIMARY KEY(JobID));"); - /* - * parsing data source and filling the database - */ - public void Parsing() throws IOException { - if (false) { - // if (source.equals("http")) { - // get data from real Jpred production server - System.out.println("Parsing web data source......"); - String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat"; - String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results"; - JpredParserHTTP parser = new JpredParserHTTP(prefix); - parser.Parsing(datasrc, 4); - } - if (true) { - // if (source.equals("file")) { - // get irtifical data generated for the DB stress tests - System.out.println("Parsing local file data source......"); - String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat"; - String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata"; - JpredParserLocalFile parser = new JpredParserLocalFile(prefix); - parser.Parsing(datasrc, 190); - } + session.execute("CREATE TABLE IF NOT EXISTS ProteinLog " + + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, " + + "ExecutionStatus ascii, Protein ascii, ProgramName ascii, ProgramVersion ascii, PRIMARY KEY(JobID));"); + + session.execute("CREATE TABLE IF NOT EXISTS ProteinData " + + "(jobtime bigint, JobID ascii, ExecTime int, Protein ascii, PRIMARY KEY(jobtime, JobID));"); + + session.execute("CREATE TABLE IF NOT EXISTS FailLog " + + "(jobtime bigint, JobID ascii, ExecTime int, ip ascii, FinalStatus ascii, PRIMARY KEY(jobtime, JobID));"); + + session.execute("CREATE TABLE IF NOT EXISTS JpredArchive " + + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, FinalStatus ascii, ExecutionStatus ascii, alignment map, " + + "predictions map, ArchiveLink varchar, LOG varchar, ProgramName ascii, ProgramVersion ascii, PRIMARY KEY(JobID));"); + + session.execute("CREATE TABLE IF NOT EXISTS JobDateInfo " + + "(jobday bigint, Total bigint, TotalOK bigint, TotalStopped bigint, TotalError bigint, TotalTimeOut bigint, Program varchar, Version varchar, PRIMARY KEY(jobday));"); + + String com = "CREATE TABLE IF NOT EXISTS Users " + + "(name varchar, id bigint, email varchar, password varchar, organisation varchar, position varchar, signedtolist boolean, registrationdate bigint, PRIMARY KEY(id));"; + + // session.execute("ALTER TABLE ProteinLog ADD ProgramName ascii;"); + // session.execute("ALTER TABLE ProteinLog ADD ProgramVersion ascii;"); + // session.execute("ALTER TABLE JpredArchive ADD ProgramName ascii;"); + // session.execute("ALTER TABLE JpredArchive ADD ProgramVersion ascii;"); + + session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);"); + session.execute("CREATE INDEX IF NOT EXISTS ProteinIp ON ProteinLog (ip);"); + session.execute("CREATE INDEX IF NOT EXISTS ON ProteinLog (ExecutionStatus);"); + session.execute("CREATE INDEX IF NOT EXISTS ON FailLog (FinalStatus);"); + session.execute("CREATE INDEX IF NOT EXISTS ON Users (email);"); + // session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);"); } public void Closing() { + session.shutdown(); cluster.shutdown(); System.out.println("Cassandra has been shut down"); } /* - * prepare data for insertion into the db + * getting earlest date of jobs from the db */ - public void InsertData(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal, - String protein, List predictions) { - - String check1 = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';"; - //System.out.println(check1); - ResultSet results1 = session.execute(check1); - if (results1.isExhausted()) { - String com1 = "INSERT INTO ProteinKeyspace.ProteinLog " - + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','" - + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');"; - //System.out.println(com1); - session.execute(com1); - - String com2 = "INSERT INTO ProteinKeyspace.ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid - + "','" + protein + "');"; - //System.out.println(com2); - session.execute(com2); - - String allpredictions = ""; - for (FastaSequence pred : predictions) { - String predictionname = pred.getId(); - String prediction = pred.getSequence().replaceAll("\n", ""); - allpredictions += "'" + predictionname + "':'" + prediction + "',"; - } - String final_prediction = ""; - if (null != allpredictions) { - final_prediction = allpredictions.substring(0, allpredictions.length() - 1); - } - - String check2 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "';"; - //System.out.println(check2); - ResultSet results2 = session.execute(check2); - if (results2.isExhausted()) { - String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" - + protein + "','" + jobid + "',{" + final_prediction + "});"; - //System.out.println(com3); - session.execute(com3); - } + public static long getEarliestDateInDB() { + String com = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';"; + System.out.println("Command: " + com); + ResultSet results = session.execute(com); + + if (!results.isExhausted()) { + Row r = results.one(); + return Long.parseLong(r.getString("Value")); } + Calendar cal = Calendar.getInstance(); + return cal.getTimeInMillis(); + } + + private void PrintClusterConfiguration(Configuration cc) { + Policies policies = cc.getPolicies(); + SocketOptions sopt = cc.getSocketOptions(); + ProtocolOptions propt = cc.getProtocolOptions(); + PoolingOptions plopt = cc.getPoolingOptions(); + MetricsOptions mopt = cc.getMetricsOptions(); + QueryOptions qopt = cc.getQueryOptions(); + System.out.println("Cluster configuration:"); + System.out.println(" Policies = " + policies.toString()); + System.out.println(" Socket Options = " + sopt.toString()); + System.out.println(" Protocol Options: compression = " + propt.getCompression()); + System.out.println(" Pooling Options = " + plopt.toString()); + System.out.println(" Metrics Options = " + mopt.toString()); + System.out.println(" Query Options = " + qopt.toString()); } }