diff shinefind.py @ 4:5004ddb62700 draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:53:31 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/shinefind.py	Mon Jun 05 02:53:31 2023 +0000
@@ -0,0 +1,433 @@
+#!/usr/bin/env python
+import re
+import sys
+import argparse
+import logging
+from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature
+from Bio import SeqIO
+from Bio.SeqRecord import SeqRecord
+from Bio.SeqFeature import FeatureLocation
+from gff3 import (
+    feature_lambda,
+    feature_test_type,
+    feature_test_true,
+    feature_test_quals,
+    get_id,
+    ensure_location_in_bounds,
+)
+
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger()
+
+
+class NaiveSDCaller(object):
+
+    # TODO May make switch for different sequence sets
+    SD_SEQUENCES = (
+        "AGGAGGT",
+        "GGAGGT",
+        "AGGAGG",
+        "GGGGGG",
+        "AGGAG",
+        "GAGGT",
+        "GGAGG",
+        "GGGGG",
+        "AGGT",
+        "GGGT",
+        "GAGG",
+        "GGGG",
+        "AGGA",
+        "GGAG",
+        "GGA",
+        "GAG",
+        "AGG",
+        "GGT",
+        "GGG",
+    )
+
+    def __init__(self):
+        self.sd_reg = [re.compile(x, re.IGNORECASE) for x in self.SD_SEQUENCES]
+
+    def list_sds(self, sequence, sd_min=3, sd_max=17):
+        hits = []
+        for regex in self.sd_reg:
+            for match in regex.finditer(sequence):
+                spacing = len(sequence) - len(match.group()) - match.start()
+                if sd_max >= spacing + sd_min and spacing + sd_min >= sd_min:
+                    # if the spacing is within gap limits, add
+                    # (search space is [sd_max+7 .. sd_min] so actual gap is spacing+sd_min)
+                    # print('min %d max %d - adding SD with gap %d' % (sd_min, sd_max, spacing+sd_min))
+                    hits.append(
+                        {
+                            "spacing": spacing,
+                            "hit": match.group(),
+                            "start": match.start(),
+                            "end": match.end(),
+                            "len": len(match.group()),
+                        }
+                    )
+        hits = sorted(hits, key=lambda x: (-x["len"], x["spacing"]))
+        return hits
+
+    @classmethod
+    def highlight_sd(cls, sequence, start, end):
+        return " ".join(
+            [
+                sequence[0:start].lower(),
+                sequence[start:end].upper(),
+                sequence[end:].lower(),
+            ]
+        )
+
+    @classmethod
+    def to_features(
+        cls,
+        hits,
+        strand,
+        parent_start,
+        parent_end,
+        feature_id=None,
+        sd_min=3,
+        sd_max=17,
+    ):
+        results = []
+        for idx, hit in enumerate(hits):
+            # gene            complement(124..486)
+            # -1      491     501     0       5       5
+            # -1      491     501     0       4       5
+            # -1      491     501     1       4       5
+            # -1      491     501     2       3       5
+            # -1      491     501     1       3       5
+            # -1      491     501     0       3       5
+
+            qualifiers = {
+                "source": "CPT_ShineFind",
+                "ID": "%s.rbs-%s" % (feature_id, idx),
+            }
+
+            if strand > 0:
+                start = parent_end - hit["spacing"] - hit["len"]
+                end = parent_end - hit["spacing"]
+            else:
+                start = parent_start + hit["spacing"]
+                end = parent_start + hit["spacing"] + hit["len"]
+            # check that the END of the SD sequence is within the given min/max of parent start/end
+
+            # gap is either the sd_start-cds_end (neg strand) or the sd_end-cds_start (pos strand)
+            # minimum absolute value of these two will be the proper gap regardless of strand
+            tmp = gffSeqFeature(
+                FeatureLocation(min(start, end), max(start, end), strand=strand),
+                # FeatureLocation(min(start, end), max(start, end), strand=strand),
+                type="Shine_Dalgarno_sequence",
+                qualifiers=qualifiers,
+            )
+            results.append(tmp)
+        return results
+
+    def testFeatureUpstream(self, feature, record, sd_min=3, sd_max=17):
+        # Strand information necessary to getting correct upstream sequence
+        strand = feature.location.strand
+
+        # n_bases_upstream (plus/minus 7 upstream to make the min/max define the possible gap position)
+        if strand > 0:
+            start = feature.location.start - sd_max - 7
+            end = feature.location.start - sd_min
+        else:
+            start = feature.location.end + sd_min
+            end = feature.location.end + sd_max + 7
+
+        (start, end) = ensure_location_in_bounds(
+            start=start, end=end, parent_length=len(record)
+        )
+
+        # Create our temp feature used to obtain correct portion of
+        # genome
+        tmp = gffSeqFeature(
+            FeatureLocation(min(start, end), max(start, end), strand=strand),
+            type="domain",
+        )
+        seq = str(tmp.extract(record.seq))
+        return self.list_sds(seq, sd_min, sd_max), start, end, seq
+
+    def hasSd(self, feature, record, sd_min=3, sd_max=17):
+        sds, start, end, seq = self.testFeatureUpstream(
+            feature, record, sd_min=sd_min, sd_max=sd_max
+        )
+        return len(sds) > 0
+
+
+# Cycle through subfeatures, set feature's location to be equal
+# to the smallest start and largest end.
+# Remove pending bugfix for feature display in Apollo
+def fminmax(feature):
+    fmin = None
+    fmax = None
+    for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True):
+        if fmin is None:
+            fmin = sf.location.start
+            fmax = sf.location.end
+        if sf.location.start < fmin:
+            fmin = sf.location.start
+        if sf.location.end > fmax:
+            fmax = sf.location.end
+    return fmin, fmax
+
+
+def fix_gene_boundaries(feature):
+    # There is a bug in Apollo whereby we have created gene
+    # features which are larger than expected, but we cannot see this.
+    # We only see a perfect sized gene + SD together.
+    #
+    # So, we clamp the location of the gene feature to the
+    # contained mRNAs. Will remove pending Apollo upgrade.
+    fmin, fmax = fminmax(feature)
+    if feature.location.strand > 0:
+        feature.location = FeatureLocation(fmin, fmax, strand=1)
+    else:
+        feature.location = FeatureLocation(fmin, fmax, strand=-1)
+    return feature
+
+
+def shinefind(
+    fasta,
+    gff3,
+    gff3_output=None,
+    table_output=None,
+    lookahead_min=3,
+    lookahead_max=17,
+    top_only=False,
+    add=False,
+):
+    table_output.write(
+        "\t".join(
+            [
+                "ID",
+                "Name",
+                "Terminus",
+                "Terminus",
+                "Strand",
+                "Upstream Sequence",
+                "SD",
+                "Spacing",
+            ]
+        )
+        + "\n"
+    )
+
+    sd_finder = NaiveSDCaller()
+    # Load up sequence(s) for GFF3 data
+    seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
+    # Parse GFF3 records
+    for record in gffParse(gff3, base_dict=seq_dict):
+        # Shinefind's gff3_output.
+        gff3_output_record = SeqRecord(record.seq, record.id)
+        # Filter out just coding sequences
+        ignored_features = []
+        for x in record.features:
+            # If feature X does NOT contain a CDS, add to ignored_features
+            # list. This means if we have a top level gene feature with or
+            # without a CDS subfeature, we're catch it appropriately here.
+            if (
+                len(
+                    list(
+                        feature_lambda(
+                            [x], feature_test_type, {"type": "CDS"}, subfeatures=True
+                        )
+                    )
+                )
+                == 0
+            ):
+                ignored_features.append(x)
+
+        # Loop over all gene features
+        for gene in feature_lambda(
+            record.features, feature_test_type, {"type": "gene"}, subfeatures=True
+        ):
+
+            # Get the CDS from this gene.
+            feature = sorted(
+                list(
+                    feature_lambda(
+                        gene.sub_features,
+                        feature_test_type,
+                        {"type": "CDS"},
+                        subfeatures=True,
+                    )
+                ),
+                key=lambda x: x.location.start,
+            )
+            # If no CDSs are in this gene feature, then quit
+            if len(feature) == 0:
+                # We've already caught these above in our ignored_features
+                # list, so we skip out on the rest of this for loop
+                continue
+            else:
+                # Otherwise pull the first on the strand.
+                feature = feature[0]
+
+            # Three different ways RBSs can be stored that we expect.
+            rbs_rbs = list(
+                feature_lambda(
+                    gene.sub_features,
+                    feature_test_type,
+                    {"type": "RBS"},
+                    subfeatures=False,
+                )
+            )
+            rbs_sds = list(
+                feature_lambda(
+                    gene.sub_features,
+                    feature_test_type,
+                    {"type": "Shine_Dalgarno_sequence"},
+                    subfeatures=False,
+                )
+            )
+            regulatory_elements = list(
+                feature_lambda(
+                    gene.sub_features,
+                    feature_test_type,
+                    {"type": "regulatory"},
+                    subfeatures=False,
+                )
+            )
+            rbs_regulatory = list(
+                feature_lambda(
+                    regulatory_elements,
+                    feature_test_quals,
+                    {"regulatory_class": ["ribosome_binding_site"]},
+                    subfeatures=False,
+                )
+            )
+            rbss = rbs_rbs + rbs_sds + rbs_regulatory
+
+            # If someone has already annotated an RBS, we move to the next gene
+            if len(rbss) > 0:
+                log.debug("Has %s RBSs", len(rbss))
+                ignored_features.append(gene)
+                continue
+
+            sds, start, end, seq = sd_finder.testFeatureUpstream(
+                feature, record, sd_min=lookahead_min, sd_max=lookahead_max
+            )
+
+            feature_id = get_id(feature)
+            sd_features = sd_finder.to_features(
+                sds, feature.location.strand, start, end, feature_id=feature.id
+            )
+
+            human_strand = "+" if feature.location.strand == 1 else "-"
+
+            # http://book.pythontips.com/en/latest/for_-_else.html
+            log.debug("Found %s SDs", len(sds))
+            for (sd, sd_feature) in zip(sds, sd_features):
+                # If we only want the top feature, after the bulk of the
+                # forloop executes once, we append the top feature, and fake a
+                # break, because an actual break triggers the else: block
+                table_output.write(
+                    "\t".join(
+                        map(
+                            str,
+                            [
+                                feature.id,
+                                feature_id,
+                                feature.location.start,
+                                feature.location.end,
+                                human_strand,
+                                sd_finder.highlight_sd(seq, sd["start"], sd["end"]),
+                                sd["hit"],
+                                int(sd["spacing"]) + lookahead_min,
+                            ],
+                        )
+                    )
+                    + "\n"
+                )
+
+                if add:
+                    # Append the top RBS to the gene feature
+                    gene.sub_features.append(sd_feature)
+                    # Pick out start/end locations for all sub_features
+                    locations = [x.location.start for x in gene.sub_features] + [
+                        x.location.end for x in gene.sub_features
+                    ]
+                    # Update gene's start/end to be inclusive
+                    gene.location._start = min(locations)
+                    gene.location._end = max(locations)
+                # Also register the feature with the separate GFF3 output
+                sd_feature = fix_gene_boundaries(sd_feature)
+                gff3_output_record.features.append(sd_feature)
+
+                if top_only or sd == (sds[-1]):
+                    break
+            else:
+                table_output.write(
+                    "\t".join(
+                        map(
+                            str,
+                            [
+                                feature.id,
+                                feature_id,
+                                feature.location.start,
+                                feature.location.end,
+                                human_strand,
+                                seq,
+                                None,
+                                -1,
+                            ],
+                        )
+                    )
+                    + "\n"
+                )
+
+        record.annotations = {}
+        gffWrite([record], sys.stdout)
+
+        gff3_output_record.features = sorted(
+            gff3_output_record.features, key=lambda x: x.location.start
+        )
+        gff3_output_record.annotations = {}
+        gffWrite([gff3_output_record], gff3_output)
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences")
+    parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
+    parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations")
+
+    parser.add_argument(
+        "--gff3_output",
+        type=argparse.FileType("w"),
+        help="GFF3 Output",
+        default="shinefind.gff3",
+    )
+    parser.add_argument(
+        "--table_output",
+        type=argparse.FileType("w"),
+        help="Tabular Output",
+        default="shinefind.tbl",
+    )
+
+    parser.add_argument(
+        "--lookahead_min",
+        nargs="?",
+        type=int,
+        help="Number of bases upstream of CDSs to end search",
+        default=3,
+    )
+    parser.add_argument(
+        "--lookahead_max",
+        nargs="?",
+        type=int,
+        help="Number of bases upstream of CDSs to begin search",
+        default=17,
+    )
+
+    parser.add_argument("--top_only", action="store_true", help="Only report best hits")
+    parser.add_argument(
+        "--add",
+        action="store_true",
+        help='Function in "addition" mode whereby the '
+        + "RBSs are added directly to the gene model.",
+    )
+
+    args = parser.parse_args()
+    shinefind(**vars(args))