Mercurial > repos > fubar > jbrowse2
view gff3_rebase.py @ 38:07849bf248e3 draft
planemo upload for repository https://github.com/usegalaxy-eu/temporary-tools/tree/master/jbrowse2 commit a74e469a81b38c7142f63de510ae31d3754d1767
author | fubar |
---|---|
date | Fri, 01 Mar 2024 00:40:38 +0000 (10 months ago) |
parents | 39b717d934a8 |
children | b1260bca5fdc |
line wrap: on
line source
#!/usr/bin/env python import argparse import copy import logging import sys from BCBio import GFF from Bio.SeqFeature import FeatureLocation logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) __author__ = "Eric Rasche" __version__ = "0.4.0" __maintainer__ = "Eric Rasche" __email__ = "esr@tamu.edu" def feature_lambda(feature_list, test, test_kwargs, subfeatures=True): """Recursively search through features, testing each with a test function, yielding matches. GFF3 is a hierachical data structure, so we need to be able to recursively search through features. E.g. if you're looking for a feature with ID='bob.42', you can't just do a simple list comprehension with a test case. You don't know how deeply burried bob.42 will be in the feature tree. This is where feature_lambda steps in. :type feature_list: list :param feature_list: an iterable of features :type test: function reference :param test: a closure with the method signature (feature, **kwargs) where the kwargs are those passed in the next argument. This function should return True or False, True if the feature is to be yielded as part of the main feature_lambda function, or False if it is to be ignored. This function CAN mutate the features passed to it (think "apply"). :type test_kwargs: dictionary :param test_kwargs: kwargs to pass to your closure when it is called. :type subfeatures: boolean :param subfeatures: when a feature is matched, should just that feature be yielded to the caller, or should the entire sub_feature tree for that feature be included? subfeatures=True is useful in cases such as searching for a gene feature, and wanting to know what RBS/Shine_Dalgarno_sequences are in the sub_feature tree (which can be accomplished with two feature_lambda calls). subfeatures=False is useful in cases when you want to process (and possibly return) the entire feature tree, such as applying a qualifier to every single feature. :rtype: yielded list :return: Yields a list of matching features. """ # Either the top level set of [features] or the subfeature attribute for feature in feature_list: if test(feature, **test_kwargs): if not subfeatures: feature_copy = copy.deepcopy(feature) feature_copy.sub_features = [] yield feature_copy else: yield feature if hasattr(feature, "sub_features"): for x in feature_lambda( feature.sub_features, test, test_kwargs, subfeatures=subfeatures, ): yield x def feature_test_qual_value(feature, **kwargs): """Test qualifier values. For every feature, check that at least one value in feature.quailfiers(kwargs['qualifier']) is in kwargs['attribute_list'] """ for attribute_value in feature.qualifiers.get(kwargs["qualifier"], []): if attribute_value in kwargs["attribute_list"]: return True return False def __get_features(child, interpro=False): child_features = {} for rec in GFF.parse(child): # Only top level for feature in rec.features: # Get the record id as parent_feature_id (since this is how it will be during remapping) parent_feature_id = rec.id # If it's an interpro specific gff3 file if interpro: # Then we ignore polypeptide features as they're useless if feature.type == "polypeptide": continue # If there's an underscore, we strip up to that underscore? # I do not know the rationale for this, removing. # if '_' in parent_feature_id: # parent_feature_id = parent_feature_id[parent_feature_id.index('_') + 1:] try: child_features[parent_feature_id].append(feature) except KeyError: child_features[parent_feature_id] = [feature] # Keep a list of feature objects keyed by parent record id return child_features def __update_feature_location(feature, parent, protein2dna): start = feature.location.start end = feature.location.end if protein2dna: start *= 3 end *= 3 if parent.location.strand != None and parent.location.strand >= 0: ns = parent.location.start + start ne = parent.location.start + end st = +1 else: ns = parent.location.end - end ne = parent.location.end - start st = -1 # Don't let start/stops be less than zero. It's technically valid for them # to be (at least in the model I'm working with) but it causes numerous # issues. # # Instead, we'll replace with %3 to try and keep it in the same reading # frame that it should be in. if ns < 0: ns %= 3 if ne < 0: ne %= 3 if ns > ne: ne, ns = ns, ne # dunno why but sometimes happens feature.location = FeatureLocation(ns, ne, strand=st) if hasattr(feature, "sub_features"): for subfeature in feature.sub_features: __update_feature_location(subfeature, parent, protein2dna) def rebase(parent, child, interpro=False, protein2dna=False, map_by="ID"): # get all of the features we will be re-mapping in a dictionary, keyed by parent feature ID child_features = __get_features(child, interpro=interpro) for rec in GFF.parse(parent): replacement_features = [] for feature in feature_lambda( rec.features, # Filter features in the parent genome by those that are # "interesting", i.e. have results in child_features array. # Probably an unnecessary optimisation. feature_test_qual_value, { "qualifier": map_by, "attribute_list": child_features.keys(), }, subfeatures=False, ): # Features which will be re-mapped to_remap = child_features[feature.id] # TODO: update starts fixed_features = [] for x in to_remap: # Then update the location of the actual feature __update_feature_location(x, feature, protein2dna) if interpro: for y in ("status", "Target"): try: del x.qualifiers[y] except Exception: pass fixed_features.append(x) replacement_features.extend(fixed_features) # We do this so we don't include the original set of features that we # were rebasing against in our result. rec.features = replacement_features rec.annotations = {} GFF.write([rec], sys.stdout) if __name__ == "__main__": parser = argparse.ArgumentParser( description="rebase gff3 features against parent locations", epilog="" ) parser.add_argument( "parent", type=argparse.FileType("r"), help="Parent GFF3 annotations" ) parser.add_argument( "child", type=argparse.FileType("r"), help="Child GFF3 annotations to rebase against parent", ) parser.add_argument( "--interpro", action="store_true", help="Interpro specific modifications", ) parser.add_argument( "--protein2dna", action="store_true", help="Map protein translated results to original DNA data", ) parser.add_argument("--map_by", help="Map by key", default="ID") args = parser.parse_args() rebase(**vars(args))