import java.util.ArrayList;
import java.util.Map;
+import org.apache.log4j.Logger;
+
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.BoundStatement;
+
+import compbio.engine.ProteoCachePropertyHelperManager;
+import compbio.util.PropertyHelper;
+import compbio.util.Util;
public class CassandraNativeConnector {
private static Cluster cluster;
private static Session session;
+ private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
+ private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
+
+ public static String CASSANDRA_HOSTNAME = "localhost";
+ public static boolean READ_WEB_JPRED = false;
+ public static boolean READ_LOCALFILE_JPRED = false;
+
+ private static boolean initBooleanValue(String key) {
+ assert key != null;
+ String status = ph.getProperty(key);
+ log.debug("Loading property: " + key + " with value: " + status);
+ if (Util.isEmpty(status)) {
+ return false;
+ }
+ return new Boolean(status.trim()).booleanValue();
+ }
+
/*
- * connect to the cluster and look weather the dababase has any data inside
+ * connect to the cluster and look whether all tables exist
*/
public void Connect() {
- // local cassandra cluster
- cluster = Cluster.builder().addContactPoint("localhost").build();
- // distributed cassandra cluster
- /* cluster = Cluster.builder().addContactPoint("10.0.115.190").build(); */
+
+ String cassandrahostname = ph.getProperty("cassandra.host");
+ if (null != cassandrahostname) {
+ CASSANDRA_HOSTNAME = cassandrahostname;
+ }
+ READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web");
+ READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local");
+
+ cluster = Cluster.builder().addContactPoint(CASSANDRA_HOSTNAME).build();
+
Metadata metadata = cluster.getMetadata();
System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());
for (Host host : metadata.getAllHosts()) {
System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
}
-
session = cluster.connect();
+ CreateTables();
+ System.out.println("Cassandra connected");
+ }
+
+ private void CreateTables() {
session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
- session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinRow (Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
- session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinLog "
- + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
- session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinKeyspace.ProteinData (jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
+ session.execute("USE ProteinKeyspace");
- session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinKeyspace.ProteinRow (protein);");
- session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinKeyspace.ProteinData (jobtime);");
+ session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinRow "
+ + "(Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
+ session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinLog "
+ + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, "
+ + "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
+ session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinData "
+ + "(jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
+ session.execute("CREATE COLUMNFAMILY IF NOT EXISTS JpredArchive "
+ + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map<ascii,ascii>, "
+ + "predictions map<ascii,ascii>, archive blob, LOG varchar, PRIMARY KEY(JobID));");
- System.out.println("Cassandra connected");
+ session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);");
+ session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);");
}
/*
* parsing data source and filling the database
*/
public void Parsing() throws IOException {
- if (true) {
+ if (READ_WEB_JPRED) {
// if (source.equals("http")) {
// get data from real Jpred production server
System.out.println("Parsing web data source......");
String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
JpredParserHTTP parser = new JpredParserHTTP(prefix);
- parser.Parsing(datasrc, 4);
+ parser.Parsing(datasrc, 5);
}
- if (false) {
+ if (READ_LOCALFILE_JPRED) {
// if (source.equals("file")) {
// get irtifical data generated for the DB stress tests
System.out.println("Parsing local file data source......");
System.out.println("Cassandra has been shut down");
}
+ public boolean JobisNotInsterted(String jobid) {
+ ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
+ if (results1.isExhausted()) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean JobisNotArchived(String jobid) {
+ ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';");
+ if (results1.isExhausted()) {
+ return true;
+ }
+ return false;
+ }
+
/*
- * inserting data into the db
+ * inserting data into the tables for queries
*/
- public void InsertData(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, String statusFinal,
- String protein, List<FastaSequence> predictions) {
- String check1 = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
- ResultSet results1 = session.execute(check1);
- if (results1.isExhausted()) {
- String com1 = "INSERT INTO ProteinKeyspace.ProteinLog "
- + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + " VALUES ('" + jobid + "','" + ip + "','"
- + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + "','" + protein + "');";
- session.execute(com1);
- String com2 = "INSERT INTO ProteinKeyspace.ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid
+ public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
+ String statusFinal, String protein, List<FastaSequence> predictions) {
+ if (JobisNotInsterted(jobid)) {
+ String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
+ + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
+ "','" + protein + "');";
+ session.execute(com1);
+
+ String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
+ + "');";
session.execute(com2);
+
String allpredictions = "";
for (FastaSequence pred : predictions) {
String predictionname = pred.getId();
if (null != allpredictions) {
final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
}
- String check2 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
+
+ String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
ResultSet results2 = session.execute(check2);
if (results2.isExhausted()) {
- String com3 = "INSERT INTO ProteinKeyspace.ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('"
- + protein + "','" + jobid + "',{" + final_prediction + "});";
+ String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
+ + final_prediction + "});";
session.execute(com3);
}
- String check3 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "';";
+ return 1;
}
+ return 0;
+ }
+
+ /*
+ * insert data from a real Jpred job: timing+IP, Execution Status, Final
+ * status, protein sequence, predictions, alignment, LOG and tar.gz files
+ */
+ public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
+ List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
+ if (JobisNotArchived(jobid)) {
+ String log = LogFile.replaceAll("'", "");
+ session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
+ + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
+ if (false) {
+ PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
+ BoundStatement boundStatement = new BoundStatement(statement);
+ session.execute(boundStatement.bind(jobid, archivepath));
+ }
+
+ for (FastaSequence p : predictions) {
+ session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
+ + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+ }
+
+ for (FastaSequence s : seqs) {
+ session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
+ + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+ }
+ return 1;
+ }
+ return 0;
}
/*
*/
public List<Pair<String, String>> ReadProteinDataTable() {
final long startTime = System.currentTimeMillis();
- String com = "SELECT DataBegin,DataEnd FROM ProteinKeyspace.ProteinLog;";
+ String com = "SELECT DataBegin,DataEnd FROM ProteinLog;";
System.out.println("Command: " + com);
ResultSet results = session.execute(com);
final long queryTime = System.currentTimeMillis();
List<Row> rows = results.all();
- System.out.println ("Query time is " + (queryTime - startTime) + " msec");
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
int c = 0;
for (Row r : rows) {
- Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"),r.getString("DataEnd"));
+ Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"), r.getString("DataEnd"));
res.add(pair);
++c;
}
final long endTime = System.currentTimeMillis();
- System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+ System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
return res;
}
-
+
/*
* getting data from the db ProteinData
*/
public Integer ReadDateTable(long queryDate) {
final long startTime = System.currentTimeMillis();
- String com = "SELECT jobtime, JobID FROM ProteinKeyspace.ProteinData WHERE jobtime = " + queryDate + ";";
+ String com = "SELECT jobtime, JobID FROM ProteinData WHERE jobtime = " + queryDate + ";";
System.out.println("Command: " + com);
ResultSet results = session.execute(com);
- if (results.isExhausted())
- return null;
final long queryTime = System.currentTimeMillis();
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
+ if (results.isExhausted())
+ return 0;
List<Row> rows = results.all();
- System.out.println ("Query time is " + (queryTime - startTime) + " msec");
+ final long endTime = System.currentTimeMillis();
+ System.out.println("Processing time is " + (endTime - queryTime) + " msec");
return rows.size();
}
*/
public List<StructureProteinPrediction> ReadWholeSequence(String queryProtein) {
final long startTime = System.currentTimeMillis();
- String com = "SELECT JobID, Predictions FROM ProteinKeyspace.ProteinRow WHERE Protein = '" + queryProtein + "';";
+ String com = "SELECT JobID, Predictions FROM ProteinRow WHERE Protein = '" + queryProtein + "';";
System.out.println("Command: " + com);
ResultSet results = session.execute(com);
if (results.isExhausted())
return null;
final long queryTime = System.currentTimeMillis();
List<Row> rows = results.all();
- System.out.println ("Query time is " + (queryTime - startTime) + " msec");
- System.out.println (" rows analysed, " + rows.size());
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
+ System.out.println(" rows analysed, " + rows.size());
List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
int c = 0;
for (Row r : rows) {
- StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap("Predictions", String.class, String.class));
+ StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap(
+ "Predictions", String.class, String.class));
res.add(structure);
++c;
}
final long endTime = System.currentTimeMillis();
- System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+ System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
return res;
}
-
+
/*
* getting part of protein sequence from the db ProteinRow
*/
- public List<StructureProteinPrediction> ReadPartOfSequence(String queryProtein) {
+ public List<StructureProteinPrediction> ReadPartOfSequence(String queryProtein) {
final long startTime = System.currentTimeMillis();
- String com = "SELECT * FROM ProteinKeyspace.ProteinRow;";
+ String com = "SELECT * FROM ProteinRow;";
System.out.println("Command: " + com);
ResultSet results = session.execute(com);
if (results.isExhausted())
return null;
final long queryTime = System.currentTimeMillis();
List<Row> rows = results.all();
- System.out.println ("Query time is " + (queryTime - startTime) + " msec");
- System.out.println (" rows analysed, " + rows.size());
- List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
+ System.out.println(" rows analysed, " + rows.size());
+ List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
int c = 0;
for (Row r : rows) {
String prot = r.getString("Protein");
if (prot.matches("(.*)" + queryProtein + "(.*)")) {
- // System.out.println(prot);
- StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions", String.class, String.class));
+ StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions",
+ String.class, String.class));
res.add(structure);
++c;
}
}
final long endTime = System.currentTimeMillis();
- System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+ System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
return res;
}
-
+
/*
* getting protein sequences by counter
*/
- public Map<String, Integer> ReadProteinDataByCounter() {
+ public Map<String, Integer> ReadProteinDataByCounter() {
final long startTime = System.currentTimeMillis();
- String com = "SELECT Protein FROM ProteinKeyspace.ProteinRow;";
+ String com = "SELECT Protein FROM ProteinRow;";
System.out.println("Command: " + com);
ResultSet results = session.execute(com);
if (results.isExhausted())
return null;
final long queryTime = System.currentTimeMillis();
List<Row> rows = results.all();
- System.out.println ("Query time is " + (queryTime - startTime) + " msec");
- System.out.println (" rows analysed, " + rows.size());
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
+ System.out.println(" rows analysed, " + rows.size());
Map<String, Integer> res = new HashMap<String, Integer>();
int c = 0;
for (Row r : rows) {
String protein = r.getString("Protein");
- if (res.containsKey(protein))
+ if (res.containsKey(protein))
res.put(protein, res.get(protein) + 1);
else
res.put(protein, 1);
}
final long endTime = System.currentTimeMillis();
- System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+ System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
return res;
}
-
-
+
/*
* getting protein sequences by counter
*/
if (results1.isExhausted())
return null;
Row row1 = results1.one();
- StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"), row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
- System.out.println ("Query time is " + (queryTime - startTime) + " msec");
+ StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"),
+ row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
final long endTime = System.currentTimeMillis();
- System.out.println (" rows analysed, execution time is " + (endTime - startTime) + " msec");
+ System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec");
return res;
}
-
+
/*
* getting earlest date of jobs from the db
*/
public long getEarliestDateInDB() {
final long startTime = System.currentTimeMillis();
- String com = "SELECT jobtime,JobID FROM ProteinKeyspace.ProteinData;";
+ String com = "SELECT jobtime,JobID FROM ProteinData;";
System.out.println("Command: " + com);
ResultSet results = session.execute(com);
final long queryTime = System.currentTimeMillis();
- System.out.println ("Query time is " + (queryTime - startTime) + " msec");
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
Calendar cal = Calendar.getInstance();
long res = cal.getTimeInMillis();
++c;
}
final long endTime = System.currentTimeMillis();
- System.out.println (c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+ System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
return res;
}
-
+
}