Mercurial > repos > cpt > cpt_convert_mga
comparison cpt_convert_mga_to_gff3.py @ 10:4100ee965124 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:40:41 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
9:06645c14adc0 | 10:4100ee965124 |
---|---|
1 #!/usr/bin/env python | |
2 import sys | |
3 import argparse | |
4 from Bio import SeqIO | |
5 from Bio.SeqFeature import SeqFeature | |
6 from Bio.SeqFeature import FeatureLocation | |
7 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature | |
8 import logging | |
9 | |
10 logging.basicConfig(level=logging.INFO) | |
11 | |
12 | |
13 def mga_to_gff3(mga_output, genome): | |
14 seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta")) | |
15 | |
16 current_record = None | |
17 for line in mga_output: | |
18 if line.startswith("#"): | |
19 if line.startswith("# gc = ") or line.startswith("# self:"): | |
20 continue | |
21 chromId = line.strip().replace("# ", "") | |
22 | |
23 if " " in chromId: | |
24 chromId = chromId[0 : chromId.index(" ")] | |
25 | |
26 if chromId in seq_dict: | |
27 if current_record is not None: | |
28 yield current_record | |
29 current_record = seq_dict[chromId] | |
30 else: | |
31 raise Exception( | |
32 "Found results for sequence %s which was not in fasta file sequences (%s)" | |
33 % (chromId, ", ".join(seq_dict.keys())) | |
34 ) | |
35 | |
36 else: | |
37 ( | |
38 gene_id, | |
39 start, | |
40 end, | |
41 strand, | |
42 phase, | |
43 complete, | |
44 score, | |
45 model, | |
46 rbs_start, | |
47 rbs_end, | |
48 rbs_score, | |
49 ) = line.strip().split("\t") | |
50 start = int(start) | |
51 end = int(end) | |
52 strand = +1 if strand == "+" else -1 | |
53 | |
54 # Correct for gff3 | |
55 start -= 1 | |
56 | |
57 rbs_feat = None | |
58 if rbs_start != "-": | |
59 rbs_start = int(rbs_start) | |
60 rbs_end = int(rbs_end) | |
61 rbs_feat = gffSeqFeature( | |
62 FeatureLocation(rbs_start, rbs_end), | |
63 type="Shine_Dalgarno_sequence", | |
64 strand=strand, | |
65 qualifiers={ | |
66 "ID": "%s.rbs_%s" % (current_record.id, gene_id), | |
67 "Source": "MGA", | |
68 }, | |
69 phase=phase, | |
70 source="MGA", | |
71 ) | |
72 | |
73 cds_feat = gffSeqFeature( | |
74 FeatureLocation(start, end), | |
75 type="CDS", | |
76 strand=strand, | |
77 qualifiers={ | |
78 "Source": "MGA", | |
79 "ID": "%s.cds_%s" % (current_record.id, gene_id), | |
80 }, | |
81 phase=phase, | |
82 source="MGA", | |
83 ) | |
84 | |
85 if rbs_feat is not None: | |
86 if strand > 0: | |
87 gene_start = rbs_start | |
88 gene_end = end | |
89 else: | |
90 gene_start = start | |
91 gene_end = rbs_end | |
92 else: | |
93 gene_start = start | |
94 gene_end = end | |
95 | |
96 gene = gffSeqFeature( | |
97 FeatureLocation(gene_start, gene_end), | |
98 type="gene", | |
99 strand=strand, | |
100 id="%s.%s" % (current_record.id, gene_id), | |
101 qualifiers={ | |
102 "Source": "MGA", | |
103 "ID": "%s.%s" % (current_record.id, gene_id), | |
104 }, | |
105 phase=phase, | |
106 source="MGA", | |
107 ) | |
108 | |
109 gene.sub_features = [cds_feat] | |
110 if rbs_feat is not None: | |
111 gene.sub_features.append(rbs_feat) | |
112 current_record.features.append(gene) | |
113 yield current_record | |
114 | |
115 | |
116 if __name__ == "__main__": | |
117 parser = argparse.ArgumentParser(description="Convert MGA to GFF3", epilog="") | |
118 parser.add_argument( | |
119 "mga_output", type=argparse.FileType("r"), help="MetaGeneAnnotator Output" | |
120 ) | |
121 parser.add_argument("genome", type=argparse.FileType("r"), help="Fasta Genome") | |
122 args = parser.parse_args() | |
123 | |
124 for result in mga_to_gff3(**vars(args)): | |
125 gffWrite([result], sys.stdout) |