Mercurial > repos > fubar > jbrowse2
comparison jb2_GFF/GFFParser.py @ 17:4c201a3d4755 draft
planemo upload for repository https://github.com/usegalaxy-eu/temporary-tools/tree/master/jbrowse2 commit a37bfdfc108501b11c7b2aa15efb1bd16f0c4b66
author | fubar |
---|---|
date | Sun, 28 Jan 2024 06:48:52 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
16:1fe91657bfd6 | 17:4c201a3d4755 |
---|---|
1 """Parse GFF files into features attached to Biopython SeqRecord objects. | |
2 | |
3 This deals with GFF3 formatted files, a tab delimited format for storing | |
4 sequence features and annotations: | |
5 | |
6 http://www.sequenceontology.org/gff3.shtml | |
7 | |
8 It will also deal with older GFF versions (GTF/GFF2): | |
9 | |
10 http://www.sanger.ac.uk/Software/formats/GFF/GFF_Spec.shtml | |
11 http://mblab.wustl.edu/GTF22.html | |
12 | |
13 The implementation utilizes map/reduce parsing of GFF using Disco. Disco | |
14 (http://discoproject.org) is a Map-Reduce framework for Python utilizing | |
15 Erlang for parallelization. The code works on a single processor without | |
16 Disco using the same architecture. | |
17 """ | |
18 import os | |
19 import copy | |
20 import json | |
21 import re | |
22 import collections | |
23 import io | |
24 import itertools | |
25 import warnings | |
26 import six | |
27 from six.moves import urllib | |
28 | |
29 from Bio.SeqRecord import SeqRecord | |
30 from Bio import SeqFeature | |
31 from Bio import SeqIO | |
32 from Bio import BiopythonDeprecationWarning | |
33 | |
34 import disco | |
35 | |
36 # Make defaultdict compatible with versions of python older than 2.4 | |
37 try: | |
38 collections.defaultdict | |
39 except AttributeError: | |
40 import _utils | |
41 | |
42 collections.defaultdict = _utils.defaultdict | |
43 | |
44 unknown_seq_avail = False | |
45 try: | |
46 from Bio.Seq import UnknownSeq | |
47 | |
48 unknown_seq_avail = True | |
49 except ImportError: | |
50 # Starting with biopython 1.81, has been removed | |
51 from Bio.Seq import _UndefinedSequenceData | |
52 from Bio.Seq import Seq | |
53 | |
54 | |
55 warnings.simplefilter("ignore", BiopythonDeprecationWarning) | |
56 | |
57 | |
58 def _gff_line_map(line, params): | |
59 """Map part of Map-Reduce; parses a line of GFF into a dictionary. | |
60 | |
61 Given an input line from a GFF file, this: | |
62 - decides if the file passes our filtering limits | |
63 - if so: | |
64 - breaks it into component elements | |
65 - determines the type of attribute (flat, parent, child or annotation) | |
66 - generates a dictionary of GFF info which can be serialized as JSON | |
67 """ | |
68 | |
69 def _merge_keyvals(parts): | |
70 """Merge key-values escaped by quotes | |
71 that are improperly split at semicolons.""" | |
72 out = [] | |
73 for i, p in enumerate(parts): | |
74 if ( | |
75 i > 0 | |
76 and len(p) == 1 | |
77 and p[0].endswith('"') | |
78 and not p[0].startswith('"') | |
79 ): | |
80 if out[-1][-1].startswith('"'): | |
81 prev_p = out.pop(-1) | |
82 to_merge = prev_p[-1] | |
83 prev_p[-1] = "%s; %s" % (to_merge, p[0]) | |
84 out.append(prev_p) | |
85 else: | |
86 out.append(p) | |
87 return out | |
88 | |
89 gff3_kw_pat = re.compile(r"\w+=") | |
90 | |
91 def _split_keyvals(keyval_str): | |
92 """Split key-value pairs in a GFF2, GTF and GFF3 compatible way. | |
93 | |
94 GFF3 has key value pairs like: | |
95 count=9;gene=amx-2;sequence=SAGE:aacggagccg | |
96 GFF2 and GTF have: | |
97 Sequence "Y74C9A" ; Note "Clone Y74C9A; Genbank AC024206" | |
98 name "fgenesh1_pg.C_chr_1000003"; transcriptId 869 | |
99 """ | |
100 quals = collections.defaultdict(list) | |
101 if keyval_str is None: | |
102 return quals | |
103 # ensembl GTF has a stray semi-colon at the end | |
104 if keyval_str[-1] == ";": | |
105 keyval_str = keyval_str[:-1] | |
106 # GFF2/GTF has a semi-colon with at least one space after it. | |
107 # It can have spaces on both sides; wormbase does this. | |
108 # GFF3 works with no spaces. | |
109 # Split at the first one we can recognize as working | |
110 parts = keyval_str.split(" ; ") | |
111 if len(parts) == 1: | |
112 parts = [x.strip() for x in keyval_str.split(";")] | |
113 # check if we have GFF3 style key-vals (with =) | |
114 is_gff2 = True | |
115 if gff3_kw_pat.match(parts[0]): | |
116 is_gff2 = False | |
117 key_vals = _merge_keyvals([p.split("=") for p in parts]) | |
118 # otherwise, we are separated by a space with a key as the first item | |
119 else: | |
120 pieces = [] | |
121 for p in parts: | |
122 # fix misplaced semi-colons in keys in some GFF2 files | |
123 if p and p[0] == ";": | |
124 p = p[1:] | |
125 pieces.append(p.strip().split(" ")) | |
126 key_vals = [(p[0], " ".join(p[1:])) for p in pieces] | |
127 for item in key_vals: | |
128 # standard in-spec items are key=value | |
129 if len(item) == 2: | |
130 key, val = item | |
131 # out-of-spec files can have just key values. We set an empty value | |
132 # which will be changed to true later to standardize. | |
133 else: | |
134 assert len(item) == 1, item | |
135 key = item[0] | |
136 val = "" | |
137 # remove quotes in GFF2 files | |
138 quoted = False | |
139 if len(val) > 0 and val[0] == '"' and val[-1] == '"': | |
140 quoted = True | |
141 val = val[1:-1] | |
142 if val: | |
143 if quoted: | |
144 quals[key].append(val) | |
145 else: | |
146 quals[key].extend( | |
147 [v for v in val.split(",") if v] | |
148 ) | |
149 # if we don't have a value, make this a key=True/False style | |
150 # attribute | |
151 else: | |
152 quals[key].append("true") | |
153 for key, vals in quals.items(): | |
154 quals[key] = [urllib.parse.unquote(v) for v in vals] | |
155 return quals, is_gff2 | |
156 | |
157 def _nest_gff2_features(gff_parts): | |
158 """Provide nesting of GFF2 transcript parts with transcript IDs. | |
159 | |
160 exons and coding sequences are mapped to a parent with a transcript_id | |
161 in GFF2. This is implemented differently at different genome centers | |
162 and this function attempts to resolve that and map things to the GFF3 | |
163 way of doing them. | |
164 """ | |
165 # map protein or transcript ids to a parent | |
166 for transcript_id in [ | |
167 "transcript_id", | |
168 "transcriptId", | |
169 "proteinId", | |
170 ]: | |
171 try: | |
172 gff_parts["quals"]["Parent"] = gff_parts["quals"][ | |
173 transcript_id | |
174 ] | |
175 break | |
176 except KeyError: | |
177 pass | |
178 # case for WormBase GFF -- everything labelled as Transcript or CDS | |
179 for flat_name in ["Transcript", "CDS"]: | |
180 if flat_name in gff_parts["quals"]: | |
181 # parent types | |
182 if gff_parts["type"] in [flat_name]: | |
183 if not gff_parts["id"]: | |
184 gff_parts["id"] = gff_parts["quals"][ | |
185 flat_name | |
186 ][0] | |
187 gff_parts["quals"]["ID"] = [gff_parts["id"]] | |
188 # children types | |
189 elif gff_parts["type"] in [ | |
190 "intron", | |
191 "exon", | |
192 "three_prime_UTR", | |
193 "coding_exon", | |
194 "five_prime_UTR", | |
195 "CDS", | |
196 "stop_codon", | |
197 "start_codon", | |
198 ]: | |
199 gff_parts["quals"]["Parent"] = gff_parts["quals"][ | |
200 flat_name | |
201 ] | |
202 break | |
203 | |
204 return gff_parts | |
205 | |
206 strand_map = {"+": 1, "-": -1, "?": None, None: None} | |
207 line = line.strip() | |
208 if line[:2] == "##": | |
209 return [("directive", line[2:])] | |
210 elif line and line[0] != "#": | |
211 parts = line.split("\t") | |
212 should_do = True | |
213 if params.limit_info: | |
214 for limit_name, limit_values in params.limit_info.items(): | |
215 cur_id = tuple( | |
216 [parts[i] for i in params.filter_info[limit_name]] | |
217 ) | |
218 if cur_id not in limit_values: | |
219 should_do = False | |
220 break | |
221 if should_do: | |
222 assert len(parts) >= 8, line | |
223 # not python2.4 compatible but easier to understand | |
224 # gff_parts = [(None if p == '.' else p) for p in parts] | |
225 gff_parts = [] | |
226 for p in parts: | |
227 if p == ".": | |
228 gff_parts.append(None) | |
229 else: | |
230 gff_parts.append(p) | |
231 gff_info = dict() | |
232 # collect all of the base qualifiers for this item | |
233 if len(parts) > 8: | |
234 quals, is_gff2 = _split_keyvals(gff_parts[8]) | |
235 else: | |
236 quals, is_gff2 = collections.defaultdict(list), False | |
237 gff_info["is_gff2"] = is_gff2 | |
238 if gff_parts[1]: | |
239 quals["source"].append(gff_parts[1]) | |
240 if gff_parts[5]: | |
241 quals["score"].append(gff_parts[5]) | |
242 if gff_parts[7]: | |
243 quals["phase"].append(gff_parts[7]) | |
244 gff_info["quals"] = dict(quals) | |
245 gff_info["rec_id"] = gff_parts[0] | |
246 # if we are describing a location, then we are a feature | |
247 if gff_parts[3] and gff_parts[4]: | |
248 gff_info["location"] = [ | |
249 int(gff_parts[3]) - 1, | |
250 int(gff_parts[4]), | |
251 ] | |
252 gff_info["type"] = gff_parts[2] | |
253 gff_info["id"] = quals.get("ID", [""])[0] | |
254 gff_info["strand"] = strand_map.get( | |
255 gff_parts[6], None | |
256 ) | |
257 if is_gff2: | |
258 gff_info = _nest_gff2_features(gff_info) | |
259 # features that have parents need to link so we can pick up | |
260 # the relationship | |
261 if "Parent" in gff_info["quals"]: | |
262 # check for self referential parent/child relationships | |
263 # remove the ID, which is not useful | |
264 for p in gff_info["quals"]["Parent"]: | |
265 if p == gff_info["id"]: | |
266 gff_info["id"] = "" | |
267 del gff_info["quals"]["ID"] | |
268 break | |
269 final_key = "child" | |
270 elif gff_info["id"]: | |
271 final_key = "parent" | |
272 # Handle flat features | |
273 else: | |
274 final_key = "feature" | |
275 # otherwise, associate these annotations with the full record | |
276 else: | |
277 final_key = "annotation" | |
278 if params.jsonify: | |
279 return [(final_key, json.dumps(gff_info))] | |
280 else: | |
281 return [(final_key, gff_info)] | |
282 return [] | |
283 | |
284 | |
285 def _gff_line_reduce(map_results, out, params): | |
286 """Reduce part of Map-Reduce; combines results of parsed features.""" | |
287 final_items = dict() | |
288 for gff_type, final_val in map_results: | |
289 if params.jsonify and gff_type not in ["directive"]: | |
290 final_val = json.loads(final_val) | |
291 try: | |
292 final_items[gff_type].append(final_val) | |
293 except KeyError: | |
294 final_items[gff_type] = [final_val] | |
295 for key, vals in final_items.items(): | |
296 if params.jsonify: | |
297 vals = json.dumps(vals) | |
298 out.add(key, vals) | |
299 | |
300 | |
301 class _MultiIDRemapper: | |
302 """Provide an ID remapping for cases where a parent has a non-unique ID. | |
303 | |
304 Real life GFF3 cases have non-unique ID attributes, which we fix here | |
305 by using the unique sequence region to assign children to the right | |
306 parent. | |
307 """ | |
308 | |
309 def __init__(self, base_id, all_parents): | |
310 self._base_id = base_id | |
311 self._parents = all_parents | |
312 | |
313 def remap_id(self, feature_dict): | |
314 rstart, rend = feature_dict["location"] | |
315 for index, parent in enumerate(self._parents): | |
316 pstart, pend = parent["location"] | |
317 if rstart >= pstart and rend <= pend: | |
318 if index > 0: | |
319 return "%s_%s" % (self._base_id, index + 1) | |
320 else: | |
321 return self._base_id | |
322 # if we haven't found a location match but parents are umabiguous, | |
323 # return that | |
324 if len(self._parents) == 1: | |
325 return self._base_id | |
326 raise ValueError( | |
327 "Did not find remapped ID location: %s, %s, %s" | |
328 % ( | |
329 self._base_id, | |
330 [p["location"] for p in self._parents], | |
331 feature_dict["location"], | |
332 ) | |
333 ) | |
334 | |
335 | |
336 class _AbstractMapReduceGFF: | |
337 """Base class providing general GFF parsing for local and remote classes. | |
338 | |
339 This class should be subclassed to provide a concrete class to parse | |
340 GFF under specific conditions. These classes need to implement | |
341 the _gff_process function, which returns a dictionary of SeqRecord | |
342 information. | |
343 """ | |
344 | |
345 def __init__(self, create_missing=True): | |
346 """Initialize GFF parser | |
347 | |
348 create_missing - If True, create blank records for GFF ids not in | |
349 the base_dict. If False, an error will be raised. | |
350 """ | |
351 self._create_missing = create_missing | |
352 self._map_fn = _gff_line_map | |
353 self._reduce_fn = _gff_line_reduce | |
354 self._examiner = GFFExaminer() | |
355 | |
356 def _gff_process(self, gff_files, limit_info, target_lines=None): | |
357 raise NotImplementedError("Derived class must define") | |
358 | |
359 def parse(self, gff_files, base_dict=None, limit_info=None): | |
360 """Parse a GFF file, returning an iterator of SeqRecords. | |
361 | |
362 limit_info - A dictionary specifying the regions of the GFF file | |
363 which should be extracted. This allows only relevant portions of a file | |
364 to be parsed. | |
365 | |
366 base_dict - A base dictionary of SeqRecord objects which may be | |
367 pre-populated with sequences and other features. The new features from | |
368 the GFF file will be added to this dictionary. | |
369 """ | |
370 for rec in self.parse_in_parts( | |
371 gff_files, base_dict, limit_info | |
372 ): | |
373 yield rec | |
374 | |
375 def parse_in_parts( | |
376 self, | |
377 gff_files, | |
378 base_dict=None, | |
379 limit_info=None, | |
380 target_lines=None, | |
381 ): | |
382 """Parse a region of a GFF file specified, returning info as generated. | |
383 | |
384 target_lines -- The number of lines in the file which should be used | |
385 for each partial parse. This should be determined based on available | |
386 memory. | |
387 """ | |
388 for results in self.parse_simple( | |
389 gff_files, limit_info, target_lines | |
390 ): | |
391 if base_dict is None: | |
392 cur_dict = dict() | |
393 else: | |
394 cur_dict = copy.deepcopy(base_dict) | |
395 cur_dict = self._results_to_features(cur_dict, results) | |
396 all_ids = list(cur_dict.keys()) | |
397 all_ids.sort() | |
398 for cur_id in all_ids: | |
399 yield cur_dict[cur_id] | |
400 | |
401 def parse_simple( | |
402 self, gff_files, limit_info=None, target_lines=1 | |
403 ): | |
404 """Simple parse which does not build or nest features. | |
405 | |
406 This returns a simple dictionary representation of each line in the | |
407 GFF file. | |
408 """ | |
409 # gracefully handle a single file passed | |
410 if not isinstance(gff_files, (list, tuple)): | |
411 gff_files = [gff_files] | |
412 limit_info = self._normalize_limit_info(limit_info) | |
413 for results in self._gff_process( | |
414 gff_files, limit_info, target_lines | |
415 ): | |
416 yield results | |
417 | |
418 def _normalize_limit_info(self, limit_info): | |
419 """Turn all limit information into tuples for identical comparisons.""" | |
420 final_limit_info = {} | |
421 if limit_info: | |
422 for key, values in limit_info.items(): | |
423 final_limit_info[key] = [] | |
424 for v in values: | |
425 if isinstance(v, str): | |
426 final_limit_info[key].append((v,)) | |
427 else: | |
428 final_limit_info[key].append(tuple(v)) | |
429 return final_limit_info | |
430 | |
431 def _results_to_features(self, base, results): | |
432 """Add parsed dictionaries of results to Biopython SeqFeatures.""" | |
433 base = self._add_annotations( | |
434 base, results.get("annotation", []) | |
435 ) | |
436 for feature in results.get("feature", []): | |
437 (_, base) = self._add_toplevel_feature(base, feature) | |
438 base = self._add_parent_child_features( | |
439 base, results.get("parent", []), results.get("child", []) | |
440 ) | |
441 base = self._add_seqs(base, results.get("fasta", [])) | |
442 base = self._add_directives( | |
443 base, results.get("directive", []) | |
444 ) | |
445 return base | |
446 | |
447 def _add_directives(self, base, directives): | |
448 """Handle any directives or meta-data in the GFF file. | |
449 | |
450 Relevant items are added as annotation meta-data to each record. | |
451 """ | |
452 dir_keyvals = collections.defaultdict(list) | |
453 for directive in directives: | |
454 parts = directive.split() | |
455 if len(parts) > 1: | |
456 key = parts[0] | |
457 if len(parts) == 2: | |
458 val = parts[1] | |
459 else: | |
460 val = tuple(parts[1:]) | |
461 # specific directives that need special handling | |
462 if ( | |
463 key == "sequence-region" | |
464 ): # convert to Python 0-based coordinates | |
465 if len(val) == 2: # handle regions missing contig | |
466 val = (int(val[0]) - 1, int(val[1])) | |
467 elif len(val) == 3: | |
468 val = (val[0], int(val[1]) - 1, int(val[2])) | |
469 dir_keyvals[key].append(val) | |
470 for key, vals in dir_keyvals.items(): | |
471 for rec in base.values(): | |
472 self._add_ann_to_rec(rec, key, vals) | |
473 return base | |
474 | |
475 def _get_matching_record_id(self, base, find_id): | |
476 """Find a matching base record with the test identifier, handling tricky cases. | |
477 | |
478 NCBI IDs https://en.wikipedia.org/wiki/FASTA_format#NCBI_identifiers | |
479 """ | |
480 # Straight matches for identifiers | |
481 if find_id in base: | |
482 return find_id | |
483 # NCBI style IDs in find_id | |
484 elif find_id and find_id.find("|") > 0: | |
485 for test_id in [ | |
486 x.strip() for x in find_id.split("|")[1:] | |
487 ]: | |
488 if test_id and test_id in base: | |
489 return test_id | |
490 # NCBI style IDs in base IDs | |
491 else: | |
492 for base_id in base.keys(): | |
493 if base_id.find("|") > 0: | |
494 for test_id in [ | |
495 x.strip() for x in base_id.split("|")[1:] | |
496 ]: | |
497 if test_id and test_id == find_id: | |
498 return base_id | |
499 return None | |
500 | |
501 def _add_seqs(self, base, recs): | |
502 """Add sequence information contained in the GFF3 to records.""" | |
503 for rec in recs: | |
504 match_id = self._get_matching_record_id(base, rec.id) | |
505 if match_id: | |
506 base[match_id].seq = rec.seq | |
507 else: | |
508 base[rec.id] = rec | |
509 return base | |
510 | |
511 def _add_parent_child_features(self, base, parents, children): | |
512 """Add nested features with parent child relationships.""" | |
513 multi_remap = self._identify_dup_ids(parents) | |
514 # add children features | |
515 children_prep = collections.defaultdict(list) | |
516 for child_dict in children: | |
517 child_feature = self._get_feature(child_dict) | |
518 for pindex, pid in enumerate( | |
519 child_feature.qualifiers["Parent"] | |
520 ): | |
521 if pid in multi_remap: | |
522 pid = multi_remap[pid].remap_id(child_dict) | |
523 child_feature.qualifiers["Parent"][pindex] = pid | |
524 children_prep[pid].append( | |
525 (child_dict["rec_id"], child_feature) | |
526 ) | |
527 children = dict(children_prep) | |
528 # add children to parents that exist | |
529 for cur_parent_dict in parents: | |
530 cur_id = cur_parent_dict["id"] | |
531 if cur_id in multi_remap: | |
532 cur_parent_dict["id"] = multi_remap[cur_id].remap_id( | |
533 cur_parent_dict | |
534 ) | |
535 cur_parent, base = self._add_toplevel_feature( | |
536 base, cur_parent_dict | |
537 ) | |
538 cur_parent, children = self._add_children_to_parent( | |
539 cur_parent, children | |
540 ) | |
541 # create parents for children without them (GFF2 or split/bad files) | |
542 while len(children) > 0: | |
543 parent_id, cur_children = next( | |
544 itertools.islice(children.items(), 1) | |
545 ) | |
546 # one child, do not nest it | |
547 if len(cur_children) == 1: | |
548 rec_id, child = cur_children[0] | |
549 loc = (child.location.start, child.location.end) | |
550 rec, base = self._get_rec( | |
551 base, dict(rec_id=rec_id, location=loc) | |
552 ) | |
553 rec.features.append(child) | |
554 del children[parent_id] | |
555 else: | |
556 cur_parent, base = self._add_missing_parent( | |
557 base, parent_id, cur_children | |
558 ) | |
559 cur_parent, children = self._add_children_to_parent( | |
560 cur_parent, children | |
561 ) | |
562 return base | |
563 | |
564 def _identify_dup_ids(self, parents): | |
565 """Identify duplicated ID attributes in potential nested parents. | |
566 | |
567 According to the GFF3 spec ID attributes are supposed to be unique | |
568 for a file, but this is not always true in practice. This looks | |
569 for duplicates, and provides unique IDs sorted by locations. | |
570 """ | |
571 multi_ids = collections.defaultdict(list) | |
572 for parent in parents: | |
573 multi_ids[parent["id"]].append(parent) | |
574 multi_ids = [ | |
575 (mid, ps) | |
576 for (mid, ps) in multi_ids.items() | |
577 if len(parents) > 1 | |
578 ] | |
579 multi_remap = dict() | |
580 for mid, parents in multi_ids: | |
581 multi_remap[mid] = _MultiIDRemapper(mid, parents) | |
582 return multi_remap | |
583 | |
584 def _add_children_to_parent(self, cur_parent, children): | |
585 """Recursively add children to parent features.""" | |
586 if cur_parent.id in children: | |
587 cur_children = children[cur_parent.id] | |
588 ready_children = [] | |
589 for _, cur_child in cur_children: | |
590 cur_child, _ = self._add_children_to_parent( | |
591 cur_child, children | |
592 ) | |
593 ready_children.append(cur_child) | |
594 # Support Biopython features for 1.62+ | |
595 # CompoundLocations and pre-1.62 | |
596 if not hasattr(SeqFeature, "CompoundLocation"): | |
597 cur_parent.location_operator = "join" | |
598 for cur_child in ready_children: | |
599 cur_parent.sub_features.append(cur_child) | |
600 del children[cur_parent.id] | |
601 return cur_parent, children | |
602 | |
603 def _add_annotations(self, base, anns): | |
604 """Add annotation data from the GFF file to records.""" | |
605 # add these as a list of annotations, checking not to overwrite | |
606 # current values | |
607 for ann in anns: | |
608 rec, base = self._get_rec(base, ann) | |
609 for key, vals in ann["quals"].items(): | |
610 self._add_ann_to_rec(rec, key, vals) | |
611 return base | |
612 | |
613 def _add_ann_to_rec(self, rec, key, vals): | |
614 """Add a key/value annotation to the given SeqRecord.""" | |
615 if key in rec.annotations: | |
616 try: | |
617 rec.annotations[key].extend(vals) | |
618 except AttributeError: | |
619 rec.annotations[key] = [rec.annotations[key]] + vals | |
620 else: | |
621 rec.annotations[key] = vals | |
622 | |
623 def _get_rec(self, base, info_dict): | |
624 """Retrieve a record to add features to.""" | |
625 max_loc = info_dict.get("location", (0, 1))[1] | |
626 match_id = self._get_matching_record_id( | |
627 base, info_dict["rec_id"] | |
628 ) | |
629 if match_id: | |
630 cur_rec = base[match_id] | |
631 # update generated unknown sequences | |
632 if unknown_seq_avail and isinstance( | |
633 cur_rec.seq, UnknownSeq | |
634 ): | |
635 cur_rec.seq._length = max( | |
636 [max_loc, cur_rec.seq._length] | |
637 ) | |
638 elif not unknown_seq_avail and isinstance( | |
639 cur_rec.seq._data, _UndefinedSequenceData | |
640 ): | |
641 cur_rec.seq._data._length = max( | |
642 [max_loc, cur_rec.seq._data._length] | |
643 ) | |
644 return cur_rec, base | |
645 elif self._create_missing: | |
646 if unknown_seq_avail: | |
647 new_rec = SeqRecord( | |
648 UnknownSeq(max_loc), info_dict["rec_id"] | |
649 ) | |
650 else: | |
651 new_rec = SeqRecord( | |
652 Seq(None, length=max_loc), info_dict["rec_id"] | |
653 ) | |
654 base[info_dict["rec_id"]] = new_rec | |
655 return new_rec, base | |
656 else: | |
657 raise KeyError( | |
658 "Did not find matching record in %s for %s" | |
659 % (base.keys(), info_dict) | |
660 ) | |
661 | |
662 def _add_missing_parent(self, base, parent_id, cur_children): | |
663 """Add a new feature that is missing from the GFF file.""" | |
664 base_rec_id = list(set(c[0] for c in cur_children)) | |
665 child_strands = list( | |
666 set(c[1].location.strand for c in cur_children) | |
667 ) | |
668 inferred_strand = ( | |
669 child_strands[0] if len(child_strands) == 1 else None | |
670 ) | |
671 assert len(base_rec_id) > 0 | |
672 feature_dict = dict( | |
673 id=parent_id, | |
674 strand=inferred_strand, | |
675 type="inferred_parent", | |
676 quals=dict(ID=[parent_id]), | |
677 rec_id=base_rec_id[0], | |
678 ) | |
679 coords = [ | |
680 (c.location.start, c.location.end) | |
681 for r, c in cur_children | |
682 ] | |
683 feature_dict["location"] = ( | |
684 min([c[0] for c in coords]), | |
685 max([c[1] for c in coords]), | |
686 ) | |
687 return self._add_toplevel_feature(base, feature_dict) | |
688 | |
689 def _add_toplevel_feature(self, base, feature_dict): | |
690 """Add a toplevel non-nested feature to the appropriate record.""" | |
691 new_feature = self._get_feature(feature_dict) | |
692 rec, base = self._get_rec(base, feature_dict) | |
693 rec.features.append(new_feature) | |
694 return new_feature, base | |
695 | |
696 def _get_feature(self, feature_dict): | |
697 """Retrieve a Biopython feature from our dictionary representation.""" | |
698 # location = SeqFeature.FeatureLocation(*feature_dict['location']) | |
699 rstart, rend = feature_dict["location"] | |
700 new_feature = SeqFeature.SeqFeature( | |
701 SeqFeature.SimpleLocation( | |
702 start=rstart, end=rend, strand=feature_dict["strand"] | |
703 ), | |
704 feature_dict["type"], | |
705 id=feature_dict["id"], | |
706 ) | |
707 # Support for Biopython 1.68 and above, which removed sub_features | |
708 if not hasattr(new_feature, "sub_features"): | |
709 new_feature.sub_features = [] | |
710 new_feature.qualifiers = feature_dict["quals"] | |
711 return new_feature | |
712 | |
713 def _parse_fasta(self, in_handle): | |
714 """Parse FASTA sequence information contained in the GFF3 file.""" | |
715 return list(SeqIO.parse(in_handle, "fasta")) | |
716 | |
717 | |
718 class _GFFParserLocalOut: | |
719 """Provide a collector for local GFF MapReduce file parsing.""" | |
720 | |
721 def __init__(self, smart_breaks=False): | |
722 self._items = dict() | |
723 self._smart_breaks = smart_breaks | |
724 self._missing_keys = collections.defaultdict(int) | |
725 self._last_parent = None | |
726 self.can_break = True | |
727 self.num_lines = 0 | |
728 | |
729 def add(self, key, vals): | |
730 if self._smart_breaks: | |
731 # if we are not GFF2 we expect parents and break | |
732 # based on not having missing ones | |
733 if key == "directive": | |
734 if vals[0] == "#": | |
735 self.can_break = True | |
736 self._last_parent = None | |
737 elif not vals[0].get("is_gff2", False): | |
738 self._update_missing_parents(key, vals) | |
739 self.can_break = len(self._missing_keys) == 0 | |
740 # break when we are done with stretches of child features | |
741 elif key != "child": | |
742 self.can_break = True | |
743 self._last_parent = None | |
744 # break when we have lots of child features in a row | |
745 # and change between parents | |
746 else: | |
747 cur_parent = vals[0]["quals"]["Parent"][0] | |
748 if self._last_parent: | |
749 self.can_break = cur_parent != self._last_parent | |
750 self._last_parent = cur_parent | |
751 self.num_lines += 1 | |
752 try: | |
753 self._items[key].extend(vals) | |
754 except KeyError: | |
755 self._items[key] = vals | |
756 | |
757 def _update_missing_parents(self, key, vals): | |
758 # smart way of deciding if we can break this. | |
759 # if this is too much, can go back to not breaking in the | |
760 # middle of children | |
761 if key in ["child"]: | |
762 for val in vals: | |
763 for p_id in val["quals"]["Parent"]: | |
764 self._missing_keys[p_id] += 1 | |
765 for val in vals: | |
766 try: | |
767 del self._missing_keys[val["quals"]["ID"][0]] | |
768 except KeyError: | |
769 pass | |
770 | |
771 def has_items(self): | |
772 return len(self._items) > 0 | |
773 | |
774 def get_results(self): | |
775 self._last_parent = None | |
776 return self._items | |
777 | |
778 | |
779 class GFFParser(_AbstractMapReduceGFF): | |
780 """Local GFF parser providing standardized parsing of GFF files.""" | |
781 | |
782 def __init__(self, line_adjust_fn=None, create_missing=True): | |
783 _AbstractMapReduceGFF.__init__( | |
784 self, create_missing=create_missing | |
785 ) | |
786 self._line_adjust_fn = line_adjust_fn | |
787 | |
788 def _gff_process(self, gff_files, limit_info, target_lines): | |
789 """Process GFF addition without any parallelization. | |
790 | |
791 In addition to limit filtering, this accepts a target_lines attribute | |
792 which provides a number of lines to parse before returning results. | |
793 This allows partial parsing of a file to prevent memory issues. | |
794 """ | |
795 line_gen = self._file_line_generator(gff_files) | |
796 for out in self._lines_to_out_info( | |
797 line_gen, limit_info, target_lines | |
798 ): | |
799 yield out | |
800 | |
801 def _file_line_generator(self, gff_files): | |
802 """Generate single lines from a set of GFF files.""" | |
803 for gff_file in gff_files: | |
804 if hasattr(gff_file, "read"): | |
805 need_close = False | |
806 in_handle = gff_file | |
807 else: | |
808 need_close = True | |
809 in_handle = open(gff_file) | |
810 while 1: | |
811 line = in_handle.readline() | |
812 if not line: | |
813 break | |
814 yield line | |
815 if need_close: | |
816 in_handle.close() | |
817 | |
818 def _lines_to_out_info(self, line_iter, limit_info=None, target_lines=None): | |
819 """Generate SeqRecord and SeqFeatures from GFF file lines.""" | |
820 params = self._examiner._get_local_params(limit_info) | |
821 out_info = _GFFParserLocalOut( | |
822 (target_lines is not None and target_lines > 1) | |
823 ) | |
824 found_seqs = False | |
825 for line in line_iter: | |
826 results = self._map_fn(line, params) | |
827 if self._line_adjust_fn and results: | |
828 if results[0][0] not in ["directive"]: | |
829 results = [ | |
830 ( | |
831 results[0][0], | |
832 self._line_adjust_fn(results[0][1]), | |
833 ) | |
834 ] | |
835 self._reduce_fn(results, out_info, params) | |
836 if ( | |
837 target_lines | |
838 and out_info.num_lines >= target_lines | |
839 and out_info.can_break | |
840 ): | |
841 yield out_info.get_results() | |
842 out_info = _GFFParserLocalOut( | |
843 (target_lines is not None and target_lines > 1) | |
844 ) | |
845 if ( | |
846 results | |
847 and results[0][0] == "directive" | |
848 and results[0][1] == "FASTA" | |
849 ): | |
850 found_seqs = True | |
851 break | |
852 | |
853 class FakeHandle: | |
854 def __init__(self, line_iter): | |
855 self._iter = line_iter | |
856 | |
857 def __iter__(self): | |
858 return self | |
859 | |
860 def __next__(self): | |
861 return next(self._iter) | |
862 | |
863 next = __next__ | |
864 | |
865 def read(self, size=-1): | |
866 if size < 0: | |
867 return "".join(x for x in self._iter) | |
868 elif size == 0: | |
869 return "" # Used by Biopython to sniff unicode vs bytes | |
870 else: | |
871 raise NotImplementedError | |
872 | |
873 def readline(self): | |
874 try: | |
875 return next(self._iter) | |
876 except StopIteration: | |
877 return "" | |
878 | |
879 if found_seqs: | |
880 fasta_recs = self._parse_fasta(FakeHandle(line_iter)) | |
881 out_info.add("fasta", fasta_recs) | |
882 if out_info.has_items(): | |
883 yield out_info.get_results() | |
884 | |
885 | |
886 class DiscoGFFParser(_AbstractMapReduceGFF): | |
887 """GFF Parser with parallelization (http://discoproject.org.""" | |
888 | |
889 def __init__(self, disco_host, create_missing=True): | |
890 """Initialize parser. | |
891 | |
892 disco_host - Web reference to a Disco host which will be used for | |
893 parallelizing the GFF reading job. | |
894 """ | |
895 _AbstractMapReduceGFF.__init__( | |
896 self, create_missing=create_missing | |
897 ) | |
898 self._disco_host = disco_host | |
899 | |
900 def _gff_process(self, gff_files, limit_info, target_lines=None): | |
901 """Process GFF addition, using Disco to parallelize the process.""" | |
902 assert target_lines is None, "Cannot split parallelized jobs" | |
903 # make these imports local; only need them when using disco | |
904 # absolute path names unless they are special disco files | |
905 full_files = [] | |
906 for f in gff_files: | |
907 if f.split(":")[0] != "disco": | |
908 full_files.append(os.path.abspath(f)) | |
909 else: | |
910 full_files.append(f) | |
911 results = disco.job( | |
912 self._disco_host, | |
913 name="gff_reader", | |
914 input=full_files, | |
915 params=disco.Params( | |
916 limit_info=limit_info, | |
917 jsonify=True, | |
918 filter_info=self._examiner._filter_info, | |
919 ), | |
920 required_modules=["json", "collections", "re"], | |
921 map=self._map_fn, | |
922 reduce=self._reduce_fn, | |
923 ) | |
924 processed = dict() | |
925 for out_key, out_val in disco.result_iterator(results): | |
926 processed[out_key] = json.loads(out_val) | |
927 yield processed | |
928 | |
929 | |
930 def parse( | |
931 gff_files, base_dict=None, limit_info=None, target_lines=None | |
932 ): | |
933 """parse GFF files into SeqRecords and SeqFeatures.""" | |
934 parser = GFFParser() | |
935 for rec in parser.parse_in_parts( | |
936 gff_files, base_dict, limit_info, target_lines | |
937 ): | |
938 yield rec | |
939 | |
940 | |
941 def parse_simple(gff_files, limit_info=None): | |
942 """Parse GFF files as line by line dictionary of parts.""" | |
943 parser = GFFParser() | |
944 for rec in parser.parse_simple(gff_files, limit_info=limit_info): | |
945 if "child" in rec: | |
946 assert "parent" not in rec | |
947 yield rec["child"][0] | |
948 elif "parent" in rec: | |
949 yield rec["parent"][0] | |
950 elif "feature" in rec: | |
951 yield rec["feature"][0] | |
952 # ignore directive lines | |
953 else: | |
954 assert "directive" in rec | |
955 | |
956 | |
957 def _file_or_handle(fn): | |
958 """Decorator to handle either an input handle or a file.""" | |
959 | |
960 def _file_or_handle_inside(*args, **kwargs): | |
961 in_file = args[1] | |
962 if hasattr(in_file, "read"): | |
963 need_close = False | |
964 in_handle = in_file | |
965 if six.PY3 and not isinstance(in_handle, io.TextIOBase): | |
966 raise TypeError( | |
967 "input handle must be opened in text mode" | |
968 ) | |
969 else: | |
970 need_close = True | |
971 in_handle = open(in_file) | |
972 args = (args[0], in_handle) + args[2:] | |
973 out = fn(*args, **kwargs) | |
974 if need_close: | |
975 in_handle.close() | |
976 return out | |
977 | |
978 return _file_or_handle_inside | |
979 | |
980 | |
981 class GFFExaminer: | |
982 """Provide high level details about a GFF file to refine parsing. | |
983 | |
984 GFF is a spec and is provided by many different centers. Real life files | |
985 will present the same information in slightly different ways. Becoming | |
986 familiar with the file you are dealing with is the best way to extract the | |
987 information you need. This class provides high level summary details to | |
988 help in learning. | |
989 """ | |
990 | |
991 def __init__(self): | |
992 self._filter_info = dict( | |
993 gff_id=[0], | |
994 gff_source_type=[1, 2], | |
995 gff_source=[1], | |
996 gff_type=[2], | |
997 ) | |
998 | |
999 def _get_local_params(self, limit_info=None): | |
1000 class _LocalParams: | |
1001 def __init__(self): | |
1002 self.jsonify = False | |
1003 | |
1004 params = _LocalParams() | |
1005 params.limit_info = limit_info | |
1006 params.filter_info = self._filter_info | |
1007 return params | |
1008 | |
1009 @_file_or_handle | |
1010 def available_limits(self, gff_handle): | |
1011 """Return dictionary information on possible limits for this file. | |
1012 | |
1013 This returns a nested dictionary with the following structure: | |
1014 | |
1015 keys -- names of items to filter by | |
1016 values -- dictionary with: | |
1017 keys -- filter choice | |
1018 value -- counts of that filter in this file | |
1019 | |
1020 Not a parallelized map-reduce implementation. | |
1021 """ | |
1022 cur_limits = dict() | |
1023 for filter_key in self._filter_info.keys(): | |
1024 cur_limits[filter_key] = collections.defaultdict(int) | |
1025 for line in gff_handle: | |
1026 # when we hit FASTA sequences, we are done with annotations | |
1027 if line.startswith("##FASTA"): | |
1028 break | |
1029 # ignore empty and comment lines | |
1030 if line.strip() and line.strip()[0] != "#": | |
1031 parts = [p.strip() for p in line.split("\t")] | |
1032 assert len(parts) >= 8, line | |
1033 parts = parts[:9] | |
1034 for ( | |
1035 filter_key, | |
1036 cur_indexes, | |
1037 ) in self._filter_info.items(): | |
1038 cur_id = tuple([parts[i] for i in cur_indexes]) | |
1039 cur_limits[filter_key][cur_id] += 1 | |
1040 # get rid of the default dicts | |
1041 final_dict = dict() | |
1042 for key, value_dict in cur_limits.items(): | |
1043 if len(key) == 1: | |
1044 key = key[0] | |
1045 final_dict[key] = dict(value_dict) | |
1046 gff_handle.close() | |
1047 return final_dict | |
1048 | |
1049 @_file_or_handle | |
1050 def parent_child_map(self, gff_handle): | |
1051 """Provide a mapping of parent to child relationships in the file. | |
1052 | |
1053 Returns a dictionary of parent child relationships: | |
1054 | |
1055 keys -- tuple of (source, type) for each parent | |
1056 values -- tuple of (source, type) as children of that parent | |
1057 | |
1058 Not a parallelized map-reduce implementation. | |
1059 """ | |
1060 # collect all of the parent and child types mapped to IDs | |
1061 parent_sts = dict() | |
1062 child_sts = collections.defaultdict(list) | |
1063 for line in gff_handle: | |
1064 # when we hit FASTA sequences, we are done with annotations | |
1065 if line.startswith("##FASTA"): | |
1066 break | |
1067 if line.strip() and not line.startswith("#"): | |
1068 line_type, line_info = _gff_line_map( | |
1069 line, self._get_local_params() | |
1070 )[0] | |
1071 if line_type == "parent" or ( | |
1072 line_type == "child" and line_info["id"] | |
1073 ): | |
1074 parent_sts[line_info["id"]] = ( | |
1075 line_info["quals"].get("source", [""])[0], | |
1076 line_info["type"], | |
1077 ) | |
1078 if line_type == "child": | |
1079 for parent_id in line_info["quals"]["Parent"]: | |
1080 child_sts[parent_id].append( | |
1081 ( | |
1082 line_info["quals"].get( | |
1083 "source", [""] | |
1084 )[0], | |
1085 line_info["type"], | |
1086 ) | |
1087 ) | |
1088 # print parent_sts, child_sts | |
1089 # generate a dictionary of the unique final type relationships | |
1090 pc_map = collections.defaultdict(list) | |
1091 for parent_id, parent_type in parent_sts.items(): | |
1092 for child_type in child_sts[parent_id]: | |
1093 pc_map[parent_type].append(child_type) | |
1094 pc_final_map = dict() | |
1095 for ptype, ctypes in pc_map.items(): | |
1096 unique_ctypes = list(set(ctypes)) | |
1097 unique_ctypes.sort() | |
1098 pc_final_map[ptype] = unique_ctypes | |
1099 return pc_final_map |