comparison phe/variant_filters/__init__.py @ 10:c2f8e7580133 draft

Uploaded
author ulfschaefer
date Mon, 21 Dec 2015 10:50:17 -0500
parents
children
comparison
equal deleted inserted replaced
9:2e3115b4df74 10:c2f8e7580133
1 """Classes and functions for working with variant filters."""
2
3 from __builtin__ import __import__
4 from abc import abstractproperty
5 import abc
6 import argparse
7 import glob
8 import inspect
9 import logging
10 import os
11 import re
12 import sys
13
14 import vcf
15 import vcf.filters
16 from vcf.parser import _Filter
17
18 IUPAC_CODES = {frozenset(["A"]): "A",
19 frozenset(["C"]): "C",
20 frozenset(["G"]): "G",
21 frozenset(["T"]): "T",
22 frozenset(["A", "G"]): "R",
23 frozenset(["C", "T"]): "Y",
24 frozenset(["G", "C"]): "S",
25 frozenset(["A", "T"]): "W",
26 frozenset(["G", "T"]): "K",
27 frozenset(["A", "C"]): "M",
28 frozenset(["C", "G", "T"]): "B",
29 frozenset(["A", "G", "T"]): "D",
30 frozenset(["A", "C", "T"]): "H",
31 frozenset(["A", "C", "G"]): "V"
32 }
33
34 class PHEFilterBase(vcf.filters.Base):
35 """Base class for VCF filters."""
36 __meta__ = abc.ABCMeta
37
38 magic_sep = ":"
39 decoder_pattern = re.compile(magic_sep)
40
41 @abc.abstractproperty
42 def parameter(self):
43 """Short name of parameter being filtered."""
44 return self.parameter
45
46 @abc.abstractproperty
47 def _default_threshold(self):
48 """Default threshold for filtering."""
49 return self._default_threshold
50
51 def __init__(self, args):
52 super(PHEFilterBase, self).__init__(args)
53
54 # Change the threshold to custom gq value.
55 self.threshold = self._default_threshold
56
57 if isinstance(args, dict):
58 self.threshold = args.get(self.parameter)
59
60 def __str__(self):
61 return self.filter_name()
62
63 def _check_record(self, record):
64 if self.is_uncallable(record):
65 return False
66 elif record.is_monomorphic:
67 return None
68 else:
69 return True
70
71 @abc.abstractmethod
72 def short_desc(self):
73 """Short description of the filter (included in VCF)."""
74 raise NotImplementedError("Get short description is not implemented.")
75
76 def get_config(self):
77 """This is used for reconstructing filter."""
78 return {self.parameter: self.threshold}
79
80 def filter_name(self):
81 """Create filter names by their parameter separated by magic.
82 E.g. if filter parameter is ad_ratio and threshold is 0.9 then
83 ad_ratio:0.9 if the filter name.
84 """
85 return "%s%s%s" % (self.parameter, self.magic_sep, self.threshold)
86
87 @staticmethod
88 def decode(filter_id):
89 """Decode name of filter."""
90 conf = {}
91
92 if PHEFilterBase.magic_sep in filter_id:
93 info = PHEFilterBase.decoder_pattern.split(filter_id)
94 assert len(info) == 2
95 conf[info[0]] = info[1]
96 return conf
97
98 def is_gap(self):
99 return False
100
101 def is_n(self):
102 return True
103
104 def is_uncallable(self, record):
105 """Filter a :py:class:`vcf.model._Record`."""
106
107 if len(record.samples) > 1:
108 logging.warn("More than 1 sample detected. Only first is considered.")
109
110 try:
111 record_gt = record.samples[0].data.GT
112 except AttributeError:
113 logging.warn("Could not retrieve GQ score POS %i", record.POS)
114 record_gt = "./."
115
116 if record_gt == "./.":
117 return True
118 else:
119 return False
120
121 @staticmethod
122 def call_concensus(record):
123 if not record.FILTER:
124 sample_ad = frozenset([str(c).upper() for c in record.ALT])
125 return IUPAC_CODES.get(sample_ad, "N")
126
127 else:
128 sample_ad = frozenset([str(c).upper() for c in record.ALT] + [record.REF])
129
130 return IUPAC_CODES.get(sample_ad, "N")
131
132 def dynamic_filter_loader():
133 """Fancy way of dynamically importing existing filters.
134
135 Returns
136 -------
137 dict:
138 Available filters dictionary. Keys are parameters that
139 can be supplied to the filters.
140 """
141
142 # We assume the filters are in the same directory as THIS file.
143 filter_dir = os.path.dirname(__file__)
144 filter_dir = os.path.abspath(filter_dir)
145
146 # This is populated when the module is first imported.
147 avail_filters = {}
148
149 # Add this directory to the syspath.
150 sys.path.insert(0, filter_dir)
151
152 # Find all "py" files.
153 for filter_mod in glob.glob(os.path.join(filter_dir, "*.py")):
154
155 # Derive name of the module where filter is.
156 filter_mod_file = os.path.basename(filter_mod)
157
158 # Ignore this file, obviously.
159 if filter_mod_file.startswith("__init__"):
160 continue
161
162 # Import the module with a filter.
163 mod = __import__(filter_mod_file.replace(".pyc", "").replace(".py", ""))
164
165 # Find all the classes contained in this module.
166 classes = inspect.getmembers(mod, inspect.isclass)
167 for cls_name, cls in classes:
168 # For each class, if it is a sublass of PHEFilterBase, add it.
169 if cls_name != "PHEFilterBase" and issubclass(cls, PHEFilterBase):
170 # The parameters are inherited and defined within each filter.
171 avail_filters[cls.parameter] = cls
172
173 sys.path.remove(filter_dir)
174
175 return avail_filters
176
177 _avail_filters = dynamic_filter_loader()
178
179 def available_filters():
180 """Return list of available filters."""
181 return _avail_filters.keys()
182
183 def str_to_filters(filters):
184 """Convert from filter string to array of filters.
185 E.g. ad_ration:0.9,min_depth:5
186
187 Parameters:
188 -----------
189 filters: str
190 String version of filters, separated by comma.
191
192 Returns:
193 --------
194 list:
195 List of :py:class:`phe.variant_filters.PHEFilterBase` instances.
196 """
197
198 config = {}
199 for kv_pair in filters.split(","):
200 pair = kv_pair.split(":")
201 assert len(pair) == 2, "Filters should be separated by ':' %s" % kv_pair
202
203 # We don't care about casting them to correct type because Filters
204 # will do it for us.
205 config[pair[0]] = pair[1]
206
207 return make_filters(config)
208
209 def make_filters(config):
210 """Create a list of filters from *config*.
211
212 Parameters:
213 -----------
214 config: dict, optional
215 Dictionary with parameter: value pairs. For each parameter, an
216 appropriate Filter will be found and instanciated.
217
218 Returns:
219 --------
220 list:
221 List of :py:class:`PHEFilterBase` filters.
222 """
223 filters = []
224
225 if config:
226 for custom_filter in config:
227 if custom_filter in _avail_filters:
228 filters.append(_avail_filters[custom_filter](config))
229 else:
230 logging.error("Could not find appropriate filter for %s",
231 custom_filter)
232 raise Exception("Filter %s could not be created. Please check your filter arguments." % custom_filter)
233
234 return filters