From: Sasha Sherstnev Date: Sat, 9 Nov 2013 11:48:07 +0000 (+0000) Subject: Merge branch 'servlets' X-Git-Url: http://source.jalview.org/gitweb/?p=proteocache.git;a=commitdiff_plain;h=3803c9757f61c3a4309252b3820fd5d9cc1a7eb8;hp=642acd8e446f4ec6765eded00dabd01649e39830 Merge branch 'servlets' Conflicts: datadb/compbio/cassandra/CassandraNativeConnector.java server/compbio/statistic/CassandraRequester.java --- diff --git a/conf/Proteocache.properties b/conf/Proteocache.properties index 1f1c5cf..35cfe56 100644 --- a/conf/Proteocache.properties +++ b/conf/Proteocache.properties @@ -5,5 +5,22 @@ cassandra.host=localhost ################################################################################# # Jpred sources -cassandra.jpred.web=true -cassandra.jpred.local=false \ No newline at end of file +# real Jpred web-server +cassandra.jpred.web.update=true +cassandra.jpred.web.inidelay=0 +cassandra.jpred.web.updaterate=200 + +# update time period (in days) +# by defauls for 100 past days +cassandra.jpred.web.period=5 + + +################################################################################# +# local test job source +cassandra.jpred.local.update=false +cassandra.jpred.local.inidelay=10 +cassandra.jpred.local.updaterate=200 + +# update time period (in days) +# by defauls for 100 past days +cassandra.jpred.local.period=300 diff --git a/datadb/compbio/cassandra/CassandraNativeConnector.java b/datadb/compbio/cassandra/CassandraNativeConnector.java index 8e76d43..acb50de 100644 --- a/datadb/compbio/cassandra/CassandraNativeConnector.java +++ b/datadb/compbio/cassandra/CassandraNativeConnector.java @@ -1,26 +1,18 @@ package compbio.cassandra; -import java.io.IOException; import java.util.Calendar; -import java.util.HashMap; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; import org.apache.log4j.Logger; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Host; import com.datastax.driver.core.Metadata; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; import compbio.engine.ProteoCachePropertyHelperManager; import compbio.util.PropertyHelper; -import compbio.util.Util; public class CassandraNativeConnector { private static Cluster cluster; @@ -29,17 +21,9 @@ public class CassandraNativeConnector { private static Logger log = Logger.getLogger(CassandraNativeConnector.class); public static String CASSANDRA_HOSTNAME = "localhost"; - public static boolean READ_WEB_JPRED = false; - public static boolean READ_LOCALFILE_JPRED = false; - private static boolean initBooleanValue(String key) { - assert key != null; - String status = ph.getProperty(key); - log.debug("Loading property: " + key + " with value: " + status); - if (Util.isEmpty(status)) { - return false; - } - return new Boolean(status.trim()).booleanValue(); + public static Session getSession () { + return session; } /* @@ -51,8 +35,6 @@ public class CassandraNativeConnector { if (null != cassandrahostname) { CASSANDRA_HOSTNAME = cassandrahostname; } - READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web"); - READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local"); cluster = Cluster.builder().addContactPoint(CASSANDRA_HOSTNAME).build(); @@ -62,320 +44,58 @@ public class CassandraNativeConnector { System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); } session = cluster.connect(); - CreateTables(); + CreateMainTables(); System.out.println("Cassandra connected"); } - private void CreateTables() { + private void CreateMainTables() { session.execute("CREATE KEYSPACE IF NOT EXISTS ProteinKeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};"); session.execute("USE ProteinKeyspace"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinRow " + session.execute("CREATE TABLE IF NOT EXISTS MainParameters " + + "(Name ascii, Value ascii, PRIMARY KEY(Name));"); + + session.execute("CREATE TABLE IF NOT EXISTS ProteinRow " + "(Protein ascii, JobID ascii, Predictions map, PRIMARY KEY(JobID));"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinLog " + + session.execute("CREATE TABLE IF NOT EXISTS ProteinLog " + "(JobID ascii, DataBegin ascii, DataEnd ascii, ip ascii, FinalStatus ascii, " + "ExecutionStatus ascii, Protein ascii, PRIMARY KEY(JobID));"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS ProteinData " + + session.execute("CREATE TABLE IF NOT EXISTS ProteinData " + "(jobtime bigint, JobID ascii, Protein ascii, PRIMARY KEY(JobID));"); - session.execute("CREATE COLUMNFAMILY IF NOT EXISTS JpredArchive " + + session.execute("CREATE TABLE IF NOT EXISTS JpredArchive " + "(JobID ascii, Protein varchar, IP ascii, StartTime bigint, ExecTime int, alignment map, " + "predictions map, archive blob, LOG varchar, PRIMARY KEY(JobID));"); + + session.execute("CREATE TABLE IF NOT EXISTS JobDateInfo " + + "(jobday bigint, Total bigint, PRIMARY KEY(jobday));"); session.execute("CREATE INDEX IF NOT EXISTS ProteinSeq ON ProteinRow (protein);"); session.execute("CREATE INDEX IF NOT EXISTS JobDateStamp ON ProteinData (jobtime);"); } - /* - * parsing data source and filling the database - */ - public void Parsing() throws IOException { - if (READ_WEB_JPRED) { - // if (source.equals("http")) { - // get data from real Jpred production server - System.out.println("Parsing web data source......"); - String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat"; - String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results"; - JpredParserHTTP parser = new JpredParserHTTP(prefix); - parser.Parsing(datasrc, 5); - } - if (READ_LOCALFILE_JPRED) { - // if (source.equals("file")) { - // get irtifical data generated for the DB stress tests - System.out.println("Parsing local file data source......"); - String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat"; - String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata"; - JpredParserLocalFile parser = new JpredParserLocalFile(prefix); - parser.Parsing(datasrc, 190); - } - } - public void Closing() { session.shutdown(); cluster.shutdown(); System.out.println("Cassandra has been shut down"); } - public boolean JobisNotInsterted(String jobid) { - ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';"); - if (results1.isExhausted()) { - return true; - } - return false; - } - - public boolean JobisNotArchived(String jobid) { - ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';"); - if (results1.isExhausted()) { - return true; - } - return false; - } - - /* - * inserting data into the tables for queries - */ - public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, - String statusFinal, String protein, List predictions) { - if (JobisNotInsterted(jobid)) { - String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" - + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx - + "','" + protein + "');"; - session.execute(com1); - - String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein - + "');"; - session.execute(com2); - - String allpredictions = ""; - for (FastaSequence pred : predictions) { - String predictionname = pred.getId(); - String prediction = pred.getSequence().replaceAll("\n", ""); - allpredictions += "'" + predictionname + "':'" + prediction + "',"; - } - String final_prediction = ""; - if (null != allpredictions) { - final_prediction = allpredictions.substring(0, allpredictions.length() - 1); - } - - String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';"; - ResultSet results2 = session.execute(check2); - if (results2.isExhausted()) { - String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{" - + final_prediction + "});"; - session.execute(com3); - } - return 1; - } - return 0; - } - - /* - * insert data from a real Jpred job: timing+IP, Execution Status, Final - * status, protein sequence, predictions, alignment, LOG and tar.gz files - */ - public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein, - List predictions, List seqs, String LogFile, String archivepath) { - if (JobisNotArchived(jobid)) { - String log = LogFile.replaceAll("'", ""); - session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein - + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');"); - if (false) { - PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);"); - BoundStatement boundStatement = new BoundStatement(statement); - session.execute(boundStatement.bind(jobid, archivepath)); - } - - for (FastaSequence p : predictions) { - session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'" - + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); - } - - for (FastaSequence s : seqs) { - session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'" - + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); - } - return 1; - } - return 0; - } - - /* - * getting data from the db - */ - public List> ReadProteinDataTable() { - final long startTime = System.currentTimeMillis(); - String com = "SELECT DataBegin,DataEnd FROM ProteinLog;"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - final long queryTime = System.currentTimeMillis(); - List rows = results.all(); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); - - List> res = new ArrayList>(); - int c = 0; - for (Row r : rows) { - Pair pair = new Pair(r.getString("DataBegin"), r.getString("DataEnd")); - res.add(pair); - ++c; - } - final long endTime = System.currentTimeMillis(); - System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; - } - - /* - * getting data from the db ProteinData - */ - public Integer ReadDateTable(long queryDate) { - final long startTime = System.currentTimeMillis(); - String com = "SELECT count(*) FROM ProteinData WHERE jobtime = " + queryDate + ";"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - final long queryTime = System.currentTimeMillis(); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); - Row row = results.one(); - Integer count = (int) row.getLong(0); - final long endTime = System.currentTimeMillis(); - System.out.println("Processing time is " + (endTime - queryTime) + " msec"); - return count; - } - - /* - * getting whole protein sequence from the db ProteinRow - */ - public List ReadWholeSequence(String queryProtein) { - final long startTime = System.currentTimeMillis(); - String com = "SELECT JobID, Predictions FROM ProteinRow WHERE Protein = '" + queryProtein + "';"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - if (results.isExhausted()) - return null; - final long queryTime = System.currentTimeMillis(); - List rows = results.all(); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); - System.out.println(" rows analysed, " + rows.size()); - List res = new ArrayList(); - int c = 0; - for (Row r : rows) { - StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap( - "Predictions", String.class, String.class)); - res.add(structure); - ++c; - } - final long endTime = System.currentTimeMillis(); - System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; - } - - /* - * getting part of protein sequence from the db ProteinRow - */ - public List ReadPartOfSequence(String queryProtein) { - final long startTime = System.currentTimeMillis(); - String com = "SELECT * FROM ProteinRow;"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - if (results.isExhausted()) - return null; - final long queryTime = System.currentTimeMillis(); - List rows = results.all(); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); - System.out.println(" rows analysed, " + rows.size()); - List res = new ArrayList(); - int c = 0; - for (Row r : rows) { - String prot = r.getString("Protein"); - if (prot.matches("(.*)" + queryProtein + "(.*)")) { - StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions", - String.class, String.class)); - res.add(structure); - ++c; - } - } - final long endTime = System.currentTimeMillis(); - System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; - } - - /* - * getting protein sequences by counter - */ - public Map ReadProteinDataByCounter() { - final long startTime = System.currentTimeMillis(); - String com = "SELECT Protein FROM ProteinRow;"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - if (results.isExhausted()) - return null; - final long queryTime = System.currentTimeMillis(); - List rows = results.all(); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); - System.out.println(" rows analysed, " + rows.size()); - Map res = new HashMap(); - int c = 0; - for (Row r : rows) { - String protein = r.getString("Protein"); - if (res.containsKey(protein)) - res.put(protein, res.get(protein) + 1); - else - res.put(protein, 1); - } - final long endTime = System.currentTimeMillis(); - System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; - } - - /* - * getting protein sequences by counter - */ - public StructureJobLog ReadJobLog(String jobid) { - final long startTime = System.currentTimeMillis(); - String com = "SELECT * FROM ProteinKeyspace.ProteinLog WHERE JobID = '" + jobid + "';"; - System.out.println("Command: " + com); - ResultSet results = session.execute(com); - if (results.isExhausted()) - return null; - final long queryTime = System.currentTimeMillis(); - Row row = results.one(); - String com1 = "SELECT * FROM ProteinKeyspace.ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;"; - System.out.println("Command: " + com1); - ResultSet results1 = session.execute(com1); - if (results1.isExhausted()) - return null; - Row row1 = results1.one(); - StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"), - row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class)); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); - final long endTime = System.currentTimeMillis(); - System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; - } - /* * getting earlest date of jobs from the db */ - public long getEarliestDateInDB() { - final long startTime = System.currentTimeMillis(); - String com = "SELECT jobtime,JobID FROM ProteinData;"; + public static long getEarliestDateInDB() { + String com = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';"; System.out.println("Command: " + com); ResultSet results = session.execute(com); - final long queryTime = System.currentTimeMillis(); - System.out.println("Query time is " + (queryTime - startTime) + " msec"); - Calendar cal = Calendar.getInstance(); - long res = cal.getTimeInMillis(); - int c = 0; - while (!results.isExhausted()) { + if (!results.isExhausted()) { Row r = results.one(); - long d1 = r.getLong("jobtime"); - if (res > d1) { - res = d1; - } - ++c; + return Long.parseLong(r.getString("Value")); } - final long endTime = System.currentTimeMillis(); - System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); - return res; + Calendar cal = Calendar.getInstance(); + return cal.getTimeInMillis(); } - + } diff --git a/datadb/compbio/cassandra/CassandraNewTableWriter.java b/datadb/compbio/cassandra/CassandraNewTableWriter.java new file mode 100644 index 0000000..1832750 --- /dev/null +++ b/datadb/compbio/cassandra/CassandraNewTableWriter.java @@ -0,0 +1,94 @@ +package compbio.cassandra; + +import java.util.Calendar; +import java.util.Date; +import java.util.List; + +import org.apache.log4j.Logger; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.BoundStatement; + +import compbio.engine.ProteoCachePropertyHelperManager; +import compbio.cassandra.CassandraNativeConnector; +import compbio.util.PropertyHelper; + +public class CassandraNewTableWriter { + private Session session; + private static Logger log = Logger.getLogger(CassandraNativeConnector.class); + + public CassandraNewTableWriter() { + Session inis = CassandraNativeConnector.getSession(); + setSession(inis); + } + + public void setSession(Session s) { + assert s != null; + session = s; + } + + public boolean JobisNotInsterted(String jobid) { + ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';"); + if (results1.isExhausted()) { + return true; + } + return false; + } + + public boolean JobisNotArchived(String jobid) { + ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';"); + if (results1.isExhausted()) { + return true; + } + return false; + } + + /* + * fill new table + */ + public void FillNewTable() { + long date1 = CassandraNativeConnector.getEarliestDateInDB(); + Calendar start = Calendar.getInstance(); + start.setTime(new Date(date1)); + Calendar endcal = Calendar.getInstance(); + Date end = endcal.getTime(); + for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) { + String query1 = "SELECT * FROM ProteinData WHERE jobtime = " + date.getTime() + ";"; + System.out.println("Query db: " + query1); + ResultSet results = session.execute(query1); + String query2 = "INSERT INTO JobDateInfo " + "(jobday, Total)" + " VALUES (" + date.getTime() + "," + results.all().size()+ ");"; + System.out.println("Insert DB: " + query2); + session.execute(query2); + } + System.out.println("Table JobDateInfo filled"); + } + + /* + * fill new table + */ + public void FillParameters() { + Date bubu = new Date(CassandraNativeConnector.getEarliestDateInDB()); + System.out.println("Old EarliestJobDate is " + bubu.toString()); + + String query1 = "SELECT jobtime FROM ProteinData LIMIT 2000000;"; + System.out.println("Query db: " + query1); + ResultSet results = session.execute(query1); + Calendar endcal = Calendar.getInstance(); + long newearliestday = endcal.getTime().getTime(); + while (!results.isExhausted()) { + Row r = results.one(); + long day = r.getLong("jobtime"); + if (day < newearliestday) { + newearliestday = day; + } + } + String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + String.valueOf(newearliestday) + + "');"; + session.execute(com); + Date gaga = new Date(newearliestday); + System.out.println("New EarliestJobDate is " + gaga.toString()); + } +} diff --git a/datadb/compbio/cassandra/CassandraReader.java b/datadb/compbio/cassandra/CassandraReader.java index d2d2e1b..c71243b 100644 --- a/datadb/compbio/cassandra/CassandraReader.java +++ b/datadb/compbio/cassandra/CassandraReader.java @@ -1,16 +1,184 @@ package compbio.cassandra; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; + +import org.apache.log4j.Logger; + +import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.ResultSet; + +import compbio.engine.ProteoCachePropertyHelperManager; +import compbio.util.PropertyHelper; + +public class CassandraReader { + private Session session; + private static Logger log = Logger.getLogger(CassandraNativeConnector.class); + + public CassandraReader() { + Session inis = CassandraNativeConnector.getSession(); + setSession (inis); + } + + public void setSession(Session s) { + assert s != null; + session = s; + } + + /* + * getting data from the db + */ + public List> ReadProteinDataTable() { + final long startTime = System.currentTimeMillis(); + String com = "SELECT DataBegin,DataEnd FROM ProteinLog;"; + System.out.println("Command: " + com); + ResultSet results = session.execute(com); + final long queryTime = System.currentTimeMillis(); + List rows = results.all(); + System.out.println("Query time is " + (queryTime - startTime) + " msec"); -public interface CassandraReader { + List> res = new ArrayList>(); + int c = 0; + for (Row r : rows) { + Pair pair = new Pair(r.getString("DataBegin"), r.getString("DataEnd")); + res.add(pair); + ++c; + } + final long endTime = System.currentTimeMillis(); + System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); + return res; + } /* - * Defines a source file with metainformation of Jpred Jobs - **/ - void setSession (Session s); - + * getting data from the db ProteinData + */ + public Integer ReadDateTable(long queryDate) { + final long startTime = System.currentTimeMillis(); + String com = "SELECT jobtime, JobID FROM ProteinData WHERE jobtime = " + queryDate + ";"; + System.out.println("Command: " + com); + ResultSet results = session.execute(com); + final long queryTime = System.currentTimeMillis(); + System.out.println("Query time is " + (queryTime - startTime) + " msec"); + if (results.isExhausted()) + return 0; + List rows = results.all(); + final long endTime = System.currentTimeMillis(); + System.out.println("Processing time is " + (endTime - queryTime) + " msec"); + return rows.size(); + } + + /* + * getting whole protein sequence from the db ProteinRow + */ + public List ReadWholeSequence(String queryProtein) { + final long startTime = System.currentTimeMillis(); + String com = "SELECT JobID, Predictions FROM ProteinRow WHERE Protein = '" + queryProtein + "';"; + System.out.println("Command: " + com); + ResultSet results = session.execute(com); + if (results.isExhausted()) + return null; + final long queryTime = System.currentTimeMillis(); + List rows = results.all(); + System.out.println("Query time is " + (queryTime - startTime) + " msec"); + System.out.println(" rows analysed, " + rows.size()); + List res = new ArrayList(); + int c = 0; + for (Row r : rows) { + StructureProteinPrediction structure = new StructureProteinPrediction(queryProtein, r.getString("JobID"), r.getMap( + "Predictions", String.class, String.class)); + res.add(structure); + ++c; + } + final long endTime = System.currentTimeMillis(); + System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); + return res; + } + + /* + * getting part of protein sequence from the db ProteinRow + */ + public List ReadPartOfSequence(String queryProtein) { + final long startTime = System.currentTimeMillis(); + String com = "SELECT * FROM ProteinRow;"; + System.out.println("Command: " + com); + ResultSet results = session.execute(com); + if (results.isExhausted()) + return null; + final long queryTime = System.currentTimeMillis(); + List rows = results.all(); + System.out.println("Query time is " + (queryTime - startTime) + " msec"); + System.out.println(" rows analysed, " + rows.size()); + List res = new ArrayList(); + int c = 0; + for (Row r : rows) { + String prot = r.getString("Protein"); + if (prot.matches("(.*)" + queryProtein + "(.*)")) { + StructureProteinPrediction structure = new StructureProteinPrediction(prot, r.getString("JobID"), r.getMap("Predictions", + String.class, String.class)); + res.add(structure); + ++c; + } + } + final long endTime = System.currentTimeMillis(); + System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); + return res; + } + + /* + * getting protein sequences by counter + */ + public Map ReadProteinDataByCounter() { + final long startTime = System.currentTimeMillis(); + String com = "SELECT Protein FROM ProteinRow;"; + System.out.println("Command: " + com); + ResultSet results = session.execute(com); + if (results.isExhausted()) + return null; + final long queryTime = System.currentTimeMillis(); + List rows = results.all(); + System.out.println("Query time is " + (queryTime - startTime) + " msec"); + System.out.println(" rows analysed, " + rows.size()); + Map res = new HashMap(); + int c = 0; + for (Row r : rows) { + String protein = r.getString("Protein"); + if (res.containsKey(protein)) + res.put(protein, res.get(protein) + 1); + else + res.put(protein, 1); + } + final long endTime = System.currentTimeMillis(); + System.out.println(c + " rows analysed, execution time is " + (endTime - startTime) + " msec"); + return res; + } + /* - * Makes real parsing of the source file - **/ - /*void getResults();*/ + * getting protein sequences by counter + */ + public StructureJobLog ReadJobLog(String jobid) { + final long startTime = System.currentTimeMillis(); + String com = "SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';"; + System.out.println("Command: " + com); + ResultSet results = session.execute(com); + if (results.isExhausted()) + return null; + final long queryTime = System.currentTimeMillis(); + Row row = results.one(); + String com1 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "' ALLOW FILTERING;"; + System.out.println("Command: " + com1); + ResultSet results1 = session.execute(com1); + if (results1.isExhausted()) + return null; + Row row1 = results1.one(); + StructureJobLog res = new StructureJobLog(row.getString("Protein"), row.getString("JobID"), row.getString("DataBegin"), + row.getString("DataEnd"), row.getString("ip"), row1.getMap("Predictions", String.class, String.class)); + System.out.println("Query time is " + (queryTime - startTime) + " msec"); + final long endTime = System.currentTimeMillis(); + System.out.println(" rows analysed, execution time is " + (endTime - startTime) + " msec"); + return res; + } } diff --git a/datadb/compbio/cassandra/CassandraWriter.java b/datadb/compbio/cassandra/CassandraWriter.java new file mode 100644 index 0000000..6ee62fd --- /dev/null +++ b/datadb/compbio/cassandra/CassandraWriter.java @@ -0,0 +1,142 @@ +package compbio.cassandra; + +import java.io.IOException; +import java.util.List; + +import org.apache.log4j.Logger; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.BoundStatement; + +import compbio.engine.ProteoCachePropertyHelperManager; +import compbio.util.PropertyHelper; + +public class CassandraWriter { + private Session session; + private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper(); + private static Logger log = Logger.getLogger(CassandraNativeConnector.class); + + CassandraWriter() { + Session inis = CassandraNativeConnector.getSession(); + setSession(inis); + } + + public void setSession(Session s) { + assert s != null; + session = s; + } + + public boolean JobisNotInsterted(String jobid) { + ResultSet results1 = session.execute("SELECT * FROM ProteinLog WHERE JobID = '" + jobid + "';"); + if (results1.isExhausted()) { + return true; + } + return false; + } + + public boolean JobisNotArchived(String jobid) { + ResultSet results1 = session.execute("SELECT * FROM JpredArchive WHERE JobID = '" + jobid + "';"); + if (results1.isExhausted()) { + return true; + } + return false; + } + + /* + * inserting data into the tables for queries + */ + public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx, + String statusFinal, String protein, List predictions) { + if (JobisNotInsterted(jobid)) { + String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)" + + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx + + "','" + protein + "');"; + session.execute(com1); + + String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein + + "');"; + session.execute(com2); + + String allpredictions = ""; + for (FastaSequence pred : predictions) { + String predictionname = pred.getId(); + String prediction = pred.getSequence().replaceAll("\n", ""); + allpredictions += "'" + predictionname + "':'" + prediction + "',"; + } + String final_prediction = ""; + if (null != allpredictions) { + final_prediction = allpredictions.substring(0, allpredictions.length() - 1); + } + + String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';"; + ResultSet results2 = session.execute(check2); + if (results2.isExhausted()) { + String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{" + + final_prediction + "});"; + session.execute(com3); + } + + // update some internal tables + String check3 = "SELECT * FROM MainParameters WHERE Name = 'EarliestJobDate';"; + ResultSet results3 = session.execute(check3); + boolean updateparameter = true; + if (!results3.isExhausted()) { + Row r = results3.one(); + if (jobtime >= Long.parseLong(r.getString("Value"))) + updateparameter = false; + } + if (updateparameter) { + String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + String.valueOf(jobtime) + + "');"; + session.execute(com); + } + String check4 = "SELECT * FROM MainParameters WHERE Name = 'TotalNumberOfJobs';"; + ResultSet results4 = session.execute(check4); + updateparameter = true; + int njobs = 1; + if (!results4.isExhausted()) { + Row r = results4.one(); + njobs += Integer.parseInt(r.getString("Value")); + } + String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('TotalNumberOfJobs','" + String.valueOf(njobs) + "');"; + session.execute(com); + + return 1; + } + return 0; + } + + /* + * insert data from a real Jpred job: timing+IP, Execution Status, Final + * status, protein sequence, predictions, alignment, LOG and tar.gz files + */ + public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein, + List predictions, List seqs, String LogFile, String archivepath) { + if (JobisNotArchived(jobid)) { + String log = LogFile.replaceAll("'", ""); + session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein + + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');"); + if (false) { + PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);"); + BoundStatement boundStatement = new BoundStatement(statement); + session.execute(boundStatement.bind(jobid, archivepath)); + } + + for (FastaSequence p : predictions) { + session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'" + + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); + } + + for (FastaSequence s : seqs) { + session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'" + + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';"); + } + return 1; + } + return 0; + } + +} diff --git a/datadb/compbio/cassandra/JpredParserHTTP.java b/datadb/compbio/cassandra/JpredParserHTTP.java index 27f66cc..5687a83 100644 --- a/datadb/compbio/cassandra/JpredParserHTTP.java +++ b/datadb/compbio/cassandra/JpredParserHTTP.java @@ -21,17 +21,17 @@ import java.util.List; import compbio.cassandra.JpredParser; public class JpredParserHTTP implements JpredParser { - private CassandraNativeConnector cc = new CassandraNativeConnector(); + private CassandraWriter cw = new CassandraWriter(); private String dirprefix; private List alignment; private List predictions; private String jnetpred; - JpredParserHTTP() { + public JpredParserHTTP() { dirprefix = "http://www.compbio.dundee.ac.uk/www-jpred/results"; } - JpredParserHTTP(String sourceurl) { + public JpredParserHTTP(String sourceurl) { dirprefix = sourceurl; } @@ -125,7 +125,7 @@ public class JpredParserHTTP implements JpredParser { // unknown_email jp_J9HBCBT String id = table[table.length - 1]; totalcount++; - if (cc.JobisNotInsterted(id)) { + if (cw.JobisNotInsterted(id)) { URL dataurl = new URL(dirprefix + "/" + id + "/" + id + ".concise.fasta"); URL archiveurl = new URL(dirprefix + "/" + id + "/" + id + ".tar.gz"); URL logurl = new URL(dirprefix + "/" + id + "/LOG"); @@ -150,7 +150,7 @@ public class JpredParserHTTP implements JpredParser { String ip = table[2]; String execstatus = "OK"; String finalstatus = "OK"; - countinsertions += cc.FormQueryTables(startdate.getTime(), table[0], table[1], ip, id, execstatus, + countinsertions += cw.FormQueryTables(startdate.getTime(), table[0], table[1], ip, id, execstatus, finalstatus, protein, predictions); long exectime = (endtime.getTime() - starttime.getTime()) / 1000; @@ -158,7 +158,7 @@ public class JpredParserHTTP implements JpredParser { if (199 < response2 && response2 < 300) { log = parseLogFile(logurl.openStream()); } - cc.ArchiveData(startdate.getTime(), exectime, ip, id, execstatus, finalstatus, protein, + cw.ArchiveData(startdate.getTime(), exectime, ip, id, execstatus, finalstatus, protein, predictions, alignment, log, archiveurl.toString()); } catch (ParseException e) { e.printStackTrace(); diff --git a/datadb/compbio/cassandra/JpredParserLocalFile.java b/datadb/compbio/cassandra/JpredParserLocalFile.java index 4b254ae..a379d4e 100644 --- a/datadb/compbio/cassandra/JpredParserLocalFile.java +++ b/datadb/compbio/cassandra/JpredParserLocalFile.java @@ -16,18 +16,18 @@ import java.util.Date; import java.util.List; public class JpredParserLocalFile implements JpredParser { - private CassandraNativeConnector cc = new CassandraNativeConnector(); + private CassandraWriter cw = new CassandraWriter(); private String dirprefix; public void setSource(String newsourceprefix) { this.dirprefix = newsourceprefix; } - JpredParserLocalFile() { + public JpredParserLocalFile() { this.dirprefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat"; } - JpredParserLocalFile(String sourceurl) { + public JpredParserLocalFile(String sourceurl) { this.dirprefix = sourceurl; } @@ -103,7 +103,7 @@ public class JpredParserLocalFile implements JpredParser { } catch (ParseException e) { e.printStackTrace(); } - countinsertions += cc.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs); + countinsertions += cw.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs); } fr.close(); } catch (IOException e) { diff --git a/datadb/compbio/cassandra/readers/CassandraReaderExecutionTime.java b/datadb/compbio/cassandra/readers/CassandraReaderExecutionTime.java index 8648aeb..347ebe3 100644 --- a/datadb/compbio/cassandra/readers/CassandraReaderExecutionTime.java +++ b/datadb/compbio/cassandra/readers/CassandraReaderExecutionTime.java @@ -18,7 +18,7 @@ import compbio.cassandra.Pair; import compbio.engine.ProteoCachePropertyHelperManager; import compbio.util.PropertyHelper; -public class CassandraReaderExecutionTime implements CassandraReader { +public class CassandraReaderExecutionTime { private Session session; public static String CASSANDRA_HOSTNAME = "localhost"; diff --git a/server/compbio/listeners/ContextListener.java b/server/compbio/listeners/ContextListener.java index 75f4275..5b05a8b 100644 --- a/server/compbio/listeners/ContextListener.java +++ b/server/compbio/listeners/ContextListener.java @@ -9,9 +9,17 @@ import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import javax.servlet.annotation.WebListener; +import org.apache.log4j.Logger; + +import com.datastax.driver.core.Session; + import compbio.cassandra.CassandraNativeConnector; +import compbio.cassandra.CassandraNewTableWriter; +import compbio.cassandra.JpredParserHTTP; +import compbio.cassandra.JpredParserLocalFile; import compbio.engine.ProteoCachePropertyHelperManager; import compbio.util.PropertyHelper; +import compbio.util.Util; /** * Application Lifecycle Listener implementation class ContextListener @@ -20,28 +28,124 @@ import compbio.util.PropertyHelper; @WebListener public class ContextListener implements ServletContextListener { private ScheduledExecutorService webjob_scheduler; + private ScheduledExecutorService localjob_scheduler; CassandraNativeConnector db = new CassandraNativeConnector(); static PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper(); + private static Logger log = Logger.getLogger(ContextListener.class); + public static boolean READ_WEB_JPRED = false; + public static boolean READ_LOCALFILE_JPRED = false; + + private static boolean initBooleanValue(String key) { + assert key != null; + String status = ph.getProperty(key); + log.debug("Loading property: " + key + " with value: " + status); + if (Util.isEmpty(status)) { + return false; + } + return new Boolean(status.trim()).booleanValue(); + } /** * @see ServletContextListener#contextInitialized(ServletContextEvent) */ public void contextInitialized(ServletContextEvent arg0) { System.out.println("ProteoCache session start......"); + // connect to the db and create table if needed db.Connect(); + CassandraNewTableWriter updater = new CassandraNewTableWriter(); + + // updater.FillParameters(); + // updater.FillNewTable(); + + READ_WEB_JPRED = initBooleanValue("cassandra.jpred.web.update"); + READ_LOCALFILE_JPRED = initBooleanValue("cassandra.jpred.local.update"); + + if (READ_WEB_JPRED) { + // get data from real Jpred production server + final String datasrc = "http://www.compbio.dundee.ac.uk/www-jpred/results/usage-new/alljobs.dat"; + final String prefix = "http://www.compbio.dundee.ac.uk/www-jpred/results"; + final JpredParserHTTP parser = new JpredParserHTTP(prefix); + + int initialdelay = 300; + int updaterate = 600; + int newinitialdelay = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.web.inidelay")); + if (0 <= newinitialdelay) { + initialdelay = newinitialdelay; + } + int newupdaterate = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.web.updaterate")); + if (0 < newupdaterate) { + updaterate = newupdaterate; + } + final int updateperiod = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.web.period")); - webjob_scheduler = Executors.newSingleThreadScheduledExecutor(); - webjob_scheduler.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - db.Parsing(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + webjob_scheduler = Executors.newSingleThreadScheduledExecutor(); + System.out.println("Initializating web job scheduler"); + System.out.println(" initial delay = " + initialdelay + " seconds"); + System.out.println(" update rate = " + updaterate + " seconds"); + if (0 < updateperiod) + System.out.println(" update period = " + updateperiod + " days"); + else + System.out.println(" update period = 5 days"); + + webjob_scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (0 < updateperiod) { + parser.Parsing(datasrc, updateperiod); + } else { + parser.Parsing(datasrc, 5); + } + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } + }, initialdelay, updaterate, TimeUnit.SECONDS); + } + + if (READ_LOCALFILE_JPRED) { + // get irtifical data generated for the DB stress tests + final String datasrc = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/data.dat"; + final String prefix = "/home/asherstnev/Projects/Java.projects/proteocache/data_stress_test/Jpreddata"; + final JpredParserLocalFile parser = new JpredParserLocalFile(prefix); + + int initialdelay = 300; + int updaterate = 600; + int newinitialdelay = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.local.inidelay")); + if (0 <= newinitialdelay) { + initialdelay = newinitialdelay; + } + int newupdaterate = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.local.updaterate")); + if (0 < newupdaterate) { + updaterate = newupdaterate; } - }, 0, 600, TimeUnit.SECONDS); + final int updateperiod = ProteoCachePropertyHelperManager.getIntProperty(ph.getProperty("cassandra.jpred.local.period")); + + localjob_scheduler = Executors.newSingleThreadScheduledExecutor(); + System.out.println("Initializating local job scheduler"); + System.out.println(" initial delay = " + initialdelay + " seconds"); + System.out.println(" update rate = " + updaterate + " seconds"); + if (0 < updateperiod) + System.out.println(" update period = " + updateperiod + " days"); + else + System.out.println(" update period = 5 days"); + localjob_scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (0 < updateperiod) { + parser.Parsing(datasrc, updateperiod); + } else { + parser.Parsing(datasrc, 100); + } + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }, initialdelay, updaterate, TimeUnit.SECONDS); + } } @@ -51,7 +155,12 @@ public class ContextListener implements ServletContextListener { public void contextDestroyed(ServletContextEvent arg0) { db.Closing(); System.out.println("Shut down ProteoCache......"); - webjob_scheduler.shutdownNow(); + if (READ_WEB_JPRED) { + webjob_scheduler.shutdownNow(); + } + if (READ_LOCALFILE_JPRED) { + localjob_scheduler.shutdownNow(); + } } } diff --git a/server/compbio/statistic/CassandraRequester.java b/server/compbio/statistic/CassandraRequester.java index 92ca531..4bb1966 100755 --- a/server/compbio/statistic/CassandraRequester.java +++ b/server/compbio/statistic/CassandraRequester.java @@ -10,13 +10,14 @@ import java.util.List; import java.util.Map; import compbio.cassandra.CassandraNativeConnector; +import compbio.cassandra.CassandraReader; import compbio.cassandra.DataBase; import compbio.cassandra.Pair; import compbio.cassandra.StructureJobLog; import compbio.cassandra.StructureProteinPrediction; public class CassandraRequester { - private CassandraNativeConnector DBInstance = new CassandraNativeConnector(); + private CassandraReader db = new CassandraReader(); private ArrayList query; private static long currentDate = 0; private static long earlestDate = 0; @@ -54,7 +55,7 @@ public class CassandraRequester { List totalTime = new ArrayList(); for (int i = 0; i < nbins; i++) totalTime.add(i, 0); - List> res = DBInstance.ReadProteinDataTable(); + List> res = db.ReadProteinDataTable(); List> numres = new ArrayList>(); for (Pair entry : res) { @@ -140,15 +141,14 @@ public class CassandraRequester { end.setTime(new Date(dateEnd)); query = new ArrayList(); for (Date date = start.getTime(); !start.after(end); start.add(Calendar.DATE, 1), date = start.getTime()) { - Integer res = DBInstance.ReadDateTable(date.getTime()); - System.out.println("Command: " + res); + Integer res = db.ReadDateTable(date.getTime()); if (res == null) continue; DataBase db = new DataBase(); db.setTotal(res); db.setDate(DateFormat(date.getTime())); query.add(db); - } + } System.out.println("StatisticsProt.readLength: total number of dates = " + query.size()); return query; } @@ -159,9 +159,9 @@ public class CassandraRequester { query = new ArrayList(); List res; if (flag.equals("whole")) - res = DBInstance.ReadWholeSequence(protIn); + res = db.ReadWholeSequence(protIn); else - res = DBInstance.ReadPartOfSequence(protIn); + res = db.ReadPartOfSequence(protIn); for (StructureProteinPrediction entry : res) { Map pred = entry.getPrediction(); Iterator it = pred.entrySet().iterator(); @@ -184,7 +184,7 @@ public class CassandraRequester { * */ public List readProteinByCounter(int counter) { query = new ArrayList(); - Map map = DBInstance.ReadProteinDataByCounter(); + Map map = db.ReadProteinDataByCounter(); for (Map.Entry entry : map.entrySet()) { if (entry.getValue() > counter) { DataBase db = new DataBase(); @@ -201,7 +201,7 @@ public class CassandraRequester { */ public DataBase readJobLog(String jobid) { // query = new ArrayList(); - StructureJobLog res = DBInstance.ReadJobLog(jobid); + StructureJobLog res = db.ReadJobLog(jobid); DataBase query = new DataBase(); query.setLogInfo(res); // query.setres); @@ -282,7 +282,7 @@ public class CassandraRequester { * find the earliest date in the database */ public long earliestDate() { - earlestDate = DBInstance.getEarliestDateInDB(); + earlestDate = CassandraNativeConnector.getEarliestDateInDB(); return earlestDate; }