comparison gffparser_bcbio.py @ 5:6e589f267c14

Uploaded
author devteam
date Tue, 04 Nov 2014 12:15:19 -0500
parents
children
comparison
equal deleted inserted replaced
4:619e0fcd9126 5:6e589f267c14
1 """Parse GFF files into features attached to Biopython SeqRecord objects.
2
3 This deals with GFF3 formatted files, a tab delimited format for storing
4 sequence features and annotations:
5
6 http://www.sequenceontology.org/gff3.shtml
7
8 It will also deal with older GFF versions (GTF/GFF2):
9
10 http://www.sanger.ac.uk/Software/formats/GFF/GFF_Spec.shtml
11 http://mblab.wustl.edu/GTF22.html
12
13 The implementation utilizes map/reduce parsing of GFF using Disco. Disco
14 (http://discoproject.org) is a Map-Reduce framework for Python utilizing
15 Erlang for parallelization. The code works on a single processor without
16 Disco using the same architecture.
17 """
18 import os
19 import copy
20 import re
21 import collections
22 import urllib
23 import itertools
24
25 # Make defaultdict compatible with versions of python older than 2.4
26 try:
27 collections.defaultdict
28 except AttributeError:
29 import _utils
30 collections.defaultdict = _utils.defaultdict
31
32 from Bio.Seq import Seq, UnknownSeq
33 from Bio.SeqRecord import SeqRecord
34 from Bio.SeqFeature import SeqFeature, FeatureLocation
35 from Bio import SeqIO
36
37 def _gff_line_map(line, params):
38 """Map part of Map-Reduce; parses a line of GFF into a dictionary.
39
40 Given an input line from a GFF file, this:
41 - decides if the file passes our filtering limits
42 - if so:
43 - breaks it into component elements
44 - determines the type of attribute (flat, parent, child or annotation)
45 - generates a dictionary of GFF info which can be serialized as JSON
46 """
47 gff3_kw_pat = re.compile("\w+=")
48 def _split_keyvals(keyval_str):
49 """Split key-value pairs in a GFF2, GTF and GFF3 compatible way.
50
51 GFF3 has key value pairs like:
52 count=9;gene=amx-2;sequence=SAGE:aacggagccg
53 GFF2 and GTF have:
54 Sequence "Y74C9A" ; Note "Clone Y74C9A; Genbank AC024206"
55 name "fgenesh1_pg.C_chr_1000003"; transcriptId 869
56 """
57 quals = collections.defaultdict(list)
58 if keyval_str is None:
59 return quals
60 # ensembl GTF has a stray semi-colon at the end
61 if keyval_str[-1] == ';':
62 keyval_str = keyval_str[:-1]
63 # GFF2/GTF has a semi-colon with at least one space after it.
64 # It can have spaces on both sides; wormbase does this.
65 # GFF3 works with no spaces.
66 # Split at the first one we can recognize as working
67 parts = keyval_str.split(" ; ")
68 if len(parts) == 1:
69 parts = keyval_str.split("; ")
70 if len(parts) == 1:
71 parts = keyval_str.split(";")
72 # check if we have GFF3 style key-vals (with =)
73 is_gff2 = True
74 if gff3_kw_pat.match(parts[0]):
75 is_gff2 = False
76 key_vals = [p.split('=') for p in parts]
77 # otherwise, we are separated by a space with a key as the first item
78 else:
79 pieces = []
80 for p in parts:
81 # fix misplaced semi-colons in keys in some GFF2 files
82 if p and p[0] == ';':
83 p = p[1:]
84 pieces.append(p.strip().split(" "))
85 key_vals = [(p[0], " ".join(p[1:])) for p in pieces]
86 for item in key_vals:
87 # standard in-spec items are key=value
88 if len(item) == 2:
89 key, val = item
90 # out-of-spec files can have just key values. We set an empty value
91 # which will be changed to true later to standardize.
92 else:
93 assert len(item) == 1, item
94 key = item[0]
95 val = ''
96 # remove quotes in GFF2 files
97 if (len(val) > 0 and val[0] == '"' and val[-1] == '"'):
98 val = val[1:-1]
99 if val:
100 quals[key].extend([v for v in val.split(',') if v])
101 # if we don't have a value, make this a key=True/False style
102 # attribute
103 else:
104 quals[key].append('true')
105 for key, vals in quals.items():
106 quals[key] = [urllib.unquote(v) for v in vals]
107 return quals, is_gff2
108
109 def _nest_gff2_features(gff_parts):
110 """Provide nesting of GFF2 transcript parts with transcript IDs.
111
112 exons and coding sequences are mapped to a parent with a transcript_id
113 in GFF2. This is implemented differently at different genome centers
114 and this function attempts to resolve that and map things to the GFF3
115 way of doing them.
116 """
117 # map protein or transcript ids to a parent
118 for transcript_id in ["transcript_id", "transcriptId", "proteinId"]:
119 try:
120 gff_parts["quals"]["Parent"] = \
121 gff_parts["quals"][transcript_id]
122 break
123 except KeyError:
124 pass
125 # case for WormBase GFF -- everything labelled as Transcript or CDS
126 for flat_name in ["Transcript", "CDS"]:
127 if gff_parts["quals"].has_key(flat_name):
128 # parent types
129 if gff_parts["type"] in [flat_name]:
130 if not gff_parts["id"]:
131 gff_parts["id"] = gff_parts["quals"][flat_name][0]
132 gff_parts["quals"]["ID"] = [gff_parts["id"]]
133 # children types
134 elif gff_parts["type"] in ["intron", "exon", "three_prime_UTR",
135 "coding_exon", "five_prime_UTR", "CDS", "stop_codon",
136 "start_codon"]:
137 gff_parts["quals"]["Parent"] = gff_parts["quals"][flat_name]
138 break
139
140 return gff_parts
141
142 strand_map = {'+' : 1, '-' : -1, '?' : None, None: None}
143 line = line.strip()
144 if line[:2] == "##":
145 return [('directive', line[2:])]
146 elif line and line[0] != "#":
147 parts = line.split('\t')
148 should_do = True
149 if params.limit_info:
150 for limit_name, limit_values in params.limit_info.items():
151 cur_id = tuple([parts[i] for i in
152 params.filter_info[limit_name]])
153 if cur_id not in limit_values:
154 should_do = False
155 break
156 if should_do:
157 assert len(parts) >= 8, line
158 # not python2.4 compatible but easier to understand
159 #gff_parts = [(None if p == '.' else p) for p in parts]
160 gff_parts = []
161 for p in parts:
162 if p == ".":
163 gff_parts.append(None)
164 else:
165 gff_parts.append(p)
166 gff_info = dict()
167 # collect all of the base qualifiers for this item
168 if len(parts) > 8:
169 quals, is_gff2 = _split_keyvals(gff_parts[8])
170 else:
171 quals, is_gff2 = dict(), False
172 gff_info["is_gff2"] = is_gff2
173 if gff_parts[1]:
174 quals["source"].append(gff_parts[1])
175 if gff_parts[5]:
176 quals["score"].append(gff_parts[5])
177 if gff_parts[7]:
178 quals["phase"].append(gff_parts[7])
179 gff_info['quals'] = dict(quals)
180 gff_info['rec_id'] = gff_parts[0]
181 # if we are describing a location, then we are a feature
182 if gff_parts[3] and gff_parts[4]:
183 gff_info['location'] = [int(gff_parts[3]) - 1,
184 int(gff_parts[4])]
185 gff_info['type'] = gff_parts[2]
186 gff_info['id'] = quals.get('ID', [''])[0]
187 gff_info['strand'] = strand_map.get(gff_parts[6], None)
188 if is_gff2:
189 gff_info = _nest_gff2_features(gff_info)
190 # features that have parents need to link so we can pick up
191 # the relationship
192 if gff_info['quals'].has_key('Parent'):
193 # check for self referential parent/child relationships
194 # remove the ID, which is not useful
195 for p in gff_info['quals']['Parent']:
196 if p == gff_info['id']:
197 gff_info['id'] = ''
198 del gff_info['quals']['ID']
199 break
200 final_key = 'child'
201 elif gff_info['id']:
202 final_key = 'parent'
203 # Handle flat features
204 else:
205 final_key = 'feature'
206 # otherwise, associate these annotations with the full record
207 else:
208 final_key = 'annotation'
209 if params.jsonify:
210 return [(final_key, simplejson.dumps(gff_info))]
211 else:
212 return [(final_key, gff_info)]
213 return []
214
215 def _gff_line_reduce(map_results, out, params):
216 """Reduce part of Map-Reduce; combines results of parsed features.
217 """
218 final_items = dict()
219 for gff_type, final_val in map_results:
220 if params.jsonify and gff_type not in ['directive']:
221 final_val = simplejson.loads(final_val)
222 try:
223 final_items[gff_type].append(final_val)
224 except KeyError:
225 final_items[gff_type] = [final_val]
226 for key, vals in final_items.items():
227 if params.jsonify:
228 vals = simplejson.dumps(vals)
229 out.add(key, vals)
230
231 class _MultiIDRemapper:
232 """Provide an ID remapping for cases where a parent has a non-unique ID.
233
234 Real life GFF3 cases have non-unique ID attributes, which we fix here
235 by using the unique sequence region to assign children to the right
236 parent.
237 """
238 def __init__(self, base_id, all_parents):
239 self._base_id = base_id
240 self._parents = all_parents
241
242 def remap_id(self, feature_dict):
243 rstart, rend = feature_dict['location']
244 for index, parent in enumerate(self._parents):
245 pstart, pend = parent['location']
246 if rstart >= pstart and rend <= pend:
247 if index > 0:
248 return ("%s_%s" % (self._base_id, index + 1))
249 else:
250 return self._base_id
251 raise ValueError("Did not find remapped ID location: %s, %s, %s" % (
252 self._base_id, [p['location'] for p in self._parents],
253 feature_dict['location']))
254
255 class _AbstractMapReduceGFF:
256 """Base class providing general GFF parsing for local and remote classes.
257
258 This class should be subclassed to provide a concrete class to parse
259 GFF under specific conditions. These classes need to implement
260 the _gff_process function, which returns a dictionary of SeqRecord
261 information.
262 """
263 def __init__(self, create_missing=True):
264 """Initialize GFF parser
265
266 create_missing - If True, create blank records for GFF ids not in
267 the base_dict. If False, an error will be raised.
268 """
269 self._create_missing = create_missing
270 self._map_fn = _gff_line_map
271 self._reduce_fn = _gff_line_reduce
272 self._examiner = GFFExaminer()
273
274 def _gff_process(self, gff_files, limit_info, target_lines=None):
275 raise NotImplementedError("Derived class must define")
276
277 def parse(self, gff_files, base_dict=None, limit_info=None):
278 """Parse a GFF file, returning an iterator of SeqRecords.
279
280 limit_info - A dictionary specifying the regions of the GFF file
281 which should be extracted. This allows only relevant portions of a file
282 to be parsed.
283
284 base_dict - A base dictionary of SeqRecord objects which may be
285 pre-populated with sequences and other features. The new features from
286 the GFF file will be added to this dictionary.
287 """
288 for rec in self.parse_in_parts(gff_files, base_dict, limit_info):
289 yield rec
290
291 def parse_in_parts(self, gff_files, base_dict=None, limit_info=None,
292 target_lines=None):
293 """Parse a region of a GFF file specified, returning info as generated.
294
295 target_lines -- The number of lines in the file which should be used
296 for each partial parse. This should be determined based on available
297 memory.
298 """
299 for results in self.parse_simple(gff_files, limit_info, target_lines):
300 if base_dict is None:
301 cur_dict = dict()
302 else:
303 cur_dict = copy.deepcopy(base_dict)
304 cur_dict = self._results_to_features(cur_dict, results)
305 all_ids = cur_dict.keys()
306 all_ids.sort()
307 for cur_id in all_ids:
308 yield cur_dict[cur_id]
309
310 def parse_simple(self, gff_files, limit_info=None, target_lines=1):
311 """Simple parse which does not build or nest features.
312
313 This returns a simple dictionary representation of each line in the
314 GFF file.
315 """
316 # gracefully handle a single file passed
317 if not isinstance(gff_files, (list, tuple)):
318 gff_files = [gff_files]
319 limit_info = self._normalize_limit_info(limit_info)
320 for results in self._gff_process(gff_files, limit_info, target_lines):
321 yield results
322
323 def _normalize_limit_info(self, limit_info):
324 """Turn all limit information into tuples for identical comparisons.
325 """
326 final_limit_info = {}
327 if limit_info:
328 for key, values in limit_info.items():
329 final_limit_info[key] = []
330 for v in values:
331 if isinstance(v, str):
332 final_limit_info[key].append((v,))
333 else:
334 final_limit_info[key].append(tuple(v))
335 return final_limit_info
336
337 def _results_to_features(self, base, results):
338 """Add parsed dictionaries of results to Biopython SeqFeatures.
339 """
340 base = self._add_annotations(base, results.get('annotation', []))
341 for feature in results.get('feature', []):
342 (_, base) = self._add_toplevel_feature(base, feature)
343 base = self._add_parent_child_features(base, results.get('parent', []),
344 results.get('child', []))
345 base = self._add_seqs(base, results.get('fasta', []))
346 base = self._add_directives(base, results.get('directive', []))
347 return base
348
349 def _add_directives(self, base, directives):
350 """Handle any directives or meta-data in the GFF file.
351
352 Relevant items are added as annotation meta-data to each record.
353 """
354 dir_keyvals = collections.defaultdict(list)
355 for directive in directives:
356 parts = directive.split()
357 if len(parts) > 1:
358 key = parts[0]
359 if len(parts) == 2:
360 val = parts[1]
361 else:
362 val = tuple(parts[1:])
363 dir_keyvals[key].append(val)
364 for key, vals in dir_keyvals.items():
365 for rec in base.values():
366 self._add_ann_to_rec(rec, key, vals)
367 return base
368
369 def _add_seqs(self, base, recs):
370 """Add sequence information contained in the GFF3 to records.
371 """
372 for rec in recs:
373 if base.has_key(rec.id):
374 base[rec.id].seq = rec.seq
375 else:
376 base[rec.id] = rec
377 return base
378
379 def _add_parent_child_features(self, base, parents, children):
380 """Add nested features with parent child relationships.
381 """
382 multi_remap = self._identify_dup_ids(parents)
383 # add children features
384 children_prep = collections.defaultdict(list)
385 for child_dict in children:
386 child_feature = self._get_feature(child_dict)
387 for pindex, pid in enumerate(child_feature.qualifiers['Parent']):
388 if multi_remap.has_key(pid):
389 pid = multi_remap[pid].remap_id(child_dict)
390 child_feature.qualifiers['Parent'][pindex] = pid
391 children_prep[pid].append((child_dict['rec_id'],
392 child_feature))
393 children = dict(children_prep)
394 # add children to parents that exist
395 for cur_parent_dict in parents:
396 cur_id = cur_parent_dict['id']
397 if multi_remap.has_key(cur_id):
398 cur_parent_dict['id'] = multi_remap[cur_id].remap_id(
399 cur_parent_dict)
400 cur_parent, base = self._add_toplevel_feature(base, cur_parent_dict)
401 cur_parent, children = self._add_children_to_parent(cur_parent,
402 children)
403 # create parents for children without them (GFF2 or split/bad files)
404 while len(children) > 0:
405 parent_id, cur_children = itertools.islice(children.items(),
406 1).next()
407 # one child, do not nest it
408 if len(cur_children) == 1:
409 rec_id, child = cur_children[0]
410 loc = (child.location.nofuzzy_start, child.location.nofuzzy_end)
411 rec, base = self._get_rec(base,
412 dict(rec_id=rec_id, location=loc))
413 rec.features.append(child)
414 del children[parent_id]
415 else:
416 cur_parent, base = self._add_missing_parent(base, parent_id,
417 cur_children)
418 cur_parent, children = self._add_children_to_parent(cur_parent,
419 children)
420 return base
421
422 def _identify_dup_ids(self, parents):
423 """Identify duplicated ID attributes in potential nested parents.
424
425 According to the GFF3 spec ID attributes are supposed to be unique
426 for a file, but this is not always true in practice. This looks
427 for duplicates, and provides unique IDs sorted by locations.
428 """
429 multi_ids = collections.defaultdict(list)
430 for parent in parents:
431 multi_ids[parent['id']].append(parent)
432 multi_ids = [(mid, parents) for (mid, parents) in multi_ids.items()
433 if len(parents) > 1]
434 multi_remap = dict()
435 for mid, parents in multi_ids:
436 multi_remap[mid] = _MultiIDRemapper(mid, parents)
437 return multi_remap
438
439 def _add_children_to_parent(self, cur_parent, children):
440 """Recursively add children to parent features.
441 """
442 if children.has_key(cur_parent.id):
443 cur_children = children[cur_parent.id]
444 for rec_id, cur_child in cur_children:
445 cur_child, children = self._add_children_to_parent(cur_child,
446 children)
447 cur_parent.location_operator = "join"
448 cur_parent.sub_features.append(cur_child)
449 del children[cur_parent.id]
450 return cur_parent, children
451
452 def _add_annotations(self, base, anns):
453 """Add annotation data from the GFF file to records.
454 """
455 # add these as a list of annotations, checking not to overwrite
456 # current values
457 for ann in anns:
458 rec, base = self._get_rec(base, ann)
459 for key, vals in ann['quals'].items():
460 self._add_ann_to_rec(rec, key, vals)
461 return base
462
463 def _add_ann_to_rec(self, rec, key, vals):
464 """Add a key/value annotation to the given SeqRecord.
465 """
466 if rec.annotations.has_key(key):
467 try:
468 rec.annotations[key].extend(vals)
469 except AttributeError:
470 rec.annotations[key] = [rec.annotations[key]] + vals
471 else:
472 rec.annotations[key] = vals
473
474 def _get_rec(self, base, info_dict):
475 """Retrieve a record to add features to.
476 """
477 max_loc = info_dict.get('location', (0, 1))[1]
478 try:
479 cur_rec = base[info_dict['rec_id']]
480 # update generated unknown sequences with the expected maximum length
481 if isinstance(cur_rec.seq, UnknownSeq):
482 cur_rec.seq._length = max([max_loc, cur_rec.seq._length])
483 return cur_rec, base
484 except KeyError:
485 if self._create_missing:
486 new_rec = SeqRecord(UnknownSeq(max_loc), info_dict['rec_id'])
487 base[info_dict['rec_id']] = new_rec
488 return new_rec, base
489 else:
490 raise
491
492 def _add_missing_parent(self, base, parent_id, cur_children):
493 """Add a new feature that is missing from the GFF file.
494 """
495 base_rec_id = list(set(c[0] for c in cur_children))
496 assert len(base_rec_id) == 1
497 feature_dict = dict(id=parent_id, strand=None,
498 type="inferred_parent", quals=dict(ID=[parent_id]),
499 rec_id=base_rec_id[0])
500 coords = [(c.location.nofuzzy_start, c.location.nofuzzy_end)
501 for r, c in cur_children]
502 feature_dict["location"] = (min([c[0] for c in coords]),
503 max([c[1] for c in coords]))
504 return self._add_toplevel_feature(base, feature_dict)
505
506 def _add_toplevel_feature(self, base, feature_dict):
507 """Add a toplevel non-nested feature to the appropriate record.
508 """
509 new_feature = self._get_feature(feature_dict)
510 rec, base = self._get_rec(base, feature_dict)
511 rec.features.append(new_feature)
512 return new_feature, base
513
514 def _get_feature(self, feature_dict):
515 """Retrieve a Biopython feature from our dictionary representation.
516 """
517 location = FeatureLocation(*feature_dict['location'])
518 new_feature = SeqFeature(location, feature_dict['type'],
519 id=feature_dict['id'], strand=feature_dict['strand'])
520 new_feature.qualifiers = feature_dict['quals']
521 return new_feature
522
523 def _parse_fasta(self, in_handle):
524 """Parse FASTA sequence information contained in the GFF3 file.
525 """
526 return list(SeqIO.parse(in_handle, "fasta"))
527
528 class _GFFParserLocalOut:
529 """Provide a collector for local GFF MapReduce file parsing.
530 """
531 def __init__(self, smart_breaks=False):
532 self._items = dict()
533 self._smart_breaks = smart_breaks
534 self._missing_keys = collections.defaultdict(int)
535 self._last_parent = None
536 self.can_break = True
537 self.num_lines = 0
538
539 def add(self, key, vals):
540 if self._smart_breaks:
541 # if we are not GFF2 we expect parents and break
542 # based on not having missing ones
543 if key == 'directive':
544 if vals[0] == '#':
545 self.can_break = True
546 self._last_parent = None
547 elif not vals[0].get("is_gff2", False):
548 self._update_missing_parents(key, vals)
549 self.can_break = (len(self._missing_keys) == 0)
550 # break when we are done with stretches of child features
551 elif key != 'child':
552 self.can_break = True
553 self._last_parent = None
554 # break when we have lots of child features in a row
555 # and change between parents
556 else:
557 cur_parent = vals[0]["quals"]["Parent"][0]
558 if (self._last_parent):
559 self.can_break = (cur_parent != self._last_parent)
560 self._last_parent = cur_parent
561 self.num_lines += 1
562 try:
563 self._items[key].extend(vals)
564 except KeyError:
565 self._items[key] = vals
566
567 def _update_missing_parents(self, key, vals):
568 # smart way of deciding if we can break this.
569 # if this is too much, can go back to not breaking in the
570 # middle of children
571 if key in ["child"]:
572 for val in vals:
573 for p_id in val["quals"]["Parent"]:
574 self._missing_keys[p_id] += 1
575 for val in vals:
576 try:
577 del self._missing_keys[val["quals"]["ID"][0]]
578 except KeyError:
579 pass
580
581 def has_items(self):
582 return len(self._items) > 0
583
584 def get_results(self):
585 self._last_parent = None
586 return self._items
587
588 class GFFParser(_AbstractMapReduceGFF):
589 """Local GFF parser providing standardized parsing of GFF3 and GFF2 files.
590 """
591 def __init__(self, line_adjust_fn=None, create_missing=True):
592 _AbstractMapReduceGFF.__init__(self, create_missing=create_missing)
593 self._line_adjust_fn = line_adjust_fn
594
595 def _gff_process(self, gff_files, limit_info, target_lines):
596 """Process GFF addition without any parallelization.
597
598 In addition to limit filtering, this accepts a target_lines attribute
599 which provides a number of lines to parse before returning results.
600 This allows partial parsing of a file to prevent memory issues.
601 """
602 line_gen = self._file_line_generator(gff_files)
603 for out in self._lines_to_out_info(line_gen, limit_info, target_lines):
604 yield out
605
606 def _file_line_generator(self, gff_files):
607 """Generate single lines from a set of GFF files.
608 """
609 for gff_file in gff_files:
610 if hasattr(gff_file, "read"):
611 need_close = False
612 in_handle = gff_file
613 else:
614 need_close = True
615 in_handle = open(gff_file)
616 found_seqs = False
617 while 1:
618 line = in_handle.readline()
619 if not line:
620 break
621 yield line
622 if need_close:
623 in_handle.close()
624
625 def _lines_to_out_info(self, line_iter, limit_info=None,
626 target_lines=None):
627 """Generate SeqRecord and SeqFeatures from GFF file lines.
628 """
629 params = self._examiner._get_local_params(limit_info)
630 out_info = _GFFParserLocalOut((target_lines is not None and
631 target_lines > 1))
632 found_seqs = False
633 for line in line_iter:
634 results = self._map_fn(line, params)
635 if self._line_adjust_fn and results:
636 if results[0][0] not in ['directive']:
637 results = [(results[0][0],
638 self._line_adjust_fn(results[0][1]))]
639 self._reduce_fn(results, out_info, params)
640 if (target_lines and out_info.num_lines >= target_lines and
641 out_info.can_break):
642 yield out_info.get_results()
643 out_info = _GFFParserLocalOut((target_lines is not None and
644 target_lines > 1))
645 if (results and results[0][0] == 'directive' and
646 results[0][1] == 'FASTA'):
647 found_seqs = True
648 break
649
650 class FakeHandle:
651 def __init__(self, line_iter):
652 self._iter = line_iter
653 def read(self):
654 return "".join(l for l in self._iter)
655 def readline(self):
656 try:
657 return self._iter.next()
658 except StopIteration:
659 return ""
660
661 if found_seqs:
662 fasta_recs = self._parse_fasta(FakeHandle(line_iter))
663 out_info.add('fasta', fasta_recs)
664 if out_info.has_items():
665 yield out_info.get_results()
666
667 class DiscoGFFParser(_AbstractMapReduceGFF):
668 """GFF Parser with parallelization through Disco (http://discoproject.org.
669 """
670 def __init__(self, disco_host, create_missing=True):
671 """Initialize parser.
672
673 disco_host - Web reference to a Disco host which will be used for
674 parallelizing the GFF reading job.
675 """
676 _AbstractMapReduceGFF.__init__(self, create_missing=create_missing)
677 self._disco_host = disco_host
678
679 def _gff_process(self, gff_files, limit_info, target_lines=None):
680 """Process GFF addition, using Disco to parallelize the process.
681 """
682 assert target_lines is None, "Cannot split parallelized jobs"
683 # make these imports local; only need them when using disco
684 import simplejson
685 import disco
686 # absolute path names unless they are special disco files
687 full_files = []
688 for f in gff_files:
689 if f.split(":")[0] != "disco":
690 full_files.append(os.path.abspath(f))
691 else:
692 full_files.append(f)
693 results = disco.job(self._disco_host, name="gff_reader",
694 input=full_files,
695 params=disco.Params(limit_info=limit_info, jsonify=True,
696 filter_info=self._examiner._filter_info),
697 required_modules=["simplejson", "collections", "re"],
698 map=self._map_fn, reduce=self._reduce_fn)
699 processed = dict()
700 for out_key, out_val in disco.result_iterator(results):
701 processed[out_key] = simplejson.loads(out_val)
702 yield processed
703
704 def parse(gff_files, base_dict=None, limit_info=None, target_lines=None):
705 """High level interface to parse GFF files into SeqRecords and SeqFeatures.
706 """
707 parser = GFFParser()
708 for rec in parser.parse_in_parts(gff_files, base_dict, limit_info,
709 target_lines):
710 yield rec
711
712 def _file_or_handle(fn):
713 """Decorator to handle either an input handle or a file.
714 """
715 def _file_or_handle_inside(*args, **kwargs):
716 in_file = args[1]
717 if hasattr(in_file, "read"):
718 need_close = False
719 in_handle = in_file
720 else:
721 need_close = True
722 in_handle = open(in_file)
723 args = (args[0], in_handle) + args[2:]
724 out = fn(*args, **kwargs)
725 if need_close:
726 in_handle.close()
727 return out
728 return _file_or_handle_inside
729
730 class GFFExaminer:
731 """Provide high level details about a GFF file to refine parsing.
732
733 GFF is a spec and is provided by many different centers. Real life files
734 will present the same information in slightly different ways. Becoming
735 familiar with the file you are dealing with is the best way to extract the
736 information you need. This class provides high level summary details to
737 help in learning.
738 """
739 def __init__(self):
740 self._filter_info = dict(gff_id = [0], gff_source_type = [1, 2],
741 gff_source = [1], gff_type = [2])
742
743 def _get_local_params(self, limit_info=None):
744 class _LocalParams:
745 def __init__(self):
746 self.jsonify = False
747 params = _LocalParams()
748 params.limit_info = limit_info
749 params.filter_info = self._filter_info
750 return params
751
752 @_file_or_handle
753 def available_limits(self, gff_handle):
754 """Return dictionary information on possible limits for this file.
755
756 This returns a nested dictionary with the following structure:
757
758 keys -- names of items to filter by
759 values -- dictionary with:
760 keys -- filter choice
761 value -- counts of that filter in this file
762
763 Not a parallelized map-reduce implementation.
764 """
765 cur_limits = dict()
766 for filter_key in self._filter_info.keys():
767 cur_limits[filter_key] = collections.defaultdict(int)
768 for line in gff_handle:
769 # when we hit FASTA sequences, we are done with annotations
770 if line.startswith("##FASTA"):
771 break
772 # ignore empty and comment lines
773 if line.strip() and line.strip()[0] != "#":
774 parts = [p.strip() for p in line.split('\t')]
775 assert len(parts) == 9, line
776 for filter_key, cur_indexes in self._filter_info.items():
777 cur_id = tuple([parts[i] for i in cur_indexes])
778 cur_limits[filter_key][cur_id] += 1
779 # get rid of the default dicts
780 final_dict = dict()
781 for key, value_dict in cur_limits.items():
782 if len(key) == 1:
783 key = key[0]
784 final_dict[key] = dict(value_dict)
785 gff_handle.close()
786 return final_dict
787
788 @_file_or_handle
789 def parent_child_map(self, gff_handle):
790 """Provide a mapping of parent to child relationships in the file.
791
792 Returns a dictionary of parent child relationships:
793
794 keys -- tuple of (source, type) for each parent
795 values -- tuple of (source, type) as children of that parent
796
797 Not a parallelized map-reduce implementation.
798 """
799 # collect all of the parent and child types mapped to IDs
800 parent_sts = dict()
801 child_sts = collections.defaultdict(list)
802 for line in gff_handle:
803 # when we hit FASTA sequences, we are done with annotations
804 if line.startswith("##FASTA"):
805 break
806 if line.strip():
807 line_type, line_info = _gff_line_map(line,
808 self._get_local_params())[0]
809 if (line_type == 'parent' or (line_type == 'child' and
810 line_info['id'])):
811 parent_sts[line_info['id']] = (
812 line_info['quals']['source'][0], line_info['type'])
813 if line_type == 'child':
814 for parent_id in line_info['quals']['Parent']:
815 child_sts[parent_id].append((
816 line_info['quals']['source'][0], line_info['type']))
817 #print parent_sts, child_sts
818 # generate a dictionary of the unique final type relationships
819 pc_map = collections.defaultdict(list)
820 for parent_id, parent_type in parent_sts.items():
821 for child_type in child_sts[parent_id]:
822 pc_map[parent_type].append(child_type)
823 pc_final_map = dict()
824 for ptype, ctypes in pc_map.items():
825 unique_ctypes = list(set(ctypes))
826 unique_ctypes.sort()
827 pc_final_map[ptype] = unique_ctypes
828 return pc_final_map