Mercurial > repos > ulfschaefer > filter_vcf
comparison phe/variant_filters/__init__.py @ 10:c2f8e7580133 draft
Uploaded
| author | ulfschaefer | 
|---|---|
| date | Mon, 21 Dec 2015 10:50:17 -0500 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| 9:2e3115b4df74 | 10:c2f8e7580133 | 
|---|---|
| 1 """Classes and functions for working with variant filters.""" | |
| 2 | |
| 3 from __builtin__ import __import__ | |
| 4 from abc import abstractproperty | |
| 5 import abc | |
| 6 import argparse | |
| 7 import glob | |
| 8 import inspect | |
| 9 import logging | |
| 10 import os | |
| 11 import re | |
| 12 import sys | |
| 13 | |
| 14 import vcf | |
| 15 import vcf.filters | |
| 16 from vcf.parser import _Filter | |
| 17 | |
| 18 IUPAC_CODES = {frozenset(["A"]): "A", | |
| 19 frozenset(["C"]): "C", | |
| 20 frozenset(["G"]): "G", | |
| 21 frozenset(["T"]): "T", | |
| 22 frozenset(["A", "G"]): "R", | |
| 23 frozenset(["C", "T"]): "Y", | |
| 24 frozenset(["G", "C"]): "S", | |
| 25 frozenset(["A", "T"]): "W", | |
| 26 frozenset(["G", "T"]): "K", | |
| 27 frozenset(["A", "C"]): "M", | |
| 28 frozenset(["C", "G", "T"]): "B", | |
| 29 frozenset(["A", "G", "T"]): "D", | |
| 30 frozenset(["A", "C", "T"]): "H", | |
| 31 frozenset(["A", "C", "G"]): "V" | |
| 32 } | |
| 33 | |
| 34 class PHEFilterBase(vcf.filters.Base): | |
| 35 """Base class for VCF filters.""" | |
| 36 __meta__ = abc.ABCMeta | |
| 37 | |
| 38 magic_sep = ":" | |
| 39 decoder_pattern = re.compile(magic_sep) | |
| 40 | |
| 41 @abc.abstractproperty | |
| 42 def parameter(self): | |
| 43 """Short name of parameter being filtered.""" | |
| 44 return self.parameter | |
| 45 | |
| 46 @abc.abstractproperty | |
| 47 def _default_threshold(self): | |
| 48 """Default threshold for filtering.""" | |
| 49 return self._default_threshold | |
| 50 | |
| 51 def __init__(self, args): | |
| 52 super(PHEFilterBase, self).__init__(args) | |
| 53 | |
| 54 # Change the threshold to custom gq value. | |
| 55 self.threshold = self._default_threshold | |
| 56 | |
| 57 if isinstance(args, dict): | |
| 58 self.threshold = args.get(self.parameter) | |
| 59 | |
| 60 def __str__(self): | |
| 61 return self.filter_name() | |
| 62 | |
| 63 def _check_record(self, record): | |
| 64 if self.is_uncallable(record): | |
| 65 return False | |
| 66 elif record.is_monomorphic: | |
| 67 return None | |
| 68 else: | |
| 69 return True | |
| 70 | |
| 71 @abc.abstractmethod | |
| 72 def short_desc(self): | |
| 73 """Short description of the filter (included in VCF).""" | |
| 74 raise NotImplementedError("Get short description is not implemented.") | |
| 75 | |
| 76 def get_config(self): | |
| 77 """This is used for reconstructing filter.""" | |
| 78 return {self.parameter: self.threshold} | |
| 79 | |
| 80 def filter_name(self): | |
| 81 """Create filter names by their parameter separated by magic. | |
| 82 E.g. if filter parameter is ad_ratio and threshold is 0.9 then | |
| 83 ad_ratio:0.9 if the filter name. | |
| 84 """ | |
| 85 return "%s%s%s" % (self.parameter, self.magic_sep, self.threshold) | |
| 86 | |
| 87 @staticmethod | |
| 88 def decode(filter_id): | |
| 89 """Decode name of filter.""" | |
| 90 conf = {} | |
| 91 | |
| 92 if PHEFilterBase.magic_sep in filter_id: | |
| 93 info = PHEFilterBase.decoder_pattern.split(filter_id) | |
| 94 assert len(info) == 2 | |
| 95 conf[info[0]] = info[1] | |
| 96 return conf | |
| 97 | |
| 98 def is_gap(self): | |
| 99 return False | |
| 100 | |
| 101 def is_n(self): | |
| 102 return True | |
| 103 | |
| 104 def is_uncallable(self, record): | |
| 105 """Filter a :py:class:`vcf.model._Record`.""" | |
| 106 | |
| 107 if len(record.samples) > 1: | |
| 108 logging.warn("More than 1 sample detected. Only first is considered.") | |
| 109 | |
| 110 try: | |
| 111 record_gt = record.samples[0].data.GT | |
| 112 except AttributeError: | |
| 113 logging.warn("Could not retrieve GQ score POS %i", record.POS) | |
| 114 record_gt = "./." | |
| 115 | |
| 116 if record_gt == "./.": | |
| 117 return True | |
| 118 else: | |
| 119 return False | |
| 120 | |
| 121 @staticmethod | |
| 122 def call_concensus(record): | |
| 123 if not record.FILTER: | |
| 124 sample_ad = frozenset([str(c).upper() for c in record.ALT]) | |
| 125 return IUPAC_CODES.get(sample_ad, "N") | |
| 126 | |
| 127 else: | |
| 128 sample_ad = frozenset([str(c).upper() for c in record.ALT] + [record.REF]) | |
| 129 | |
| 130 return IUPAC_CODES.get(sample_ad, "N") | |
| 131 | |
| 132 def dynamic_filter_loader(): | |
| 133 """Fancy way of dynamically importing existing filters. | |
| 134 | |
| 135 Returns | |
| 136 ------- | |
| 137 dict: | |
| 138 Available filters dictionary. Keys are parameters that | |
| 139 can be supplied to the filters. | |
| 140 """ | |
| 141 | |
| 142 # We assume the filters are in the same directory as THIS file. | |
| 143 filter_dir = os.path.dirname(__file__) | |
| 144 filter_dir = os.path.abspath(filter_dir) | |
| 145 | |
| 146 # This is populated when the module is first imported. | |
| 147 avail_filters = {} | |
| 148 | |
| 149 # Add this directory to the syspath. | |
| 150 sys.path.insert(0, filter_dir) | |
| 151 | |
| 152 # Find all "py" files. | |
| 153 for filter_mod in glob.glob(os.path.join(filter_dir, "*.py")): | |
| 154 | |
| 155 # Derive name of the module where filter is. | |
| 156 filter_mod_file = os.path.basename(filter_mod) | |
| 157 | |
| 158 # Ignore this file, obviously. | |
| 159 if filter_mod_file.startswith("__init__"): | |
| 160 continue | |
| 161 | |
| 162 # Import the module with a filter. | |
| 163 mod = __import__(filter_mod_file.replace(".pyc", "").replace(".py", "")) | |
| 164 | |
| 165 # Find all the classes contained in this module. | |
| 166 classes = inspect.getmembers(mod, inspect.isclass) | |
| 167 for cls_name, cls in classes: | |
| 168 # For each class, if it is a sublass of PHEFilterBase, add it. | |
| 169 if cls_name != "PHEFilterBase" and issubclass(cls, PHEFilterBase): | |
| 170 # The parameters are inherited and defined within each filter. | |
| 171 avail_filters[cls.parameter] = cls | |
| 172 | |
| 173 sys.path.remove(filter_dir) | |
| 174 | |
| 175 return avail_filters | |
| 176 | |
| 177 _avail_filters = dynamic_filter_loader() | |
| 178 | |
| 179 def available_filters(): | |
| 180 """Return list of available filters.""" | |
| 181 return _avail_filters.keys() | |
| 182 | |
| 183 def str_to_filters(filters): | |
| 184 """Convert from filter string to array of filters. | |
| 185 E.g. ad_ration:0.9,min_depth:5 | |
| 186 | |
| 187 Parameters: | |
| 188 ----------- | |
| 189 filters: str | |
| 190 String version of filters, separated by comma. | |
| 191 | |
| 192 Returns: | |
| 193 -------- | |
| 194 list: | |
| 195 List of :py:class:`phe.variant_filters.PHEFilterBase` instances. | |
| 196 """ | |
| 197 | |
| 198 config = {} | |
| 199 for kv_pair in filters.split(","): | |
| 200 pair = kv_pair.split(":") | |
| 201 assert len(pair) == 2, "Filters should be separated by ':' %s" % kv_pair | |
| 202 | |
| 203 # We don't care about casting them to correct type because Filters | |
| 204 # will do it for us. | |
| 205 config[pair[0]] = pair[1] | |
| 206 | |
| 207 return make_filters(config) | |
| 208 | |
| 209 def make_filters(config): | |
| 210 """Create a list of filters from *config*. | |
| 211 | |
| 212 Parameters: | |
| 213 ----------- | |
| 214 config: dict, optional | |
| 215 Dictionary with parameter: value pairs. For each parameter, an | |
| 216 appropriate Filter will be found and instanciated. | |
| 217 | |
| 218 Returns: | |
| 219 -------- | |
| 220 list: | |
| 221 List of :py:class:`PHEFilterBase` filters. | |
| 222 """ | |
| 223 filters = [] | |
| 224 | |
| 225 if config: | |
| 226 for custom_filter in config: | |
| 227 if custom_filter in _avail_filters: | |
| 228 filters.append(_avail_filters[custom_filter](config)) | |
| 229 else: | |
| 230 logging.error("Could not find appropriate filter for %s", | |
| 231 custom_filter) | |
| 232 raise Exception("Filter %s could not be created. Please check your filter arguments." % custom_filter) | |
| 233 | |
| 234 return filters | 
