diff adjacent_features.py @ 1:e29c36ee61e0 draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:42:35 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/adjacent_features.py	Mon Jun 05 02:42:35 2023 +0000
@@ -0,0 +1,444 @@
+#!/usr/bin/env python
+from Bio import SeqIO
+from Bio.Seq import Seq
+from Bio.Data import CodonTable
+from Bio.SeqRecord import SeqRecord
+from Bio.SeqFeature import SeqFeature, FeatureLocation
+from Bio.Alphabet import generic_dna, generic_protein
+import argparse
+import logging
+
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger()
+
+
+def extract_features(
+    genbankFiles=None,
+    fastaFiles=None,
+    upOut=None,
+    downOut=None,
+    genesOnly=False,
+    cdsOnly=True,
+    forceSeqID=False,
+    forward=1,
+    behind=1,
+    outProt=True,
+    tTable=11,
+    fTable=11,
+):
+
+    genList = []
+    fastaList = []
+
+    for fileX in genbankFiles:
+        opener = SeqIO.parse(fileX, "genbank")
+        for (
+            openRec
+        ) in (
+            opener
+        ):  # To turn the generator into objects (or else we end up with a list of generators)
+            genList.append(openRec)
+
+    for fileX in fastaFiles:
+        opener = SeqIO.parse(fileX, "fasta")
+        for openRec in opener:  # Technically flattens multifastas too
+            fastaList.append(openRec)
+
+    for seqMatch in fastaList:
+        longOut = seqMatch.description
+        protID = seqMatch.id
+        if fTable != 0:
+            fSeq = seqMatch.seq.translate(table=fTable, cds=False)
+        else:
+            fSeq = seqMatch.seq
+
+        for gbk in genList:
+            sourceOut = gbk.id
+            num = -1
+            for feat in gbk.features:
+                num += 1
+
+                if (genesOnly and feat.type != "gene") or (
+                    cdsOnly and feat.type != "CDS"
+                ):
+                    continue
+
+                if "codon_start" in feat.qualifiers:
+                    offset = 1 - int(feat.qualifiers["codon_start"][0])
+                else:
+                    offset = 0
+
+                temp = gbk.seq[feat.location.start : feat.location.end]
+                if feat.location.strand == -1:
+                    temp = gbk.seq[feat.location.start : feat.location.end - offset]
+                    temp = temp.reverse_complement()
+                else:
+                    temp = gbk.seq[feat.location.start + offset : feat.location.end]
+
+                if tTable != 0:
+                    try:
+                        gSeq = temp.translate(table=tTable, cds=True)
+                    except CodonTable.TranslationError as cte:
+                        # log.info("Translation issue at %s", cte)
+                        gSeq = temp.translate(table=tTable, cds=False)
+                else:
+                    gSeq = temp
+
+                if not ("protein_id" in feat.qualifiers):
+                    feat.qualifiers["protein_id"] = [
+                        "++++++++"
+                    ]  # Junk value for genesOnly flag
+
+                if (gSeq == fSeq) and (
+                    protID == feat.qualifiers["protein_id"][0] or forceSeqID == False
+                ):
+                    goBack = num - 1
+                    goAhead = num + 1
+                    numBack = behind
+                    numAhead = forward
+                    backList = []
+                    aheadList = []
+
+                    while numBack != 0 and goBack >= 0:
+                        if (genesOnly and gbk.features[goBack].type != "gene") or (
+                            cdsOnly and gbk.features[goBack].type != "CDS"
+                        ):
+                            goBack -= 1
+                            continue
+                        backList.append(gbk.features[goBack])
+                        numBack -= 1
+                        goBack -= 1
+
+                    while numAhead != 0 and goAhead < len(gbk.features):
+                        if (genesOnly and gbk.features[goAhead].type != "gene") or (
+                            cdsOnly and gbk.features[goAhead].type != "CDS"
+                        ):
+                            goAhead += 1
+                            continue
+                        aheadList.append(gbk.features[goAhead])
+                        numAhead -= 1
+                        goAhead += 1
+
+                    backList.reverse()
+                    if feat.location.strand == -1:
+                        tmpList = aheadList
+                        aheadList = backList
+                        backList = tmpList
+
+                    for item in backList:
+                        addition = ""
+                        header = ""
+                        if "product" in item.qualifiers:
+                            addition = " -" + str(item.qualifiers["product"][0]) + "-"
+                        if "protein_id" in item.qualifiers:
+                            header = (
+                                ">"
+                                + (item.qualifiers["protein_id"][0])
+                                + addition
+                                + " (5' of "
+                                + longOut
+                                + " found within "
+                                + sourceOut
+                                + ")\n"
+                            )
+                        else:
+                            header = (
+                                ">"
+                                + (item.qualifiers["locus_tag"][0])
+                                + addition
+                                + " (5' of "
+                                + longOut
+                                + " found within "
+                                + sourceOut
+                                + ")\n"
+                            )
+                        if outProt == True:
+                            if "translation" in item.qualifiers:
+                                upOut.write(header)
+                                upOut.write(
+                                    str(item.qualifiers["translation"][0]) + "\n\n"
+                                )
+                            else:
+                                modS = 0
+                                modE = 0
+                                if "codon_start" in item.qualifiers:
+                                    if item.location.strand > 0:
+                                        modS = (
+                                            int(item.qualifiers["codon_start"][0]) - 1
+                                        )
+                                    else:
+                                        modE = (
+                                            int(item.qualifiers["codon_start"][0]) - 1
+                                        )
+
+                                seqHold = gbk.seq[
+                                    item.location.start
+                                    + modS : item.location.end
+                                    - modE
+                                ]
+                                if item.location.strand == -1:
+                                    seqHold = seqHold.reverse_complement()
+                                if cdsOnly:
+                                    try:
+                                        finalSeq = ""
+                                        if tTable != 0:
+                                            finalSeq = (
+                                                str(
+                                                    seqHold.translate(
+                                                        table=tTable, cds=True
+                                                    )
+                                                )
+                                                + "\n\n"
+                                            )
+                                        else:
+                                            finalSeq = str(seqHold) + "\n\n"
+                                        # upOut.write(header)
+                                        # upOut.write(finalSeq)
+                                    except Exception as bdct:
+                                        log.warn(
+                                            "ERROR %s %s",
+                                            item.qualifiers["locus_tag"][0],
+                                            bdct,
+                                        )
+                                        finalSeq = ""
+                                        if tTable != 0:
+                                            finalSeq = (
+                                                str(
+                                                    seqHold.translate(
+                                                        table=tTable, cds=False
+                                                    )
+                                                )
+                                                + "\n\n"
+                                            )
+                                        else:
+                                            finalSeq = str(seqHold) + "\n\n"
+                                        header = (
+                                            ">"
+                                            + (item.qualifiers["locus_tag"][0])
+                                            + addition
+                                            + " [INCOMPLETE] (5' of "
+                                            + longOut
+                                            + " found within "
+                                            + sourceOut
+                                            + ")\n"
+                                        )
+                                    upOut.write(header)
+                                    upOut.write(finalSeq)
+                                else:
+
+                                    if tTable != 0:
+                                        upOut.write(header)
+                                        upOut.write(
+                                            str(
+                                                seqHold.translate(
+                                                    table=tTable, cds=False
+                                                )
+                                            )
+                                            + "\n\n"
+                                        )
+                                    else:
+                                        upOut.write(header)
+                                        upOut.write(str(seqHold) + "\n\n")
+                        else:
+                            upOut.write(header)
+                            upOut.write(
+                                str(gbk.seq[item.location.start : item.location.end])
+                                + "\n\n"
+                            )
+
+                    for item in aheadList:
+                        addition = ""
+                        header = ""
+                        if "product" in item.qualifiers:
+                            addition = " -" + str(item.qualifiers["product"][0]) + "-"
+                        if "protein_id" in item.qualifiers:
+                            header = (
+                                ">"
+                                + (item.qualifiers["protein_id"][0])
+                                + addition
+                                + " (3' of "
+                                + longOut
+                                + " found within "
+                                + sourceOut
+                                + ")\n"
+                            )
+                        else:
+                            header = (
+                                ">"
+                                + (item.qualifiers["locus_tag"][0])
+                                + addition
+                                + " (3' of "
+                                + longOut
+                                + " found within "
+                                + sourceOut
+                                + ")\n"
+                            )
+                        if outProt == True:
+                            if "translation" in item.qualifiers:
+                                downOut.write(header)
+                                downOut.write(
+                                    str(item.qualifiers["translation"][0]) + "\n\n"
+                                )
+                            else:
+                                modS = 0
+                                modE = 0
+                                if "codon_start" in item.qualifiers:
+                                    if item.location.strand > 0:
+                                        modS = (
+                                            int(item.qualifiers["codon_start"][0]) - 1
+                                        )
+                                    else:
+                                        modE = (
+                                            int(item.qualifiers["codon_start"][0]) - 1
+                                        )
+
+                                seqHold = gbk.seq[
+                                    item.location.start
+                                    + modS : item.location.end
+                                    - modE
+                                ]
+                                if item.location.strand == -1:
+                                    seqHold = seqHold.reverse_complement()
+                                if cdsOnly:
+                                    try:
+                                        finalSeq = ""
+                                        if tTable != 0:
+                                            finalSeq = (
+                                                str(
+                                                    seqHold.translate(
+                                                        table=tTable, cds=True
+                                                    )
+                                                )
+                                                + "\n\n"
+                                            )
+                                        else:
+                                            finalSeq = str(seqHold) + "\n\n"
+                                        # downOut.write(header)
+                                        # downOut.write(finalSeq)
+                                    except Exception as bdct:
+                                        log.warn(
+                                            "ERROR %s %s",
+                                            item.qualifiers["locus_tag"][0],
+                                            bdct,
+                                        )
+                                        finalSeq = ""
+                                        if tTable != 0:
+                                            finalSeq = (
+                                                str(
+                                                    seqHold.translate(
+                                                        table=tTable, cds=False
+                                                    )
+                                                )
+                                                + "\n\n"
+                                            )
+                                        else:
+                                            finalSeq = str(seqHold) + "\n\n"
+                                        header = (
+                                            ">"
+                                            + (item.qualifiers["locus_tag"][0])
+                                            + addition
+                                            + " [INCOMPLETE] (3' of "
+                                            + longOut
+                                            + " found within "
+                                            + sourceOut
+                                            + ")\n"
+                                        )
+                                    downOut.write(header)
+                                    downOut.write(finalSeq)
+                                else:
+
+                                    if tTable != 0:
+                                        downOut.write(header)
+                                        downOut.write(
+                                            str(
+                                                seqHold.translate(
+                                                    table=tTable, cds=False
+                                                )
+                                            )
+                                            + "\n\n"
+                                        )
+                                    else:
+                                        downOut.write(header)
+                                        downOut.write(str(seqHold) + "\n\n")
+                        else:
+                            downOut.write(header)
+                            downOut.write(
+                                str(gbk.seq[item.location.start : item.location.end])
+                                + "\n\n"
+                            )
+            # print(longOut)
+
+    return
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(
+        description="Export a subset of features from a Genbank file", epilog=""
+    )
+    parser.add_argument(
+        "-genbankFiles", nargs="+", type=argparse.FileType("r"), help="Genbank file"
+    )
+    parser.add_argument(
+        "-fastaFiles",
+        nargs="+",
+        type=argparse.FileType("r"),
+        help="Fasta file to match against",
+    )
+    parser.add_argument(
+        "-tTable",
+        type=int,
+        default=11,
+        help="Translation table to use",
+        choices=range(0, 23),
+    )
+    parser.add_argument(
+        "-fTable",
+        type=int,
+        default=11,
+        help="Translation table to use",
+        choices=range(0, 23),
+    )
+    parser.add_argument(
+        "-upOut",
+        type=argparse.FileType("w"),
+        help="Upstream Fasta output",
+        default="test-data/upOut.fa",
+    )
+    parser.add_argument(
+        "-downOut",
+        type=argparse.FileType("w"),
+        help="Downstream Fasta output",
+        default="test-data/downOut.fa",
+    )
+    parser.add_argument(
+        "--genesOnly",
+        action="store_true",
+        help="Search and return only Gene type features",
+    )
+    parser.add_argument(
+        "--cdsOnly",
+        action="store_true",
+        help="Search and return only CDS type features",
+    )
+    parser.add_argument(
+        "--forceSeqID",
+        action="store_true",
+        help="Search and return only CDS type features",
+    )
+    parser.add_argument(
+        "--outProt", action="store_true", help="Output the translated sequence"
+    )
+    parser.add_argument(
+        "--forward",
+        type=int,
+        default=1,
+        help="Number of features upstream from the hit to return",
+    )
+    parser.add_argument(
+        "--behind",
+        type=int,
+        default=1,
+        help="Number of features downstream from the hit to return",
+    )
+    args = parser.parse_args()
+    extract_features(**vars(args))