Re-design the parser for inserting alignment-only and failed jobs
[proteocache.git] / datadb / compbio / cassandra / JpredParserHTTP.java
index 3616818..bf4c460 100644 (file)
@@ -1,7 +1,11 @@
 package compbio.cassandra;
 
 import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
@@ -17,22 +21,25 @@ import java.util.List;
 import compbio.cassandra.JpredParser;
 
 public class JpredParserHTTP implements JpredParser {
-       private CassandraCreate cc = new CassandraCreate();
+       private CassandraWriter cw = new CassandraWriter();
        private String dirprefix;
-       
-       JpredParserHTTP() {
-               this.dirprefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
+       private List<FastaSequence> alignment;
+       private List<FastaSequence> predictions;
+       private int countNoData;
+
+       public JpredParserHTTP() {
+               dirprefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
        }
-       
-       JpredParserHTTP(String sourceurl) {
-               this.dirprefix = sourceurl;
+
+       public JpredParserHTTP(String sourceurl) {
+               dirprefix = sourceurl;
        }
 
-       public void setSource (String newsourceprefix) {
-               this.dirprefix = newsourceprefix;
+       public void setSource(String newsourceprefix) {
+               dirprefix = newsourceprefix;
        }
 
-       public void Parsing(String source, int nDays) {
+       public void Parsing(String source, int nDays) throws IOException {
                Calendar cal = Calendar.getInstance();
                cal.add(Calendar.DATE, -nDays);
                for (int i = 0; i < nDays; ++i) {
@@ -45,14 +52,163 @@ public class JpredParserHTTP implements JpredParser {
                }
        }
 
+       /*
+        * The method parses the Jpred output concise file in the FASTA format If
+        * there is a record with ID = QUERY or jobid, this a "one protein" job
+        * otherwise this is an alignment job
+        */
+       private String parsePredictions(final InputStream stream, String jobid) throws FileNotFoundException {
+               final FastaReader fr = new FastaReader(stream);
+               String protein = "";
+               alignment = new ArrayList<FastaSequence>();
+               predictions = new ArrayList<FastaSequence>();
+               while (fr.hasNext()) {
+                       final FastaSequence fs = fr.next();
+                       String seqid = fs.getId();
+                       String seq = fs.getSequence().replaceAll("\n", "");
+                       if (seqid.equals("jnetpred") || seqid.equals("Lupas_21") || seqid.equals("Lupas_14") || seqid.equals("Lupas_28")
+                                       || seqid.equals("JNETSOL25") || seqid.equals("JNETSOL5") || seqid.equals("JNETSOL0") || seqid.equals("JNETCONF")
+                                       || seqid.equals("JNETHMM") || seqid.equals("JNETPSSM") || seqid.equals("JNETCONF")) {
+                               predictions.add(fs);
+                       } else {
+                               alignment.add(fs);
+                               if (seqid.equals("QUERY") || seqid.equals(jobid))
+                                       protein = seq;
+                       }
+               }
+               return protein;
+       }
+
+       private String parseLogFile(final InputStream stream) throws IOException {
+               String out = "";
+               BufferedReader buffer = new BufferedReader(new InputStreamReader(stream));
+               String line;
+               while (null != (line = buffer.readLine())) {
+                       out += line;
+               }
+               return out;
+       }
+
+       private List<Byte> parseArchiveFile(final InputStream stream) throws IOException {
+               DataInputStream data_in = new DataInputStream(stream);
+               List<Byte> out = new ArrayList<Byte>();
+               while (true) {
+                       try {
+                               out.add(data_in.readByte());
+                       } catch (EOFException eof) {
+                               break;
+                       }
+               }
+               return out;
+       }
+
+       private int analyseJob(String[] job) throws IOException {
+               boolean running = true;
+               boolean ConcisefileExists = false;
+               boolean LogfileExists = false;
+               String id = job[job.length - 1];
+               String startdatestring = job[0].substring(0, job[0].indexOf(":"));
+               Date startdate = new Date(0);
+               Date starttime = new Date(0);
+               Date endtime = new Date(0);
+               Date currDate = new Date();
+               String ip = job[2];
+               String execstatus = "OK";
+               String finalstatus = "OK";
+               String protein = "";
+               long exectime = 0;
+               String log = "";
+               String maindir = dirprefix + "/" + id + "/";
+               String concisefile = dirprefix + "/" + id + "/" + id + ".concise.fasta";
+               String archivefile = dirprefix + "/" + id + "/" + id + ".tar.gz";
+               String logfile = dirprefix + "/" + id + "/LOG";
+               SimpleDateFormat dateformatter = new SimpleDateFormat("yyyy/MM/dd");
+               SimpleDateFormat timeformatter = new SimpleDateFormat("yyyy/MM/dd:H:m:s");
+               try {
+                       startdate = dateformatter.parse(startdatestring);
+                       starttime = timeformatter.parse(job[0]);
+                       endtime = timeformatter.parse(job[1]);
+                       exectime = (endtime.getTime() - starttime.getTime()) / 1000;
+               } catch (ParseException e) {
+                       e.printStackTrace();
+               }
+
+               try {
+                       URL dirurl = new URL(maindir);
+                       HttpURLConnection httpConnection_dirurl = (HttpURLConnection) dirurl.openConnection();
+                       if (httpConnection_dirurl.getResponseCode() < 199 || 300 <= httpConnection_dirurl.getResponseCode()) {
+                               return 0;
+                       }
+                       URL conciseurl = new URL(concisefile);
+                       URL archiveurl = new URL(archivefile);
+                       URL logurl = new URL(logfile);
+                       HttpURLConnection httpConnection_conciseurl = (HttpURLConnection) conciseurl.openConnection();
+                       HttpURLConnection httpConnection_logurl = (HttpURLConnection) logurl.openConnection();
+                       HttpURLConnection httpConnection_archiveurl = (HttpURLConnection) archiveurl.openConnection();
+                       if (199 < httpConnection_conciseurl.getResponseCode() && httpConnection_conciseurl.getResponseCode() < 300) {
+                               ConcisefileExists = true;
+                               running = false;
+                               try {
+                                       protein = parsePredictions(conciseurl.openStream(), id);
+                               } catch (IOException e) {
+                                       e.printStackTrace();
+                               }
+                       } else {
+                               // The job still can be running of failed...
+                               ++countNoData;
+                               alignment = new ArrayList<FastaSequence>();
+                               predictions = new ArrayList<FastaSequence>();
+                       }
+                       if (199 < httpConnection_logurl.getResponseCode() && httpConnection_logurl.getResponseCode() < 300) {
+                               LogfileExists = true;
+                               log = parseLogFile(logurl.openStream());
+                       } else {
+                               // The job has not been started at all...
+                               execstatus = "FAIL";
+                               finalstatus = "STOPPED";
+                               running = false;
+                       }
+                       if (log.matches("(.*)TIMEOUT\\syour\\sjob\\stimed\\sout(.*)")) {
+                               // blast job was too long (more than 3600 secs by default)...
+                               execstatus = "FAIL";
+                               finalstatus = "TIMEDOUT";
+                               running = false;
+                       } else if (log.matches("(.*)Jpred\\serror:\\sDied\\sat(.*)")) {
+                               // an internal Jpred error...
+                               execstatus = "FAIL";
+                               finalstatus = "JPREDERROR";
+                               running = false;
+                       } else if ((currDate.getTime() - endtime.getTime()) / 1000 > 3601 && LogfileExists && !ConcisefileExists) {
+                               // the job was stopped with unknown reason...
+                               execstatus = "FAIL";
+                               finalstatus = "STOPPED";
+                               running = false;
+                       }
+
+                       httpConnection_conciseurl.disconnect();
+                       httpConnection_logurl.disconnect();
+                       httpConnection_archiveurl.disconnect();
+               } catch (MalformedURLException e) {
+                       e.printStackTrace();
+               }
+
+               if (!running) {
+                       long t = startdate.getTime();
+                       cw.FormQueryTables(t, job[0], job[1], ip, id, execstatus, finalstatus, protein, predictions);
+                       cw.ArchiveData(t, exectime, ip, id, execstatus, finalstatus, protein, predictions, alignment, log, archivefile);
+                       return 1;
+               } else
+                       System.out.println("job " + id + " is running");
+
+               return 0;
+       }
+
        private void ParsingForDate(String input, String date) {
                int totalcount = 0;
-               int countNoData = 0;
-               int countUnclearFASTAid = 0;
                int countinsertions = 0;
                int countinserted = 0;
-               int counAlignments = 0;
-               int countStrange = 0;
+               int countNotanalyzed = 0;
+               countNoData = 0;
 
                System.out.println("Inserting jobs for " + date);
                try {
@@ -62,74 +218,30 @@ public class JpredParserHTTP implements JpredParser {
                        String line;
 
                        while ((line = alljobs.readLine()) != null) {
-                               if (line.matches(date + "(.*)jp_[^\\s]+")) {
-                                       String[] table = line.split("\\s+");
-                                       String id = table[table.length - 1];
+                               if (line.matches(date + ":(.*)jp_[^\\s]+")) {
                                        totalcount++;
-                                       if (!cc.CheckID(id)) {
-                                               URL urltable = new URL(dirprefix + "/" + id + "/" + id + ".concise.fasta");
-                                               HttpURLConnection httpConnection = (HttpURLConnection) urltable.openConnection();
-                                               int responsecode = httpConnection.getResponseCode();
-                                               if (199 < responsecode && responsecode < 300) {
-                                                       try {
-                                                               final FastaReader fr = new FastaReader(urltable.openStream());
-                                                               final List<FastaSequence> seqs = new ArrayList<FastaSequence>();
-                                                               String newprotein = "";
-                                                               while (fr.hasNext()) {
-                                                                       final FastaSequence fs = fr.next();
-                                                                       if (fs.getId().equals("QUERY") || fs.getId().equals(id))
-                                                                               newprotein = fs.getSequence().replaceAll("\n", "");
-                                                                       else
-                                                                               seqs.add(fs);
-                                                               }
-                                                               if (newprotein.equals("")) {
-                                                                       countUnclearFASTAid++;
-                                                               } else {
-                                                                       SimpleDateFormat formatter = new SimpleDateFormat("yyyy/MM/dd");
-                                                                       String dateInString1 = table[0].substring(0, table[0].indexOf(":"));
-                                                                       long dateWork1 = 0;
-                                                                       try {
-                                                                               Date dat1 = formatter.parse(dateInString1);
-                                                                               dateWork1 = dat1.getTime();
-                                                                       } catch (ParseException e) {
-                                                                               e.printStackTrace();
-                                                                       }
-                                                                       cc.InsertData(dateWork1, table[0], table[1], table[2], id, "OK", "OK", newprotein, seqs);
-                                                                       ++countinsertions;
-                                                                       // flush every 100 insertions
-                                                                       if (0 == countinsertions % 100) {
-                                                                               cc.flushData();
-                                                                       }
-                                                               }
-                                                       } catch (IOException e) {
-                                                               e.printStackTrace();
-                                                       }
-                                               } else {
-                                                       countNoData++;
-                                               }
+                                       String[] job = line.split("\\s+");
+                                       String jobid = job[job.length - 1];
+                                       if (cw.JobisNotInsterted(jobid)) {
+                                               countinsertions += analyseJob(job);
                                        } else {
                                                ++countinserted;
                                        }
                                } else {
-                                       if (line.matches(date + "(.*)Sequence0/(.*)")) {
-                                               ++counAlignments;
-                                       } else {
-                                               ++countStrange;
-                                       }
+                                       ++countNotanalyzed;
                                }
                        }
                        alljobs.close();
                        System.out.println("Total number of jobs = " + totalcount);
                        System.out.println("   " + countinserted + " jobs inserted already");
-                       System.out.println("   " + counAlignments + " jalview jobs");
-                       System.out.println("   " + countStrange + " not analysed jobs");
-                       System.out.println("   " + countNoData + " jobs without *.concise.fasta file");
-                       System.out.println("   " + countUnclearFASTAid + " jobs with unclear FASTA protein id in *.concise.fasta");
+                       System.out.println("   " + countNotanalyzed + " not analysed jobs");
+                       System.out.println("   " + countNoData + " jobs without *.concise.fasta file (RUNNING or FAILED)");
                        System.out.println("   " + countinsertions + " new job insertions\n");
                } catch (MalformedURLException e) {
                        e.printStackTrace();
                } catch (IOException e) {
                        e.printStackTrace();
                }
+               ;
        }
 }