--- /dev/null
+# Copyright 1999 by Jeffrey Chang. All rights reserved.
+# This code is part of the Biopython distribution and governed by its
+# license. Please see the LICENSE file that should have been included
+# as part of this package.
+
+"""Code to support writing parsers.
+
+
+
+Classes:
+AbstractParser Base class for parsers.
+AbstractConsumer Base class of all Consumers.
+TaggingConsumer Consumer that tags output with its event. For debugging
+SGMLStrippingConsumer Consumer that strips SGML tags from output.
+EventGenerator Generate Biopython Events from Martel XML output
+ (note that Martel is now DEPRECATED)
+
+Functions:
+safe_readline Read a line from a handle, with check for EOF.
+safe_peekline Peek at next line, with check for EOF.
+read_and_call Read a line from a handle and pass it to a method.
+read_and_call_while Read many lines, as long as a condition is met.
+read_and_call_until Read many lines, until a condition is met.
+attempt_read_and_call Like read_and_call, but forgiving of errors.
+is_blank_line Test whether a line is blank.
+
+"""
+
+import sys
+import traceback
+from types import *
+
+from Bio import File
+
+# XML from python 2.0
+try:
+ from xml.sax import handler
+ xml_support = 1
+except ImportError:
+ sys.stderr.write("Warning: Could not import SAX for dealing with XML.\n" +
+ "This causes problems with some ParserSupport modules\n")
+ xml_support = 0
+
+class AbstractParser:
+ """Base class for other parsers.
+
+ """
+ def parse(self, handle):
+ raise NotImplementedError("Please implement in a derived class")
+
+ def parse_str(self, string):
+ return self.parse(File.StringHandle(string))
+
+ def parse_file(self, filename):
+ h = open(filename)
+ try:
+ retval = self.parse(h)
+ finally:
+ h.close()
+ return retval
+
+class AbstractConsumer:
+ """Base class for other Consumers.
+
+ Derive Consumers from this class and implement appropriate
+ methods for each event that you want to receive.
+
+ """
+ def _unhandled_section(self):
+ pass
+ def _unhandled(self, data):
+ pass
+ def __getattr__(self, attr):
+ if attr[:6] == 'start_' or attr[:4] == 'end_':
+ method = self._unhandled_section
+ else:
+ method = self._unhandled
+ return method
+
+class TaggingConsumer(AbstractConsumer):
+ """A Consumer that tags the data stream with the event and
+ prints it to a handle. Useful for debugging.
+
+ """
+ def __init__(self, handle=None, colwidth=15, maxwidth=80):
+ """TaggingConsumer(handle=sys.stdout, colwidth=15, maxwidth=80)"""
+ # I can't assign sys.stdout to handle in the argument list.
+ # If I do that, handle will be assigned the value of sys.stdout
+ # the first time this function is called. This will fail if
+ # the user has assigned sys.stdout to some other file, which may
+ # be closed or invalid at a later time.
+ if handle is None:
+ handle = sys.stdout
+ self._handle = handle
+ self._colwidth = colwidth
+ self._maxwidth = maxwidth
+
+ def unhandled_section(self):
+ self._print_name('unhandled_section')
+
+ def unhandled(self, data):
+ self._print_name('unhandled', data)
+
+ def _print_name(self, name, data=None):
+ if data is None:
+ # Write the name of a section.
+ self._handle.write("%s %s\n" % ("*"*self._colwidth, name))
+ else:
+ # Write the tag and line.
+ self._handle.write("%-*s: %s\n" % (
+ self._colwidth, name[:self._colwidth],
+ data[:self._maxwidth-self._colwidth-2].rstrip()))
+
+ def __getattr__(self, attr):
+ if attr[:6] == 'start_' or attr[:4] == 'end_':
+ method = lambda a=attr, s=self: s._print_name(a)
+ else:
+ method = lambda x, a=attr, s=self: s._print_name(a, x)
+ return method
+
+class SGMLStrippingConsumer:
+ """A consumer that strips off SGML tags.
+
+ This is meant to be used as a decorator for other consumers.
+
+ """
+ def __init__(self, consumer):
+ if type(consumer) is not InstanceType:
+ raise ValueError("consumer should be an instance")
+ self._consumer = consumer
+ self._prev_attr = None
+ self._stripper = File.SGMLStripper()
+
+ def _apply_clean_data(self, data):
+ clean = self._stripper.strip(data)
+ self._prev_attr(clean)
+
+ def __getattr__(self, name):
+ if name in ['_prev_attr', '_stripper']:
+ return getattr(self, name)
+ attr = getattr(self._consumer, name)
+ # If this is not a method, then return it as is.
+ if type(attr) is not MethodType:
+ return attr
+ # If it's a section method, then return it.
+ if name[:6] == 'start_' or name[:4] == 'end_':
+ return attr
+ # Otherwise, it's an info event, and return my method.
+ self._prev_attr = attr
+ return self._apply_clean_data
+
+# onle use the Event Generator if XML handling is okay
+if xml_support:
+ class EventGenerator(handler.ContentHandler):
+ """Handler to generate events associated with a Martel parsed file.
+
+ This acts like a normal SAX handler, and accepts XML generated by
+ Martel during parsing. These events are then converted into
+ 'Biopython events', which can then be caught by a standard
+ biopython consumer.
+
+ Note that Martel is now DEPRECATED.
+ """
+ def __init__(self, consumer, interest_tags, callback_finalizer = None,
+ exempt_tags = []):
+ """Initialize to begin catching and firing off events.
+
+ Arguments:
+ o consumer - The consumer that we'll send Biopython events to.
+
+ o interest_tags - A listing of all the tags we are interested in.
+
+ o callback_finalizer - A function to deal with the collected
+ information before passing it on to the consumer. By default
+ the collected information is a list of all of the lines read
+ for a particular tag -- if there are multiple tags in a row
+ like:
+
+ <some_info>Spam<some_info>
+ <some_info>More Spam<some_info>
+
+ In this case the list of information would be:
+
+ ['Spam', 'More Spam']
+
+ This list of lines will be passed to the callback finalizer if
+ it is present. Otherwise the consumer will be called with the
+ list of content information.
+
+ o exempt_tags - A listing of particular tags that are exempt from
+ being processed by the callback_finalizer. This allows you to
+ use a finalizer to deal with most tags, but leave those you don't
+ want touched.
+ """
+ self._consumer = consumer
+ self.interest_tags = interest_tags
+ self._finalizer = callback_finalizer
+ self._exempt_tags = exempt_tags
+
+ # a dictionary of content for each tag of interest
+ # the information for each tag is held as a list of the lines.
+ # This allows us to collect information from multiple tags
+ # in a row, and return it all at once.
+ self.info = {}
+ for tag in self.interest_tags:
+ self.info[tag] = []
+
+ # the previous tag we were collecting information for.
+ # We set a delay in sending info to the consumer so that we can
+ # collect a bunch of tags in a row and append all of the info
+ # together.
+ self._previous_tag = ''
+
+ # the current character information for a tag
+ self._cur_content = []
+ # whether we should be collecting information
+ self._collect_characters = 0
+
+ def startElement(self, name, attrs):
+ """Determine if we should collect characters from this tag.
+ """
+ if name in self.interest_tags:
+ self._collect_characters = 1
+
+ def characters(self, content):
+ """Extract the information if we are interested in it.
+ """
+ if self._collect_characters:
+ self._cur_content.append(content)
+
+ def endElement(self, name):
+ """Send the information to the consumer.
+
+ Once we've got the end element we've collected up all of the
+ character information we need, and we need to send this on to
+ the consumer to do something with it.
+
+ We have a delay of one tag on doing this, so that we can collect
+ all of the info from multiple calls to the same element at once.
+ """
+ # only deal with the tag if it is something we are
+ # interested in and potentially have information for
+ if self._collect_characters:
+ # add all of the information collected inside this tag
+ self.info[name].append("".join(self._cur_content))
+ # reset our information and flags
+ self._cur_content = []
+ self._collect_characters = 0
+
+ # if we are at a new tag, pass on the info from the last tag
+ if self._previous_tag and self._previous_tag != name:
+ self._make_callback(self._previous_tag)
+
+ # set this tag as the next to be passed
+ self._previous_tag = name
+
+ def _make_callback(self, name):
+ """Call the callback function with the info with the given name.
+ """
+ # strip off whitespace and call the consumer
+ callback_function = getattr(self._consumer, name)
+
+ # --- pass back the information
+ # if there is a finalizer, use that
+ if self._finalizer is not None and name not in self._exempt_tags:
+ info_to_pass = self._finalizer(self.info[name])
+ # otherwise pass back the entire list of information
+ else:
+ info_to_pass = self.info[name]
+
+ callback_function(info_to_pass)
+
+ # reset the information for the tag
+ self.info[name] = []
+
+ def endDocument(self):
+ """Make sure all of our information has been passed.
+
+ This just flushes out any stored tags that need to be passed.
+ """
+ if self._previous_tag:
+ self._make_callback(self._previous_tag)
+
+def read_and_call(uhandle, method, **keywds):
+ """read_and_call(uhandle, method[, start][, end][, contains][, blank][, has_re])
+
+ Read a line from uhandle, check it, and pass it to the method.
+ Raises a ValueError if the line does not pass the checks.
+
+ start, end, contains, blank, and has_re specify optional conditions
+ that the line must pass. start and end specifies what the line must
+ begin or end with (not counting EOL characters). contains
+ specifies a substring that must be found in the line. If blank
+ is a true value, then the line must be blank. has_re should be
+ a regular expression object with a pattern that the line must match
+ somewhere.
+
+ """
+ line = safe_readline(uhandle)
+ errmsg = _fails_conditions(*(line,), **keywds)
+ if errmsg is not None:
+ raise ValueError(errmsg)
+ method(line)
+
+def read_and_call_while(uhandle, method, **keywds):
+ """read_and_call_while(uhandle, method[, start][, end][, contains][, blank][, has_re]) -> number of lines
+
+ Read a line from uhandle and pass it to the method as long as
+ some condition is true. Returns the number of lines that were read.
+
+ See the docstring for read_and_call for a description of the parameters.
+
+ """
+ nlines = 0
+ while 1:
+ line = safe_readline(uhandle)
+ # If I've failed the condition, then stop reading the line.
+ if _fails_conditions(*(line,), **keywds):
+ uhandle.saveline(line)
+ break
+ method(line)
+ nlines = nlines + 1
+ return nlines
+
+def read_and_call_until(uhandle, method, **keywds):
+ """read_and_call_until(uhandle, method,
+ start=None, end=None, contains=None, blank=None) -> number of lines
+
+ Read a line from uhandle and pass it to the method until
+ some condition is true. Returns the number of lines that were read.
+
+ See the docstring for read_and_call for a description of the parameters.
+
+ """
+ nlines = 0
+ while 1:
+ line = safe_readline(uhandle)
+ # If I've met the condition, then stop reading the line.
+ if not _fails_conditions(*(line,), **keywds):
+ uhandle.saveline(line)
+ break
+ method(line)
+ nlines = nlines + 1
+ return nlines
+
+def attempt_read_and_call(uhandle, method, **keywds):
+ """attempt_read_and_call(uhandle, method, **keywds) -> boolean
+
+ Similar to read_and_call, but returns a boolean specifying
+ whether the line has passed the checks. Does not raise
+ exceptions.
+
+ See docs for read_and_call for a description of the function
+ arguments.
+
+ """
+ line = safe_readline(uhandle)
+ passed = not _fails_conditions(*(line,), **keywds)
+ if passed:
+ method(line)
+ else:
+ uhandle.saveline(line)
+ return passed
+
+def _fails_conditions(line, start=None, end=None, contains=None, blank=None,
+ has_re=None):
+ if start is not None:
+ if line[:len(start)] != start:
+ return "Line does not start with '%s':\n%s" % (start, line)
+ if end is not None:
+ if line.rstrip()[-len(end):] != end:
+ return "Line does not end with '%s':\n%s" % (end, line)
+ if contains is not None:
+ if line.find(contains) == -1:
+ return "Line does not contain '%s':\n%s" % (contains, line)
+ if blank is not None:
+ if blank:
+ if not is_blank_line(line):
+ return "Expected blank line, but got:\n%s" % line
+ else:
+ if is_blank_line(line):
+ return "Expected non-blank line, but got a blank one"
+ if has_re is not None:
+ if has_re.search(line) is None:
+ return "Line does not match regex '%s':\n%s" % (
+ has_re.pattern, line)
+ return None
+
+def is_blank_line(line, allow_spaces=0):
+ """is_blank_line(line, allow_spaces=0) -> boolean
+
+ Return whether a line is blank. allow_spaces specifies whether to
+ allow whitespaces in a blank line. A true value signifies that a
+ line containing whitespaces as well as end-of-line characters
+ should be considered blank.
+
+ """
+ if not line:
+ return 1
+ if allow_spaces:
+ return line.rstrip() == ''
+ return line[0] == '\n' or line[0] == '\r'
+
+def safe_readline(handle):
+ """safe_readline(handle) -> line
+
+ Read a line from an UndoHandle and return it. If there are no more
+ lines to read, I will raise a ValueError.
+
+ """
+ line = handle.readline()
+ if not line:
+ raise ValueError("Unexpected end of stream.")
+ return line
+
+def safe_peekline(handle):
+ """safe_peekline(handle) -> line
+
+ Peek at the next line in an UndoHandle and return it. If there are no
+ more lines to peek, I will raise a ValueError.
+
+ """
+ line = handle.peekline()
+ if not line:
+ raise ValueError("Unexpected end of stream.")
+ return line