view cpt_convert_mga_to_gff3.py @ 10:4100ee965124 draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:40:41 +0000
parents
children
line wrap: on
line source

#!/usr/bin/env python
import sys
import argparse
from Bio import SeqIO
from Bio.SeqFeature import SeqFeature
from Bio.SeqFeature import FeatureLocation
from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature
import logging

logging.basicConfig(level=logging.INFO)


def mga_to_gff3(mga_output, genome):
    seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta"))

    current_record = None
    for line in mga_output:
        if line.startswith("#"):
            if line.startswith("# gc = ") or line.startswith("# self:"):
                continue
            chromId = line.strip().replace("# ", "")

            if " " in chromId:
                chromId = chromId[0 : chromId.index(" ")]

            if chromId in seq_dict:
                if current_record is not None:
                    yield current_record
                current_record = seq_dict[chromId]
            else:
                raise Exception(
                    "Found results for sequence %s which was not in fasta file sequences (%s)"
                    % (chromId, ", ".join(seq_dict.keys()))
                )

        else:
            (
                gene_id,
                start,
                end,
                strand,
                phase,
                complete,
                score,
                model,
                rbs_start,
                rbs_end,
                rbs_score,
            ) = line.strip().split("\t")
            start = int(start)
            end = int(end)
            strand = +1 if strand == "+" else -1

            # Correct for gff3
            start -= 1

            rbs_feat = None
            if rbs_start != "-":
                rbs_start = int(rbs_start)
                rbs_end = int(rbs_end)
                rbs_feat = gffSeqFeature(
                    FeatureLocation(rbs_start, rbs_end),
                    type="Shine_Dalgarno_sequence",
                    strand=strand,
                    qualifiers={
                        "ID": "%s.rbs_%s" % (current_record.id, gene_id),
                        "Source": "MGA",
                    },
                    phase=phase,
                    source="MGA",
                )

            cds_feat = gffSeqFeature(
                FeatureLocation(start, end),
                type="CDS",
                strand=strand,
                qualifiers={
                    "Source": "MGA",
                    "ID": "%s.cds_%s" % (current_record.id, gene_id),
                },
                phase=phase,
                source="MGA",
            )

            if rbs_feat is not None:
                if strand > 0:
                    gene_start = rbs_start
                    gene_end = end
                else:
                    gene_start = start
                    gene_end = rbs_end
            else:
                gene_start = start
                gene_end = end

            gene = gffSeqFeature(
                FeatureLocation(gene_start, gene_end),
                type="gene",
                strand=strand,
                id="%s.%s" % (current_record.id, gene_id),
                qualifiers={
                    "Source": "MGA",
                    "ID": "%s.%s" % (current_record.id, gene_id),
                },
                phase=phase,
                source="MGA",
            )

            gene.sub_features = [cds_feat]
            if rbs_feat is not None:
                gene.sub_features.append(rbs_feat)
            current_record.features.append(gene)
    yield current_record


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Convert MGA to GFF3", epilog="")
    parser.add_argument(
        "mga_output", type=argparse.FileType("r"), help="MetaGeneAnnotator Output"
    )
    parser.add_argument("genome", type=argparse.FileType("r"), help="Fasta Genome")
    args = parser.parse_args()

    for result in mga_to_gff3(**vars(args)):
        gffWrite([result], sys.stdout)