Mercurial > repos > cpt > cpt_gff_to_gbk
view gff2gb.py @ 4:35a6c466e270 draft
planemo upload commit 852ac96ca53a2ffa0947e6df5e24671866b642f5
author | cpt |
---|---|
date | Sun, 23 Jul 2023 01:38:52 +0000 |
parents | c8fcb7246ac3 |
children |
line wrap: on
line source
#!/usr/bin/env python """Convert a GFF and associated FASTA file into GenBank format. Usage: gff_to_genbank.py <GFF annotation file> <FASTA sequence file> """ import argparse import sys import re import copy import itertools import logging from Bio import SeqIO # from Bio.Alphabet import generic_dna from Bio.SeqFeature import CompoundLocation, FeatureLocation from CPT_GFFParser import gffParse, gffWrite from gff3 import ( feature_lambda, wa_unified_product_name, is_uuid, feature_test_type, fsort, feature_test_true, feature_test_quals, ) default_name = re.compile(r"^gene_(\d+)$") logging.basicConfig(level=logging.INFO) def rename_key(ds, k_f, k_t): """Rename a key in a dictionary and return it, FP style""" # If they key is not in the dictionary, just return immediately if k_f not in ds: return ds # Otherwise, we check if the target key is in there if k_t in ds: # If it is, we need to append ds[k_t] += ds[k_f] else: # if not, we can just set. ds[k_t] = ds[k_f] # Remove source del ds[k_f] return ds def gff3_to_genbank(gff_file, fasta_file, transltbl): fasta_input = SeqIO.to_dict(SeqIO.parse(fasta_file, "fasta")) # , generic_dna)) gff_iter = gffParse(gff_file, fasta_input) for record in gff_iter: yield handle_record(record, transltbl) def handle_non_gene_features(features): # These are NON-GENE features (maybe terminators? etc?) for feature in feature_lambda( features, feature_test_type, {"type": "gene"}, subfeatures=False, invert=True, recurse=True, # used to catch RBS from new apollo runs (used to be False) ): if feature.type in ( "terminator", "tRNA", "Shine_Dalgarno_sequence", "sequence_feature", "recombination_feature", "sequence_alteration", "binding_site", ): yield feature elif feature.type in ("CDS",): pass else: yield feature def fminmax(feature): fmin = None fmax = None for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True): if fmin is None: fmin = sf.location.start fmax = sf.location.end if sf.location.start < fmin: fmin = sf.location.start if sf.location.end > fmax: fmax = sf.location.end return fmin, fmax def fix_gene_boundaries(feature): # There is a frustrating bug in apollo whereby we have created gene # features which are LARGER than expected, but we cannot see this. # We only see a perfect sized gene + great SD together. # # So, we have this awful hack to clamp the location of the gene # feature to the contained mRNAs. This is good enough for now. fmin, fmax = fminmax(feature) if feature.location.strand > 0: feature.location = FeatureLocation(fmin, fmax, strand=1) else: feature.location = FeatureLocation(fmin, fmax, strand=-1) return feature def fix_gene_qualifiers(name, feature, fid): for mRNA in feature.sub_features: mRNA.qualifiers["locus_tag"] = "CPT_%s_%03d" % (name, fid) # And some exons below that sf_replacement = [] for sf in mRNA.sub_features: # We set a locus_tag on ALL sub features sf.qualifiers["locus_tag"] = "CPT_%s_%03d" % (name, fid) # Remove Names which are UUIDs # NOT GOOD PRACTICE try: if is_uuid(sf.qualifiers["Name"][0]): del sf.qualifiers["Name"] except KeyError: continue # might should go back to pass, I have not put thought into this still # If it is the RBS exon (mis-labelled by apollo as 'exon') if sf.type == "exon" and len(sf) < 10: sf.type = "Shine_Dalgarno_sequence" sf_replacement.append(sf) # and if it is the CDS elif sf.type == "CDS": # Update CDS qualifiers with all info that was on parent sf.qualifiers.update(feature.qualifiers) sf_replacement.append(sf) else: sf_replacement.append(sf) if mRNA.type == "tRNA": mRNA.qualifiers["product"] = mRNA.qualifiers["Name"] # Handle multiple child CDS features by merging them. # Replace the subfeatures on the mRNA mRNA.sub_features = merge_multi_cds(sf_replacement) return feature def fix_frameshifted(features): logging.info("Fixing Frameshifted group: [%s]", str(features)) genes = features # Find all mRNAs (plus reduce nested list into flattened one) mRNAs = sum([f.sub_features for f in genes], []) # Find all CDSs (plus reduce nested list into flattened one) cdss = sum([m.sub_features for m in mRNAs], []) # List to store the RBSs which we'll break apart + re-attach later. rbss = [] # List to store all of the CDSs (i.e. cdss - rbss) cdss2 = [] # Copy genes + clean out subfeatures. We'll re-use these constructs. fixed_features = copy.deepcopy(genes) for f in fixed_features: f.sub_features = [] # Copy / empty out mRNAs fixed_mrnas = copy.deepcopy(mRNAs) for f in fixed_mrnas: f.sub_features = [] f.qualifiers = {} # Fill rbss + cdss2 for cds in cdss: if "frameshift" in cds.qualifiers: del cds.qualifiers["frameshift"] # Ignore short features, as those are RBSs if len(cds) < 15: rbss.append(cds) continue # Otherwise cdss. else: cdss2.append(cds) # Ok, now have cdss2 to deal with. other = [] # Find the two with least value for distance between end / start (strand aware). # For every possible pair, we'll check their distance match_data = {} for (a, b) in itertools.permutations(cdss2, 2): if a.location.start < b.location.start: # A is downstream of B match_data[(a, b)] = b.location.start - a.location.end else: match_data[(a, b)] = a.location.start - b.location.end # Now we'll find the features which are closest in terms of start/end ((merge_a, merge_b), value) = max(match_data.items(), key=lambda kv: kv[1]) # And get the non-matching features into other for f in cdss2: if f != merge_a and f != merge_b: other.append(f) # Back to the merge_a/b # With those, we'll merge them into one feature, and discard the other. merge_a.location = CompoundLocation([merge_a.location, merge_b.location]) # The gene + RBSs should be identical and two/two. assert len(fixed_features) == 2 # If not, we can just duplicate the RBS, doesn't matter. noRBS = len(rbss) == 0 if len(rbss) != 2 and not noRBS: rbss = [rbss[0], copy.deepcopy(rbss[0])] # Now re-construct. gene_0 = fixed_features[0] gene_1 = fixed_features[1] mRNA_0 = fixed_mrnas[0] mRNA_1 = fixed_mrnas[1] if not noRBS: mRNA_0.sub_features = [rbss[0], merge_a] mRNA_1.sub_features = other + [rbss[1]] else: mRNA_0.sub_features = [merge_a] mRNA_1.sub_features = other mRNA_0 = fix_gene_boundaries(mRNA_0) mRNA_1 = fix_gene_boundaries(mRNA_1) gene_0.sub_features = [mRNA_0] gene_1.sub_features = [mRNA_1] gene_0 = fix_gene_boundaries(gene_0) gene_1 = fix_gene_boundaries(gene_1) return fixed_features def fix_frameshifts(features): # Collect all gene features where at least one subfeature has a # frameshift=??? annotation. def has_frameshift_qual(f): return ( len( list( feature_lambda( f.sub_features, feature_test_quals, {"frameshift": None} ) ) ) > 0 ) def has_frameshift_qual_val(f, val): return ( len( list( feature_lambda( f.sub_features, feature_test_quals, {"frameshift": val} ) ) ) > 0 ) def get_frameshift_qual(f): for f in feature_lambda( f.sub_features, feature_test_quals, {"frameshift": None} ): return f.qualifiers["frameshift"] to_frameshift = [x for x in features if x.type == "gene" and has_frameshift_qual(x)] fixed = [x for x in features if x not in to_frameshift] frameshift_keys = set(sum(map(get_frameshift_qual, to_frameshift), [])) for key in frameshift_keys: # Get features matching that key current = [x for x in to_frameshift if has_frameshift_qual_val(x, key)] # Fix them and append them fixed += fix_frameshifted(current) return fixed def remove_useless_features(features): # Drop mRNAs, apollo crap, useless CDSs for f in features: if f.type in ( "non_canonical_three_prime_splice_site", "non_canonical_five_prime_splice_site", "stop_codon_read_through", "mRNA", "exon", ): continue else: if f.type == "CDS" and len(f) < 10: # Another RBS mistake continue # We use the full GO term, but it should be less than that. if f.type == "Shine_Dalgarno_sequence": f.type = "RBS" if f.type == "sequence_feature": f.type = "misc_feature" if f.type == "recombination_feature": f.type = "misc_recomb" if f.type == "sequence_alteration": f.type = "variation" if f.type == "binding_site": f.type = "misc_binding" yield f def merge_multi_cds(mRNA_sf): cdss = [x for x in mRNA_sf if x.type == "CDS"] non_cdss = [x for x in mRNA_sf if x.type != "CDS"] if len(cdss) <= 1: return non_cdss + cdss else: # Grab all locations, and sort them so we can work with them rationally. locations = sorted([x.location for x in cdss], key=lambda y: y.start) # Pick randomly a main CDS main_cds = cdss[0] # We'll merge the other CDSs into this one. main_cds.location = CompoundLocation(locations) return non_cdss + [main_cds] def handle_record(record, transltbl): full_feats = [] for feature in fsort(record.features): if ( feature.type == "region" and "source" in feature.qualifiers and "GenBank" in feature.qualifiers["source"] ): feature.type = "source" if "comment1" in feature.qualifiers: del feature.qualifiers["comment1"] if "Note" in feature.qualifiers: record.annotations = feature.qualifiers if len(feature.qualifiers["Note"]) > 1: record.annotations["comment"] = feature.qualifiers["Note"][1] del feature.qualifiers["Note"] if "comment" in feature.qualifiers: del feature.qualifiers["comment"] # We'll work on a separate copy of features to avoid modifying a list # we're iterating over replacement_feats = [] replacement_feats += list(handle_non_gene_features(record.features)) # Renumbering requires sorting fid = 0 for feature in fsort( feature_lambda( record.features, feature_test_type, {"type": "gene"}, subfeatures=True ) ): # Our modifications only involve genes fid += 1 feature = fix_gene_boundaries(feature) # Which have mRNAs we'll drop later feature = fix_gene_qualifiers(record.id, feature, fid) # Wipe out the parent gene's data, leaving only a locus_tag feature.qualifiers = {"locus_tag": "CPT_%s_%03d" % (record.id, fid)} # Patch our features back in (even if they're non-gene features) replacement_feats.append(feature) replacement_feats = fix_frameshifts(replacement_feats) # exit(0) flat_features = feature_lambda( replacement_feats, lambda x: True, {}, subfeatures=True ) flat_features = remove_useless_features(flat_features) # Meat of our modifications for flat_feat in flat_features: # Try and figure out a name. We gave conflicting instructions, so # this isn't as trivial as it should be. protein_product = wa_unified_product_name(flat_feat) for x in ( "source", "phase", "Parent", "ID", "owner", "date_creation", "date_last_modified", "datasetSource", ): if x in flat_feat.qualifiers: if x == "ID": flat_feat._ID = flat_feat.qualifiers["ID"] del flat_feat.qualifiers[x] # Add product tag if flat_feat.type == "CDS": flat_feat.qualifiers["product"] = [protein_product] flat_feat.qualifiers["transl_table"] = [transltbl] if "Product" in flat_feat.qualifiers: del flat_feat.qualifiers["Product"] elif flat_feat.type == "RBS": if "locus_tag" not in flat_feat.qualifiers.keys(): continue elif flat_feat.type == "terminator": flat_feat.type = "regulatory" flat_feat.qualifiers = {"regulatory_class": "terminator"} # In genbank format, note is lower case. flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "Note", "note") flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "description", "note") flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "protein", "note") flat_feat.qualifiers = rename_key(flat_feat.qualifiers, "Dbxref", "db_xref") if "Name" in flat_feat.qualifiers: del flat_feat.qualifiers["Name"] # more apollo nonsense if "Manually set translation start" in flat_feat.qualifiers.get("note", []): flat_feat.qualifiers["note"].remove("Manually set translation start") # Append the feature full_feats.append(flat_feat) # Update our features record.features = fsort(full_feats) # Strip off record names that would cause crashes. record.name = record.name[0:16] return record if __name__ == "__main__": # Grab all of the filters from our plugin loader parser = argparse.ArgumentParser(description="Convert gff3 to gbk") parser.add_argument("gff_file", type=argparse.FileType("r"), help="GFF3 file") parser.add_argument("fasta_file", type=argparse.FileType("r"), help="Fasta Input") parser.add_argument( "--transltbl", type=int, default=11, help="Translation Table choice for CDS tag, default 11", ) args = parser.parse_args() for record in gff3_to_genbank(**vars(args)): record.annotations["molecule_type"] = "DNA" # record.seq.alphabet = generic_dna SeqIO.write([record], sys.stdout, "genbank")