Mercurial > repos > cpt > cpt_convert_mga
diff cpt_convert_mga_to_gff3.py @ 10:4100ee965124 draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:40:41 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cpt_convert_mga_to_gff3.py Mon Jun 05 02:40:41 2023 +0000 @@ -0,0 +1,125 @@ +#!/usr/bin/env python +import sys +import argparse +from Bio import SeqIO +from Bio.SeqFeature import SeqFeature +from Bio.SeqFeature import FeatureLocation +from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature +import logging + +logging.basicConfig(level=logging.INFO) + + +def mga_to_gff3(mga_output, genome): + seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta")) + + current_record = None + for line in mga_output: + if line.startswith("#"): + if line.startswith("# gc = ") or line.startswith("# self:"): + continue + chromId = line.strip().replace("# ", "") + + if " " in chromId: + chromId = chromId[0 : chromId.index(" ")] + + if chromId in seq_dict: + if current_record is not None: + yield current_record + current_record = seq_dict[chromId] + else: + raise Exception( + "Found results for sequence %s which was not in fasta file sequences (%s)" + % (chromId, ", ".join(seq_dict.keys())) + ) + + else: + ( + gene_id, + start, + end, + strand, + phase, + complete, + score, + model, + rbs_start, + rbs_end, + rbs_score, + ) = line.strip().split("\t") + start = int(start) + end = int(end) + strand = +1 if strand == "+" else -1 + + # Correct for gff3 + start -= 1 + + rbs_feat = None + if rbs_start != "-": + rbs_start = int(rbs_start) + rbs_end = int(rbs_end) + rbs_feat = gffSeqFeature( + FeatureLocation(rbs_start, rbs_end), + type="Shine_Dalgarno_sequence", + strand=strand, + qualifiers={ + "ID": "%s.rbs_%s" % (current_record.id, gene_id), + "Source": "MGA", + }, + phase=phase, + source="MGA", + ) + + cds_feat = gffSeqFeature( + FeatureLocation(start, end), + type="CDS", + strand=strand, + qualifiers={ + "Source": "MGA", + "ID": "%s.cds_%s" % (current_record.id, gene_id), + }, + phase=phase, + source="MGA", + ) + + if rbs_feat is not None: + if strand > 0: + gene_start = rbs_start + gene_end = end + else: + gene_start = start + gene_end = rbs_end + else: + gene_start = start + gene_end = end + + gene = gffSeqFeature( + FeatureLocation(gene_start, gene_end), + type="gene", + strand=strand, + id="%s.%s" % (current_record.id, gene_id), + qualifiers={ + "Source": "MGA", + "ID": "%s.%s" % (current_record.id, gene_id), + }, + phase=phase, + source="MGA", + ) + + gene.sub_features = [cds_feat] + if rbs_feat is not None: + gene.sub_features.append(rbs_feat) + current_record.features.append(gene) + yield current_record + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Convert MGA to GFF3", epilog="") + parser.add_argument( + "mga_output", type=argparse.FileType("r"), help="MetaGeneAnnotator Output" + ) + parser.add_argument("genome", type=argparse.FileType("r"), help="Fasta Genome") + args = parser.parse_args() + + for result in mga_to_gff3(**vars(args)): + gffWrite([result], sys.stdout)