X-Git-Url: http://source.jalview.org/gitweb/?a=blobdiff_plain;f=datadb%2Fcompbio%2Fcassandra%2FCassandraNativeConnector.java;h=3360ad1dc201e70971f5bec8dc474477ae61fe66;hb=5fb4cb600b34a9b33e1a96aae9d66cdd1c3201dc;hp=d87f89e08bb5f9aad525ad78e716f8ce5e4acb65;hpb=91de317f0e7f8697c3b2d036c7f22a5d6df6cabc;p=proteocache.git diff --git a/datadb/compbio/cassandra/CassandraNativeConnector.java b/datadb/compbio/cassandra/CassandraNativeConnector.java index d87f89e..3360ad1 100644 --- a/datadb/compbio/cassandra/CassandraNativeConnector.java +++ b/datadb/compbio/cassandra/CassandraNativeConnector.java @@ -1,26 +1,29 @@ package compbio.cassandra; -import java.io.IOException; import java.util.Calendar; -import java.util.HashMap; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; import org.apache.log4j.Logger; +import org.springframework.dao.DataIntegrityViolationException; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; import com.datastax.driver.core.Host; import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.MetricsOptions; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.ProtocolOptions; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import com.datastax.driver.core.SocketOptions; + import com.datastax.driver.core.Session; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import com.datastax.driver.core.policies.Policies; import compbio.engine.ProteoCachePropertyHelperManager; import compbio.util.PropertyHelper; -import compbio.util.Util; public class CassandraNativeConnector { private static Cluster cluster; @@ -29,17 +32,9 @@ public class CassandraNativeConnector { private static Logger log = Logger.getLogger(CassandraNativeConnector.class); public static String CASSANDRA_HOSTNAME = "localhost"; - public static boolean READ_WEB_JPRED = false; - public static boolean READ_LOCALFILE_JPRED = false; - private static boolean initBooleanValue(String key) { - assert key != null; - String status = ph.getProperty(key); - log.debug("Loading property: " + key + " with value: " + status); - if (Util.isEmpty(status)) { - return false; - } - return new Boolean(status.trim()).booleanValue(); + public static Session getSession() { + return session; } /* @@ -51,10 +46,11 @@ public class CassandraNativeConnector { if (null != cassandrahostname) { CASSANDRA_HOSTNAME = cassandrahostname; } - READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web"); - READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local"); - cluster = Cluster.builder().addContactPoint(CASSANDRA_HOSTNAME).build(); + Cluster.Builder builder = Cluster.builder(); + builder.addContactPoint(CASSANDRA_HOSTNAME); + // PrintClusterConfiguration( builder.getConfiguration()); + cluster = builder.build(); Metadata metadata = cluster.getMetadata(); System.out.printf("Connected to cluster: %s\n", metadata.getClusterName()); @@ -62,318 +58,88 @@ public class CassandraNativeConnector { System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); } session = cluster.connect(); - CreateTables(); + CreateMainTables(); System.out.println("Cassandra connected"); } - private void CreateTables() { + private void CreateMainTables() { session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};"); session.execute("USE ProteinKeyspace"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinRow " - + "(Protein ascii, JobID ascii, Predictions map, PRIMARY KEY(JobID));"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinLog " - + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, " - + "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinData " - + "(jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS JpredArchive " - + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map, " - + "predictions map, archive blob, LOG varchar, PRIMARY KEY(JobID));"); - - session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);"); - session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);"); - } - - /* - * parsing data source and filling the database - */ - public void Parsing() throws IOException { - if (READ_WEB_JPRED) { - // if (source.equals("http")) { - // get data from real Jpred production server - System.out.println("Parsing web data source......"); - String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat"; - String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results"; - JpredParserHTTP parser = new JpredParserHTTP(prefix); - parser.Parsing(datasrc, 5); - } - if (READ_LOCALFILE_JPRED) { - // if (source.equals("file")) { - // get irtifical data generated for the DB stress tests - System.out.println("Parsing local file data source......"); - String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat"; - String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata"; - JpredParserLocalFile parser = new JpredParserLocalFile(prefix); - parser.Parsing(datasrc, 190); - } - } - - public void Closing() { - session.shutdown(); - cluster.shutdown(); - System.out.println("Cassandra has been shut down"); - } - - public boolean JobisNotInsterted(String jobid) { - ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';"); - if (results1.isExhausted()) { - return true; - } - return false; - } - - public boolean JobisNotArchived(String jobid) { - ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';"); - if (results1.isExhausted()) { - return true; - } - return false; - } - - /* - * inserting data into the tables for queries - */ - public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, - String statusFinal, String protein, List predictions) { - if (JobisNotInsterted(jobid)) { - String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" - + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx - + "','" + protein + "');"; - session.execute(com1); - - String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein - + "');"; - session.execute(com2); - - String allpredictions = ""; - for (FastaSequence pred : predictions) { - String predictionname = pred.getId(); - String prediction = pred.getSequence().replaceAll("\n", ""); - allpredictions += "'" + predictionname + "':'" + prediction + "',"; - } - String final_prediction = ""; - if (null != allpredictions) { - final_prediction = allpredictions.substring(0, allpredictions.length() - 1); - } - - String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';"; - ResultSet results2 = session.execute(check2); - if (results2.isExhausted()) { - String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{" - + final_prediction + "});"; - session.execute(com3); - } - return 1; - } - return 0; - } + session.execute("CREATE TABLE IF NOT EXISTS MainParameters " + "(Name ascii, Value ascii, PRIMARY KEY(Name));"); - /* - * insert data from a real Jpred job: timing+IP, Execution Status, Final - * status, protein sequence, predictions, alignment, LOG and tar.gz files - */ - public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein, - List predictions, List seqs, String LogFile, String archivepath) { - if (JobisNotArchived(jobid)) { - String log = LogFile.replaceAll("'", ""); - session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein - + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');"); - if (false) { - PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);"); - BoundStatement boundStatement = new BoundStatement(statement); - session.execute(boundStatement.bind(jobid, archivepath)); - } + session.execute("CREATE TABLE IF NOT EXISTS ProteinRow " + + "(Protein ascii, JobID ascii, Predictions map, PRIMARY KEY(JobID));"); - for (FastaSequence p : predictions) { - session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'" - + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); - } + session.execute("CREATE TABLE IF NOT EXISTS ProteinLog " + + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, " + + "ExecutionStatus ascii, Protein ascii, ProgramName ascii, ProgramVersion ascii, PRIMARY KEY(JobID));"); - for (FastaSequence s : seqs) { - session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'" - + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); - } - return 1; - } - return 0; - } + session.execute("CREATE TABLE IF NOT EXISTS ProteinData " + + "(jobtime bigint, JobID ascii, ExecTime int, Protein ascii, PRIMARY KEY(jobtime, JobID));"); - /* - * getting data from the db - */ - public List> ReadProteinDataTable() { - final long startTime = System.currentTimeMillis(); - String com = "SELECT DataBegin,DataEnd FROM ProteinKeyspace.ProteinLog;"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - final long queryTime = System.currentTimeMillis(); - List rows = results.all(); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); + session.execute("CREATE TABLE IF NOT EXISTS FailLog " + + "(jobtime bigint, JobID ascii, ExecTime int, ip ascii, FinalStatus ascii, PRIMARY KEY(jobtime, JobID));"); - List> res = new ArrayList>(); - int c = 0; - for (Row r : rows) { - Pair pair = new Pair(r.getString("DataBegin"), r.getString("DataEnd")); - res.add(pair); - ++c; - } - final long endTime = System.currentTimeMillis(); - System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; - } + session.execute("CREATE TABLE IF NOT EXISTS JpredArchive " + + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, FinalStatus ascii, ExecutionStatus ascii, alignment map, " + + "predictions map, ArchiveLink varchar, LOG varchar, ProgramName ascii, ProgramVersion ascii, PRIMARY KEY(JobID));"); - /* - * getting data from the db ProteinData - */ - public Integer ReadDateTable(long queryDate) { - final long startTime = System.currentTimeMillis(); - String com = "SELECT jobtime, JobID FROM ProteinKeyspace.ProteinData WHERE jobtime = " + queryDate + ";"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - final long queryTime = System.currentTimeMillis(); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); - if (results.isExhausted()) - return 0; - List rows = results.all(); - final long endTime = System.currentTimeMillis(); - System.out.println ("Processing time is " + (endTime - queryTime) + " msec"); - return rows.size(); - } + session.execute("CREATE TABLE IF NOT EXISTS JobDateInfo " + + "(jobday bigint, Total bigint, TotalOK bigint, TotalStopped bigint, TotalError bigint, TotalTimeOut bigint, Program varchar, Version varchar, PRIMARY KEY(jobday));"); - /* - * getting whole protein sequence from the db ProteinRow - */ - public List ReadWholeSequence(String queryProtein) { - final long startTime = System.currentTimeMillis(); - String com = "SELECT JobID, Predictions FROM ProteinKeyspace.ProteinRow WHERE Protein = '" + queryProtein + "';"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - if (results.isExhausted()) - return null; - final long queryTime = System.currentTimeMillis(); - List rows = results.all(); - System.out.println ("Query time is " + (queryTime - startTime) + " msec"); - System.out.println (" rows analysed, " + rows.size()); - List res = new ArrayList(); - int c = 0; - for (Row r : rows) { - StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap("Predictions", String.class, String.class)); - res.add(structure); - ++c; - } - final long endTime = System.currentTimeMillis(); - System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; - } + String com = "CREATE TABLE IF NOT EXISTS Users " + + "(name varchar, id bigint, email varchar, password varchar, organisation varchar, position varchar, signedtolist boolean, registrationdate bigint, PRIMARY KEY(id));"; - /* - * getting part of protein sequence from the db ProteinRow - */ - public List ReadPartOfSequence(String queryProtein) { - final long startTime = System.currentTimeMillis(); - String com = "SELECT * FROM ProteinKeyspace.ProteinRow;"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - if (results.isExhausted()) - return null; - final long queryTime = System.currentTimeMillis(); - List rows = results.all(); - System.out.println ("Query time is " + (queryTime - startTime) + " msec"); - System.out.println (" rows analysed, " + rows.size()); - List res = new ArrayList(); - int c = 0; - for (Row r : rows) { - String prot = r.getString("Protein"); - if (prot.matches("(.*)" + queryProtein + "(.*)")) { - StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions", String.class, String.class)); - res.add(structure); - ++c; - } - } - final long endTime = System.currentTimeMillis(); - System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; - } + // session.execute("ALTER TABLE ProteinLog ADD ProgramName ascii;"); + // session.execute("ALTER TABLE ProteinLog ADD ProgramVersion ascii;"); + // session.execute("ALTER TABLE JpredArchive ADD ProgramName ascii;"); + // session.execute("ALTER TABLE JpredArchive ADD ProgramVersion ascii;"); - /* - * getting protein sequences by counter - */ - public Map ReadProteinDataByCounter() { - final long startTime = System.currentTimeMillis(); - String com = "SELECT Protein FROM ProteinKeyspace.ProteinRow;"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - if (results.isExhausted()) - return null; - final long queryTime = System.currentTimeMillis(); - List rows = results.all(); - System.out.println ("Query time is " + (queryTime - startTime) + " msec"); - System.out.println (" rows analysed, " + rows.size()); - Map res = new HashMap(); - int c = 0; - for (Row r : rows) { - String protein = r.getString("Protein"); - if (res.containsKey(protein)) - res.put(protein, res.get(protein) + 1); - else - res.put(protein, 1); - } - final long endTime = System.currentTimeMillis(); - System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; + session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);"); + session.execute("CREATE INDEX IF NOT EXISTS ProteinIp ON ProteinLog (ip);"); + session.execute("CREATE INDEX IF NOT EXISTS ON ProteinLog (ExecutionStatus);"); + session.execute("CREATE INDEX IF NOT EXISTS ON FailLog (FinalStatus);"); + session.execute("CREATE INDEX IF NOT EXISTS ON Users (email);"); + // session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);"); } - /* - * getting protein sequences by counter - */ - public StructureJobLog ReadJobLog(String jobid) { - final long startTime = System.currentTimeMillis(); - String com = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - if (results.isExhausted()) - return null; - final long queryTime = System.currentTimeMillis(); - Row row = results.one(); - String com1 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;"; - System.out.println("Command: " + com1); - ResultSet results1 = session.execute(com1); - if (results1.isExhausted()) - return null; - Row row1 = results1.one(); - StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"), row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class)); - System.out.println ("Query time is " + (queryTime - startTime) + " msec"); - final long endTime = System.currentTimeMillis(); - System.out.println (" rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; + public void Closing() { + session.shutdown(); + cluster.shutdown(); + System.out.println("Cassandra has been shut down"); } /* * getting earlest date of jobs from the db */ - public long getEarliestDateInDB() { - final long startTime = System.currentTimeMillis(); - String com = "SELECT jobtime FROM ProteinKeyspace.ProteinData;"; + public static long getEarliestDateInDB() { + String com = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';"; System.out.println("Command: " + com); ResultSet results = session.execute(com); - final long queryTime = System.currentTimeMillis(); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); - Calendar cal = Calendar.getInstance(); - long res = cal.getTimeInMillis(); - int c = 0; - while (!results.isExhausted()) { + if (!results.isExhausted()) { Row r = results.one(); - long d1 = r.getLong("jobtime"); - if (res > d1) { - res = d1; - } - ++c; + return Long.parseLong(r.getString("Value")); } - final long endTime = System.currentTimeMillis(); - System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; + Calendar cal = Calendar.getInstance(); + return cal.getTimeInMillis(); + } + + private void PrintClusterConfiguration(Configuration cc) { + Policies policies = cc.getPolicies(); + SocketOptions sopt = cc.getSocketOptions(); + ProtocolOptions propt = cc.getProtocolOptions(); + PoolingOptions plopt = cc.getPoolingOptions(); + MetricsOptions mopt = cc.getMetricsOptions(); + QueryOptions qopt = cc.getQueryOptions(); + System.out.println("Cluster configuration:"); + System.out.println(" Policies = " + policies.toString()); + System.out.println(" Socket Options = " + sopt.toString()); + System.out.println(" Protocol Options: compression = " + propt.getCompression()); + System.out.println(" Pooling Options = " + plopt.toString()); + System.out.println(" Metrics Options = " + mopt.toString()); + System.out.println(" Query Options = " + qopt.toString()); } }