# HG changeset patch # User cpt # Date 1685932881 0 # Node ID ac766d7dd6413ebe312241a7f6ffab6683ab983a # Parent aaed5a0c774c0daf0c51d46995ae559aa177c5a5 planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c diff -r aaed5a0c774c -r ac766d7dd641 cpt-macros.xml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cpt-macros.xml Mon Jun 05 02:41:21 2023 +0000 @@ -0,0 +1,115 @@ + + + + python + biopython + requests + cpt_gffparser + + + + + + + + 10.1371/journal.pcbi.1008214 + @unpublished{galaxyTools, + author = {E. Mijalis, H. Rasche}, + title = {CPT Galaxy Tools}, + year = {2013-2017}, + note = {https://github.com/tamu-cpt/galaxy-tools/} + } + + + + + 10.1371/journal.pcbi.1008214 + + @unpublished{galaxyTools, + author = {E. Mijalis, H. Rasche}, + title = {CPT Galaxy Tools}, + year = {2013-2017}, + note = {https://github.com/tamu-cpt/galaxy-tools/} + } + + + + + + + 10.1371/journal.pcbi.1008214 + + @unpublished{galaxyTools, + author = {C. Ross}, + title = {CPT Galaxy Tools}, + year = {2020-}, + note = {https://github.com/tamu-cpt/galaxy-tools/} + } + + + + + + + 10.1371/journal.pcbi.1008214 + + @unpublished{galaxyTools, + author = {E. Mijalis, H. Rasche}, + title = {CPT Galaxy Tools}, + year = {2013-2017}, + note = {https://github.com/tamu-cpt/galaxy-tools/} + } + + + @unpublished{galaxyTools, + author = {A. Criscione}, + title = {CPT Galaxy Tools}, + year = {2019-2021}, + note = {https://github.com/tamu-cpt/galaxy-tools/} + } + + + + + + + 10.1371/journal.pcbi.1008214 + + @unpublished{galaxyTools, + author = {A. Criscione}, + title = {CPT Galaxy Tools}, + year = {2019-2021}, + note = {https://github.com/tamu-cpt/galaxy-tools/} + } + + + + + + + 10.1371/journal.pcbi.1008214 + + @unpublished{galaxyTools, + author = {C. Maughmer}, + title = {CPT Galaxy Tools}, + year = {2017-2020}, + note = {https://github.com/tamu-cpt/galaxy-tools/} + } + + + + + + + @unpublished{galaxyTools, + author = {C. Maughmer}, + title = {CPT Galaxy Tools}, + year = {2017-2020}, + note = {https://github.com/tamu-cpt/galaxy-tools/} + } + + + + diff -r aaed5a0c774c -r ac766d7dd641 cpt_export_seq_unique/cpt-macros.xml --- a/cpt_export_seq_unique/cpt-macros.xml Fri Jul 01 13:43:49 2022 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,115 +0,0 @@ - - - - - python - biopython - requests - - - - - - - - 10.1371/journal.pcbi.1008214 - @unpublished{galaxyTools, - author = {E. Mijalis, H. Rasche}, - title = {CPT Galaxy Tools}, - year = {2013-2017}, - note = {https://github.com/tamu-cpt/galaxy-tools/} - } - - - - - 10.1371/journal.pcbi.1008214 - - @unpublished{galaxyTools, - author = {E. Mijalis, H. Rasche}, - title = {CPT Galaxy Tools}, - year = {2013-2017}, - note = {https://github.com/tamu-cpt/galaxy-tools/} - } - - - - - - - 10.1371/journal.pcbi.1008214 - - @unpublished{galaxyTools, - author = {C. Ross}, - title = {CPT Galaxy Tools}, - year = {2020-}, - note = {https://github.com/tamu-cpt/galaxy-tools/} - } - - - - - - - 10.1371/journal.pcbi.1008214 - - @unpublished{galaxyTools, - author = {E. Mijalis, H. Rasche}, - title = {CPT Galaxy Tools}, - year = {2013-2017}, - note = {https://github.com/tamu-cpt/galaxy-tools/} - } - - - @unpublished{galaxyTools, - author = {A. Criscione}, - title = {CPT Galaxy Tools}, - year = {2019-2021}, - note = {https://github.com/tamu-cpt/galaxy-tools/} - } - - - - - - - 10.1371/journal.pcbi.1008214 - - @unpublished{galaxyTools, - author = {A. Criscione}, - title = {CPT Galaxy Tools}, - year = {2019-2021}, - note = {https://github.com/tamu-cpt/galaxy-tools/} - } - - - - - - - 10.1371/journal.pcbi.1008214 - - @unpublished{galaxyTools, - author = {C. Maughmer}, - title = {CPT Galaxy Tools}, - year = {2017-2020}, - note = {https://github.com/tamu-cpt/galaxy-tools/} - } - - - - - - - @unpublished{galaxyTools, - author = {C. Maughmer}, - title = {CPT Galaxy Tools}, - year = {2017-2020}, - note = {https://github.com/tamu-cpt/galaxy-tools/} - } - - - - diff -r aaed5a0c774c -r ac766d7dd641 cpt_export_seq_unique/gff3.py --- a/cpt_export_seq_unique/gff3.py Fri Jul 01 13:43:49 2022 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,346 +0,0 @@ -import copy -import logging - -log = logging.getLogger() -log.setLevel(logging.WARN) - - -def feature_lambda( - feature_list, - test, - test_kwargs, - subfeatures=True, - parent=None, - invert=False, - recurse=True, -): - """Recursively search through features, testing each with a test function, yielding matches. - - GFF3 is a hierachical data structure, so we need to be able to recursively - search through features. E.g. if you're looking for a feature with - ID='bob.42', you can't just do a simple list comprehension with a test - case. You don't know how deeply burried bob.42 will be in the feature tree. This is where feature_lambda steps in. - - :type feature_list: list - :param feature_list: an iterable of features - - :type test: function reference - :param test: a closure with the method signature (feature, **kwargs) where - the kwargs are those passed in the next argument. This - function should return True or False, True if the feature is - to be yielded as part of the main feature_lambda function, or - False if it is to be ignored. This function CAN mutate the - features passed to it (think "apply"). - - :type test_kwargs: dictionary - :param test_kwargs: kwargs to pass to your closure when it is called. - - :type subfeatures: boolean - :param subfeatures: when a feature is matched, should just that feature be - yielded to the caller, or should the entire sub_feature - tree for that feature be included? subfeatures=True is - useful in cases such as searching for a gene feature, - and wanting to know what RBS/Shine_Dalgarno_sequences - are in the sub_feature tree (which can be accomplished - with two feature_lambda calls). subfeatures=False is - useful in cases when you want to process (and possibly - return) the entire feature tree, such as applying a - qualifier to every single feature. - - :type invert: boolean - :param invert: Negate/invert the result of the filter. - - :rtype: yielded list - :return: Yields a list of matching features. - """ - # Either the top level set of [features] or the subfeature attribute - for feature in feature_list: - feature._parent = parent - if not parent: - # Set to self so we cannot go above root. - feature._parent = feature - test_result = test(feature, **test_kwargs) - # if (not invert and test_result) or (invert and not test_result): - if invert ^ test_result: - if not subfeatures: - feature_copy = copy.deepcopy(feature) - feature_copy.sub_features = list() - yield feature_copy - else: - yield feature - - if recurse and hasattr(feature, "sub_features"): - for x in feature_lambda( - feature.sub_features, - test, - test_kwargs, - subfeatures=subfeatures, - parent=feature, - invert=invert, - recurse=recurse, - ): - yield x - - -def fetchParent(feature): - if not hasattr(feature, "_parent") or feature._parent is None: - return feature - else: - return fetchParent(feature._parent) - - -def feature_test_true(feature, **kwargs): - return True - - -def feature_test_type(feature, **kwargs): - if "type" in kwargs: - return str(feature.type).upper() == str(kwargs["type"]).upper() - elif "types" in kwargs: - for x in kwargs["types"]: - if str(feature.type).upper() == str(x).upper(): - return True - return False - raise Exception("Incorrect feature_test_type call, need type or types") - - -def feature_test_qual_value(feature, **kwargs): - """Test qualifier values. - - For every feature, check that at least one value in - feature.quailfiers(kwargs['qualifier']) is in kwargs['attribute_list'] - """ - if isinstance(kwargs["qualifier"], list): - for qualifier in kwargs["qualifier"]: - for attribute_value in feature.qualifiers.get(qualifier, []): - if attribute_value in kwargs["attribute_list"]: - return True - else: - for attribute_value in feature.qualifiers.get(kwargs["qualifier"], []): - if attribute_value in kwargs["attribute_list"]: - return True - return False - - -def feature_test_location(feature, **kwargs): - if "strand" in kwargs: - if feature.location.strand != kwargs["strand"]: - return False - - return feature.location.start <= kwargs["loc"] <= feature.location.end - - -def feature_test_quals(feature, **kwargs): - """ - Example:: - - a = Feature(qualifiers={'Note': ['Some notes', 'Aasdf']}) - - # Check if a contains a Note - feature_test_quals(a, {'Note': None}) # Returns True - feature_test_quals(a, {'Product': None}) # Returns False - - # Check if a contains a note with specific value - feature_test_quals(a, {'Note': ['ome']}) # Returns True - - # Check if a contains a note with specific value - feature_test_quals(a, {'Note': ['other']}) # Returns False - """ - for key in kwargs: - if key not in feature.qualifiers: - return False - - # Key is present, no value specified - if kwargs[key] is None: - return True - - # Otherwise there is a key value we're looking for. - # so we make a list of matches - matches = [] - # And check all of the feature qualifier valuse - for value in feature.qualifiers[key]: - # For that kwargs[key] value - for x in kwargs[key]: - matches.append(x in value) - - # If none matched, then we return false. - if not any(matches): - return False - - return True - - -def feature_test_contains(feature, **kwargs): - if "index" in kwargs: - return feature.location.start < kwargs["index"] < feature.location.end - elif "range" in kwargs: - return ( - feature.location.start < kwargs["range"]["start"] < feature.location.end - and feature.location.start < kwargs["range"]["end"] < feature.location.end - ) - else: - raise RuntimeError("Must use index or range keyword") - - -def get_id(feature=None, parent_prefix=None): - result = "" - if parent_prefix is not None: - result += parent_prefix + "|" - if "locus_tag" in feature.qualifiers: - result += feature.qualifiers["locus_tag"][0] - elif "gene" in feature.qualifiers: - result += feature.qualifiers["gene"][0] - elif "Gene" in feature.qualifiers: - result += feature.qualifiers["Gene"][0] - elif "product" in feature.qualifiers: - result += feature.qualifiers["product"][0] - elif "Product" in feature.qualifiers: - result += feature.qualifiers["Product"][0] - elif "Name" in feature.qualifiers: - result += feature.qualifiers["Name"][0] - else: - return feature.id - # Leaving in case bad things happen. - # result += '%s_%s_%s_%s' % ( - # feature.id, - # feature.location.start, - # feature.location.end, - # feature.location.strand - # ) - return result - - -def get_gff3_id(gene): - return gene.qualifiers.get("Name", [gene.id])[0] - - -def ensure_location_in_bounds(start=0, end=0, parent_length=0): - # This prevents frameshift errors - while start < 0: - start += 3 - while end < 0: - end += 3 - while start > parent_length: - start -= 3 - while end > parent_length: - end -= 3 - return (start, end) - - -def coding_genes(feature_list): - for x in genes(feature_list): - if ( - len( - list( - feature_lambda( - x.sub_features, - feature_test_type, - {"type": "CDS"}, - subfeatures=False, - ) - ) - ) - > 0 - ): - yield x - - -def genes(feature_list, feature_type="gene", sort=False): - """ - Simple filter to extract gene features from the feature set. - """ - - if not sort: - for x in feature_lambda( - feature_list, feature_test_type, {"type": feature_type}, subfeatures=True - ): - yield x - else: - data = list(genes(feature_list, feature_type=feature_type, sort=False)) - data = sorted(data, key=lambda feature: feature.location.start) - for x in data: - yield x - - -def wa_unified_product_name(feature): - """ - Try and figure out a name. We gave conflicting instructions, so - this isn't as trivial as it should be. Sometimes it will be in - 'product' or 'Product', othertimes in 'Name' - """ - # Manually applied tags. - protein_product = feature.qualifiers.get( - "product", feature.qualifiers.get("Product", [None]) - )[0] - - # If neither of those are available ... - if protein_product is None: - # And there's a name... - if "Name" in feature.qualifiers: - if not is_uuid(feature.qualifiers["Name"][0]): - protein_product = feature.qualifiers["Name"][0] - - return protein_product - - -def is_uuid(name): - return name.count("-") == 4 and len(name) == 36 - - -def get_rbs_from(gene): - # Normal RBS annotation types - rbs_rbs = list( - feature_lambda( - gene.sub_features, feature_test_type, {"type": "RBS"}, subfeatures=False - ) - ) - rbs_sds = list( - feature_lambda( - gene.sub_features, - feature_test_type, - {"type": "Shine_Dalgarno_sequence"}, - subfeatures=False, - ) - ) - # Fraking apollo - apollo_exons = list( - feature_lambda( - gene.sub_features, feature_test_type, {"type": "exon"}, subfeatures=False - ) - ) - apollo_exons = [x for x in apollo_exons if len(x) < 10] - # These are more NCBI's style - regulatory_elements = list( - feature_lambda( - gene.sub_features, - feature_test_type, - {"type": "regulatory"}, - subfeatures=False, - ) - ) - rbs_regulatory = list( - feature_lambda( - regulatory_elements, - feature_test_quals, - {"regulatory_class": ["ribosome_binding_site"]}, - subfeatures=False, - ) - ) - # Here's hoping you find just one ;) - return rbs_rbs + rbs_sds + rbs_regulatory + apollo_exons - - -def nice_name(record): - """ - get the real name rather than NCBI IDs and so on. If fails, will return record.id - """ - name = record.id - likely_parental_contig = list(genes(record.features, feature_type="contig")) - if len(likely_parental_contig) == 1: - name = likely_parental_contig[0].qualifiers.get("organism", [name])[0] - return name - - -def fsort(it): - for i in sorted(it, key=lambda x: int(x.location.start)): - yield i diff -r aaed5a0c774c -r ac766d7dd641 cpt_export_seq_unique/gff3_extract_sequence.py --- a/cpt_export_seq_unique/gff3_extract_sequence.py Fri Jul 01 13:43:49 2022 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,275 +0,0 @@ -#!/usr/bin/env python -import sys -import argparse -import logging -import uuid -from CPT_GFFParser import gffParse, gffWrite -from Bio import SeqIO -from Bio.Seq import Seq -from Bio.SeqRecord import SeqRecord -from Bio.SeqFeature import FeatureLocation, CompoundLocation -from gff3 import feature_lambda, feature_test_type, get_id - -logging.basicConfig(level=logging.INFO) -log = logging.getLogger(__name__) - - -def main(fasta, gff3, feature_filter=None, nodesc=False): - if feature_filter == "nice_cds": - from gff2gb import gff3_to_genbank as cpt_Gff2Gbk - - - for rec in cpt_Gff2Gbk(gff3, fasta, 11): - seenList = {} - if rec.seq[0] == "?": - sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") - exit(1) - for feat in sorted(rec.features, key=lambda x: x.location.start): - if feat.type != "CDS": - continue - - ind = 0 - if ( - str( - feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-") - ) - in seenList.keys() - ): - seenList[ - str( - feat.qualifiers.get("locus_tag", get_id(feat)).replace( - " ", "-" - ) - ) - ] += 1 - ind = seenList[ - str( - feat.qualifiers.get("locus_tag", get_id(feat)).replace( - " ", "-" - ) - ) - ] - else: - seenList[ - str( - feat.qualifiers.get("locus_tag", get_id(feat)).replace( - " ", "-" - ) - ) - ] = 1 - append = "" - if ind != 0: - append = "_" + str(ind) - - if nodesc: - description = "" - else: - feat.qualifiers["ID"] = [feat._ID] - product = feat.qualifiers.get("product", "") - description = "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format( - feat, product - ) - yield [ - SeqRecord( - feat.extract(rec).seq, - id=str( - feat.qualifiers.get("locus_tag", get_id(feat)).replace( - " ", "-" - ) - ) - + append, - description=description, - ) - ] - - elif feature_filter == "unique_cds": - seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) - seen_ids = {} - - for rec in gffParse(gff3, base_dict=seq_dict): - noMatch = True - if "Alias" in rec.features[0].qualifiers.keys(): - lColumn = rec.features[0].qualifiers["Alias"][0] - else: - lColumn = "" - for x in seq_dict: - if x == rec.id or x == lColumn: - noMatch = False - if noMatch: - sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") - exit(1) - newfeats = [] - for feat in sorted( - feature_lambda( - rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False - ), - key=lambda f: f.location.start, - ): - nid = rec.id + "____" + feat.id - if nid in seen_ids: - nid = nid + "__" + uuid.uuid4().hex - feat.qualifiers["ID"] = [nid] - newfeats.append(feat) - seen_ids[nid] = True - - if nodesc: - description = "" - else: - if feat.strand == -1: - important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)} - else: - important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)} - if "Name" in feat.qualifiers: - important_data["Name"] = feat.qualifiers.get("Name", [""])[0] - - description = "[{}]".format( - ";".join( - [ - "{key}={value}".format(key=k, value=v) - for (k, v) in important_data.items() - ] - ) - ) - #if feat.id == "CPT_Privateer_006.p01": - #print(feat) - #exit() - - if isinstance(feat.location, CompoundLocation): - finSeq = "" - if feat.strand == -1: - for x in feat.location.parts: - finSeq += str((rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement()) - else: - for x in feat.location.parts: - finSeq += str(rec.seq[feat.location.start + feat.phase: feat.location.end]) - yield [ - SeqRecord( - finSeq, - id=nid.replace(" ", "-"), - description=description, - ) - ] - elif feat.strand == -1: - yield [ - SeqRecord( - (rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement(), - id=nid.replace(" ", "-"), - description=description, - ) - ] - else: - yield [ - SeqRecord( - #feat.extract(rec).seq, - rec.seq[feat.location.start + feat.phase: feat.location.end], - id=nid.replace(" ", "-"), - description=description, - ) - ] - rec.features = newfeats - rec.annotations = {} - #gffWrite([rec], sys.stdout) - else: - seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) - - for rec in gffParse(gff3, base_dict=seq_dict): - noMatch = True - if "Alias" in rec.features[0].qualifiers.keys(): - lColumn = rec.features[0].qualifiers["Alias"][0] - else: - lColumn = "" - for x in seq_dict: - if x == rec.id or x == lColumn: - noMatch = False - if noMatch: - sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") - exit(1) - for feat in sorted( - feature_lambda( - rec.features, - feature_test_type, - {"type": feature_filter}, - subfeatures=True, - ), - key=lambda f: f.location.start, - ): - id = feat.id - if len(id) == 0: - id = get_id(feat) - - if nodesc: - description = "" - else: - if feat.strand == -1: - important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)} - else: - important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)} - if "Name" in feat.qualifiers: - important_data["Name"] = feat.qualifiers.get("Name", [""])[0] - - description = "[{}]".format( - ";".join( - [ - "{key}={value}".format(key=k, value=v) - for (k, v) in important_data.items() - ] - ) - ) - - if isinstance(feat.location, CompoundLocation): - finSeq = "" - if feat.strand == -1: - for x in feat.location.parts: - finSeq += str((rec.seq[x.start: x.end - feat.phase]).reverse_complement()) - else: - for x in feat.location.parts: - finSeq += str(rec.seq[x.start + feat.phase: x.end]) - yield [ - SeqRecord( - Seq(finSeq), - id=id.replace(" ", "-"), - description=description, - ) - ] - - else: - - if feat.strand == -1: - yield [ - SeqRecord( - seq=Seq(str(rec.seq[feat.location.start: feat.location.end - feat.phase])).reverse_complement(), - id=id.replace(" ", "-"), - description=description, - ) - ] - else: - yield [ - SeqRecord( - #feat.extract(rec).seq, - seq=Seq(str(rec.seq[feat.location.start + feat.phase: feat.location.end])), - id=id.replace(" ", "-"), - description=description, - ) - ] - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Export corresponding sequence in genome from GFF3", epilog="" - ) - parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") - parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") - parser.add_argument( - "--feature_filter", default=None, help="Filter for specific feature types" - ) - parser.add_argument( - "--nodesc", action="store_true", help="Strip description field off" - ) - args = parser.parse_args() - for seq in main(**vars(args)): - #if isinstance(seq, list): - # for x in seq: - # print(type(x.seq)) - # SeqIO.write(x, sys.stdout, "fasta") - #else: - SeqIO.write(seq, sys.stdout, "fasta") diff -r aaed5a0c774c -r ac766d7dd641 cpt_export_seq_unique/gff3_extract_sequence2.xml --- a/cpt_export_seq_unique/gff3_extract_sequence2.xml Fri Jul 01 13:43:49 2022 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,37 +0,0 @@ - - - specially modified for sending CDSs to blast - - macros.xml - cpt-macros.xml - - - $default -2> $gff3 -]]> - - - - - - - - - - diff -r aaed5a0c774c -r ac766d7dd641 cpt_export_seq_unique/macros.xml --- a/cpt_export_seq_unique/macros.xml Fri Jul 01 13:43:49 2022 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,62 +0,0 @@ - - - - - python - biopython - cpt_gffparser - - - - - "$blast_tsv" - - - - - - - "$blast_xml" - - - - - - - - - - - - - - - - - - - - "$gff3_data" - - - genomeref.fa - - - ln -s $genome_fasta genomeref.fa; - - - genomeref.fa - - - - - - - "$sequences" - - - - - diff -r aaed5a0c774c -r ac766d7dd641 gff3.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gff3.py Mon Jun 05 02:41:21 2023 +0000 @@ -0,0 +1,346 @@ +import copy +import logging + +log = logging.getLogger() +log.setLevel(logging.WARN) + + +def feature_lambda( + feature_list, + test, + test_kwargs, + subfeatures=True, + parent=None, + invert=False, + recurse=True, +): + """Recursively search through features, testing each with a test function, yielding matches. + + GFF3 is a hierachical data structure, so we need to be able to recursively + search through features. E.g. if you're looking for a feature with + ID='bob.42', you can't just do a simple list comprehension with a test + case. You don't know how deeply burried bob.42 will be in the feature tree. This is where feature_lambda steps in. + + :type feature_list: list + :param feature_list: an iterable of features + + :type test: function reference + :param test: a closure with the method signature (feature, **kwargs) where + the kwargs are those passed in the next argument. This + function should return True or False, True if the feature is + to be yielded as part of the main feature_lambda function, or + False if it is to be ignored. This function CAN mutate the + features passed to it (think "apply"). + + :type test_kwargs: dictionary + :param test_kwargs: kwargs to pass to your closure when it is called. + + :type subfeatures: boolean + :param subfeatures: when a feature is matched, should just that feature be + yielded to the caller, or should the entire sub_feature + tree for that feature be included? subfeatures=True is + useful in cases such as searching for a gene feature, + and wanting to know what RBS/Shine_Dalgarno_sequences + are in the sub_feature tree (which can be accomplished + with two feature_lambda calls). subfeatures=False is + useful in cases when you want to process (and possibly + return) the entire feature tree, such as applying a + qualifier to every single feature. + + :type invert: boolean + :param invert: Negate/invert the result of the filter. + + :rtype: yielded list + :return: Yields a list of matching features. + """ + # Either the top level set of [features] or the subfeature attribute + for feature in feature_list: + feature._parent = parent + if not parent: + # Set to self so we cannot go above root. + feature._parent = feature + test_result = test(feature, **test_kwargs) + # if (not invert and test_result) or (invert and not test_result): + if invert ^ test_result: + if not subfeatures: + feature_copy = copy.deepcopy(feature) + feature_copy.sub_features = list() + yield feature_copy + else: + yield feature + + if recurse and hasattr(feature, "sub_features"): + for x in feature_lambda( + feature.sub_features, + test, + test_kwargs, + subfeatures=subfeatures, + parent=feature, + invert=invert, + recurse=recurse, + ): + yield x + + +def fetchParent(feature): + if not hasattr(feature, "_parent") or feature._parent is None: + return feature + else: + return fetchParent(feature._parent) + + +def feature_test_true(feature, **kwargs): + return True + + +def feature_test_type(feature, **kwargs): + if "type" in kwargs: + return str(feature.type).upper() == str(kwargs["type"]).upper() + elif "types" in kwargs: + for x in kwargs["types"]: + if str(feature.type).upper() == str(x).upper(): + return True + return False + raise Exception("Incorrect feature_test_type call, need type or types") + + +def feature_test_qual_value(feature, **kwargs): + """Test qualifier values. + + For every feature, check that at least one value in + feature.quailfiers(kwargs['qualifier']) is in kwargs['attribute_list'] + """ + if isinstance(kwargs["qualifier"], list): + for qualifier in kwargs["qualifier"]: + for attribute_value in feature.qualifiers.get(qualifier, []): + if attribute_value in kwargs["attribute_list"]: + return True + else: + for attribute_value in feature.qualifiers.get(kwargs["qualifier"], []): + if attribute_value in kwargs["attribute_list"]: + return True + return False + + +def feature_test_location(feature, **kwargs): + if "strand" in kwargs: + if feature.location.strand != kwargs["strand"]: + return False + + return feature.location.start <= kwargs["loc"] <= feature.location.end + + +def feature_test_quals(feature, **kwargs): + """ + Example:: + + a = Feature(qualifiers={'Note': ['Some notes', 'Aasdf']}) + + # Check if a contains a Note + feature_test_quals(a, {'Note': None}) # Returns True + feature_test_quals(a, {'Product': None}) # Returns False + + # Check if a contains a note with specific value + feature_test_quals(a, {'Note': ['ome']}) # Returns True + + # Check if a contains a note with specific value + feature_test_quals(a, {'Note': ['other']}) # Returns False + """ + for key in kwargs: + if key not in feature.qualifiers: + return False + + # Key is present, no value specified + if kwargs[key] is None: + return True + + # Otherwise there is a key value we're looking for. + # so we make a list of matches + matches = [] + # And check all of the feature qualifier valuse + for value in feature.qualifiers[key]: + # For that kwargs[key] value + for x in kwargs[key]: + matches.append(x in value) + + # If none matched, then we return false. + if not any(matches): + return False + + return True + + +def feature_test_contains(feature, **kwargs): + if "index" in kwargs: + return feature.location.start < kwargs["index"] < feature.location.end + elif "range" in kwargs: + return ( + feature.location.start < kwargs["range"]["start"] < feature.location.end + and feature.location.start < kwargs["range"]["end"] < feature.location.end + ) + else: + raise RuntimeError("Must use index or range keyword") + + +def get_id(feature=None, parent_prefix=None): + result = "" + if parent_prefix is not None: + result += parent_prefix + "|" + if "locus_tag" in feature.qualifiers: + result += feature.qualifiers["locus_tag"][0] + elif "gene" in feature.qualifiers: + result += feature.qualifiers["gene"][0] + elif "Gene" in feature.qualifiers: + result += feature.qualifiers["Gene"][0] + elif "product" in feature.qualifiers: + result += feature.qualifiers["product"][0] + elif "Product" in feature.qualifiers: + result += feature.qualifiers["Product"][0] + elif "Name" in feature.qualifiers: + result += feature.qualifiers["Name"][0] + else: + return feature.id + # Leaving in case bad things happen. + # result += '%s_%s_%s_%s' % ( + # feature.id, + # feature.location.start, + # feature.location.end, + # feature.location.strand + # ) + return result + + +def get_gff3_id(gene): + return gene.qualifiers.get("Name", [gene.id])[0] + + +def ensure_location_in_bounds(start=0, end=0, parent_length=0): + # This prevents frameshift errors + while start < 0: + start += 3 + while end < 0: + end += 3 + while start > parent_length: + start -= 3 + while end > parent_length: + end -= 3 + return (start, end) + + +def coding_genes(feature_list): + for x in genes(feature_list): + if ( + len( + list( + feature_lambda( + x.sub_features, + feature_test_type, + {"type": "CDS"}, + subfeatures=False, + ) + ) + ) + > 0 + ): + yield x + + +def genes(feature_list, feature_type="gene", sort=False): + """ + Simple filter to extract gene features from the feature set. + """ + + if not sort: + for x in feature_lambda( + feature_list, feature_test_type, {"type": feature_type}, subfeatures=True + ): + yield x + else: + data = list(genes(feature_list, feature_type=feature_type, sort=False)) + data = sorted(data, key=lambda feature: feature.location.start) + for x in data: + yield x + + +def wa_unified_product_name(feature): + """ + Try and figure out a name. We gave conflicting instructions, so + this isn't as trivial as it should be. Sometimes it will be in + 'product' or 'Product', othertimes in 'Name' + """ + # Manually applied tags. + protein_product = feature.qualifiers.get( + "product", feature.qualifiers.get("Product", [None]) + )[0] + + # If neither of those are available ... + if protein_product is None: + # And there's a name... + if "Name" in feature.qualifiers: + if not is_uuid(feature.qualifiers["Name"][0]): + protein_product = feature.qualifiers["Name"][0] + + return protein_product + + +def is_uuid(name): + return name.count("-") == 4 and len(name) == 36 + + +def get_rbs_from(gene): + # Normal RBS annotation types + rbs_rbs = list( + feature_lambda( + gene.sub_features, feature_test_type, {"type": "RBS"}, subfeatures=False + ) + ) + rbs_sds = list( + feature_lambda( + gene.sub_features, + feature_test_type, + {"type": "Shine_Dalgarno_sequence"}, + subfeatures=False, + ) + ) + # Fraking apollo + apollo_exons = list( + feature_lambda( + gene.sub_features, feature_test_type, {"type": "exon"}, subfeatures=False + ) + ) + apollo_exons = [x for x in apollo_exons if len(x) < 10] + # These are more NCBI's style + regulatory_elements = list( + feature_lambda( + gene.sub_features, + feature_test_type, + {"type": "regulatory"}, + subfeatures=False, + ) + ) + rbs_regulatory = list( + feature_lambda( + regulatory_elements, + feature_test_quals, + {"regulatory_class": ["ribosome_binding_site"]}, + subfeatures=False, + ) + ) + # Here's hoping you find just one ;) + return rbs_rbs + rbs_sds + rbs_regulatory + apollo_exons + + +def nice_name(record): + """ + get the real name rather than NCBI IDs and so on. If fails, will return record.id + """ + name = record.id + likely_parental_contig = list(genes(record.features, feature_type="contig")) + if len(likely_parental_contig) == 1: + name = likely_parental_contig[0].qualifiers.get("organism", [name])[0] + return name + + +def fsort(it): + for i in sorted(it, key=lambda x: int(x.location.start)): + yield i diff -r aaed5a0c774c -r ac766d7dd641 gff3_extract_sequence.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gff3_extract_sequence.py Mon Jun 05 02:41:21 2023 +0000 @@ -0,0 +1,335 @@ +#!/usr/bin/env python +import sys +import argparse +import logging +import uuid +from CPT_GFFParser import gffParse, gffWrite +from Bio import SeqIO +from Bio.Seq import Seq +from Bio.SeqRecord import SeqRecord +from Bio.SeqFeature import FeatureLocation, CompoundLocation +from gff3 import feature_lambda, feature_test_type, get_id + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + + +def main(fasta, gff3, feature_filter=None, nodesc=False): + if feature_filter == "nice_cds": + from gff2gb import gff3_to_genbank as cpt_Gff2Gbk + + for rec in cpt_Gff2Gbk(gff3, fasta, 11): + seenList = {} + if rec.seq[0] == "?": + sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") + exit(1) + for feat in sorted(rec.features, key=lambda x: x.location.start): + if feat.type != "CDS": + continue + + ind = 0 + if ( + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-") + ) + in seenList.keys() + ): + seenList[ + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + ] += 1 + ind = seenList[ + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + ] + else: + seenList[ + str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + ] = 1 + append = "" + if ind != 0: + append = "_" + str(ind) + + if nodesc: + description = "" + else: + feat.qualifiers["ID"] = [feat._ID] + product = feat.qualifiers.get("product", "") + description = ( + "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format( + feat, product + ) + ) + yield [ + SeqRecord( + feat.extract(rec).seq, + id=str( + feat.qualifiers.get("locus_tag", get_id(feat)).replace( + " ", "-" + ) + ) + + append, + description=description, + ) + ] + + elif feature_filter == "unique_cds": + seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) + seen_ids = {} + + for rec in gffParse(gff3, base_dict=seq_dict): + noMatch = True + if "Alias" in rec.features[0].qualifiers.keys(): + lColumn = rec.features[0].qualifiers["Alias"][0] + else: + lColumn = "" + for x in seq_dict: + if x == rec.id or x == lColumn: + noMatch = False + if noMatch: + sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") + exit(1) + newfeats = [] + for feat in sorted( + feature_lambda( + rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False + ), + key=lambda f: f.location.start, + ): + nid = rec.id + "____" + feat.id + if nid in seen_ids: + nid = nid + "__" + uuid.uuid4().hex + feat.qualifiers["ID"] = [nid] + newfeats.append(feat) + seen_ids[nid] = True + + if nodesc: + description = "" + else: + if feat.strand == -1: + important_data = { + "Location": FeatureLocation( + feat.location.start + 1, + feat.location.end - feat.phase, + feat.strand, + ) + } + else: + important_data = { + "Location": FeatureLocation( + feat.location.start + 1 + feat.phase, + feat.location.end, + feat.strand, + ) + } + if "Name" in feat.qualifiers: + important_data["Name"] = feat.qualifiers.get("Name", [""])[0] + + description = "[{}]".format( + ";".join( + [ + "{key}={value}".format(key=k, value=v) + for (k, v) in important_data.items() + ] + ) + ) + # if feat.id == "CPT_Privateer_006.p01": + # print(feat) + # exit() + + if isinstance(feat.location, CompoundLocation): + finSeq = "" + if feat.strand == -1: + for x in feat.location.parts: + finSeq += str( + ( + rec.seq[ + feat.location.start : feat.location.end + - feat.phase + ] + ).reverse_complement() + ) + else: + for x in feat.location.parts: + finSeq += str( + rec.seq[ + feat.location.start + feat.phase : feat.location.end + ] + ) + yield [ + SeqRecord( + finSeq, + id=nid.replace(" ", "-"), + description=description, + ) + ] + elif feat.strand == -1: + yield [ + SeqRecord( + ( + rec.seq[ + feat.location.start : feat.location.end - feat.phase + ] + ).reverse_complement(), + id=nid.replace(" ", "-"), + description=description, + ) + ] + else: + yield [ + SeqRecord( + # feat.extract(rec).seq, + rec.seq[ + feat.location.start + feat.phase : feat.location.end + ], + id=nid.replace(" ", "-"), + description=description, + ) + ] + rec.features = newfeats + rec.annotations = {} + # gffWrite([rec], sys.stdout) + else: + seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) + + for rec in gffParse(gff3, base_dict=seq_dict): + noMatch = True + if "Alias" in rec.features[0].qualifiers.keys(): + lColumn = rec.features[0].qualifiers["Alias"][0] + else: + lColumn = "" + for x in seq_dict: + if x == rec.id or x == lColumn: + noMatch = False + if noMatch: + sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") + exit(1) + for feat in sorted( + feature_lambda( + rec.features, + feature_test_type, + {"type": feature_filter}, + subfeatures=True, + ), + key=lambda f: f.location.start, + ): + id = feat.id + if len(id) == 0: + id = get_id(feat) + + if nodesc: + description = "" + else: + if feat.strand == -1: + important_data = { + "Location": FeatureLocation( + feat.location.start + 1, + feat.location.end - feat.phase, + feat.strand, + ) + } + else: + important_data = { + "Location": FeatureLocation( + feat.location.start + 1 + feat.phase, + feat.location.end, + feat.strand, + ) + } + if "Name" in feat.qualifiers: + important_data["Name"] = feat.qualifiers.get("Name", [""])[0] + + description = "[{}]".format( + ";".join( + [ + "{key}={value}".format(key=k, value=v) + for (k, v) in important_data.items() + ] + ) + ) + + if isinstance(feat.location, CompoundLocation): + finSeq = "" + if feat.strand == -1: + for x in feat.location.parts: + finSeq += str( + ( + rec.seq[x.start : x.end - feat.phase] + ).reverse_complement() + ) + else: + for x in feat.location.parts: + finSeq += str(rec.seq[x.start + feat.phase : x.end]) + yield [ + SeqRecord( + Seq(finSeq), + id=id.replace(" ", "-"), + description=description, + ) + ] + + else: + + if feat.strand == -1: + yield [ + SeqRecord( + seq=Seq( + str( + rec.seq[ + feat.location.start : feat.location.end + - feat.phase + ] + ) + ).reverse_complement(), + id=id.replace(" ", "-"), + description=description, + ) + ] + else: + yield [ + SeqRecord( + # feat.extract(rec).seq, + seq=Seq( + str( + rec.seq[ + feat.location.start + + feat.phase : feat.location.end + ] + ) + ), + id=id.replace(" ", "-"), + description=description, + ) + ] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Export corresponding sequence in genome from GFF3", epilog="" + ) + parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") + parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") + parser.add_argument( + "--feature_filter", default=None, help="Filter for specific feature types" + ) + parser.add_argument( + "--nodesc", action="store_true", help="Strip description field off" + ) + args = parser.parse_args() + for seq in main(**vars(args)): + # if isinstance(seq, list): + # for x in seq: + # print(type(x.seq)) + # SeqIO.write(x, sys.stdout, "fasta") + # else: + SeqIO.write(seq, sys.stdout, "fasta") diff -r aaed5a0c774c -r ac766d7dd641 gff3_extract_sequence2.xml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gff3_extract_sequence2.xml Mon Jun 05 02:41:21 2023 +0000 @@ -0,0 +1,36 @@ + + specially modified for sending CDSs to blast + + macros.xml + cpt-macros.xml + + + '$default' +2> '$gff3' +]]> + + + + + + + + + + diff -r aaed5a0c774c -r ac766d7dd641 macros.xml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/macros.xml Mon Jun 05 02:41:21 2023 +0000 @@ -0,0 +1,74 @@ + + + + progressivemauve + + bcbiogff + + + + 2.4.0 + + 10.1371/journal.pone.0011147 + + + 10.1093/bioinformatics/btm039 + + + '$xmfa' + + + + + + '$sequences' + + + + + + + + + + + + + + + + + + + + + + + + + + + + + '$gff3_data' + + + #if str($reference_genome.reference_genome_source) == 'cached': + '${reference_genome.fasta_indexes.fields.path}' + #else if str($reference_genome.reference_genome_source) == 'history': + genomeref.fa + #end if + + + #if $reference_genome.reference_genome_source == 'history': + ln -s '$reference_genome.genome_fasta' genomeref.fa; + #end if + + + #if str($reference_genome.reference_genome_source) == 'cached': + '${reference_genome.fasta_indexes.fields.path}' + #else if str($reference_genome.reference_genome_source) == 'history': + genomeref.fa + #end if + +