#################################################################################
# Jpred sources
-cassandra.jpred.web=true
-cassandra.jpred.local=false
\ No newline at end of file
+# real Jpred web-server
+cassandra.jpred.web.update=true
+cassandra.jpred.web.inidelay=0
+cassandra.jpred.web.updaterate=200
+
+# update time period (in days)
+# by defauls for 100 past days
+cassandra.jpred.web.period=5
+
+
+#################################################################################
+# local test job source
+cassandra.jpred.local.update=false
+cassandra.jpred.local.inidelay=10
+cassandra.jpred.local.updaterate=200
+
+# update time period (in days)
+# by defauls for 100 past days
+cassandra.jpred.local.period=300
package compbio.cassandra;
-import java.io.IOException;
import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
import org.apache.log4j.Logger;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
import compbio.engine.ProteoCachePropertyHelperManager;
import compbio.util.PropertyHelper;
-import compbio.util.Util;
public class CassandraNativeConnector {
private static Cluster cluster;
private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
public static String CASSANDRA_HOSTNAME = "localhost";
- public static boolean READ_WEB_JPRED = false;
- public static boolean READ_LOCALFILE_JPRED = false;
- private static boolean initBooleanValue(String key) {
- assert key != null;
- String status = ph.getProperty(key);
- log.debug("Loading property: " + key + " with value: " + status);
- if (Util.isEmpty(status)) {
- return false;
- }
- return new Boolean(status.trim()).booleanValue();
+ public static Session getSession () {
+ return session;
}
/*
if (null != cassandrahostname) {
CASSANDRA_HOSTNAME = cassandrahostname;
}
- READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web");
- READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local");
cluster = Cluster.builder().addContactPoint(CASSANDRA_HOSTNAME).build();
System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
}
session = cluster.connect();
- CreateTables();
+ CreateMainTables();
System.out.println("Cassandra connected");
}
- private void CreateTables() {
+ private void CreateMainTables() {
session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};");
session.execute("USE ProteinKeyspace");
- session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinRow "
+ session.execute("CREATE TABLE IF NOT EXISTS MainParameters "
+ + "(Name ascii, Value ascii, PRIMARY KEY(Name));");
+
+ session.execute("CREATE TABLE IF NOT EXISTS ProteinRow "
+ "(Protein ascii, JobID ascii, Predictions map<ascii,ascii>, PRIMARY KEY(JobID));");
- session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinLog "
+
+ session.execute("CREATE TABLE IF NOT EXISTS ProteinLog "
+ "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, "
+ "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));");
- session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinData "
+
+ session.execute("CREATE TABLE IF NOT EXISTS ProteinData "
+ "(jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));");
- session.execute("CREATE COLUMNFAMILY IF NOT EXISTS JpredArchive "
+
+ session.execute("CREATE TABLE IF NOT EXISTS JpredArchive "
+ "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map<ascii,ascii>, "
+ "predictions map<ascii,ascii>, archive blob, LOG varchar, PRIMARY KEY(JobID));");
+
+ session.execute("CREATE TABLE IF NOT EXISTS JobDateInfo "
+ + "(jobday bigint, Total bigint, PRIMARY KEY(jobday));");
session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);");
session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);");
}
- /*
- * parsing data source and filling the database
- */
- public void Parsing() throws IOException {
- if (READ_WEB_JPRED) {
- // if (source.equals("http")) {
- // get data from real Jpred production server
- System.out.println("Parsing web data source......");
- String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
- String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
- JpredParserHTTP parser = new JpredParserHTTP(prefix);
- parser.Parsing(datasrc, 5);
- }
- if (READ_LOCALFILE_JPRED) {
- // if (source.equals("file")) {
- // get irtifical data generated for the DB stress tests
- System.out.println("Parsing local file data source......");
- String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
- String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
- JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
- parser.Parsing(datasrc, 190);
- }
- }
-
public void Closing() {
session.shutdown();
cluster.shutdown();
System.out.println("Cassandra has been shut down");
}
- public boolean JobisNotInsterted(String jobid) {
- ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
- if (results1.isExhausted()) {
- return true;
- }
- return false;
- }
-
- public boolean JobisNotArchived(String jobid) {
- ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';");
- if (results1.isExhausted()) {
- return true;
- }
- return false;
- }
-
- /*
- * inserting data into the tables for queries
- */
- public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
- String statusFinal, String protein, List<FastaSequence> predictions) {
- if (JobisNotInsterted(jobid)) {
- String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
- + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
- + "','" + protein + "');";
- session.execute(com1);
-
- String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
- + "');";
- session.execute(com2);
-
- String allpredictions = "";
- for (FastaSequence pred : predictions) {
- String predictionname = pred.getId();
- String prediction = pred.getSequence().replaceAll("\n", "");
- allpredictions += "'" + predictionname + "':'" + prediction + "',";
- }
- String final_prediction = "";
- if (null != allpredictions) {
- final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
- }
-
- String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
- ResultSet results2 = session.execute(check2);
- if (results2.isExhausted()) {
- String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
- + final_prediction + "});";
- session.execute(com3);
- }
- return 1;
- }
- return 0;
- }
-
- /*
- * insert data from a real Jpred job: timing+IP, Execution Status, Final
- * status, protein sequence, predictions, alignment, LOG and tar.gz files
- */
- public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
- List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
- if (JobisNotArchived(jobid)) {
- String log = LogFile.replaceAll("'", "");
- session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
- + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
- if (false) {
- PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
- BoundStatement boundStatement = new BoundStatement(statement);
- session.execute(boundStatement.bind(jobid, archivepath));
- }
-
- for (FastaSequence p : predictions) {
- session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
- + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
- }
-
- for (FastaSequence s : seqs) {
- session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
- + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
- }
- return 1;
- }
- return 0;
- }
-
- /*
- * getting data from the db
- */
- public List<Pair<String, String>> ReadProteinDataTable() {
- final long startTime = System.currentTimeMillis();
- String com = "SELECT DataBegin,DataEnd FROM ProteinLog;";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
- final long queryTime = System.currentTimeMillis();
- List<Row> rows = results.all();
- System.out.println("Query time is " + (queryTime - startTime) + " msec");
-
- List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
- int c = 0;
- for (Row r : rows) {
- Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"), r.getString("DataEnd"));
- res.add(pair);
- ++c;
- }
- final long endTime = System.currentTimeMillis();
- System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
- return res;
- }
-
- /*
- * getting data from the db ProteinData
- */
- public Integer ReadDateTable(long queryDate) {
- final long startTime = System.currentTimeMillis();
- String com = "SELECT count(*) FROM ProteinData WHERE jobtime = " + queryDate + ";";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
- final long queryTime = System.currentTimeMillis();
- System.out.println("Query time is " + (queryTime - startTime) + " msec");
- Row row = results.one();
- Integer count = (int) row.getLong(0);
- final long endTime = System.currentTimeMillis();
- System.out.println("Processing time is " + (endTime - queryTime) + " msec");
- return count;
- }
-
- /*
- * getting whole protein sequence from the db ProteinRow
- */
- public List<StructureProteinPrediction> ReadWholeSequence(String queryProtein) {
- final long startTime = System.currentTimeMillis();
- String com = "SELECT JobID, Predictions FROM ProteinRow WHERE Protein = '" + queryProtein + "';";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
- if (results.isExhausted())
- return null;
- final long queryTime = System.currentTimeMillis();
- List<Row> rows = results.all();
- System.out.println("Query time is " + (queryTime - startTime) + " msec");
- System.out.println(" rows analysed, " + rows.size());
- List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
- int c = 0;
- for (Row r : rows) {
- StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap(
- "Predictions", String.class, String.class));
- res.add(structure);
- ++c;
- }
- final long endTime = System.currentTimeMillis();
- System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
- return res;
- }
-
- /*
- * getting part of protein sequence from the db ProteinRow
- */
- public List<StructureProteinPrediction> ReadPartOfSequence(String queryProtein) {
- final long startTime = System.currentTimeMillis();
- String com = "SELECT * FROM ProteinRow;";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
- if (results.isExhausted())
- return null;
- final long queryTime = System.currentTimeMillis();
- List<Row> rows = results.all();
- System.out.println("Query time is " + (queryTime - startTime) + " msec");
- System.out.println(" rows analysed, " + rows.size());
- List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
- int c = 0;
- for (Row r : rows) {
- String prot = r.getString("Protein");
- if (prot.matches("(.*)" + queryProtein + "(.*)")) {
- StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions",
- String.class, String.class));
- res.add(structure);
- ++c;
- }
- }
- final long endTime = System.currentTimeMillis();
- System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
- return res;
- }
-
- /*
- * getting protein sequences by counter
- */
- public Map<String, Integer> ReadProteinDataByCounter() {
- final long startTime = System.currentTimeMillis();
- String com = "SELECT Protein FROM ProteinRow;";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
- if (results.isExhausted())
- return null;
- final long queryTime = System.currentTimeMillis();
- List<Row> rows = results.all();
- System.out.println("Query time is " + (queryTime - startTime) + " msec");
- System.out.println(" rows analysed, " + rows.size());
- Map<String, Integer> res = new HashMap<String, Integer>();
- int c = 0;
- for (Row r : rows) {
- String protein = r.getString("Protein");
- if (res.containsKey(protein))
- res.put(protein, res.get(protein) + 1);
- else
- res.put(protein, 1);
- }
- final long endTime = System.currentTimeMillis();
- System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
- return res;
- }
-
- /*
- * getting protein sequences by counter
- */
- public StructureJobLog ReadJobLog(String jobid) {
- final long startTime = System.currentTimeMillis();
- String com = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';";
- System.out.println("Command: " + com);
- ResultSet results = session.execute(com);
- if (results.isExhausted())
- return null;
- final long queryTime = System.currentTimeMillis();
- Row row = results.one();
- String com1 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
- System.out.println("Command: " + com1);
- ResultSet results1 = session.execute(com1);
- if (results1.isExhausted())
- return null;
- Row row1 = results1.one();
- StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"),
- row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
- System.out.println("Query time is " + (queryTime - startTime) + " msec");
- final long endTime = System.currentTimeMillis();
- System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec");
- return res;
- }
-
/*
* getting earlest date of jobs from the db
*/
- public long getEarliestDateInDB() {
- final long startTime = System.currentTimeMillis();
- String com = "SELECT jobtime,JobID FROM ProteinData;";
+ public static long getEarliestDateInDB() {
+ String com = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';";
System.out.println("Command: " + com);
ResultSet results = session.execute(com);
- final long queryTime = System.currentTimeMillis();
- System.out.println("Query time is " + (queryTime - startTime) + " msec");
- Calendar cal = Calendar.getInstance();
- long res = cal.getTimeInMillis();
- int c = 0;
- while (!results.isExhausted()) {
+ if (!results.isExhausted()) {
Row r = results.one();
- long d1 = r.getLong("jobtime");
- if (res > d1) {
- res = d1;
- }
- ++c;
+ return Long.parseLong(r.getString("Value"));
}
- final long endTime = System.currentTimeMillis();
- System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
- return res;
+ Calendar cal = Calendar.getInstance();
+ return cal.getTimeInMillis();
}
-
+
}
--- /dev/null
+package compbio.cassandra;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.BoundStatement;
+
+import compbio.engine.ProteoCachePropertyHelperManager;
+import compbio.cassandra.CassandraNativeConnector;
+import compbio.util.PropertyHelper;
+
+public class CassandraNewTableWriter {
+ private Session session;
+ private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
+
+ public CassandraNewTableWriter() {
+ Session inis = CassandraNativeConnector.getSession();
+ setSession(inis);
+ }
+
+ public void setSession(Session s) {
+ assert s != null;
+ session = s;
+ }
+
+ public boolean JobisNotInsterted(String jobid) {
+ ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
+ if (results1.isExhausted()) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean JobisNotArchived(String jobid) {
+ ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';");
+ if (results1.isExhausted()) {
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * fill new table
+ */
+ public void FillNewTable() {
+ long date1 = CassandraNativeConnector.getEarliestDateInDB();
+ Calendar start = Calendar.getInstance();
+ start.setTime(new Date(date1));
+ Calendar endcal = Calendar.getInstance();
+ Date end = endcal.getTime();
+ for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) {
+ String query1 = "SELECT * FROM ProteinData WHERE jobtime = " + date.getTime() + ";";
+ System.out.println("Query db: " + query1);
+ ResultSet results = session.execute(query1);
+ String query2 = "INSERT INTO JobDateInfo " + "(jobday, Total)" + " VALUES (" + date.getTime() + "," + results.all().size()+ ");";
+ System.out.println("Insert DB: " + query2);
+ session.execute(query2);
+ }
+ System.out.println("Table JobDateInfo filled");
+ }
+
+ /*
+ * fill new table
+ */
+ public void FillParameters() {
+ Date bubu = new Date(CassandraNativeConnector.getEarliestDateInDB());
+ System.out.println("Old EarliestJobDate is " + bubu.toString());
+
+ String query1 = "SELECT jobtime FROM ProteinData LIMIT 2000000;";
+ System.out.println("Query db: " + query1);
+ ResultSet results = session.execute(query1);
+ Calendar endcal = Calendar.getInstance();
+ long newearliestday = endcal.getTime().getTime();
+ while (!results.isExhausted()) {
+ Row r = results.one();
+ long day = r.getLong("jobtime");
+ if (day < newearliestday) {
+ newearliestday = day;
+ }
+ }
+ String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + String.valueOf(newearliestday)
+ + "');";
+ session.execute(com);
+ Date gaga = new Date(newearliestday);
+ System.out.println("New EarliestJobDate is " + gaga.toString());
+ }
+}
package compbio.cassandra;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.ResultSet;
+
+import compbio.engine.ProteoCachePropertyHelperManager;
+import compbio.util.PropertyHelper;
+
+public class CassandraReader {
+ private Session session;
+ private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
+
+ public CassandraReader() {
+ Session inis = CassandraNativeConnector.getSession();
+ setSession (inis);
+ }
+
+ public void setSession(Session s) {
+ assert s != null;
+ session = s;
+ }
+
+ /*
+ * getting data from the db
+ */
+ public List<Pair<String, String>> ReadProteinDataTable() {
+ final long startTime = System.currentTimeMillis();
+ String com = "SELECT DataBegin,DataEnd FROM ProteinLog;";
+ System.out.println("Command: " + com);
+ ResultSet results = session.execute(com);
+ final long queryTime = System.currentTimeMillis();
+ List<Row> rows = results.all();
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
-public interface CassandraReader {
+ List<Pair<String, String>> res = new ArrayList<Pair<String, String>>();
+ int c = 0;
+ for (Row r : rows) {
+ Pair<String, String> pair = new Pair<String, String>(r.getString("DataBegin"), r.getString("DataEnd"));
+ res.add(pair);
+ ++c;
+ }
+ final long endTime = System.currentTimeMillis();
+ System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+ return res;
+ }
/*
- * Defines a source file with metainformation of Jpred Jobs
- **/
- void setSession (Session s);
-
+ * getting data from the db ProteinData
+ */
+ public Integer ReadDateTable(long queryDate) {
+ final long startTime = System.currentTimeMillis();
+ String com = "SELECT jobtime, JobID FROM ProteinData WHERE jobtime = " + queryDate + ";";
+ System.out.println("Command: " + com);
+ ResultSet results = session.execute(com);
+ final long queryTime = System.currentTimeMillis();
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
+ if (results.isExhausted())
+ return 0;
+ List<Row> rows = results.all();
+ final long endTime = System.currentTimeMillis();
+ System.out.println("Processing time is " + (endTime - queryTime) + " msec");
+ return rows.size();
+ }
+
+ /*
+ * getting whole protein sequence from the db ProteinRow
+ */
+ public List<StructureProteinPrediction> ReadWholeSequence(String queryProtein) {
+ final long startTime = System.currentTimeMillis();
+ String com = "SELECT JobID, Predictions FROM ProteinRow WHERE Protein = '" + queryProtein + "';";
+ System.out.println("Command: " + com);
+ ResultSet results = session.execute(com);
+ if (results.isExhausted())
+ return null;
+ final long queryTime = System.currentTimeMillis();
+ List<Row> rows = results.all();
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
+ System.out.println(" rows analysed, " + rows.size());
+ List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
+ int c = 0;
+ for (Row r : rows) {
+ StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap(
+ "Predictions", String.class, String.class));
+ res.add(structure);
+ ++c;
+ }
+ final long endTime = System.currentTimeMillis();
+ System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+ return res;
+ }
+
+ /*
+ * getting part of protein sequence from the db ProteinRow
+ */
+ public List<StructureProteinPrediction> ReadPartOfSequence(String queryProtein) {
+ final long startTime = System.currentTimeMillis();
+ String com = "SELECT * FROM ProteinRow;";
+ System.out.println("Command: " + com);
+ ResultSet results = session.execute(com);
+ if (results.isExhausted())
+ return null;
+ final long queryTime = System.currentTimeMillis();
+ List<Row> rows = results.all();
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
+ System.out.println(" rows analysed, " + rows.size());
+ List<StructureProteinPrediction> res = new ArrayList<StructureProteinPrediction>();
+ int c = 0;
+ for (Row r : rows) {
+ String prot = r.getString("Protein");
+ if (prot.matches("(.*)" + queryProtein + "(.*)")) {
+ StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions",
+ String.class, String.class));
+ res.add(structure);
+ ++c;
+ }
+ }
+ final long endTime = System.currentTimeMillis();
+ System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+ return res;
+ }
+
+ /*
+ * getting protein sequences by counter
+ */
+ public Map<String, Integer> ReadProteinDataByCounter() {
+ final long startTime = System.currentTimeMillis();
+ String com = "SELECT Protein FROM ProteinRow;";
+ System.out.println("Command: " + com);
+ ResultSet results = session.execute(com);
+ if (results.isExhausted())
+ return null;
+ final long queryTime = System.currentTimeMillis();
+ List<Row> rows = results.all();
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
+ System.out.println(" rows analysed, " + rows.size());
+ Map<String, Integer> res = new HashMap<String, Integer>();
+ int c = 0;
+ for (Row r : rows) {
+ String protein = r.getString("Protein");
+ if (res.containsKey(protein))
+ res.put(protein, res.get(protein) + 1);
+ else
+ res.put(protein, 1);
+ }
+ final long endTime = System.currentTimeMillis();
+ System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec");
+ return res;
+ }
+
/*
- * Makes real parsing of the source file
- **/
- /*void getResults();*/
+ * getting protein sequences by counter
+ */
+ public StructureJobLog ReadJobLog(String jobid) {
+ final long startTime = System.currentTimeMillis();
+ String com = "SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';";
+ System.out.println("Command: " + com);
+ ResultSet results = session.execute(com);
+ if (results.isExhausted())
+ return null;
+ final long queryTime = System.currentTimeMillis();
+ Row row = results.one();
+ String com1 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;";
+ System.out.println("Command: " + com1);
+ ResultSet results1 = session.execute(com1);
+ if (results1.isExhausted())
+ return null;
+ Row row1 = results1.one();
+ StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"),
+ row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class));
+ System.out.println("Query time is " + (queryTime - startTime) + " msec");
+ final long endTime = System.currentTimeMillis();
+ System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec");
+ return res;
+ }
}
--- /dev/null
+package compbio.cassandra;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.BoundStatement;
+
+import compbio.engine.ProteoCachePropertyHelperManager;
+import compbio.util.PropertyHelper;
+
+public class CassandraWriter {
+ private Session session;
+ private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
+ private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
+
+ CassandraWriter() {
+ Session inis = CassandraNativeConnector.getSession();
+ setSession(inis);
+ }
+
+ public void setSession(Session s) {
+ assert s != null;
+ session = s;
+ }
+
+ public boolean JobisNotInsterted(String jobid) {
+ ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';");
+ if (results1.isExhausted()) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean JobisNotArchived(String jobid) {
+ ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';");
+ if (results1.isExhausted()) {
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * inserting data into the tables for queries
+ */
+ public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
+ String statusFinal, String protein, List<FastaSequence> predictions) {
+ if (JobisNotInsterted(jobid)) {
+ String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
+ + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
+ + "','" + protein + "');";
+ session.execute(com1);
+
+ String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
+ + "');";
+ session.execute(com2);
+
+ String allpredictions = "";
+ for (FastaSequence pred : predictions) {
+ String predictionname = pred.getId();
+ String prediction = pred.getSequence().replaceAll("\n", "");
+ allpredictions += "'" + predictionname + "':'" + prediction + "',";
+ }
+ String final_prediction = "";
+ if (null != allpredictions) {
+ final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
+ }
+
+ String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
+ ResultSet results2 = session.execute(check2);
+ if (results2.isExhausted()) {
+ String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
+ + final_prediction + "});";
+ session.execute(com3);
+ }
+
+ // update some internal tables
+ String check3 = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';";
+ ResultSet results3 = session.execute(check3);
+ boolean updateparameter = true;
+ if (!results3.isExhausted()) {
+ Row r = results3.one();
+ if (jobtime >= Long.parseLong(r.getString("Value")))
+ updateparameter = false;
+ }
+ if (updateparameter) {
+ String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + String.valueOf(jobtime)
+ + "');";
+ session.execute(com);
+ }
+ String check4 = "SELECT * FROM MainParameters WHERE Name = 'TotalNumberOfJobs';";
+ ResultSet results4 = session.execute(check4);
+ updateparameter = true;
+ int njobs = 1;
+ if (!results4.isExhausted()) {
+ Row r = results4.one();
+ njobs += Integer.parseInt(r.getString("Value"));
+ }
+ String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('TotalNumberOfJobs','" + String.valueOf(njobs) + "');";
+ session.execute(com);
+
+ return 1;
+ }
+ return 0;
+ }
+
+ /*
+ * insert data from a real Jpred job: timing+IP, Execution Status, Final
+ * status, protein sequence, predictions, alignment, LOG and tar.gz files
+ */
+ public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
+ List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
+ if (JobisNotArchived(jobid)) {
+ String log = LogFile.replaceAll("'", "");
+ session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
+ + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
+ if (false) {
+ PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
+ BoundStatement boundStatement = new BoundStatement(statement);
+ session.execute(boundStatement.bind(jobid, archivepath));
+ }
+
+ for (FastaSequence p : predictions) {
+ session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
+ + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+ }
+
+ for (FastaSequence s : seqs) {
+ session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
+ + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+ }
+ return 1;
+ }
+ return 0;
+ }
+
+}
import compbio.cassandra.JpredParser;
public class JpredParserHTTP implements JpredParser {
- private CassandraNativeConnector cc = new CassandraNativeConnector();
+ private CassandraWriter cw = new CassandraWriter();
private String dirprefix;
private List<FastaSequence> alignment;
private List<FastaSequence> predictions;
private String jnetpred;
- JpredParserHTTP() {
+ public JpredParserHTTP() {
dirprefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
}
- JpredParserHTTP(String sourceurl) {
+ public JpredParserHTTP(String sourceurl) {
dirprefix = sourceurl;
}
// unknown_email jp_J9HBCBT
String id = table[table.length - 1];
totalcount++;
- if (cc.JobisNotInsterted(id)) {
+ if (cw.JobisNotInsterted(id)) {
URL dataurl = new URL(dirprefix + "/" + id + "/" + id + ".concise.fasta");
URL archiveurl = new URL(dirprefix + "/" + id + "/" + id + ".tar.gz");
URL logurl = new URL(dirprefix + "/" + id + "/LOG");
String ip = table[2];
String execstatus = "OK";
String finalstatus = "OK";
- countinsertions += cc.FormQueryTables(startdate.getTime(), table[0], table[1], ip, id, execstatus,
+ countinsertions += cw.FormQueryTables(startdate.getTime(), table[0], table[1], ip, id, execstatus,
finalstatus, protein, predictions);
long exectime = (endtime.getTime() - starttime.getTime()) / 1000;
if (199 < response2 && response2 < 300) {
log = parseLogFile(logurl.openStream());
}
- cc.ArchiveData(startdate.getTime(), exectime, ip, id, execstatus, finalstatus, protein,
+ cw.ArchiveData(startdate.getTime(), exectime, ip, id, execstatus, finalstatus, protein,
predictions, alignment, log, archiveurl.toString());
} catch (ParseException e) {
e.printStackTrace();
import java.util.List;
public class JpredParserLocalFile implements JpredParser {
- private CassandraNativeConnector cc = new CassandraNativeConnector();
+ private CassandraWriter cw = new CassandraWriter();
private String dirprefix;
public void setSource(String newsourceprefix) {
this.dirprefix = newsourceprefix;
}
- JpredParserLocalFile() {
+ public JpredParserLocalFile() {
this.dirprefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
}
- JpredParserLocalFile(String sourceurl) {
+ public JpredParserLocalFile(String sourceurl) {
this.dirprefix = sourceurl;
}
} catch (ParseException e) {
e.printStackTrace();
}
- countinsertions += cc.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs);
+ countinsertions += cw.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs);
}
fr.close();
} catch (IOException e) {
import compbio.engine.ProteoCachePropertyHelperManager;
import compbio.util.PropertyHelper;
-public class CassandraReaderExecutionTime implements CassandraReader {
+public class CassandraReaderExecutionTime {
private Session session;
public static String CASSANDRA_HOSTNAME = "localhost";
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
+import org.apache.log4j.Logger;
+
+import com.datastax.driver.core.Session;
+
import compbio.cassandra.CassandraNativeConnector;
+import compbio.cassandra.CassandraNewTableWriter;
+import compbio.cassandra.JpredParserHTTP;
+import compbio.cassandra.JpredParserLocalFile;
import compbio.engine.ProteoCachePropertyHelperManager;
import compbio.util.PropertyHelper;
+import compbio.util.Util;
/**
* Application Lifecycle Listener implementation class ContextListener
@WebListener
public class ContextListener implements ServletContextListener {
private ScheduledExecutorService webjob_scheduler;
+ private ScheduledExecutorService localjob_scheduler;
CassandraNativeConnector db = new CassandraNativeConnector();
static PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
+ private static Logger log = Logger.getLogger(ContextListener.class);
+ public static boolean READ_WEB_JPRED = false;
+ public static boolean READ_LOCALFILE_JPRED = false;
+
+ private static boolean initBooleanValue(String key) {
+ assert key != null;
+ String status = ph.getProperty(key);
+ log.debug("Loading property: " + key + " with value: " + status);
+ if (Util.isEmpty(status)) {
+ return false;
+ }
+ return new Boolean(status.trim()).booleanValue();
+ }
/**
* @see ServletContextListener#contextInitialized(ServletContextEvent)
*/
public void contextInitialized(ServletContextEvent arg0) {
System.out.println("ProteoCache session start......");
+ // connect to the db and create table if needed
db.Connect();
+ CassandraNewTableWriter updater = new CassandraNewTableWriter();
+
+ // updater.FillParameters();
+ // updater.FillNewTable();
+
+ READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web.update");
+ READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local.update");
+
+ if (READ_WEB_JPRED) {
+ // get data from real Jpred production server
+ final String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat";
+ final String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
+ final JpredParserHTTP parser = new JpredParserHTTP(prefix);
+
+ int initialdelay = 300;
+ int updaterate = 600;
+ int newinitialdelay = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.web.inidelay"));
+ if (0 <= newinitialdelay) {
+ initialdelay = newinitialdelay;
+ }
+ int newupdaterate = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.web.updaterate"));
+ if (0 < newupdaterate) {
+ updaterate = newupdaterate;
+ }
+ final int updateperiod = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.web.period"));
- webjob_scheduler = Executors.newSingleThreadScheduledExecutor();
- webjob_scheduler.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- db.Parsing();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ webjob_scheduler = Executors.newSingleThreadScheduledExecutor();
+ System.out.println("Initializating web job scheduler");
+ System.out.println(" initial delay = " + initialdelay + " seconds");
+ System.out.println(" update rate = " + updaterate + " seconds");
+ if (0 < updateperiod)
+ System.out.println(" update period = " + updateperiod + " days");
+ else
+ System.out.println(" update period = 5 days");
+
+ webjob_scheduler.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (0 < updateperiod) {
+ parser.Parsing(datasrc, updateperiod);
+ } else {
+ parser.Parsing(datasrc, 5);
+ }
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
+ }, initialdelay, updaterate, TimeUnit.SECONDS);
+ }
+
+ if (READ_LOCALFILE_JPRED) {
+ // get irtifical data generated for the DB stress tests
+ final String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat";
+ final String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata";
+ final JpredParserLocalFile parser = new JpredParserLocalFile(prefix);
+
+ int initialdelay = 300;
+ int updaterate = 600;
+ int newinitialdelay = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.local.inidelay"));
+ if (0 <= newinitialdelay) {
+ initialdelay = newinitialdelay;
+ }
+ int newupdaterate = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.local.updaterate"));
+ if (0 < newupdaterate) {
+ updaterate = newupdaterate;
}
- }, 0, 600, TimeUnit.SECONDS);
+ final int updateperiod = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.local.period"));
+
+ localjob_scheduler = Executors.newSingleThreadScheduledExecutor();
+ System.out.println("Initializating local job scheduler");
+ System.out.println(" initial delay = " + initialdelay + " seconds");
+ System.out.println(" update rate = " + updaterate + " seconds");
+ if (0 < updateperiod)
+ System.out.println(" update period = " + updateperiod + " days");
+ else
+ System.out.println(" update period = 5 days");
+ localjob_scheduler.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (0 < updateperiod) {
+ parser.Parsing(datasrc, updateperiod);
+ } else {
+ parser.Parsing(datasrc, 100);
+ }
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }, initialdelay, updaterate, TimeUnit.SECONDS);
+ }
}
public void contextDestroyed(ServletContextEvent arg0) {
db.Closing();
System.out.println("Shut down ProteoCache......");
- webjob_scheduler.shutdownNow();
+ if (READ_WEB_JPRED) {
+ webjob_scheduler.shutdownNow();
+ }
+ if (READ_LOCALFILE_JPRED) {
+ localjob_scheduler.shutdownNow();
+ }
}
}
import java.util.Map;
import compbio.cassandra.CassandraNativeConnector;
+import compbio.cassandra.CassandraReader;
import compbio.cassandra.DataBase;
import compbio.cassandra.Pair;
import compbio.cassandra.StructureJobLog;
import compbio.cassandra.StructureProteinPrediction;
public class CassandraRequester {
- private CassandraNativeConnector DBInstance = new CassandraNativeConnector();
+ private CassandraReader db = new CassandraReader();
private ArrayList<DataBase> query;
private static long currentDate = 0;
private static long earlestDate = 0;
List<Integer> totalTime = new ArrayList<Integer>();
for (int i = 0; i < nbins; i++)
totalTime.add(i, 0);
- List<Pair<String, String>> res = DBInstance.ReadProteinDataTable();
+ List<Pair<String, String>> res = db.ReadProteinDataTable();
List<Pair<Date, Long>> numres = new ArrayList<Pair<Date, Long>>();
for (Pair<String, String> entry : res) {
end.setTime(new Date(dateEnd));
query = new ArrayList<DataBase>();
for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) {
- Integer res = DBInstance.ReadDateTable(date.getTime());
- System.out.println("Command: " + res);
+ Integer res = db.ReadDateTable(date.getTime());
if (res == null)
continue;
DataBase db = new DataBase();
db.setTotal(res);
db.setDate(DateFormat(date.getTime()));
query.add(db);
- }
+ }
System.out.println("StatisticsProt.readLength: total number of dates = " + query.size());
return query;
}
query = new ArrayList<DataBase>();
List<StructureProteinPrediction> res;
if (flag.equals("whole"))
- res = DBInstance.ReadWholeSequence(protIn);
+ res = db.ReadWholeSequence(protIn);
else
- res = DBInstance.ReadPartOfSequence(protIn);
+ res = db.ReadPartOfSequence(protIn);
for (StructureProteinPrediction entry : res) {
Map<String,String> pred = entry.getPrediction();
Iterator it = pred.entrySet().iterator();
* */
public List<DataBase> readProteinByCounter(int counter) {
query = new ArrayList<DataBase>();
- Map<String, Integer> map = DBInstance.ReadProteinDataByCounter();
+ Map<String, Integer> map = db.ReadProteinDataByCounter();
for (Map.Entry<String, Integer> entry : map.entrySet()) {
if (entry.getValue() > counter) {
DataBase db = new DataBase();
*/
public DataBase readJobLog(String jobid) {
// query = new ArrayList<DataBase>();
- StructureJobLog res = DBInstance.ReadJobLog(jobid);
+ StructureJobLog res = db.ReadJobLog(jobid);
DataBase query = new DataBase();
query.setLogInfo(res);
// query.setres);
* find the earliest date in the database
*/
public long earliestDate() {
- earlestDate = DBInstance.getEarliestDateInDB();
+ earlestDate = CassandraNativeConnector.getEarliestDateInDB();
return earlestDate;
}