Mercurial > repos > cpt > cpt_convert_mga
comparison cpt_convert_mga_to_gff3.py @ 10:4100ee965124 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
| author | cpt |
|---|---|
| date | Mon, 05 Jun 2023 02:40:41 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 9:06645c14adc0 | 10:4100ee965124 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import sys | |
| 3 import argparse | |
| 4 from Bio import SeqIO | |
| 5 from Bio.SeqFeature import SeqFeature | |
| 6 from Bio.SeqFeature import FeatureLocation | |
| 7 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature | |
| 8 import logging | |
| 9 | |
| 10 logging.basicConfig(level=logging.INFO) | |
| 11 | |
| 12 | |
| 13 def mga_to_gff3(mga_output, genome): | |
| 14 seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta")) | |
| 15 | |
| 16 current_record = None | |
| 17 for line in mga_output: | |
| 18 if line.startswith("#"): | |
| 19 if line.startswith("# gc = ") or line.startswith("# self:"): | |
| 20 continue | |
| 21 chromId = line.strip().replace("# ", "") | |
| 22 | |
| 23 if " " in chromId: | |
| 24 chromId = chromId[0 : chromId.index(" ")] | |
| 25 | |
| 26 if chromId in seq_dict: | |
| 27 if current_record is not None: | |
| 28 yield current_record | |
| 29 current_record = seq_dict[chromId] | |
| 30 else: | |
| 31 raise Exception( | |
| 32 "Found results for sequence %s which was not in fasta file sequences (%s)" | |
| 33 % (chromId, ", ".join(seq_dict.keys())) | |
| 34 ) | |
| 35 | |
| 36 else: | |
| 37 ( | |
| 38 gene_id, | |
| 39 start, | |
| 40 end, | |
| 41 strand, | |
| 42 phase, | |
| 43 complete, | |
| 44 score, | |
| 45 model, | |
| 46 rbs_start, | |
| 47 rbs_end, | |
| 48 rbs_score, | |
| 49 ) = line.strip().split("\t") | |
| 50 start = int(start) | |
| 51 end = int(end) | |
| 52 strand = +1 if strand == "+" else -1 | |
| 53 | |
| 54 # Correct for gff3 | |
| 55 start -= 1 | |
| 56 | |
| 57 rbs_feat = None | |
| 58 if rbs_start != "-": | |
| 59 rbs_start = int(rbs_start) | |
| 60 rbs_end = int(rbs_end) | |
| 61 rbs_feat = gffSeqFeature( | |
| 62 FeatureLocation(rbs_start, rbs_end), | |
| 63 type="Shine_Dalgarno_sequence", | |
| 64 strand=strand, | |
| 65 qualifiers={ | |
| 66 "ID": "%s.rbs_%s" % (current_record.id, gene_id), | |
| 67 "Source": "MGA", | |
| 68 }, | |
| 69 phase=phase, | |
| 70 source="MGA", | |
| 71 ) | |
| 72 | |
| 73 cds_feat = gffSeqFeature( | |
| 74 FeatureLocation(start, end), | |
| 75 type="CDS", | |
| 76 strand=strand, | |
| 77 qualifiers={ | |
| 78 "Source": "MGA", | |
| 79 "ID": "%s.cds_%s" % (current_record.id, gene_id), | |
| 80 }, | |
| 81 phase=phase, | |
| 82 source="MGA", | |
| 83 ) | |
| 84 | |
| 85 if rbs_feat is not None: | |
| 86 if strand > 0: | |
| 87 gene_start = rbs_start | |
| 88 gene_end = end | |
| 89 else: | |
| 90 gene_start = start | |
| 91 gene_end = rbs_end | |
| 92 else: | |
| 93 gene_start = start | |
| 94 gene_end = end | |
| 95 | |
| 96 gene = gffSeqFeature( | |
| 97 FeatureLocation(gene_start, gene_end), | |
| 98 type="gene", | |
| 99 strand=strand, | |
| 100 id="%s.%s" % (current_record.id, gene_id), | |
| 101 qualifiers={ | |
| 102 "Source": "MGA", | |
| 103 "ID": "%s.%s" % (current_record.id, gene_id), | |
| 104 }, | |
| 105 phase=phase, | |
| 106 source="MGA", | |
| 107 ) | |
| 108 | |
| 109 gene.sub_features = [cds_feat] | |
| 110 if rbs_feat is not None: | |
| 111 gene.sub_features.append(rbs_feat) | |
| 112 current_record.features.append(gene) | |
| 113 yield current_record | |
| 114 | |
| 115 | |
| 116 if __name__ == "__main__": | |
| 117 parser = argparse.ArgumentParser(description="Convert MGA to GFF3", epilog="") | |
| 118 parser.add_argument( | |
| 119 "mga_output", type=argparse.FileType("r"), help="MetaGeneAnnotator Output" | |
| 120 ) | |
| 121 parser.add_argument("genome", type=argparse.FileType("r"), help="Fasta Genome") | |
| 122 args = parser.parse_args() | |
| 123 | |
| 124 for result in mga_to_gff3(**vars(args)): | |
| 125 gffWrite([result], sys.stdout) |
