diff gff2gb.py @ 3:c8fcb7246ac3 draft

planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author cpt
date Mon, 05 Jun 2023 02:44:32 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/gff2gb.py	Mon Jun 05 02:44:32 2023 +0000
@@ -0,0 +1,453 @@
+#!/usr/bin/env python
+"""Convert a GFF and associated FASTA file into GenBank format.
+
+Usage:
+gff_to_genbank.py <GFF annotation file> <FASTA sequence file>
+"""
+import argparse
+import sys
+import re
+import copy
+import itertools
+import logging
+from Bio import SeqIO
+
+# from Bio.Alphabet import generic_dna
+from Bio.SeqFeature import CompoundLocation, FeatureLocation
+from CPT_GFFParser import gffParse, gffWrite
+from gff3 import (
+    feature_lambda,
+    wa_unified_product_name,
+    is_uuid,
+    feature_test_type,
+    fsort,
+    feature_test_true,
+    feature_test_quals,
+)
+
+default_name = re.compile(r"^gene_(\d+)$")
+logging.basicConfig(level=logging.INFO)
+
+
+def rename_key(ds, k_f, k_t):
+    """Rename a key in a dictionary and return it, FP style"""
+    # If they key is not in the dictionary, just return immediately
+    if k_f not in ds:
+        return ds
+
+    # Otherwise, we check if the target key is in there
+    if k_t in ds:
+        # If it is, we need to append
+        ds[k_t] += ds[k_f]
+    else:
+        # if not, we can just set.
+        ds[k_t] = ds[k_f]
+
+    # Remove source
+    del ds[k_f]
+    return ds
+
+
+def gff3_to_genbank(gff_file, fasta_file, transltbl):
+    fasta_input = SeqIO.to_dict(SeqIO.parse(fasta_file, "fasta"))  # , generic_dna))
+    gff_iter = gffParse(gff_file, fasta_input)
+
+    for record in gff_iter:
+        yield handle_record(record, transltbl)
+
+
+def handle_non_gene_features(features):
+    # These are NON-GENE features (maybe terminators? etc?)
+    for feature in feature_lambda(
+        features,
+        feature_test_type,
+        {"type": "gene"},
+        subfeatures=False,
+        invert=True,
+        recurse=True,  #  used to catch RBS from new apollo runs (used to be False)
+    ):
+        if feature.type in (
+            "terminator",
+            "tRNA",
+            "Shine_Dalgarno_sequence",
+            "sequence_feature",
+            "recombination_feature",
+            "sequence_alteration",
+            "binding_site",
+        ):
+            yield feature
+        elif feature.type in ("CDS",):
+            pass
+        else:
+            yield feature
+
+
+def fminmax(feature):
+    fmin = None
+    fmax = None
+    for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True):
+        if fmin is None:
+            fmin = sf.location.start
+            fmax = sf.location.end
+        if sf.location.start < fmin:
+            fmin = sf.location.start
+        if sf.location.end > fmax:
+            fmax = sf.location.end
+    return fmin, fmax
+
+
+def fix_gene_boundaries(feature):
+    # There is a frustrating bug in apollo whereby we have created gene
+    # features which are LARGER than expected, but we cannot see this.
+    # We only see a perfect sized gene + great SD together.
+    #
+    # So, we have this awful hack to clamp the location of the gene
+    # feature to the contained mRNAs. This is good enough for now.
+    fmin, fmax = fminmax(feature)
+    if feature.location.strand > 0:
+        feature.location = FeatureLocation(fmin, fmax, strand=1)
+    else:
+        feature.location = FeatureLocation(fmin, fmax, strand=-1)
+    return feature
+
+
+def fix_gene_qualifiers(name, feature, fid):
+    for mRNA in feature.sub_features:
+        mRNA.qualifiers["locus_tag"] = "CPT_%s_%03d" % (name, fid)
+        # And some exons below that
+        sf_replacement = []
+        for sf in mRNA.sub_features:
+            # We set a locus_tag on ALL sub features
+            sf.qualifiers["locus_tag"] = "CPT_%s_%03d" % (name, fid)
+            # Remove Names which are UUIDs
+            # NOT GOOD PRACTICE
+            try:
+                if is_uuid(sf.qualifiers["Name"][0]):
+                    del sf.qualifiers["Name"]
+            except KeyError:
+                continue  # might should go back to pass, I have not put thought into this still
+
+            # If it is the RBS exon (mis-labelled by apollo as 'exon')
+            if sf.type == "exon" and len(sf) < 10:
+                sf.type = "Shine_Dalgarno_sequence"
+                sf_replacement.append(sf)
+            # and if it is the CDS
+            elif sf.type == "CDS":
+                # Update CDS qualifiers with all info that was on parent
+                sf.qualifiers.update(feature.qualifiers)
+                sf_replacement.append(sf)
+            else:
+                sf_replacement.append(sf)
+        if mRNA.type == "tRNA":
+            mRNA.qualifiers["product"] = mRNA.qualifiers["Name"]
+        # Handle multiple child CDS features by merging them.
+        # Replace the subfeatures on the mRNA
+        mRNA.sub_features = merge_multi_cds(sf_replacement)
+    return feature
+
+
+def fix_frameshifted(features):
+    logging.info("Fixing Frameshifted group: [%s]", str(features))
+    genes = features
+    # Find all mRNAs (plus reduce nested list into flattened one)
+    mRNAs = sum([f.sub_features for f in genes], [])
+    # Find all CDSs (plus reduce nested list into flattened one)
+    cdss = sum([m.sub_features for m in mRNAs], [])
+    # List to store the RBSs which we'll break apart + re-attach later.
+    rbss = []
+    # List to store all of the CDSs (i.e. cdss - rbss)
+    cdss2 = []
+    # Copy genes + clean out subfeatures. We'll re-use these constructs.
+    fixed_features = copy.deepcopy(genes)
+    for f in fixed_features:
+        f.sub_features = []
+    # Copy / empty out mRNAs
+    fixed_mrnas = copy.deepcopy(mRNAs)
+    for f in fixed_mrnas:
+        f.sub_features = []
+        f.qualifiers = {}
+    # Fill rbss + cdss2
+    for cds in cdss:
+        if "frameshift" in cds.qualifiers:
+            del cds.qualifiers["frameshift"]
+        # Ignore short features, as those are RBSs
+        if len(cds) < 15:
+            rbss.append(cds)
+            continue
+        # Otherwise cdss.
+        else:
+            cdss2.append(cds)
+    # Ok, now have cdss2 to deal with.
+    other = []
+    # Find the two with least value for distance between end / start (strand aware).
+    # For every possible pair, we'll check their distance
+    match_data = {}
+    for (a, b) in itertools.permutations(cdss2, 2):
+        if a.location.start < b.location.start:
+            # A is downstream of B
+            match_data[(a, b)] = b.location.start - a.location.end
+        else:
+            match_data[(a, b)] = a.location.start - b.location.end
+
+    # Now we'll find the features which are closest in terms of start/end
+    ((merge_a, merge_b), value) = max(match_data.items(), key=lambda kv: kv[1])
+    # And get the non-matching features into other
+    for f in cdss2:
+        if f != merge_a and f != merge_b:
+            other.append(f)
+    # Back to the merge_a/b
+    # With those, we'll merge them into one feature, and discard the other.
+    merge_a.location = CompoundLocation([merge_a.location, merge_b.location])
+    # The gene + RBSs should be identical and two/two.
+    assert len(fixed_features) == 2
+    # If not, we can just duplicate the RBS, doesn't matter.
+    noRBS = len(rbss) == 0
+    if len(rbss) != 2 and not noRBS:
+        rbss = [rbss[0], copy.deepcopy(rbss[0])]
+    # Now re-construct.
+    gene_0 = fixed_features[0]
+    gene_1 = fixed_features[1]
+    mRNA_0 = fixed_mrnas[0]
+    mRNA_1 = fixed_mrnas[1]
+
+    if not noRBS:
+        mRNA_0.sub_features = [rbss[0], merge_a]
+        mRNA_1.sub_features = other + [rbss[1]]
+    else:
+        mRNA_0.sub_features = [merge_a]
+        mRNA_1.sub_features = other
+    mRNA_0 = fix_gene_boundaries(mRNA_0)
+    mRNA_1 = fix_gene_boundaries(mRNA_1)
+
+    gene_0.sub_features = [mRNA_0]
+    gene_1.sub_features = [mRNA_1]
+    gene_0 = fix_gene_boundaries(gene_0)
+    gene_1 = fix_gene_boundaries(gene_1)
+
+    return fixed_features
+
+
+def fix_frameshifts(features):
+    # Collect all gene features where at least one subfeature has a
+    # frameshift=??? annotation.
+    def has_frameshift_qual(f):
+        return (
+            len(
+                list(
+                    feature_lambda(
+                        f.sub_features, feature_test_quals, {"frameshift": None}
+                    )
+                )
+            )
+            > 0
+        )
+
+    def has_frameshift_qual_val(f, val):
+        return (
+            len(
+                list(
+                    feature_lambda(
+                        f.sub_features, feature_test_quals, {"frameshift": val}
+                    )
+                )
+            )
+            > 0
+        )
+
+    def get_frameshift_qual(f):
+        for f in feature_lambda(
+            f.sub_features, feature_test_quals, {"frameshift": None}
+        ):
+            return f.qualifiers["frameshift"]
+
+    to_frameshift = [x for x in features if x.type == "gene" and has_frameshift_qual(x)]
+    fixed = [x for x in features if x not in to_frameshift]
+
+    frameshift_keys = set(sum(map(get_frameshift_qual, to_frameshift), []))
+    for key in frameshift_keys:
+        # Get features matching that key
+        current = [x for x in to_frameshift if has_frameshift_qual_val(x, key)]
+        # Fix them and append them
+        fixed += fix_frameshifted(current)
+
+    return fixed
+
+
+def remove_useless_features(features):
+    # Drop mRNAs, apollo crap, useless CDSs
+    for f in features:
+        if f.type in (
+            "non_canonical_three_prime_splice_site",
+            "non_canonical_five_prime_splice_site",
+            "stop_codon_read_through",
+            "mRNA",
+            "exon",
+        ):
+            continue
+        else:
+            if f.type == "CDS" and len(f) < 10:
+                # Another RBS mistake
+                continue
+            # We use the full GO term, but it should be less than that.
+            if f.type == "Shine_Dalgarno_sequence":
+                f.type = "RBS"
+
+            if f.type == "sequence_feature":
+                f.type = "misc_feature"
+
+            if f.type == "recombination_feature":
+                f.type = "misc_recomb"
+
+            if f.type == "sequence_alteration":
+                f.type = "variation"
+
+            if f.type == "binding_site":
+                f.type = "misc_binding"
+
+            yield f
+
+
+def merge_multi_cds(mRNA_sf):
+    cdss = [x for x in mRNA_sf if x.type == "CDS"]
+    non_cdss = [x for x in mRNA_sf if x.type != "CDS"]
+    if len(cdss) <= 1:
+        return non_cdss + cdss
+    else:
+        # Grab all locations, and sort them so we can work with them rationally.
+        locations = sorted([x.location for x in cdss], key=lambda y: y.start)
+        # Pick randomly a main CDS
+        main_cds = cdss[0]
+        # We'll merge the other CDSs into this one.
+        main_cds.location = CompoundLocation(locations)
+        return non_cdss + [main_cds]
+
+
+def handle_record(record, transltbl):
+    full_feats = []
+    for feature in fsort(record.features):
+        if (
+            feature.type == "region"
+            and "source" in feature.qualifiers
+            and "GenBank" in feature.qualifiers["source"]
+        ):
+            feature.type = "source"
+
+            if "comment1" in feature.qualifiers:
+                del feature.qualifiers["comment1"]
+
+            if "Note" in feature.qualifiers:
+                record.annotations = feature.qualifiers
+                if len(feature.qualifiers["Note"]) > 1:
+                    record.annotations["comment"] = feature.qualifiers["Note"][1]
+                del feature.qualifiers["Note"]
+
+            if "comment" in feature.qualifiers:
+                del feature.qualifiers["comment"]
+
+    # We'll work on a separate copy of features to avoid modifying a list
+    # we're iterating over
+    replacement_feats = []
+    replacement_feats += list(handle_non_gene_features(record.features))
+
+    # Renumbering requires sorting
+    fid = 0
+    for feature in fsort(
+        feature_lambda(
+            record.features, feature_test_type, {"type": "gene"}, subfeatures=True
+        )
+    ):
+        # Our modifications only involve genes
+        fid += 1
+
+        feature = fix_gene_boundaries(feature)
+        # Which have mRNAs we'll drop later
+        feature = fix_gene_qualifiers(record.id, feature, fid)
+
+        # Wipe out the parent gene's data, leaving only a locus_tag
+        feature.qualifiers = {"locus_tag": "CPT_%s_%03d" % (record.id, fid)}
+
+        # Patch our features back in (even if they're non-gene features)
+        replacement_feats.append(feature)
+
+    replacement_feats = fix_frameshifts(replacement_feats)
+    # exit(0)
+    flat_features = feature_lambda(
+        replacement_feats, lambda x: True, {}, subfeatures=True
+    )
+
+    flat_features = remove_useless_features(flat_features)
+
+    # Meat of our modifications
+    for flat_feat in flat_features:
+        # Try and figure out a name. We gave conflicting instructions, so
+        # this isn't as trivial as it should be.
+        protein_product = wa_unified_product_name(flat_feat)
+
+        for x in (
+            "source",
+            "phase",
+            "Parent",
+            "ID",
+            "owner",
+            "date_creation",
+            "date_last_modified",
+            "datasetSource",
+        ):
+            if x in flat_feat.qualifiers:
+                if x == "ID":
+                    flat_feat._ID = flat_feat.qualifiers["ID"]
+                del flat_feat.qualifiers[x]
+
+        # Add product tag
+        if flat_feat.type == "CDS":
+            flat_feat.qualifiers["product"] = [protein_product]
+            flat_feat.qualifiers["transl_table"] = [transltbl]
+            if "Product" in flat_feat.qualifiers:
+                del flat_feat.qualifiers["Product"]
+        elif flat_feat.type == "RBS":
+            if "locus_tag" not in flat_feat.qualifiers.keys():
+                continue
+
+        elif flat_feat.type == "terminator":
+            flat_feat.type = "regulatory"
+            flat_feat.qualifiers = {"regulatory_class": "terminator"}
+
+        # In genbank format, note is lower case.
+        flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "Note", "note")
+        flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "description", "note")
+        flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "protein", "note")
+        flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "Dbxref", "db_xref")
+        if "Name" in flat_feat.qualifiers:
+            del flat_feat.qualifiers["Name"]
+
+        # more apollo nonsense
+        if "Manually set translation start" in flat_feat.qualifiers.get("note", []):
+            flat_feat.qualifiers["note"].remove("Manually set translation start")
+
+        # Append the feature
+        full_feats.append(flat_feat)
+
+    # Update our features
+    record.features = fsort(full_feats)
+    # Strip off record names that would cause crashes.
+    record.name = record.name[0:16]
+    return record
+
+
+if __name__ == "__main__":
+    # Grab all of the filters from our plugin loader
+    parser = argparse.ArgumentParser(description="Convert gff3 to gbk")
+    parser.add_argument("gff_file", type=argparse.FileType("r"), help="GFF3 file")
+    parser.add_argument("fasta_file", type=argparse.FileType("r"), help="Fasta Input")
+    parser.add_argument(
+        "--transltbl",
+        type=int,
+        default=11,
+        help="Translation Table choice for CDS tag, default 11",
+    )
+    args = parser.parse_args()
+
+    for record in gff3_to_genbank(**vars(args)):
+        record.annotations["molecule_type"] = "DNA"
+        # record.seq.alphabet = generic_dna
+        SeqIO.write([record], sys.stdout, "genbank")