1 # Copyright 1999 by Jeffrey Chang. All rights reserved.
2 # This code is part of the Biopython distribution and governed by its
3 # license. Please see the LICENSE file that should have been included
4 # as part of this package.
6 """Code to support writing parsers.
11 AbstractParser Base class for parsers.
12 AbstractConsumer Base class of all Consumers.
13 TaggingConsumer Consumer that tags output with its event. For debugging
14 SGMLStrippingConsumer Consumer that strips SGML tags from output.
15 EventGenerator Generate Biopython Events from Martel XML output
16 (note that Martel is now DEPRECATED)
19 safe_readline Read a line from a handle, with check for EOF.
20 safe_peekline Peek at next line, with check for EOF.
21 read_and_call Read a line from a handle and pass it to a method.
22 read_and_call_while Read many lines, as long as a condition is met.
23 read_and_call_until Read many lines, until a condition is met.
24 attempt_read_and_call Like read_and_call, but forgiving of errors.
25 is_blank_line Test whether a line is blank.
37 from xml.sax import handler
40 sys.stderr.write("Warning: Could not import SAX for dealing with XML.\n" +
41 "This causes problems with some ParserSupport modules\n")
45 """Base class for other parsers.
48 def parse(self, handle):
49 raise NotImplementedError("Please implement in a derived class")
51 def parse_str(self, string):
52 return self.parse(File.StringHandle(string))
54 def parse_file(self, filename):
57 retval = self.parse(h)
62 class AbstractConsumer:
63 """Base class for other Consumers.
65 Derive Consumers from this class and implement appropriate
66 methods for each event that you want to receive.
69 def _unhandled_section(self):
71 def _unhandled(self, data):
73 def __getattr__(self, attr):
74 if attr[:6] == 'start_' or attr[:4] == 'end_':
75 method = self._unhandled_section
77 method = self._unhandled
80 class TaggingConsumer(AbstractConsumer):
81 """A Consumer that tags the data stream with the event and
82 prints it to a handle. Useful for debugging.
85 def __init__(self, handle=None, colwidth=15, maxwidth=80):
86 """TaggingConsumer(handle=sys.stdout, colwidth=15, maxwidth=80)"""
87 # I can't assign sys.stdout to handle in the argument list.
88 # If I do that, handle will be assigned the value of sys.stdout
89 # the first time this function is called. This will fail if
90 # the user has assigned sys.stdout to some other file, which may
91 # be closed or invalid at a later time.
95 self._colwidth = colwidth
96 self._maxwidth = maxwidth
98 def unhandled_section(self):
99 self._print_name('unhandled_section')
101 def unhandled(self, data):
102 self._print_name('unhandled', data)
104 def _print_name(self, name, data=None):
106 # Write the name of a section.
107 self._handle.write("%s %s\n" % ("*"*self._colwidth, name))
109 # Write the tag and line.
110 self._handle.write("%-*s: %s\n" % (
111 self._colwidth, name[:self._colwidth],
112 data[:self._maxwidth-self._colwidth-2].rstrip()))
114 def __getattr__(self, attr):
115 if attr[:6] == 'start_' or attr[:4] == 'end_':
116 method = lambda a=attr, s=self: s._print_name(a)
118 method = lambda x, a=attr, s=self: s._print_name(a, x)
121 class SGMLStrippingConsumer:
122 """A consumer that strips off SGML tags.
124 This is meant to be used as a decorator for other consumers.
127 def __init__(self, consumer):
128 if type(consumer) is not InstanceType:
129 raise ValueError("consumer should be an instance")
130 self._consumer = consumer
131 self._prev_attr = None
132 self._stripper = File.SGMLStripper()
134 def _apply_clean_data(self, data):
135 clean = self._stripper.strip(data)
136 self._prev_attr(clean)
138 def __getattr__(self, name):
139 if name in ['_prev_attr', '_stripper']:
140 return getattr(self, name)
141 attr = getattr(self._consumer, name)
142 # If this is not a method, then return it as is.
143 if type(attr) is not MethodType:
145 # If it's a section method, then return it.
146 if name[:6] == 'start_' or name[:4] == 'end_':
148 # Otherwise, it's an info event, and return my method.
149 self._prev_attr = attr
150 return self._apply_clean_data
152 # onle use the Event Generator if XML handling is okay
154 class EventGenerator(handler.ContentHandler):
155 """Handler to generate events associated with a Martel parsed file.
157 This acts like a normal SAX handler, and accepts XML generated by
158 Martel during parsing. These events are then converted into
159 'Biopython events', which can then be caught by a standard
162 Note that Martel is now DEPRECATED.
164 def __init__(self, consumer, interest_tags, callback_finalizer = None,
166 """Initialize to begin catching and firing off events.
169 o consumer - The consumer that we'll send Biopython events to.
171 o interest_tags - A listing of all the tags we are interested in.
173 o callback_finalizer - A function to deal with the collected
174 information before passing it on to the consumer. By default
175 the collected information is a list of all of the lines read
176 for a particular tag -- if there are multiple tags in a row
179 <some_info>Spam<some_info>
180 <some_info>More Spam<some_info>
182 In this case the list of information would be:
184 ['Spam', 'More Spam']
186 This list of lines will be passed to the callback finalizer if
187 it is present. Otherwise the consumer will be called with the
188 list of content information.
190 o exempt_tags - A listing of particular tags that are exempt from
191 being processed by the callback_finalizer. This allows you to
192 use a finalizer to deal with most tags, but leave those you don't
195 self._consumer = consumer
196 self.interest_tags = interest_tags
197 self._finalizer = callback_finalizer
198 self._exempt_tags = exempt_tags
200 # a dictionary of content for each tag of interest
201 # the information for each tag is held as a list of the lines.
202 # This allows us to collect information from multiple tags
203 # in a row, and return it all at once.
205 for tag in self.interest_tags:
208 # the previous tag we were collecting information for.
209 # We set a delay in sending info to the consumer so that we can
210 # collect a bunch of tags in a row and append all of the info
212 self._previous_tag = ''
214 # the current character information for a tag
215 self._cur_content = []
216 # whether we should be collecting information
217 self._collect_characters = 0
219 def startElement(self, name, attrs):
220 """Determine if we should collect characters from this tag.
222 if name in self.interest_tags:
223 self._collect_characters = 1
225 def characters(self, content):
226 """Extract the information if we are interested in it.
228 if self._collect_characters:
229 self._cur_content.append(content)
231 def endElement(self, name):
232 """Send the information to the consumer.
234 Once we've got the end element we've collected up all of the
235 character information we need, and we need to send this on to
236 the consumer to do something with it.
238 We have a delay of one tag on doing this, so that we can collect
239 all of the info from multiple calls to the same element at once.
241 # only deal with the tag if it is something we are
242 # interested in and potentially have information for
243 if self._collect_characters:
244 # add all of the information collected inside this tag
245 self.info[name].append("".join(self._cur_content))
246 # reset our information and flags
247 self._cur_content = []
248 self._collect_characters = 0
250 # if we are at a new tag, pass on the info from the last tag
251 if self._previous_tag and self._previous_tag != name:
252 self._make_callback(self._previous_tag)
254 # set this tag as the next to be passed
255 self._previous_tag = name
257 def _make_callback(self, name):
258 """Call the callback function with the info with the given name.
260 # strip off whitespace and call the consumer
261 callback_function = getattr(self._consumer, name)
263 # --- pass back the information
264 # if there is a finalizer, use that
265 if self._finalizer is not None and name not in self._exempt_tags:
266 info_to_pass = self._finalizer(self.info[name])
267 # otherwise pass back the entire list of information
269 info_to_pass = self.info[name]
271 callback_function(info_to_pass)
273 # reset the information for the tag
276 def endDocument(self):
277 """Make sure all of our information has been passed.
279 This just flushes out any stored tags that need to be passed.
281 if self._previous_tag:
282 self._make_callback(self._previous_tag)
284 def read_and_call(uhandle, method, **keywds):
285 """read_and_call(uhandle, method[, start][, end][, contains][, blank][, has_re])
287 Read a line from uhandle, check it, and pass it to the method.
288 Raises a ValueError if the line does not pass the checks.
290 start, end, contains, blank, and has_re specify optional conditions
291 that the line must pass. start and end specifies what the line must
292 begin or end with (not counting EOL characters). contains
293 specifies a substring that must be found in the line. If blank
294 is a true value, then the line must be blank. has_re should be
295 a regular expression object with a pattern that the line must match
299 line = safe_readline(uhandle)
300 errmsg = _fails_conditions(*(line,), **keywds)
301 if errmsg is not None:
302 raise ValueError(errmsg)
305 def read_and_call_while(uhandle, method, **keywds):
306 """read_and_call_while(uhandle, method[, start][, end][, contains][, blank][, has_re]) -> number of lines
308 Read a line from uhandle and pass it to the method as long as
309 some condition is true. Returns the number of lines that were read.
311 See the docstring for read_and_call for a description of the parameters.
316 line = safe_readline(uhandle)
317 # If I've failed the condition, then stop reading the line.
318 if _fails_conditions(*(line,), **keywds):
319 uhandle.saveline(line)
325 def read_and_call_until(uhandle, method, **keywds):
326 """read_and_call_until(uhandle, method,
327 start=None, end=None, contains=None, blank=None) -> number of lines
329 Read a line from uhandle and pass it to the method until
330 some condition is true. Returns the number of lines that were read.
332 See the docstring for read_and_call for a description of the parameters.
337 line = safe_readline(uhandle)
338 # If I've met the condition, then stop reading the line.
339 if not _fails_conditions(*(line,), **keywds):
340 uhandle.saveline(line)
346 def attempt_read_and_call(uhandle, method, **keywds):
347 """attempt_read_and_call(uhandle, method, **keywds) -> boolean
349 Similar to read_and_call, but returns a boolean specifying
350 whether the line has passed the checks. Does not raise
353 See docs for read_and_call for a description of the function
357 line = safe_readline(uhandle)
358 passed = not _fails_conditions(*(line,), **keywds)
362 uhandle.saveline(line)
365 def _fails_conditions(line, start=None, end=None, contains=None, blank=None,
367 if start is not None:
368 if line[:len(start)] != start:
369 return "Line does not start with '%s':\n%s" % (start, line)
371 if line.rstrip()[-len(end):] != end:
372 return "Line does not end with '%s':\n%s" % (end, line)
373 if contains is not None:
374 if line.find(contains) == -1:
375 return "Line does not contain '%s':\n%s" % (contains, line)
376 if blank is not None:
378 if not is_blank_line(line):
379 return "Expected blank line, but got:\n%s" % line
381 if is_blank_line(line):
382 return "Expected non-blank line, but got a blank one"
383 if has_re is not None:
384 if has_re.search(line) is None:
385 return "Line does not match regex '%s':\n%s" % (
386 has_re.pattern, line)
389 def is_blank_line(line, allow_spaces=0):
390 """is_blank_line(line, allow_spaces=0) -> boolean
392 Return whether a line is blank. allow_spaces specifies whether to
393 allow whitespaces in a blank line. A true value signifies that a
394 line containing whitespaces as well as end-of-line characters
395 should be considered blank.
401 return line.rstrip() == ''
402 return line[0] == '\n' or line[0] == '\r'
404 def safe_readline(handle):
405 """safe_readline(handle) -> line
407 Read a line from an UndoHandle and return it. If there are no more
408 lines to read, I will raise a ValueError.
411 line = handle.readline()
413 raise ValueError("Unexpected end of stream.")
416 def safe_peekline(handle):
417 """safe_peekline(handle) -> line
419 Peek at the next line in an UndoHandle and return it. If there are no
420 more lines to peek, I will raise a ValueError.
423 line = handle.peekline()
425 raise ValueError("Unexpected end of stream.")