Add delay for reading new jobs
[proteocache.git] / datadb / compbio / cassandra / JpredParserHTTP.java
1 package compbio.cassandra;
2
3 import java.io.BufferedReader;
4 import java.io.FileNotFoundException;
5 import java.io.IOException;
6 import java.io.InputStream;
7 import java.io.InputStreamReader;
8 import java.net.HttpURLConnection;
9 import java.net.MalformedURLException;
10 import java.net.URL;
11 import java.net.URLConnection;
12 import java.text.ParseException;
13 import java.text.SimpleDateFormat;
14 import java.util.ArrayList;
15 import java.util.Calendar;
16 import java.util.Date;
17 import java.util.List;
18 import java.util.regex.Matcher;
19 import java.util.regex.Pattern;
20
21 import compbio.cassandra.JpredParser;
22 import compbio.data.sequence.FastaReader;
23 import compbio.data.sequence.FastaSequence;
24 import compbio.engine.JpredJob;
25 import compbio.engine.ProteoCachePropertyHelperManager;
26 import compbio.engine.archive.Archive;
27 import compbio.engine.archive.ArchivedJob;
28 import compbio.util.PropertyHelper;
29 import compbio.util.Util;
30
31 public class JpredParserHTTP implements JpredParser {
32         private CassandraWriter cw = new CassandraWriter();
33         private static Archive archive;
34         private String dirprefix;
35         private List<FastaSequence> alignment;
36         private List<FastaSequence> predictions;
37         private int countNoData;
38         private static boolean archiving = false;
39         private static final PropertyHelper ph = ProteoCachePropertyHelperManager.getPropertyHelper();
40         static SimpleDateFormat timeformatter = new SimpleDateFormat("yyyy/MM/dd:H:m:s");
41
42         public JpredParserHTTP() {
43                 dirprefix = "http://www.compbio.dundee.ac.uk/www-jpred/results";
44         }
45
46         public JpredParserHTTP(String sourceurl) {
47                 dirprefix = sourceurl;
48         }
49
50         public void setSource(String newsourceprefix) {
51                 dirprefix = newsourceprefix;
52         }
53
54         private boolean initBooleanValue(String key) {
55                 assert key != null;
56                 String status = ph.getProperty(key);
57                 if (Util.isEmpty(status)) {
58                         return false;
59                 }
60                 return new Boolean(status.trim()).booleanValue();
61         }
62
63         public void Parsing(String source, int nDays) throws IOException {
64                 Calendar cal = Calendar.getInstance();
65                 cal.add(Calendar.DATE, -nDays);
66                 archiving = initBooleanValue("archive.enable");
67                 if (archiving) {
68                         archive = new Archive();
69                 }
70                 for (int i = 0; i < nDays; ++i) {
71                         cal.add(Calendar.DATE, 1);
72                         String date = cal.get(Calendar.YEAR) + "/" + (cal.get(Calendar.MONTH) + 1) + "/" + cal.get(Calendar.DATE);
73                         ParsingOneDay(source, date);
74                 }
75         }
76
77         /*
78          * The method parses the Jpred output concise file in the FASTA format If
79          * there is a record with ID = QUERY or jobid, this a "one protein" job
80          * otherwise this is an alignment job
81          */
82         private String parsePredictions(final InputStream stream, String jobid) throws FileNotFoundException {
83                 final FastaReader fr = new FastaReader(stream);
84                 String protein = "";
85                 while (fr.hasNext()) {
86                         final FastaSequence fs = fr.next();
87                         String seqid = fs.getId();
88                         String seq = fs.getSequence().replaceAll("\n", "");
89                         if (seqid.equals("jnetpred") || seqid.equals("Lupas_21") || seqid.equals("Lupas_14") || seqid.equals("Lupas_28")
90                                         || seqid.equals("JNETSOL25") || seqid.equals("JNETSOL5") || seqid.equals("JNETSOL0") || seqid.equals("JNETCONF")
91                                         || seqid.equals("JNETHMM") || seqid.equals("JNETPSSM") || seqid.equals("JNETCONF")) {
92                                 predictions.add(fs);
93                         } else {
94                                 alignment.add(fs);
95                                 if (seqid.equals("QUERY") || seqid.equals(jobid))
96                                         protein = seq;
97                         }
98                 }
99                 return protein;
100         }
101
102         private String parseSeqFile(final InputStream stream, String jobid) throws FileNotFoundException {
103                 final FastaReader fr = new FastaReader(stream);
104                 String protein = "";
105                 final FastaSequence fs = fr.next();
106                 protein = fs.getSequence().replaceAll("\n", "");
107                 if (fr.hasNext()) {
108                         // this is an aligment job...
109                         return "alignment";
110                 }
111                 return protein;
112         }
113
114         private String parseLogFile(final InputStream stream, JpredJob job) throws IOException {
115                 String out = "";
116                 BufferedReader buffer = new BufferedReader(new InputStreamReader(stream));
117                 String line;
118                 if (null != (out = buffer.readLine()) && (out.contains("version"))) {
119                         Matcher matcher = Pattern.compile("((\\d|\\.)+)").matcher(out);
120                         if (matcher.find())
121                                 job.setProgramVersion(matcher.group(0));
122                 }
123                 while (null != (line = buffer.readLine())) {
124                         out += line;            
125                 }
126                 return out;
127         }
128
129         private int analyseJob(String[] jobinfo) throws IOException {
130                 alignment = new ArrayList<FastaSequence>();
131                 predictions = new ArrayList<FastaSequence>();
132                 boolean running = true;
133                 boolean ConcisefileExists = false;
134                 boolean LogfileExists = false;
135                 JpredJob job = new JpredJob(jobinfo[jobinfo.length - 1], jobinfo[0], jobinfo[1]);
136                 job.setIP(jobinfo[2]);
137                 job.setProgramName("Jpred");
138                 Date currDate = new Date();
139                 String maindir = dirprefix + "/" + job.getJobID() + "/";
140
141                 try {
142                         Date finishTime = timeformatter.parse(jobinfo[1]);
143                         long delay = currDate.getTime() / 1000 - finishTime.getTime() / 1000;
144                         if (delay < 120) return 0;
145                 } catch (ParseException e) {
146                         e.printStackTrace();
147                 }
148
149                 try {
150                         URL dirurl = new URL(maindir);
151                         HttpURLConnection httpConnection_dirurl = (HttpURLConnection) dirurl.openConnection();
152                         if (httpConnection_dirurl.getResponseCode() < 199 || 300 <= httpConnection_dirurl.getResponseCode()) {
153                                 return 0;
154                         }
155                         URL conciseurl = new URL(maindir + job.getJobID() + ".concise.fasta");
156                         URL logurl = new URL(maindir + "LOG");
157                         HttpURLConnection httpConnection_conciseurl = (HttpURLConnection) conciseurl.openConnection();
158                         HttpURLConnection httpConnection_logurl = (HttpURLConnection) logurl.openConnection();
159                         if (199 < httpConnection_conciseurl.getResponseCode() && httpConnection_conciseurl.getResponseCode() < 300) {
160                                 ConcisefileExists = true;
161                                 running = false;
162                                 try {                           
163                                         job.setProtein(parsePredictions(conciseurl.openStream(), job.getJobID()));
164                                 } catch (IOException e) {
165                                         e.printStackTrace();
166                                 }
167                         } else {
168                                 // The job still can be running of failed...
169                                 ++countNoData;
170                         }
171                         if (199 < httpConnection_logurl.getResponseCode() && httpConnection_logurl.getResponseCode() < 300) {
172                                 LogfileExists = true;
173                                 job.setLog(parseLogFile(logurl.openStream(), job));
174                         } else {
175                                 // The job has not been started at all...
176                                 System.out.println ("WARNING! Job " + job.getJobID() + " has status FAIL/STOPPED");
177                                 job.setExecutionStatus("FAIL");
178                                 job.setFinalStatus("STOPPED");
179                                 running = false;
180                         }
181                         if (job.getLog().matches("(.*)TIMEOUT\\syour\\sjob\\stimed\\sout(.*)")) {
182                                 // blast job was too long (more than 3600 secs by default)...
183                                 job.setExecutionStatus("FAIL");
184                                 job.setFinalStatus("TIMEDOUT");
185                                 running = false;
186                         } else if (job.getLog().matches("(.*)Jpred\\serror:\\sDied\\sat(.*)")) {
187                                 // an internal Jpred error...
188                                 job.setExecutionStatus("FAIL");
189                                 job.setFinalStatus("JPREDERROR");
190                                 running = false;
191                         } else if ((currDate.getTime() - job.getEndTime()) / 1000 > 3601 && LogfileExists && !ConcisefileExists) {
192                                 // the job was stopped with unknown reason...
193                                 job.setExecutionStatus("FAIL");
194                                 job.setFinalStatus("STOPPED");
195                                 running = false;
196                         }
197
198                         httpConnection_conciseurl.disconnect();
199                         httpConnection_logurl.disconnect();
200                 } catch (MalformedURLException e) {
201                         e.printStackTrace();
202                 }
203
204                 if (!running) {
205                         // logging the job
206                         job.setAlignment(alignment);
207                         job.setPredictions(predictions);
208                         if (job.getExecutionStatus().equals("FAIL")) {
209                                 URL sequrl = new URL(maindir + job.getJobID() + ".seq");
210                                 HttpURLConnection httpConnection_sequrl = (HttpURLConnection) sequrl.openConnection();
211                                 if (199 < httpConnection_sequrl.getResponseCode() && httpConnection_sequrl.getResponseCode() < 300) {
212                                         try {
213                                                 job.setProtein(parseSeqFile(sequrl.openStream(), job.getJobID()));
214                                         } catch (IOException e) {
215                                                 e.printStackTrace();
216                                         }
217                                 }
218                         }
219                         cw.FormQueryTables(job);
220
221                         // archiving the job
222                         if (archiving) {
223                                 ArchivedJob ajob = new ArchivedJob(job.getJobID());
224                                 String arlink = archive.createJob(job.getJobID());
225                                 if (job.getFinalStatus().equals("OK")) {
226                                         ajob.setArchivePath(arlink);
227                                         ajob.copyArchiveFromWeb(maindir + job.getJobID() + ".tar.gz");
228                                         cw.ArchiveData(job, arlink);
229                                 } else {
230                                         cw.ArchiveData(job, "undefined");
231                                 }
232                         }
233                         return 1;
234                 }
235
236                 return 0;
237         }
238
239         private void ParsingOneDay(String input, String date) {
240                 int totalcount = 0;
241                 int countinsertions = 0;
242                 int countinserted = 0;
243                 int countNotanalyzed = 0;
244                 countNoData = 0;
245
246                 System.out.println("Inserting jobs for " + date);
247                 try {
248                         URL url = new URL(input);
249                         URLConnection conn = url.openConnection();
250                         BufferedReader alljobs = new BufferedReader(new InputStreamReader(conn.getInputStream()));
251                         String line;
252
253                         while ((line = alljobs.readLine()) != null) {
254                                 if (line.matches(date + ":(.*)jp_[^\\s]+")) {
255                                         totalcount++;
256                                         String[] job = line.split("\\s+");
257                                         String jobid = job[job.length - 1];
258                                         if (cw.JobisNotInsterted(jobid)) {
259                                                 countinsertions += analyseJob(job);
260                                         } else {
261                                                 ++countinserted;
262                                         }
263                                 } else {
264                                         ++countNotanalyzed;
265                                 }
266                         }
267                         alljobs.close();
268                         System.out.println("Total number of jobs = " + totalcount);
269                         System.out.println("   " + countinserted + " jobs inserted already");
270                         System.out.println("   " + countNotanalyzed + " not analysed jobs");
271                         System.out.println("   " + countNoData + " jobs without *.concise.fasta file (RUNNING or FAILED)");
272                         System.out.println("   " + countinsertions + " new job insertions\n");
273                 } catch (MalformedURLException e) {
274                         e.printStackTrace();
275                 } catch (IOException e) {
276                         e.printStackTrace();
277                 }
278                 ;
279         }
280 };