comparison jb2_GFF/GFFParser.py @ 17:4c201a3d4755 draft

planemo upload for repository https://github.com/usegalaxy-eu/temporary-tools/tree/master/jbrowse2 commit a37bfdfc108501b11c7b2aa15efb1bd16f0c4b66
author fubar
date Sun, 28 Jan 2024 06:48:52 +0000
parents
children
comparison
equal deleted inserted replaced
16:1fe91657bfd6 17:4c201a3d4755
1 """Parse GFF files into features attached to Biopython SeqRecord objects.
2
3 This deals with GFF3 formatted files, a tab delimited format for storing
4 sequence features and annotations:
5
6 http://www.sequenceontology.org/gff3.shtml
7
8 It will also deal with older GFF versions (GTF/GFF2):
9
10 http://www.sanger.ac.uk/Software/formats/GFF/GFF_Spec.shtml
11 http://mblab.wustl.edu/GTF22.html
12
13 The implementation utilizes map/reduce parsing of GFF using Disco. Disco
14 (http://discoproject.org) is a Map-Reduce framework for Python utilizing
15 Erlang for parallelization. The code works on a single processor without
16 Disco using the same architecture.
17 """
18 import os
19 import copy
20 import json
21 import re
22 import collections
23 import io
24 import itertools
25 import warnings
26 import six
27 from six.moves import urllib
28
29 from Bio.SeqRecord import SeqRecord
30 from Bio import SeqFeature
31 from Bio import SeqIO
32 from Bio import BiopythonDeprecationWarning
33
34 import disco
35
36 # Make defaultdict compatible with versions of python older than 2.4
37 try:
38 collections.defaultdict
39 except AttributeError:
40 import _utils
41
42 collections.defaultdict = _utils.defaultdict
43
44 unknown_seq_avail = False
45 try:
46 from Bio.Seq import UnknownSeq
47
48 unknown_seq_avail = True
49 except ImportError:
50 # Starting with biopython 1.81, has been removed
51 from Bio.Seq import _UndefinedSequenceData
52 from Bio.Seq import Seq
53
54
55 warnings.simplefilter("ignore", BiopythonDeprecationWarning)
56
57
58 def _gff_line_map(line, params):
59 """Map part of Map-Reduce; parses a line of GFF into a dictionary.
60
61 Given an input line from a GFF file, this:
62 - decides if the file passes our filtering limits
63 - if so:
64 - breaks it into component elements
65 - determines the type of attribute (flat, parent, child or annotation)
66 - generates a dictionary of GFF info which can be serialized as JSON
67 """
68
69 def _merge_keyvals(parts):
70 """Merge key-values escaped by quotes
71 that are improperly split at semicolons."""
72 out = []
73 for i, p in enumerate(parts):
74 if (
75 i > 0
76 and len(p) == 1
77 and p[0].endswith('"')
78 and not p[0].startswith('"')
79 ):
80 if out[-1][-1].startswith('"'):
81 prev_p = out.pop(-1)
82 to_merge = prev_p[-1]
83 prev_p[-1] = "%s; %s" % (to_merge, p[0])
84 out.append(prev_p)
85 else:
86 out.append(p)
87 return out
88
89 gff3_kw_pat = re.compile(r"\w+=")
90
91 def _split_keyvals(keyval_str):
92 """Split key-value pairs in a GFF2, GTF and GFF3 compatible way.
93
94 GFF3 has key value pairs like:
95 count=9;gene=amx-2;sequence=SAGE:aacggagccg
96 GFF2 and GTF have:
97 Sequence "Y74C9A" ; Note "Clone Y74C9A; Genbank AC024206"
98 name "fgenesh1_pg.C_chr_1000003"; transcriptId 869
99 """
100 quals = collections.defaultdict(list)
101 if keyval_str is None:
102 return quals
103 # ensembl GTF has a stray semi-colon at the end
104 if keyval_str[-1] == ";":
105 keyval_str = keyval_str[:-1]
106 # GFF2/GTF has a semi-colon with at least one space after it.
107 # It can have spaces on both sides; wormbase does this.
108 # GFF3 works with no spaces.
109 # Split at the first one we can recognize as working
110 parts = keyval_str.split(" ; ")
111 if len(parts) == 1:
112 parts = [x.strip() for x in keyval_str.split(";")]
113 # check if we have GFF3 style key-vals (with =)
114 is_gff2 = True
115 if gff3_kw_pat.match(parts[0]):
116 is_gff2 = False
117 key_vals = _merge_keyvals([p.split("=") for p in parts])
118 # otherwise, we are separated by a space with a key as the first item
119 else:
120 pieces = []
121 for p in parts:
122 # fix misplaced semi-colons in keys in some GFF2 files
123 if p and p[0] == ";":
124 p = p[1:]
125 pieces.append(p.strip().split(" "))
126 key_vals = [(p[0], " ".join(p[1:])) for p in pieces]
127 for item in key_vals:
128 # standard in-spec items are key=value
129 if len(item) == 2:
130 key, val = item
131 # out-of-spec files can have just key values. We set an empty value
132 # which will be changed to true later to standardize.
133 else:
134 assert len(item) == 1, item
135 key = item[0]
136 val = ""
137 # remove quotes in GFF2 files
138 quoted = False
139 if len(val) > 0 and val[0] == '"' and val[-1] == '"':
140 quoted = True
141 val = val[1:-1]
142 if val:
143 if quoted:
144 quals[key].append(val)
145 else:
146 quals[key].extend(
147 [v for v in val.split(",") if v]
148 )
149 # if we don't have a value, make this a key=True/False style
150 # attribute
151 else:
152 quals[key].append("true")
153 for key, vals in quals.items():
154 quals[key] = [urllib.parse.unquote(v) for v in vals]
155 return quals, is_gff2
156
157 def _nest_gff2_features(gff_parts):
158 """Provide nesting of GFF2 transcript parts with transcript IDs.
159
160 exons and coding sequences are mapped to a parent with a transcript_id
161 in GFF2. This is implemented differently at different genome centers
162 and this function attempts to resolve that and map things to the GFF3
163 way of doing them.
164 """
165 # map protein or transcript ids to a parent
166 for transcript_id in [
167 "transcript_id",
168 "transcriptId",
169 "proteinId",
170 ]:
171 try:
172 gff_parts["quals"]["Parent"] = gff_parts["quals"][
173 transcript_id
174 ]
175 break
176 except KeyError:
177 pass
178 # case for WormBase GFF -- everything labelled as Transcript or CDS
179 for flat_name in ["Transcript", "CDS"]:
180 if flat_name in gff_parts["quals"]:
181 # parent types
182 if gff_parts["type"] in [flat_name]:
183 if not gff_parts["id"]:
184 gff_parts["id"] = gff_parts["quals"][
185 flat_name
186 ][0]
187 gff_parts["quals"]["ID"] = [gff_parts["id"]]
188 # children types
189 elif gff_parts["type"] in [
190 "intron",
191 "exon",
192 "three_prime_UTR",
193 "coding_exon",
194 "five_prime_UTR",
195 "CDS",
196 "stop_codon",
197 "start_codon",
198 ]:
199 gff_parts["quals"]["Parent"] = gff_parts["quals"][
200 flat_name
201 ]
202 break
203
204 return gff_parts
205
206 strand_map = {"+": 1, "-": -1, "?": None, None: None}
207 line = line.strip()
208 if line[:2] == "##":
209 return [("directive", line[2:])]
210 elif line and line[0] != "#":
211 parts = line.split("\t")
212 should_do = True
213 if params.limit_info:
214 for limit_name, limit_values in params.limit_info.items():
215 cur_id = tuple(
216 [parts[i] for i in params.filter_info[limit_name]]
217 )
218 if cur_id not in limit_values:
219 should_do = False
220 break
221 if should_do:
222 assert len(parts) >= 8, line
223 # not python2.4 compatible but easier to understand
224 # gff_parts = [(None if p == '.' else p) for p in parts]
225 gff_parts = []
226 for p in parts:
227 if p == ".":
228 gff_parts.append(None)
229 else:
230 gff_parts.append(p)
231 gff_info = dict()
232 # collect all of the base qualifiers for this item
233 if len(parts) > 8:
234 quals, is_gff2 = _split_keyvals(gff_parts[8])
235 else:
236 quals, is_gff2 = collections.defaultdict(list), False
237 gff_info["is_gff2"] = is_gff2
238 if gff_parts[1]:
239 quals["source"].append(gff_parts[1])
240 if gff_parts[5]:
241 quals["score"].append(gff_parts[5])
242 if gff_parts[7]:
243 quals["phase"].append(gff_parts[7])
244 gff_info["quals"] = dict(quals)
245 gff_info["rec_id"] = gff_parts[0]
246 # if we are describing a location, then we are a feature
247 if gff_parts[3] and gff_parts[4]:
248 gff_info["location"] = [
249 int(gff_parts[3]) - 1,
250 int(gff_parts[4]),
251 ]
252 gff_info["type"] = gff_parts[2]
253 gff_info["id"] = quals.get("ID", [""])[0]
254 gff_info["strand"] = strand_map.get(
255 gff_parts[6], None
256 )
257 if is_gff2:
258 gff_info = _nest_gff2_features(gff_info)
259 # features that have parents need to link so we can pick up
260 # the relationship
261 if "Parent" in gff_info["quals"]:
262 # check for self referential parent/child relationships
263 # remove the ID, which is not useful
264 for p in gff_info["quals"]["Parent"]:
265 if p == gff_info["id"]:
266 gff_info["id"] = ""
267 del gff_info["quals"]["ID"]
268 break
269 final_key = "child"
270 elif gff_info["id"]:
271 final_key = "parent"
272 # Handle flat features
273 else:
274 final_key = "feature"
275 # otherwise, associate these annotations with the full record
276 else:
277 final_key = "annotation"
278 if params.jsonify:
279 return [(final_key, json.dumps(gff_info))]
280 else:
281 return [(final_key, gff_info)]
282 return []
283
284
285 def _gff_line_reduce(map_results, out, params):
286 """Reduce part of Map-Reduce; combines results of parsed features."""
287 final_items = dict()
288 for gff_type, final_val in map_results:
289 if params.jsonify and gff_type not in ["directive"]:
290 final_val = json.loads(final_val)
291 try:
292 final_items[gff_type].append(final_val)
293 except KeyError:
294 final_items[gff_type] = [final_val]
295 for key, vals in final_items.items():
296 if params.jsonify:
297 vals = json.dumps(vals)
298 out.add(key, vals)
299
300
301 class _MultiIDRemapper:
302 """Provide an ID remapping for cases where a parent has a non-unique ID.
303
304 Real life GFF3 cases have non-unique ID attributes, which we fix here
305 by using the unique sequence region to assign children to the right
306 parent.
307 """
308
309 def __init__(self, base_id, all_parents):
310 self._base_id = base_id
311 self._parents = all_parents
312
313 def remap_id(self, feature_dict):
314 rstart, rend = feature_dict["location"]
315 for index, parent in enumerate(self._parents):
316 pstart, pend = parent["location"]
317 if rstart >= pstart and rend <= pend:
318 if index > 0:
319 return "%s_%s" % (self._base_id, index + 1)
320 else:
321 return self._base_id
322 # if we haven't found a location match but parents are umabiguous,
323 # return that
324 if len(self._parents) == 1:
325 return self._base_id
326 raise ValueError(
327 "Did not find remapped ID location: %s, %s, %s"
328 % (
329 self._base_id,
330 [p["location"] for p in self._parents],
331 feature_dict["location"],
332 )
333 )
334
335
336 class _AbstractMapReduceGFF:
337 """Base class providing general GFF parsing for local and remote classes.
338
339 This class should be subclassed to provide a concrete class to parse
340 GFF under specific conditions. These classes need to implement
341 the _gff_process function, which returns a dictionary of SeqRecord
342 information.
343 """
344
345 def __init__(self, create_missing=True):
346 """Initialize GFF parser
347
348 create_missing - If True, create blank records for GFF ids not in
349 the base_dict. If False, an error will be raised.
350 """
351 self._create_missing = create_missing
352 self._map_fn = _gff_line_map
353 self._reduce_fn = _gff_line_reduce
354 self._examiner = GFFExaminer()
355
356 def _gff_process(self, gff_files, limit_info, target_lines=None):
357 raise NotImplementedError("Derived class must define")
358
359 def parse(self, gff_files, base_dict=None, limit_info=None):
360 """Parse a GFF file, returning an iterator of SeqRecords.
361
362 limit_info - A dictionary specifying the regions of the GFF file
363 which should be extracted. This allows only relevant portions of a file
364 to be parsed.
365
366 base_dict - A base dictionary of SeqRecord objects which may be
367 pre-populated with sequences and other features. The new features from
368 the GFF file will be added to this dictionary.
369 """
370 for rec in self.parse_in_parts(
371 gff_files, base_dict, limit_info
372 ):
373 yield rec
374
375 def parse_in_parts(
376 self,
377 gff_files,
378 base_dict=None,
379 limit_info=None,
380 target_lines=None,
381 ):
382 """Parse a region of a GFF file specified, returning info as generated.
383
384 target_lines -- The number of lines in the file which should be used
385 for each partial parse. This should be determined based on available
386 memory.
387 """
388 for results in self.parse_simple(
389 gff_files, limit_info, target_lines
390 ):
391 if base_dict is None:
392 cur_dict = dict()
393 else:
394 cur_dict = copy.deepcopy(base_dict)
395 cur_dict = self._results_to_features(cur_dict, results)
396 all_ids = list(cur_dict.keys())
397 all_ids.sort()
398 for cur_id in all_ids:
399 yield cur_dict[cur_id]
400
401 def parse_simple(
402 self, gff_files, limit_info=None, target_lines=1
403 ):
404 """Simple parse which does not build or nest features.
405
406 This returns a simple dictionary representation of each line in the
407 GFF file.
408 """
409 # gracefully handle a single file passed
410 if not isinstance(gff_files, (list, tuple)):
411 gff_files = [gff_files]
412 limit_info = self._normalize_limit_info(limit_info)
413 for results in self._gff_process(
414 gff_files, limit_info, target_lines
415 ):
416 yield results
417
418 def _normalize_limit_info(self, limit_info):
419 """Turn all limit information into tuples for identical comparisons."""
420 final_limit_info = {}
421 if limit_info:
422 for key, values in limit_info.items():
423 final_limit_info[key] = []
424 for v in values:
425 if isinstance(v, str):
426 final_limit_info[key].append((v,))
427 else:
428 final_limit_info[key].append(tuple(v))
429 return final_limit_info
430
431 def _results_to_features(self, base, results):
432 """Add parsed dictionaries of results to Biopython SeqFeatures."""
433 base = self._add_annotations(
434 base, results.get("annotation", [])
435 )
436 for feature in results.get("feature", []):
437 (_, base) = self._add_toplevel_feature(base, feature)
438 base = self._add_parent_child_features(
439 base, results.get("parent", []), results.get("child", [])
440 )
441 base = self._add_seqs(base, results.get("fasta", []))
442 base = self._add_directives(
443 base, results.get("directive", [])
444 )
445 return base
446
447 def _add_directives(self, base, directives):
448 """Handle any directives or meta-data in the GFF file.
449
450 Relevant items are added as annotation meta-data to each record.
451 """
452 dir_keyvals = collections.defaultdict(list)
453 for directive in directives:
454 parts = directive.split()
455 if len(parts) > 1:
456 key = parts[0]
457 if len(parts) == 2:
458 val = parts[1]
459 else:
460 val = tuple(parts[1:])
461 # specific directives that need special handling
462 if (
463 key == "sequence-region"
464 ): # convert to Python 0-based coordinates
465 if len(val) == 2: # handle regions missing contig
466 val = (int(val[0]) - 1, int(val[1]))
467 elif len(val) == 3:
468 val = (val[0], int(val[1]) - 1, int(val[2]))
469 dir_keyvals[key].append(val)
470 for key, vals in dir_keyvals.items():
471 for rec in base.values():
472 self._add_ann_to_rec(rec, key, vals)
473 return base
474
475 def _get_matching_record_id(self, base, find_id):
476 """Find a matching base record with the test identifier, handling tricky cases.
477
478 NCBI IDs https://en.wikipedia.org/wiki/FASTA_format#NCBI_identifiers
479 """
480 # Straight matches for identifiers
481 if find_id in base:
482 return find_id
483 # NCBI style IDs in find_id
484 elif find_id and find_id.find("|") > 0:
485 for test_id in [
486 x.strip() for x in find_id.split("|")[1:]
487 ]:
488 if test_id and test_id in base:
489 return test_id
490 # NCBI style IDs in base IDs
491 else:
492 for base_id in base.keys():
493 if base_id.find("|") > 0:
494 for test_id in [
495 x.strip() for x in base_id.split("|")[1:]
496 ]:
497 if test_id and test_id == find_id:
498 return base_id
499 return None
500
501 def _add_seqs(self, base, recs):
502 """Add sequence information contained in the GFF3 to records."""
503 for rec in recs:
504 match_id = self._get_matching_record_id(base, rec.id)
505 if match_id:
506 base[match_id].seq = rec.seq
507 else:
508 base[rec.id] = rec
509 return base
510
511 def _add_parent_child_features(self, base, parents, children):
512 """Add nested features with parent child relationships."""
513 multi_remap = self._identify_dup_ids(parents)
514 # add children features
515 children_prep = collections.defaultdict(list)
516 for child_dict in children:
517 child_feature = self._get_feature(child_dict)
518 for pindex, pid in enumerate(
519 child_feature.qualifiers["Parent"]
520 ):
521 if pid in multi_remap:
522 pid = multi_remap[pid].remap_id(child_dict)
523 child_feature.qualifiers["Parent"][pindex] = pid
524 children_prep[pid].append(
525 (child_dict["rec_id"], child_feature)
526 )
527 children = dict(children_prep)
528 # add children to parents that exist
529 for cur_parent_dict in parents:
530 cur_id = cur_parent_dict["id"]
531 if cur_id in multi_remap:
532 cur_parent_dict["id"] = multi_remap[cur_id].remap_id(
533 cur_parent_dict
534 )
535 cur_parent, base = self._add_toplevel_feature(
536 base, cur_parent_dict
537 )
538 cur_parent, children = self._add_children_to_parent(
539 cur_parent, children
540 )
541 # create parents for children without them (GFF2 or split/bad files)
542 while len(children) > 0:
543 parent_id, cur_children = next(
544 itertools.islice(children.items(), 1)
545 )
546 # one child, do not nest it
547 if len(cur_children) == 1:
548 rec_id, child = cur_children[0]
549 loc = (child.location.start, child.location.end)
550 rec, base = self._get_rec(
551 base, dict(rec_id=rec_id, location=loc)
552 )
553 rec.features.append(child)
554 del children[parent_id]
555 else:
556 cur_parent, base = self._add_missing_parent(
557 base, parent_id, cur_children
558 )
559 cur_parent, children = self._add_children_to_parent(
560 cur_parent, children
561 )
562 return base
563
564 def _identify_dup_ids(self, parents):
565 """Identify duplicated ID attributes in potential nested parents.
566
567 According to the GFF3 spec ID attributes are supposed to be unique
568 for a file, but this is not always true in practice. This looks
569 for duplicates, and provides unique IDs sorted by locations.
570 """
571 multi_ids = collections.defaultdict(list)
572 for parent in parents:
573 multi_ids[parent["id"]].append(parent)
574 multi_ids = [
575 (mid, ps)
576 for (mid, ps) in multi_ids.items()
577 if len(parents) > 1
578 ]
579 multi_remap = dict()
580 for mid, parents in multi_ids:
581 multi_remap[mid] = _MultiIDRemapper(mid, parents)
582 return multi_remap
583
584 def _add_children_to_parent(self, cur_parent, children):
585 """Recursively add children to parent features."""
586 if cur_parent.id in children:
587 cur_children = children[cur_parent.id]
588 ready_children = []
589 for _, cur_child in cur_children:
590 cur_child, _ = self._add_children_to_parent(
591 cur_child, children
592 )
593 ready_children.append(cur_child)
594 # Support Biopython features for 1.62+
595 # CompoundLocations and pre-1.62
596 if not hasattr(SeqFeature, "CompoundLocation"):
597 cur_parent.location_operator = "join"
598 for cur_child in ready_children:
599 cur_parent.sub_features.append(cur_child)
600 del children[cur_parent.id]
601 return cur_parent, children
602
603 def _add_annotations(self, base, anns):
604 """Add annotation data from the GFF file to records."""
605 # add these as a list of annotations, checking not to overwrite
606 # current values
607 for ann in anns:
608 rec, base = self._get_rec(base, ann)
609 for key, vals in ann["quals"].items():
610 self._add_ann_to_rec(rec, key, vals)
611 return base
612
613 def _add_ann_to_rec(self, rec, key, vals):
614 """Add a key/value annotation to the given SeqRecord."""
615 if key in rec.annotations:
616 try:
617 rec.annotations[key].extend(vals)
618 except AttributeError:
619 rec.annotations[key] = [rec.annotations[key]] + vals
620 else:
621 rec.annotations[key] = vals
622
623 def _get_rec(self, base, info_dict):
624 """Retrieve a record to add features to."""
625 max_loc = info_dict.get("location", (0, 1))[1]
626 match_id = self._get_matching_record_id(
627 base, info_dict["rec_id"]
628 )
629 if match_id:
630 cur_rec = base[match_id]
631 # update generated unknown sequences
632 if unknown_seq_avail and isinstance(
633 cur_rec.seq, UnknownSeq
634 ):
635 cur_rec.seq._length = max(
636 [max_loc, cur_rec.seq._length]
637 )
638 elif not unknown_seq_avail and isinstance(
639 cur_rec.seq._data, _UndefinedSequenceData
640 ):
641 cur_rec.seq._data._length = max(
642 [max_loc, cur_rec.seq._data._length]
643 )
644 return cur_rec, base
645 elif self._create_missing:
646 if unknown_seq_avail:
647 new_rec = SeqRecord(
648 UnknownSeq(max_loc), info_dict["rec_id"]
649 )
650 else:
651 new_rec = SeqRecord(
652 Seq(None, length=max_loc), info_dict["rec_id"]
653 )
654 base[info_dict["rec_id"]] = new_rec
655 return new_rec, base
656 else:
657 raise KeyError(
658 "Did not find matching record in %s for %s"
659 % (base.keys(), info_dict)
660 )
661
662 def _add_missing_parent(self, base, parent_id, cur_children):
663 """Add a new feature that is missing from the GFF file."""
664 base_rec_id = list(set(c[0] for c in cur_children))
665 child_strands = list(
666 set(c[1].location.strand for c in cur_children)
667 )
668 inferred_strand = (
669 child_strands[0] if len(child_strands) == 1 else None
670 )
671 assert len(base_rec_id) > 0
672 feature_dict = dict(
673 id=parent_id,
674 strand=inferred_strand,
675 type="inferred_parent",
676 quals=dict(ID=[parent_id]),
677 rec_id=base_rec_id[0],
678 )
679 coords = [
680 (c.location.start, c.location.end)
681 for r, c in cur_children
682 ]
683 feature_dict["location"] = (
684 min([c[0] for c in coords]),
685 max([c[1] for c in coords]),
686 )
687 return self._add_toplevel_feature(base, feature_dict)
688
689 def _add_toplevel_feature(self, base, feature_dict):
690 """Add a toplevel non-nested feature to the appropriate record."""
691 new_feature = self._get_feature(feature_dict)
692 rec, base = self._get_rec(base, feature_dict)
693 rec.features.append(new_feature)
694 return new_feature, base
695
696 def _get_feature(self, feature_dict):
697 """Retrieve a Biopython feature from our dictionary representation."""
698 # location = SeqFeature.FeatureLocation(*feature_dict['location'])
699 rstart, rend = feature_dict["location"]
700 new_feature = SeqFeature.SeqFeature(
701 SeqFeature.SimpleLocation(
702 start=rstart, end=rend, strand=feature_dict["strand"]
703 ),
704 feature_dict["type"],
705 id=feature_dict["id"],
706 )
707 # Support for Biopython 1.68 and above, which removed sub_features
708 if not hasattr(new_feature, "sub_features"):
709 new_feature.sub_features = []
710 new_feature.qualifiers = feature_dict["quals"]
711 return new_feature
712
713 def _parse_fasta(self, in_handle):
714 """Parse FASTA sequence information contained in the GFF3 file."""
715 return list(SeqIO.parse(in_handle, "fasta"))
716
717
718 class _GFFParserLocalOut:
719 """Provide a collector for local GFF MapReduce file parsing."""
720
721 def __init__(self, smart_breaks=False):
722 self._items = dict()
723 self._smart_breaks = smart_breaks
724 self._missing_keys = collections.defaultdict(int)
725 self._last_parent = None
726 self.can_break = True
727 self.num_lines = 0
728
729 def add(self, key, vals):
730 if self._smart_breaks:
731 # if we are not GFF2 we expect parents and break
732 # based on not having missing ones
733 if key == "directive":
734 if vals[0] == "#":
735 self.can_break = True
736 self._last_parent = None
737 elif not vals[0].get("is_gff2", False):
738 self._update_missing_parents(key, vals)
739 self.can_break = len(self._missing_keys) == 0
740 # break when we are done with stretches of child features
741 elif key != "child":
742 self.can_break = True
743 self._last_parent = None
744 # break when we have lots of child features in a row
745 # and change between parents
746 else:
747 cur_parent = vals[0]["quals"]["Parent"][0]
748 if self._last_parent:
749 self.can_break = cur_parent != self._last_parent
750 self._last_parent = cur_parent
751 self.num_lines += 1
752 try:
753 self._items[key].extend(vals)
754 except KeyError:
755 self._items[key] = vals
756
757 def _update_missing_parents(self, key, vals):
758 # smart way of deciding if we can break this.
759 # if this is too much, can go back to not breaking in the
760 # middle of children
761 if key in ["child"]:
762 for val in vals:
763 for p_id in val["quals"]["Parent"]:
764 self._missing_keys[p_id] += 1
765 for val in vals:
766 try:
767 del self._missing_keys[val["quals"]["ID"][0]]
768 except KeyError:
769 pass
770
771 def has_items(self):
772 return len(self._items) > 0
773
774 def get_results(self):
775 self._last_parent = None
776 return self._items
777
778
779 class GFFParser(_AbstractMapReduceGFF):
780 """Local GFF parser providing standardized parsing of GFF files."""
781
782 def __init__(self, line_adjust_fn=None, create_missing=True):
783 _AbstractMapReduceGFF.__init__(
784 self, create_missing=create_missing
785 )
786 self._line_adjust_fn = line_adjust_fn
787
788 def _gff_process(self, gff_files, limit_info, target_lines):
789 """Process GFF addition without any parallelization.
790
791 In addition to limit filtering, this accepts a target_lines attribute
792 which provides a number of lines to parse before returning results.
793 This allows partial parsing of a file to prevent memory issues.
794 """
795 line_gen = self._file_line_generator(gff_files)
796 for out in self._lines_to_out_info(
797 line_gen, limit_info, target_lines
798 ):
799 yield out
800
801 def _file_line_generator(self, gff_files):
802 """Generate single lines from a set of GFF files."""
803 for gff_file in gff_files:
804 if hasattr(gff_file, "read"):
805 need_close = False
806 in_handle = gff_file
807 else:
808 need_close = True
809 in_handle = open(gff_file)
810 while 1:
811 line = in_handle.readline()
812 if not line:
813 break
814 yield line
815 if need_close:
816 in_handle.close()
817
818 def _lines_to_out_info(self, line_iter, limit_info=None, target_lines=None):
819 """Generate SeqRecord and SeqFeatures from GFF file lines."""
820 params = self._examiner._get_local_params(limit_info)
821 out_info = _GFFParserLocalOut(
822 (target_lines is not None and target_lines > 1)
823 )
824 found_seqs = False
825 for line in line_iter:
826 results = self._map_fn(line, params)
827 if self._line_adjust_fn and results:
828 if results[0][0] not in ["directive"]:
829 results = [
830 (
831 results[0][0],
832 self._line_adjust_fn(results[0][1]),
833 )
834 ]
835 self._reduce_fn(results, out_info, params)
836 if (
837 target_lines
838 and out_info.num_lines >= target_lines
839 and out_info.can_break
840 ):
841 yield out_info.get_results()
842 out_info = _GFFParserLocalOut(
843 (target_lines is not None and target_lines > 1)
844 )
845 if (
846 results
847 and results[0][0] == "directive"
848 and results[0][1] == "FASTA"
849 ):
850 found_seqs = True
851 break
852
853 class FakeHandle:
854 def __init__(self, line_iter):
855 self._iter = line_iter
856
857 def __iter__(self):
858 return self
859
860 def __next__(self):
861 return next(self._iter)
862
863 next = __next__
864
865 def read(self, size=-1):
866 if size < 0:
867 return "".join(x for x in self._iter)
868 elif size == 0:
869 return "" # Used by Biopython to sniff unicode vs bytes
870 else:
871 raise NotImplementedError
872
873 def readline(self):
874 try:
875 return next(self._iter)
876 except StopIteration:
877 return ""
878
879 if found_seqs:
880 fasta_recs = self._parse_fasta(FakeHandle(line_iter))
881 out_info.add("fasta", fasta_recs)
882 if out_info.has_items():
883 yield out_info.get_results()
884
885
886 class DiscoGFFParser(_AbstractMapReduceGFF):
887 """GFF Parser with parallelization (http://discoproject.org."""
888
889 def __init__(self, disco_host, create_missing=True):
890 """Initialize parser.
891
892 disco_host - Web reference to a Disco host which will be used for
893 parallelizing the GFF reading job.
894 """
895 _AbstractMapReduceGFF.__init__(
896 self, create_missing=create_missing
897 )
898 self._disco_host = disco_host
899
900 def _gff_process(self, gff_files, limit_info, target_lines=None):
901 """Process GFF addition, using Disco to parallelize the process."""
902 assert target_lines is None, "Cannot split parallelized jobs"
903 # make these imports local; only need them when using disco
904 # absolute path names unless they are special disco files
905 full_files = []
906 for f in gff_files:
907 if f.split(":")[0] != "disco":
908 full_files.append(os.path.abspath(f))
909 else:
910 full_files.append(f)
911 results = disco.job(
912 self._disco_host,
913 name="gff_reader",
914 input=full_files,
915 params=disco.Params(
916 limit_info=limit_info,
917 jsonify=True,
918 filter_info=self._examiner._filter_info,
919 ),
920 required_modules=["json", "collections", "re"],
921 map=self._map_fn,
922 reduce=self._reduce_fn,
923 )
924 processed = dict()
925 for out_key, out_val in disco.result_iterator(results):
926 processed[out_key] = json.loads(out_val)
927 yield processed
928
929
930 def parse(
931 gff_files, base_dict=None, limit_info=None, target_lines=None
932 ):
933 """parse GFF files into SeqRecords and SeqFeatures."""
934 parser = GFFParser()
935 for rec in parser.parse_in_parts(
936 gff_files, base_dict, limit_info, target_lines
937 ):
938 yield rec
939
940
941 def parse_simple(gff_files, limit_info=None):
942 """Parse GFF files as line by line dictionary of parts."""
943 parser = GFFParser()
944 for rec in parser.parse_simple(gff_files, limit_info=limit_info):
945 if "child" in rec:
946 assert "parent" not in rec
947 yield rec["child"][0]
948 elif "parent" in rec:
949 yield rec["parent"][0]
950 elif "feature" in rec:
951 yield rec["feature"][0]
952 # ignore directive lines
953 else:
954 assert "directive" in rec
955
956
957 def _file_or_handle(fn):
958 """Decorator to handle either an input handle or a file."""
959
960 def _file_or_handle_inside(*args, **kwargs):
961 in_file = args[1]
962 if hasattr(in_file, "read"):
963 need_close = False
964 in_handle = in_file
965 if six.PY3 and not isinstance(in_handle, io.TextIOBase):
966 raise TypeError(
967 "input handle must be opened in text mode"
968 )
969 else:
970 need_close = True
971 in_handle = open(in_file)
972 args = (args[0], in_handle) + args[2:]
973 out = fn(*args, **kwargs)
974 if need_close:
975 in_handle.close()
976 return out
977
978 return _file_or_handle_inside
979
980
981 class GFFExaminer:
982 """Provide high level details about a GFF file to refine parsing.
983
984 GFF is a spec and is provided by many different centers. Real life files
985 will present the same information in slightly different ways. Becoming
986 familiar with the file you are dealing with is the best way to extract the
987 information you need. This class provides high level summary details to
988 help in learning.
989 """
990
991 def __init__(self):
992 self._filter_info = dict(
993 gff_id=[0],
994 gff_source_type=[1, 2],
995 gff_source=[1],
996 gff_type=[2],
997 )
998
999 def _get_local_params(self, limit_info=None):
1000 class _LocalParams:
1001 def __init__(self):
1002 self.jsonify = False
1003
1004 params = _LocalParams()
1005 params.limit_info = limit_info
1006 params.filter_info = self._filter_info
1007 return params
1008
1009 @_file_or_handle
1010 def available_limits(self, gff_handle):
1011 """Return dictionary information on possible limits for this file.
1012
1013 This returns a nested dictionary with the following structure:
1014
1015 keys -- names of items to filter by
1016 values -- dictionary with:
1017 keys -- filter choice
1018 value -- counts of that filter in this file
1019
1020 Not a parallelized map-reduce implementation.
1021 """
1022 cur_limits = dict()
1023 for filter_key in self._filter_info.keys():
1024 cur_limits[filter_key] = collections.defaultdict(int)
1025 for line in gff_handle:
1026 # when we hit FASTA sequences, we are done with annotations
1027 if line.startswith("##FASTA"):
1028 break
1029 # ignore empty and comment lines
1030 if line.strip() and line.strip()[0] != "#":
1031 parts = [p.strip() for p in line.split("\t")]
1032 assert len(parts) >= 8, line
1033 parts = parts[:9]
1034 for (
1035 filter_key,
1036 cur_indexes,
1037 ) in self._filter_info.items():
1038 cur_id = tuple([parts[i] for i in cur_indexes])
1039 cur_limits[filter_key][cur_id] += 1
1040 # get rid of the default dicts
1041 final_dict = dict()
1042 for key, value_dict in cur_limits.items():
1043 if len(key) == 1:
1044 key = key[0]
1045 final_dict[key] = dict(value_dict)
1046 gff_handle.close()
1047 return final_dict
1048
1049 @_file_or_handle
1050 def parent_child_map(self, gff_handle):
1051 """Provide a mapping of parent to child relationships in the file.
1052
1053 Returns a dictionary of parent child relationships:
1054
1055 keys -- tuple of (source, type) for each parent
1056 values -- tuple of (source, type) as children of that parent
1057
1058 Not a parallelized map-reduce implementation.
1059 """
1060 # collect all of the parent and child types mapped to IDs
1061 parent_sts = dict()
1062 child_sts = collections.defaultdict(list)
1063 for line in gff_handle:
1064 # when we hit FASTA sequences, we are done with annotations
1065 if line.startswith("##FASTA"):
1066 break
1067 if line.strip() and not line.startswith("#"):
1068 line_type, line_info = _gff_line_map(
1069 line, self._get_local_params()
1070 )[0]
1071 if line_type == "parent" or (
1072 line_type == "child" and line_info["id"]
1073 ):
1074 parent_sts[line_info["id"]] = (
1075 line_info["quals"].get("source", [""])[0],
1076 line_info["type"],
1077 )
1078 if line_type == "child":
1079 for parent_id in line_info["quals"]["Parent"]:
1080 child_sts[parent_id].append(
1081 (
1082 line_info["quals"].get(
1083 "source", [""]
1084 )[0],
1085 line_info["type"],
1086 )
1087 )
1088 # print parent_sts, child_sts
1089 # generate a dictionary of the unique final type relationships
1090 pc_map = collections.defaultdict(list)
1091 for parent_id, parent_type in parent_sts.items():
1092 for child_type in child_sts[parent_id]:
1093 pc_map[parent_type].append(child_type)
1094 pc_final_map = dict()
1095 for ptype, ctypes in pc_map.items():
1096 unique_ctypes = list(set(ctypes))
1097 unique_ctypes.sort()
1098 pc_final_map[ptype] = unique_ctypes
1099 return pc_final_map