Mercurial > repos > ulfschaefer > filter_vcf
diff phe/variant_filters/GTFilter.py @ 0:834a312c0114 draft
Uploaded
author | ulfschaefer |
---|---|
date | Thu, 10 Dec 2015 09:22:39 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/phe/variant_filters/GTFilter.py Thu Dec 10 09:22:39 2015 -0500 @@ -0,0 +1,72 @@ +'''Filter VCF on GT filter parameter. + +Created on 24 Sep 2015 + +@author: alex +''' + +import argparse +import logging + +from phe.variant_filters import PHEFilterBase + + +class UncallableGTFilter(PHEFilterBase): + '''Filter uncallable genotypes''' + + name = "UncallGT" + _default_threshold = None + parameter = "uncall_gt" + + @classmethod + def customize_parser(self, parser): + arg_name = self.parameter.replace("_", "-") + parser.add_argument("--%s" % arg_name, type=str, default=self._default_threshold, + help="Filter sites below given GQ score (default: %s)" % self._default_threshold) + + def __init__(self, args): + """Min Depth constructor.""" + # This needs to happen first, because threshold is initialised here. + super(UncallableGTFilter, self).__init__(args) + + # Change the threshold to custom gq value. + self.threshold = self._default_threshold + if isinstance(args, argparse.Namespace): + self.threshold = args.gq_score + elif isinstance(args, dict): + try: + self.threshold = str(args.get(self.parameter)) + except TypeError: + logging.error("Could not retrieve threshold from %s", args.get(self.parameter)) + self.threshold = None + + def __call__(self, record): + """Filter a :py:class:`vcf.model._Record`.""" + + if len(record.samples) > 1: + logging.warn("More than 1 sample detected. Only first is considered.") + + try: + record_gt = record.samples[0].data.GT + except AttributeError: + logging.error("Could not retrieve GQ score POS %i", record.POS) + record_gt = None + + if record_gt is None: + return "./." + else: + return None + + def short_desc(self): + short_desc = self.__doc__ or '' + + if short_desc: + short_desc = "%s (GT != ./. )" % (short_desc) + + return short_desc + + def is_gap(self): + return True + + def is_n(self): + return False