Mercurial > repos > ulfschaefer > filter_vcf
comparison phe/variant_filters/__init__.py @ 10:c2f8e7580133 draft
Uploaded
author | ulfschaefer |
---|---|
date | Mon, 21 Dec 2015 10:50:17 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
9:2e3115b4df74 | 10:c2f8e7580133 |
---|---|
1 """Classes and functions for working with variant filters.""" | |
2 | |
3 from __builtin__ import __import__ | |
4 from abc import abstractproperty | |
5 import abc | |
6 import argparse | |
7 import glob | |
8 import inspect | |
9 import logging | |
10 import os | |
11 import re | |
12 import sys | |
13 | |
14 import vcf | |
15 import vcf.filters | |
16 from vcf.parser import _Filter | |
17 | |
18 IUPAC_CODES = {frozenset(["A"]): "A", | |
19 frozenset(["C"]): "C", | |
20 frozenset(["G"]): "G", | |
21 frozenset(["T"]): "T", | |
22 frozenset(["A", "G"]): "R", | |
23 frozenset(["C", "T"]): "Y", | |
24 frozenset(["G", "C"]): "S", | |
25 frozenset(["A", "T"]): "W", | |
26 frozenset(["G", "T"]): "K", | |
27 frozenset(["A", "C"]): "M", | |
28 frozenset(["C", "G", "T"]): "B", | |
29 frozenset(["A", "G", "T"]): "D", | |
30 frozenset(["A", "C", "T"]): "H", | |
31 frozenset(["A", "C", "G"]): "V" | |
32 } | |
33 | |
34 class PHEFilterBase(vcf.filters.Base): | |
35 """Base class for VCF filters.""" | |
36 __meta__ = abc.ABCMeta | |
37 | |
38 magic_sep = ":" | |
39 decoder_pattern = re.compile(magic_sep) | |
40 | |
41 @abc.abstractproperty | |
42 def parameter(self): | |
43 """Short name of parameter being filtered.""" | |
44 return self.parameter | |
45 | |
46 @abc.abstractproperty | |
47 def _default_threshold(self): | |
48 """Default threshold for filtering.""" | |
49 return self._default_threshold | |
50 | |
51 def __init__(self, args): | |
52 super(PHEFilterBase, self).__init__(args) | |
53 | |
54 # Change the threshold to custom gq value. | |
55 self.threshold = self._default_threshold | |
56 | |
57 if isinstance(args, dict): | |
58 self.threshold = args.get(self.parameter) | |
59 | |
60 def __str__(self): | |
61 return self.filter_name() | |
62 | |
63 def _check_record(self, record): | |
64 if self.is_uncallable(record): | |
65 return False | |
66 elif record.is_monomorphic: | |
67 return None | |
68 else: | |
69 return True | |
70 | |
71 @abc.abstractmethod | |
72 def short_desc(self): | |
73 """Short description of the filter (included in VCF).""" | |
74 raise NotImplementedError("Get short description is not implemented.") | |
75 | |
76 def get_config(self): | |
77 """This is used for reconstructing filter.""" | |
78 return {self.parameter: self.threshold} | |
79 | |
80 def filter_name(self): | |
81 """Create filter names by their parameter separated by magic. | |
82 E.g. if filter parameter is ad_ratio and threshold is 0.9 then | |
83 ad_ratio:0.9 if the filter name. | |
84 """ | |
85 return "%s%s%s" % (self.parameter, self.magic_sep, self.threshold) | |
86 | |
87 @staticmethod | |
88 def decode(filter_id): | |
89 """Decode name of filter.""" | |
90 conf = {} | |
91 | |
92 if PHEFilterBase.magic_sep in filter_id: | |
93 info = PHEFilterBase.decoder_pattern.split(filter_id) | |
94 assert len(info) == 2 | |
95 conf[info[0]] = info[1] | |
96 return conf | |
97 | |
98 def is_gap(self): | |
99 return False | |
100 | |
101 def is_n(self): | |
102 return True | |
103 | |
104 def is_uncallable(self, record): | |
105 """Filter a :py:class:`vcf.model._Record`.""" | |
106 | |
107 if len(record.samples) > 1: | |
108 logging.warn("More than 1 sample detected. Only first is considered.") | |
109 | |
110 try: | |
111 record_gt = record.samples[0].data.GT | |
112 except AttributeError: | |
113 logging.warn("Could not retrieve GQ score POS %i", record.POS) | |
114 record_gt = "./." | |
115 | |
116 if record_gt == "./.": | |
117 return True | |
118 else: | |
119 return False | |
120 | |
121 @staticmethod | |
122 def call_concensus(record): | |
123 if not record.FILTER: | |
124 sample_ad = frozenset([str(c).upper() for c in record.ALT]) | |
125 return IUPAC_CODES.get(sample_ad, "N") | |
126 | |
127 else: | |
128 sample_ad = frozenset([str(c).upper() for c in record.ALT] + [record.REF]) | |
129 | |
130 return IUPAC_CODES.get(sample_ad, "N") | |
131 | |
132 def dynamic_filter_loader(): | |
133 """Fancy way of dynamically importing existing filters. | |
134 | |
135 Returns | |
136 ------- | |
137 dict: | |
138 Available filters dictionary. Keys are parameters that | |
139 can be supplied to the filters. | |
140 """ | |
141 | |
142 # We assume the filters are in the same directory as THIS file. | |
143 filter_dir = os.path.dirname(__file__) | |
144 filter_dir = os.path.abspath(filter_dir) | |
145 | |
146 # This is populated when the module is first imported. | |
147 avail_filters = {} | |
148 | |
149 # Add this directory to the syspath. | |
150 sys.path.insert(0, filter_dir) | |
151 | |
152 # Find all "py" files. | |
153 for filter_mod in glob.glob(os.path.join(filter_dir, "*.py")): | |
154 | |
155 # Derive name of the module where filter is. | |
156 filter_mod_file = os.path.basename(filter_mod) | |
157 | |
158 # Ignore this file, obviously. | |
159 if filter_mod_file.startswith("__init__"): | |
160 continue | |
161 | |
162 # Import the module with a filter. | |
163 mod = __import__(filter_mod_file.replace(".pyc", "").replace(".py", "")) | |
164 | |
165 # Find all the classes contained in this module. | |
166 classes = inspect.getmembers(mod, inspect.isclass) | |
167 for cls_name, cls in classes: | |
168 # For each class, if it is a sublass of PHEFilterBase, add it. | |
169 if cls_name != "PHEFilterBase" and issubclass(cls, PHEFilterBase): | |
170 # The parameters are inherited and defined within each filter. | |
171 avail_filters[cls.parameter] = cls | |
172 | |
173 sys.path.remove(filter_dir) | |
174 | |
175 return avail_filters | |
176 | |
177 _avail_filters = dynamic_filter_loader() | |
178 | |
179 def available_filters(): | |
180 """Return list of available filters.""" | |
181 return _avail_filters.keys() | |
182 | |
183 def str_to_filters(filters): | |
184 """Convert from filter string to array of filters. | |
185 E.g. ad_ration:0.9,min_depth:5 | |
186 | |
187 Parameters: | |
188 ----------- | |
189 filters: str | |
190 String version of filters, separated by comma. | |
191 | |
192 Returns: | |
193 -------- | |
194 list: | |
195 List of :py:class:`phe.variant_filters.PHEFilterBase` instances. | |
196 """ | |
197 | |
198 config = {} | |
199 for kv_pair in filters.split(","): | |
200 pair = kv_pair.split(":") | |
201 assert len(pair) == 2, "Filters should be separated by ':' %s" % kv_pair | |
202 | |
203 # We don't care about casting them to correct type because Filters | |
204 # will do it for us. | |
205 config[pair[0]] = pair[1] | |
206 | |
207 return make_filters(config) | |
208 | |
209 def make_filters(config): | |
210 """Create a list of filters from *config*. | |
211 | |
212 Parameters: | |
213 ----------- | |
214 config: dict, optional | |
215 Dictionary with parameter: value pairs. For each parameter, an | |
216 appropriate Filter will be found and instanciated. | |
217 | |
218 Returns: | |
219 -------- | |
220 list: | |
221 List of :py:class:`PHEFilterBase` filters. | |
222 """ | |
223 filters = [] | |
224 | |
225 if config: | |
226 for custom_filter in config: | |
227 if custom_filter in _avail_filters: | |
228 filters.append(_avail_filters[custom_filter](config)) | |
229 else: | |
230 logging.error("Could not find appropriate filter for %s", | |
231 custom_filter) | |
232 raise Exception("Filter %s could not be created. Please check your filter arguments." % custom_filter) | |
233 | |
234 return filters |