package compbio.cassandra; import java.util.Calendar; import org.apache.log4j.Logger; import org.springframework.dao.DataIntegrityViolationException; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Configuration; import com.datastax.driver.core.Host; import com.datastax.driver.core.Metadata; import com.datastax.driver.core.MetricsOptions; import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.ProtocolOptions; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.SocketOptions; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.QueryExecutionException; import com.datastax.driver.core.exceptions.QueryValidationException; import com.datastax.driver.core.policies.Policies; import compbio.engine.ProteoCachePropertyHelperManager; import compbio.util.PropertyHelper; public class CassandraNativeConnector { private static Cluster cluster; private static Session session; private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper(); private static Logger log = Logger.getLogger(CassandraNativeConnector.class); public static String CASSANDRA_HOSTNAME = "localhost"; public static Session getSession() { return session; } /* * connect to the cluster and look whether all tables exist */ public void Connect() { String cassandrahostname = ph.getProperty("cassandra.host"); if (null != cassandrahostname) { CASSANDRA_HOSTNAME = cassandrahostname; } Cluster.Builder builder = Cluster.builder(); builder.addContactPoint(CASSANDRA_HOSTNAME); // PrintClusterConfiguration( builder.getConfiguration()); cluster = builder.build(); Metadata metadata = cluster.getMetadata(); System.out.printf("Connected to cluster: %s\n", metadata.getClusterName()); for (Host host : metadata.getAllHosts()) { System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); } session = cluster.connect(); CreateMainTables(); System.out.println("Cassandra connected"); } private void CreateMainTables() { session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};"); session.execute("USE ProteinKeyspace"); session.execute("CREATE TABLE IF NOT EXISTS MainParameters " + "(Name ascii, Value ascii, PRIMARY KEY(Name));"); session.execute("CREATE TABLE IF NOT EXISTS ProteinRow " + "(Protein ascii, JobID ascii, Predictions map, PRIMARY KEY(JobID));"); session.execute("CREATE TABLE IF NOT EXISTS ProteinLog " + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, " + "ExecutionStatus ascii, Protein ascii, ProgramName ascii, ProgramVersion ascii, PRIMARY KEY(JobID));"); session.execute("CREATE TABLE IF NOT EXISTS ProteinData " + "(jobtime bigint, JobID ascii, ExecTime int, Protein ascii, PRIMARY KEY(jobtime, JobID));"); session.execute("CREATE TABLE IF NOT EXISTS FailLog " + "(jobtime bigint, JobID ascii, ExecTime int, ip ascii, FinalStatus ascii, PRIMARY KEY(jobtime, JobID));"); session.execute("CREATE TABLE IF NOT EXISTS JpredArchive " + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, FinalStatus ascii, ExecutionStatus ascii, alignment map, " + "predictions map, ArchiveLink varchar, LOG varchar, ProgramName ascii, ProgramVersion ascii, PRIMARY KEY(JobID));"); session.execute("CREATE TABLE IF NOT EXISTS JobDateInfo " + "(jobday bigint, Total bigint, TotalOK bigint, TotalStopped bigint, TotalError bigint, TotalTimeOut bigint, Program varchar, Version varchar, PRIMARY KEY(jobday));"); session.execute("CREATE TABLE IF NOT EXISTS Programs " + "(Program varchar, Version varchar, Description varchar, weblink varchar, PRIMARY KEY(Program,Version));"); session.execute("CREATE TABLE IF NOT EXISTS Users " + "(name varchar, id bigint, email varchar, password varchar, organisation varchar, position varchar, signedtolist boolean, registrationdate bigint, PRIMARY KEY(id));"); // session.execute("ALTER TABLE ProteinLog ADD ProgramName ascii;"); // session.execute("ALTER TABLE ProteinLog ADD ProgramVersion ascii;"); // session.execute("ALTER TABLE JpredArchive ADD ProgramName ascii;"); // session.execute("ALTER TABLE JpredArchive ADD ProgramVersion ascii;"); session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);"); session.execute("CREATE INDEX IF NOT EXISTS ProteinIp ON ProteinLog (ip);"); session.execute("CREATE INDEX IF NOT EXISTS ON ProteinLog (ExecutionStatus);"); session.execute("CREATE INDEX IF NOT EXISTS ON FailLog (FinalStatus);"); session.execute("CREATE INDEX IF NOT EXISTS ON Users (email);"); // session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);"); } public void Closing() { session.shutdown(); cluster.shutdown(); System.out.println("Cassandra has been shut down"); } /* * getting earlest date of jobs from the db */ public static long getEarliestDateInDB() { String com = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';"; ResultSet results = session.execute(com); if (!results.isExhausted()) { Row r = results.one(); return Long.parseLong(r.getString("Value")); } Calendar cal = Calendar.getInstance(); return cal.getTimeInMillis(); } private void PrintClusterConfiguration(Configuration cc) { Policies policies = cc.getPolicies(); SocketOptions sopt = cc.getSocketOptions(); ProtocolOptions propt = cc.getProtocolOptions(); PoolingOptions plopt = cc.getPoolingOptions(); MetricsOptions mopt = cc.getMetricsOptions(); QueryOptions qopt = cc.getQueryOptions(); System.out.println("Cluster configuration:"); System.out.println(" Policies = " + policies.toString()); System.out.println(" Socket Options = " + sopt.toString()); System.out.println(" Protocol Options: compression = " + propt.getCompression()); System.out.println(" Pooling Options = " + plopt.toString()); System.out.println(" Metrics Options = " + mopt.toString()); System.out.println(" Query Options = " + qopt.toString()); } }