Mac binaries
[jabaws.git] / website / archive / binaries / mac / src / globplot / biopython-1.50 / Bio / SeqIO / InsdcIO.py
1 # Copyright 2007-2009 by Peter Cock.  All rights reserved.
2 #
3 # This code is part of the Biopython distribution and governed by its
4 # license.  Please see the LICENSE file that should have been included
5 # as part of this package..
6
7 """Bio.SeqIO support for the "genbank" and "embl" file formats.
8
9 You are expected to use this module via the Bio.SeqIO functions.
10 Note that internally this module calls Bio.GenBank to do the actual
11 parsing of both GenBank and EMBL files.
12
13 See also:
14
15 International Nucleotide Sequence Database Collaboration
16 http://www.insdc.org/
17  
18 GenBank
19 http://www.ncbi.nlm.nih.gov/Genbank/
20
21 EMBL Nucleotide Sequence Database
22 http://www.ebi.ac.uk/embl/
23
24 DDBJ (DNA Data Bank of Japan)
25 http://www.ddbj.nig.ac.jp/
26 """
27
28 from Bio.Seq import UnknownSeq
29 from Bio.GenBank.Scanner import GenBankScanner, EmblScanner
30 from Bio import Alphabet
31 from Interfaces import SequentialSequenceWriter
32
33 # NOTE
34 # ====
35 # The "brains" for parsing GenBank and EMBL files (and any
36 # other flat file variants from the INSDC in future) is in
37 # Bio.GenBank.Scanner (plus the _FeatureConsumer in Bio.GenBank)
38
39 def GenBankIterator(handle) :
40     """Breaks up a Genbank file into SeqRecord objects.
41
42     Every section from the LOCUS line to the terminating // becomes
43     a single SeqRecord with associated annotation and features.
44     
45     Note that for genomes or chromosomes, there is typically only
46     one record."""
47     #This calls a generator function:
48     return GenBankScanner(debug=0).parse_records(handle)
49
50 def EmblIterator(handle) :
51     """Breaks up an EMBL file into SeqRecord objects.
52
53     Every section from the LOCUS line to the terminating // becomes
54     a single SeqRecord with associated annotation and features.
55     
56     Note that for genomes or chromosomes, there is typically only
57     one record."""
58     #This calls a generator function:
59     return EmblScanner(debug=0).parse_records(handle)
60
61 def GenBankCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein) :
62     """Breaks up a Genbank file into SeqRecord objects for each CDS feature.
63
64     Every section from the LOCUS line to the terminating // can contain
65     many CDS features.  These are returned as with the stated amino acid
66     translation sequence (if given).
67     """
68     #This calls a generator function:
69     return GenBankScanner(debug=0).parse_cds_features(handle, alphabet)
70     
71 def EmblCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein) :
72     """Breaks up a EMBL file into SeqRecord objects for each CDS feature.
73
74     Every section from the LOCUS line to the terminating // can contain
75     many CDS features.  These are returned as with the stated amino acid
76     translation sequence (if given).
77     """
78     #This calls a generator function:
79     return EmblScanner(debug=0).parse_cds_features(handle, alphabet)
80
81 class GenBankWriter(SequentialSequenceWriter) :
82     HEADER_WIDTH = 12
83     MAX_WIDTH = 80
84     
85     def _write_single_line(self, tag, text) :
86         "Used in the the 'header' of each GenBank record."""
87         assert len(tag) < self.HEADER_WIDTH
88         assert len(text) < self.MAX_WIDTH - self.HEADER_WIDTH, \
89                "Annotation %s too long for %s line" % (repr(text), tag)
90         self.handle.write("%s%s\n" % (tag.ljust(self.HEADER_WIDTH),
91                                       text.replace("\n"," ")))
92
93     def _write_multi_line(self, tag, text) :
94         "Used in the the 'header' of each GenBank record."""
95         #TODO - Do the line spliting while preserving white space?
96         max_len = self.MAX_WIDTH - self.HEADER_WIDTH
97         assert len(tag) < self.HEADER_WIDTH
98         text = text.strip()
99         if len(text) < max_len :
100             self._write_single_line(tag, text)
101             return
102
103         words = text.split()
104         assert max([len(w) for w in words]) < max_len, \
105                "Your description cannot be broken into nice lines!"
106         text = ""
107         while words and len(text) + 1 + len(words[0]) < max_len :
108             text += " " + words.pop(0)
109             text = text.strip()
110         assert len(text) < max_len
111         self._write_single_line(tag, text)
112         while words :
113             text = ""
114             while words and len(text) + 1 + len(words[0]) < max_len :
115                 text += " " + words.pop(0)
116                 text = text.strip()
117             assert len(text) < max_len
118             self._write_single_line("", text)
119         assert not words
120
121     def _write_the_first_line(self, record) :
122         """Write the LOCUS line."""
123         
124         locus = record.name
125         if not locus or locus == "<unknown name>" :
126             locus = record.id
127         if not locus or locus == "<unknown id>" :
128             locus = self._get_annotation_str(record, "accession", just_first=True)
129         if len(locus) > 16 :
130             raise ValueError("Locus identifier %s is too long" % repr(locus))
131
132         if len(record) > 99999999999 :
133             #Currently GenBank only officially support up to 350000, but
134             #the length field can take eleven digits
135             raise ValueError("Sequence too long!")
136
137         #Get the base alphabet (underneath any Gapped or StopCodon encoding)
138         a = Alphabet._get_base_alphabet(record.seq.alphabet)
139         if not isinstance(a, Alphabet.Alphabet) :
140             raise TypeError("Invalid alphabet")
141         elif isinstance(a, Alphabet.ProteinAlphabet) :
142             units = "bp"
143         elif isinstance(a, Alphabet.NucleotideAlphabet) :
144             units = "aa"
145         else :
146             #Must be something like NucleotideAlphabet or
147             #just the generic Alphabet (default for fasta files)
148             raise ValueError("Need a Nucleotide or Protein alphabet")
149
150         #Get the molecule type
151         #TODO - record this explicitly in the parser?
152         if isinstance(a, Alphabet.ProteinAlphabet) :
153             mol_type = ""
154         elif isinstance(a, Alphabet.DNAAlphabet) :
155             mol_type = "DNA"
156         elif isinstance(a, Alphabet.RNAAlphabet) :
157             mol_type = "RNA"
158         else :
159             #Must be something like NucleotideAlphabet or
160             #just the generic Alphabet (default for fasta files)
161             raise ValueError("Need a DNA, RNA or Protein alphabet")
162         
163         try :
164             division = record.annotations["data_file_division"]
165         except KeyError :
166             division = "UNK"
167         if division not in ["PRI","ROD","MAM","VRT","INV","PLN","BCT",
168                             "VRL","PHG","SYN","UNA","EST","PAT","STS",
169                             "GSS","HTG","HTC","ENV"] :
170             division = "UNK"
171         
172         assert len(units) == 2
173         assert len(division) == 3
174         #TODO - date
175         #TODO - mol_type
176         line = "LOCUS       %s %s %s    %s           %s 01-JAN-1980\n" \
177                      % (locus.ljust(16),
178                         str(len(record)).rjust(11),
179                         units,
180                         mol_type.ljust(6),
181                         division)
182         assert len(line) == 79+1, repr(line) #plus one for new line
183
184         assert line[12:28].rstrip() == locus, \
185                'LOCUS line does not contain the locus at the expected position:\n' + line
186         assert line[28:29] == " "
187         assert line[29:40].lstrip() == str(len(record)), \
188                'LOCUS line does not contain the length at the expected position:\n' + line
189
190         #Tests copied from Bio.GenBank.Scanner
191         assert line[40:44] in [' bp ', ' aa '] , \
192                'LOCUS line does not contain size units at expected position:\n' + line
193         assert line[44:47] in ['   ', 'ss-', 'ds-', 'ms-'], \
194                'LOCUS line does not have valid strand type (Single stranded, ...):\n' + line
195         assert line[47:54].strip() == "" \
196         or line[47:54].strip().find('DNA') != -1 \
197         or line[47:54].strip().find('RNA') != -1, \
198                'LOCUS line does not contain valid sequence type (DNA, RNA, ...):\n' + line
199         assert line[54:55] == ' ', \
200                'LOCUS line does not contain space at position 55:\n' + line
201         assert line[55:63].strip() in ['','linear','circular'], \
202                'LOCUS line does not contain valid entry (linear, circular, ...):\n' + line
203         assert line[63:64] == ' ', \
204                'LOCUS line does not contain space at position 64:\n' + line
205         assert line[67:68] == ' ', \
206                'LOCUS line does not contain space at position 68:\n' + line
207         assert line[70:71] == '-', \
208                'LOCUS line does not contain - at position 71 in date:\n' + line
209         assert line[74:75] == '-', \
210                'LOCUS line does not contain - at position 75 in date:\n' + line
211
212         self.handle.write(line)
213
214     def _get_annotation_str(self, record, key, default=".", just_first=False) :
215         """Get an annotation dictionary entry (as a string).
216
217         Some entries are lists, in which case if just_first=True the first entry
218         is returned.  If just_first=False (default) this verifies there is only
219         one entry before returning it."""
220         try :
221             answer = record.annotations[key]
222         except KeyError :
223             return default
224         if isinstance(answer, list) :
225             if not just_first : assert len(answer) == 1
226             return str(answer[0])
227         else :
228             return str(answer)
229
230     def _write_sequence(self, record):
231         #Loosely based on code from Howard Salis
232         #TODO - Force lower case?
233         LETTERS_PER_LINE = 60
234         SEQUENCE_INDENT = 9
235
236         if isinstance(record.seq, UnknownSeq) :
237             #We have already recorded the length, and there is no need
238             #to record a long sequence of NNNNNNN...NNN or whatever.
239             return
240
241         data = self._get_seq_string(record) #Catches sequence being None
242         seq_len = len(data)
243         for line_number in range(0,seq_len,LETTERS_PER_LINE):
244             self.handle.write(str(line_number+1).rjust(SEQUENCE_INDENT))
245             for words in range(line_number,min(line_number+LETTERS_PER_LINE,seq_len),10):
246                 self.handle.write(" %s" % data[words:words+10])
247             self.handle.write("\n")
248         
249     def write_record(self, record):
250         """Write a single record to the output file."""
251         handle = self.handle
252         self._write_the_first_line(record)
253
254         accession = self._get_annotation_str(record, "accession",
255                                              record.id.split(".",1)[0],
256                                              just_first=True)
257         acc_with_version = accession
258         if record.id.startswith(accession+".") :
259             try :
260                 acc_with_version = "%s.%i" \
261                                    % (accession, int(record.id.split(".",1)[1]))
262             except ValueError :
263                 pass
264         gi = self._get_annotation_str(record, "gi", just_first=True)
265
266         descr = record.description
267         if descr == "<unknown description>" : descr = "."
268         self._write_multi_line("DEFINITION", descr)
269         
270         self._write_single_line("ACCESSION", accession)
271         if gi != "." :
272             self._write_single_line("VERSION", "%s  GI:%s" % (acc_with_version,gi))
273         else :
274             self._write_single_line("VERSION", "%s" % (acc_with_version))
275
276         try :
277             #List of strings
278             keywords = "; ".join(record.annotations["keywords"])
279         except KeyError :
280             keywords = "."
281         self._write_multi_line("KEYWORDS", keywords)
282
283         self._write_multi_line("SOURCE", \
284                                 self._get_annotation_str(record, "source"))
285         #The ORGANISM line MUST be a single line, as any continuation is the taxonomy
286         org = self._get_annotation_str(record, "organism")
287         if len(org) > self.MAX_WIDTH - self.HEADER_WIDTH :
288             org = org[:self.MAX_WIDTH - self.HEADER_WIDTH-4]+"..."
289         self._write_single_line("  ORGANISM", org)
290         try :
291             #List of strings
292             taxonomy = "; ".join(record.annotations["taxonomy"])
293         except KeyError :
294             taxonomy = "."
295         self._write_multi_line("", taxonomy)
296
297         #TODO - References...
298         handle.write("FEATURES             Location/Qualifiers\n")
299         for feature in record.features :
300             self._write_feature(feature) 
301         handle.write("ORIGIN\n")
302         self._write_sequence(record)
303         handle.write("//\n")
304
305     def _write_feature(self, feature):
306         """Write a single SeqFeature object to features table.
307
308         Not implemented yet, but this stub exists in the short term to
309         facilitate working on writing GenBank files with a sub-class."""
310         #TODO - Features...
311         pass
312
313 if __name__ == "__main__" :
314     print "Quick self test"
315     import os
316     from StringIO import StringIO
317
318     def check_genbank_writer(records) :
319         handle = StringIO()
320         GenBankWriter(handle).write_file(records)
321         handle.seek(0)
322
323         records2 = list(GenBankIterator(handle))
324
325         assert len(records) == len(records2)
326         for r1, r2 in zip(records, records2) :
327             #The SwissProt parser may leave \n in the description...
328             assert r1.description.replace("\n", " ") == r2.description
329             assert r1.id == r2.id
330             assert r1.name == r2.name
331             assert str(r1.seq) == str(r2.seq)
332             for key in ["gi", "keywords", "source", "taxonomy"] :
333                 if key in r1.annotations :
334                     assert r1.annotations[key] == r2.annotations[key], key
335             for key in ["organism"] :
336                 if key in r1.annotations :
337                     v1 = r1.annotations[key]
338                     v2 = r2.annotations[key]
339                     assert isinstance(v1, str) and isinstance(v2, str)
340                     #SwissProt organism can be too long to record in GenBank format
341                     assert v1 == v2 or \
342                            (v2.endswith("...") and v1.startswith(v2[:-3])), key
343
344     for filename in os.listdir("../../Tests/GenBank") :
345         if not filename.endswith(".gbk") and not filename.endswith(".gb") :
346             continue
347         print filename
348         
349         handle = open("../../Tests/GenBank/%s" % filename)
350         records = list(GenBankIterator(handle))
351         handle.close()
352
353         check_genbank_writer(records)
354
355     for filename in os.listdir("../../Tests/EMBL") :
356         if not filename.endswith(".embl") :
357             continue
358         print filename
359         
360         handle = open("../../Tests/EMBL/%s" % filename)
361         records = list(EmblIterator(handle))
362         handle.close()
363
364         check_genbank_writer(records)
365
366     from Bio import SeqIO
367     for filename in os.listdir("../../Tests/SwissProt") :
368         if not filename.startswith("sp") :
369             continue
370         print filename
371         
372         handle = open("../../Tests/SwissProt/%s" % filename)
373         records = list(SeqIO.parse(handle,"swiss"))
374         handle.close()
375
376         check_genbank_writer(records)
377