comparison cpt_convert_mga_to_gff3.py @ 10:4100ee965124 draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:40:41 +0000
parents
children
comparison
equal deleted inserted replaced
9:06645c14adc0 10:4100ee965124
1 #!/usr/bin/env python
2 import sys
3 import argparse
4 from Bio import SeqIO
5 from Bio.SeqFeature import SeqFeature
6 from Bio.SeqFeature import FeatureLocation
7 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature
8 import logging
9
10 logging.basicConfig(level=logging.INFO)
11
12
13 def mga_to_gff3(mga_output, genome):
14 seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta"))
15
16 current_record = None
17 for line in mga_output:
18 if line.startswith("#"):
19 if line.startswith("# gc = ") or line.startswith("# self:"):
20 continue
21 chromId = line.strip().replace("# ", "")
22
23 if " " in chromId:
24 chromId = chromId[0 : chromId.index(" ")]
25
26 if chromId in seq_dict:
27 if current_record is not None:
28 yield current_record
29 current_record = seq_dict[chromId]
30 else:
31 raise Exception(
32 "Found results for sequence %s which was not in fasta file sequences (%s)"
33 % (chromId, ", ".join(seq_dict.keys()))
34 )
35
36 else:
37 (
38 gene_id,
39 start,
40 end,
41 strand,
42 phase,
43 complete,
44 score,
45 model,
46 rbs_start,
47 rbs_end,
48 rbs_score,
49 ) = line.strip().split("\t")
50 start = int(start)
51 end = int(end)
52 strand = +1 if strand == "+" else -1
53
54 # Correct for gff3
55 start -= 1
56
57 rbs_feat = None
58 if rbs_start != "-":
59 rbs_start = int(rbs_start)
60 rbs_end = int(rbs_end)
61 rbs_feat = gffSeqFeature(
62 FeatureLocation(rbs_start, rbs_end),
63 type="Shine_Dalgarno_sequence",
64 strand=strand,
65 qualifiers={
66 "ID": "%s.rbs_%s" % (current_record.id, gene_id),
67 "Source": "MGA",
68 },
69 phase=phase,
70 source="MGA",
71 )
72
73 cds_feat = gffSeqFeature(
74 FeatureLocation(start, end),
75 type="CDS",
76 strand=strand,
77 qualifiers={
78 "Source": "MGA",
79 "ID": "%s.cds_%s" % (current_record.id, gene_id),
80 },
81 phase=phase,
82 source="MGA",
83 )
84
85 if rbs_feat is not None:
86 if strand > 0:
87 gene_start = rbs_start
88 gene_end = end
89 else:
90 gene_start = start
91 gene_end = rbs_end
92 else:
93 gene_start = start
94 gene_end = end
95
96 gene = gffSeqFeature(
97 FeatureLocation(gene_start, gene_end),
98 type="gene",
99 strand=strand,
100 id="%s.%s" % (current_record.id, gene_id),
101 qualifiers={
102 "Source": "MGA",
103 "ID": "%s.%s" % (current_record.id, gene_id),
104 },
105 phase=phase,
106 source="MGA",
107 )
108
109 gene.sub_features = [cds_feat]
110 if rbs_feat is not None:
111 gene.sub_features.append(rbs_feat)
112 current_record.features.append(gene)
113 yield current_record
114
115
116 if __name__ == "__main__":
117 parser = argparse.ArgumentParser(description="Convert MGA to GFF3", epilog="")
118 parser.add_argument(
119 "mga_output", type=argparse.FileType("r"), help="MetaGeneAnnotator Output"
120 )
121 parser.add_argument("genome", type=argparse.FileType("r"), help="Fasta Genome")
122 args = parser.parse_args()
123
124 for result in mga_to_gff3(**vars(args)):
125 gffWrite([result], sys.stdout)