import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
-import compbio.engine.ProteoCachePropertyHelperManager;
-import compbio.util.PropertyHelper;
-
public class CassandraReader {
private Session session;
private static Logger log = Logger.getLogger(CassandraNativeConnector.class);
package compbio.cassandra;
-import java.io.IOException;
import java.util.List;
import org.apache.log4j.Logger;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.BoundStatement;
+import compbio.engine.JpredJob;
import compbio.engine.ProteoCachePropertyHelperManager;
import compbio.util.PropertyHelper;
/*
* inserting data into the tables for queries
*/
- public int FormQueryTables(long jobtime, String startdate, String enddate, String ip, String jobid, String statusEx,
- String statusFinal, String protein, List<FastaSequence> predictions) {
- if (JobisNotInsterted(jobid)) {
+ public int FormQueryTables(JpredJob job) {
+ if (JobisNotInsterted(job.getJobID())) {
+ String id = job.getJobID();
+ String ip = job.getIP();
+ String protein = job.getProtein();
+ String finalstatus = job.getFinalStatus();
+ String execstatus = job.getExecutionStatus();
String com1 = "INSERT INTO ProteinLog " + "(JobID, IP, DataBegin, DataEnd, FinalStatus, ExecutionStatus, Protein)"
- + " VALUES ('" + jobid + "','" + ip + "','" + startdate + "','" + enddate + "','" + statusFinal + "','" + statusEx
- + "','" + protein + "');";
+ + " VALUES ('" + id + "','" + ip + "','" + job.getStartingTimeStr() + "','" + job.getEndTimeStr() + "','" + finalstatus
+ + "','" + execstatus + "','" + protein + "');";
session.execute(com1);
- String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + jobtime + ",'" + jobid + "','" + protein
- + "');";
+ String com2 = "INSERT INTO ProteinData " + "(jobtime, JobID, Protein)" + " VALUES (" + job.getStartingDate() + ",'" + id
+ + "','" + protein + "');";
session.execute(com2);
String allpredictions = "";
- for (FastaSequence pred : predictions) {
+ List<FastaSequence> pr = job.getPredictions();
+ for (FastaSequence pred : pr) {
String predictionname = pred.getId();
String prediction = pred.getSequence().replaceAll("\n", "");
allpredictions += "'" + predictionname + "':'" + prediction + "',";
final_prediction = allpredictions.substring(0, allpredictions.length() - 1);
}
- String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + jobid + "';";
+ String check2 = "SELECT * FROM ProteinRow WHERE JobID = '" + job.getJobID() + "';";
ResultSet results2 = session.execute(check2);
if (results2.isExhausted()) {
- String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + jobid + "',{"
+ String com3 = "INSERT INTO ProteinRow " + "(Protein, JobID, Predictions)" + " VALUES ('" + protein + "','" + id + "',{"
+ final_prediction + "});";
session.execute(com3);
}
boolean updateparameter = true;
if (!results3.isExhausted()) {
Row r = results3.one();
- if (jobtime >= Long.parseLong(r.getString("Value")))
+ if (job.getStartingDate() >= Long.parseLong(r.getString("Value")))
updateparameter = false;
}
if (updateparameter) {
- String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + String.valueOf(jobtime)
+ String com = "INSERT INTO MainParameters " + "(Name, Value)" + " VALUES ('EarliestJobDate','" + job.getStartingDateStr()
+ "');";
session.execute(com);
}
- String check4 = "SELECT * FROM JobDateInfo WHERE jobday = " + jobtime + ";";
+ String check4 = "SELECT * FROM JobDateInfo WHERE jobday = " + job.getStartingDate() + ";";
ResultSet results4 = session.execute(check4);
updateparameter = true;
int njobs = 1;
Row r = results4.one();
njobs += r.getLong("Total");
}
- String com = "INSERT INTO JobDateInfo " + "(jobday, Total)" + " VALUES (" + jobtime + "," + njobs + ");";
+ String com = "INSERT INTO JobDateInfo " + "(jobday, Total)" + " VALUES (" + job.getStartingDate() + "," + njobs + ");";
session.execute(com);
return 1;
* insert data from a real Jpred job: timing+IP, Execution Status, Final
* status, protein sequence, predictions, alignment, LOG and tar.gz files
*/
- public int ArchiveData(long starttime, long exectime, String ip, String jobid, String statusEx, String statusFinal, String protein,
- List<FastaSequence> predictions, List<FastaSequence> seqs, String LogFile, String archivepath) {
- if (JobisNotArchived(jobid)) {
- String log = LogFile.replaceAll("'", "");
- session.execute("INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG) VALUES ('" + jobid + "','" + protein
- + "','" + ip + "'," + starttime + "," + exectime + ",'" + log + "');");
- if (false) {
- PreparedStatement statement = session.prepare("INSERT INTO JpredArchive (JobID, archive) VALUES (?,?);");
- BoundStatement boundStatement = new BoundStatement(statement);
- session.execute(boundStatement.bind(jobid, archivepath));
- }
+ public int ArchiveData(JpredJob job, String archivepath) {
+ if (JobisNotArchived(job.getJobID())) {
+ String id = job.getJobID();
+ String log = job.getLog().replaceAll("'", "");
+ String com = "INSERT INTO JpredArchive (JobID, Protein, IP, StartTime, ExecTime,LOG, ArchiveLink) VALUES ('" + id + "','"
+ + job.getProtein() + "','" + job.getIP() + "'," + job.getStartingTime() + "," + job.getExecutionTime() + ",'" + log
+ + "','" + archivepath + "');";
+ session.execute(com);
+ List<FastaSequence> predictions = job.getPredictions();
for (FastaSequence p : predictions) {
session.execute("UPDATE JpredArchive SET predictions = predictions + {'" + p.getId() + "':'"
- + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+ + p.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + id + "';");
}
+ List<FastaSequence> seqs = job.getAlignment();
for (FastaSequence s : seqs) {
session.execute("UPDATE JpredArchive SET alignment = alignment + {'" + s.getId() + "':'"
- + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + jobid + "';");
+ + s.getSequence().replaceAll("\n", "") + "'} WHERE JobID = '" + id + "';");
}
return 1;
}
package compbio.cassandra;
import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import compbio.cassandra.JpredParser;
+import compbio.engine.JpredJob;
public class JpredParserHTTP implements JpredParser {
private CassandraWriter cw = new CassandraWriter();
cal.add(Calendar.DATE, -nDays);
for (int i = 0; i < nDays; ++i) {
cal.add(Calendar.DATE, 1);
- int month = cal.get(Calendar.MONTH) + 1;
- int year = cal.get(Calendar.YEAR);
- int day = cal.get(Calendar.DATE);
- String date = year + "/" + month + "/" + day;
+ String date = cal.get(Calendar.YEAR) + "/" + (cal.get(Calendar.MONTH) + 1) + "/" + cal.get(Calendar.DATE);
ParsingForDate(source, date);
}
}
return out;
}
- private List<Byte> parseArchiveFile(final InputStream stream) throws IOException {
- DataInputStream data_in = new DataInputStream(stream);
- List<Byte> out = new ArrayList<Byte>();
- while (true) {
- try {
- out.add(data_in.readByte());
- } catch (EOFException eof) {
- break;
- }
- }
- return out;
- }
-
- private int analyseJob(String[] job) throws IOException {
+ private int analyseJob(String[] jobinfo) throws IOException {
boolean running = true;
boolean ConcisefileExists = false;
boolean LogfileExists = false;
- String id = job[job.length - 1];
- String startdatestring = job[0].substring(0, job[0].indexOf(":"));
- Date startdate = new Date(0);
- Date starttime = new Date(0);
- Date endtime = new Date(0);
+ JpredJob job = new JpredJob (jobinfo[jobinfo.length - 1], jobinfo[0], jobinfo[1]);
+ job.setIP(jobinfo[2]);
Date currDate = new Date();
- String ip = job[2];
- String execstatus = "OK";
- String finalstatus = "OK";
- String protein = "";
- long exectime = 0;
- String log = "";
- String maindir = dirprefix + "/" + id + "/";
- String concisefile = dirprefix + "/" + id + "/" + id + ".concise.fasta";
- String archivefile = dirprefix + "/" + id + "/" + id + ".tar.gz";
- String logfile = dirprefix + "/" + id + "/LOG";
- SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd");
- SimpleDateFormat timeformatter = new SimpleDateFormat("yyyy/MM/dd:H:m:s");
- try {
- startdate = dateformatter.parse(startdatestring);
- starttime = timeformatter.parse(job[0]);
- endtime = timeformatter.parse(job[1]);
- exectime = (endtime.getTime() - starttime.getTime()) / 1000;
- } catch (ParseException e) {
- e.printStackTrace();
- }
+ String maindir = dirprefix + "/" + job.getJobID() + "/";
+ //System.out.println("analyzing job " + job.getJobID());
try {
URL dirurl = new URL(maindir);
HttpURLConnection httpConnection_dirurl = (HttpURLConnection) dirurl.openConnection();
if (httpConnection_dirurl.getResponseCode() < 199 || 300 <= httpConnection_dirurl.getResponseCode()) {
return 0;
}
- URL conciseurl = new URL(concisefile);
- URL archiveurl = new URL(archivefile);
- URL logurl = new URL(logfile);
+ URL conciseurl = new URL(maindir + job.getJobID() + ".concise.fasta");
+ URL archiveurl = new URL(maindir + job.getJobID() + ".tar.gz");
+ URL logurl = new URL(maindir + "LOG");
HttpURLConnection httpConnection_conciseurl = (HttpURLConnection) conciseurl.openConnection();
HttpURLConnection httpConnection_logurl = (HttpURLConnection) logurl.openConnection();
HttpURLConnection httpConnection_archiveurl = (HttpURLConnection) archiveurl.openConnection();
ConcisefileExists = true;
running = false;
try {
- protein = parsePredictions(conciseurl.openStream(), id);
+ job.setProtein(parsePredictions(conciseurl.openStream(), job.getJobID()));
} catch (IOException e) {
e.printStackTrace();
}
} else {
// The job still can be running of failed...
++countNoData;
- alignment = new ArrayList<FastaSequence>();
- predictions = new ArrayList<FastaSequence>();
}
if (199 < httpConnection_logurl.getResponseCode() && httpConnection_logurl.getResponseCode() < 300) {
LogfileExists = true;
- log = parseLogFile(logurl.openStream());
+ job.setLog(parseLogFile(logurl.openStream()));
} else {
// The job has not been started at all...
- execstatus = "FAIL";
- finalstatus = "STOPPED";
+ job.setExecutionStatus("FAIL");
+ job.setFinalStatus("STOPPED");
running = false;
}
- if (log.matches("(.*)TIMEOUT\\syour\\sjob\\stimed\\sout(.*)")) {
+ if (job.getLog().matches("(.*)TIMEOUT\\syour\\sjob\\stimed\\sout(.*)")) {
// blast job was too long (more than 3600 secs by default)...
- execstatus = "FAIL";
- finalstatus = "TIMEDOUT";
+ job.setExecutionStatus("FAIL");
+ job.setFinalStatus("TIMEDOUT");
running = false;
- } else if (log.matches("(.*)Jpred\\serror:\\sDied\\sat(.*)")) {
+ } else if (job.getLog().matches("(.*)Jpred\\serror:\\sDied\\sat(.*)")) {
// an internal Jpred error...
- execstatus = "FAIL";
- finalstatus = "JPREDERROR";
+ job.setExecutionStatus("FAIL");
+ job.setFinalStatus("JPREDERROR");
running = false;
- } else if ((currDate.getTime() - endtime.getTime()) / 1000 > 3601 && LogfileExists && !ConcisefileExists) {
+ } else if ((currDate.getTime() - job.getEndTime()) / 1000 > 3601 && LogfileExists && !ConcisefileExists) {
// the job was stopped with unknown reason...
- execstatus = "FAIL";
- finalstatus = "STOPPED";
+ job.setExecutionStatus("FAIL");
+ job.setFinalStatus("STOPPED");
running = false;
}
}
if (!running) {
- long t = startdate.getTime();
- cw.FormQueryTables(t, job[0], job[1], ip, id, execstatus, finalstatus, protein, predictions);
- cw.ArchiveData(t, exectime, ip, id, execstatus, finalstatus, protein, predictions, alignment, log, archivefile);
+ job.setAlignment(alignment);
+ job.setPredictions(predictions);
+ cw.FormQueryTables(job);
+ cw.ArchiveData(job, "undefined");
return 1;
- } else
- System.out.println("job " + id + " is running");
+ }
return 0;
}
} catch (ParseException e) {
e.printStackTrace();
}
- countinsertions += cw.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs);
+ //countinsertions += cw.FormQueryTables(insertdate, starttime, finishtime, ip, id, "OK", "OK", newprotein, seqs);
}
fr.close();
} catch (IOException e) {