comparison phe/variant/__init__.py @ 0:834a312c0114 draft

Uploaded
author ulfschaefer
date Thu, 10 Dec 2015 09:22:39 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:834a312c0114
1 """Classes and methods to work with variants and such."""
2 import abc
3 #ulf
4 # from collections import OrderedDict
5 try:
6 from collections import OrderedDict
7 except ImportError:
8 from ordereddict import OrderedDict
9
10 import logging
11 import pickle
12
13 from vcf import filters
14 import vcf
15 from vcf.parser import _Filter
16
17 from phe.metadata import PHEMetaData
18 from phe.variant_filters import make_filters, PHEFilterBase, str_to_filters
19
20
21 class VCFTemplate(object):
22 """This is a small hack class for the Template used in generating
23 VCF file."""
24
25 def __init__(self, vcf_reader):
26 self.infos = vcf_reader.infos
27 self.formats = vcf_reader.formats
28 self.filters = vcf_reader.filters
29 self.alts = vcf_reader.alts
30 self.contigs = vcf_reader.contigs
31 self.metadata = vcf_reader.metadata
32 self._column_headers = vcf_reader._column_headers
33 self.samples = vcf_reader.samples
34
35 class VariantSet(object):
36 """A convenient representation of set of variants.
37 TODO: Implement iterator and generator for the variant set.
38 """
39
40 _reader = None
41
42 def __init__(self, vcf_in, filters=None):
43 """Constructor of variant set.
44
45 Parameters:
46 -----------
47 vcf_in: str
48 Path to the VCF file for loading information.
49 filters: str or dict, optional
50 Dictionary or string of the filter:threshold key value pairs.
51 """
52 self.vcf_in = vcf_in
53 self._reader = vcf.Reader(filename=vcf_in)
54 self.out_template = VCFTemplate(self._reader)
55
56 self.filters = []
57 if filters is not None:
58 if isinstance(filters, str):
59 self.filters = str_to_filters(filters)
60 elif isinstance(filters, dict):
61 self.filters = make_filters(config=filters)
62 elif isinstance(filters, list):
63 self.filters = filters
64 else:
65 logging.warn("Could not create filters from %s", filters)
66 else:
67 reader = vcf.Reader(filename=self.vcf_in)
68 filters = {}
69 for filter_id in reader.filters:
70 filters.update(PHEFilterBase.decode(filter_id))
71
72 if filters:
73 self.filters = make_filters(config=filters)
74
75 self.variants = []
76
77 def filter_variants(self, keep_only_snps=True):
78 """Create a variant """
79
80 if self._reader is None:
81 # Create a reader class from input VCF.
82 self._reader = vcf.Reader(filename=self.vcf_in)
83
84 # get list of existing filters.
85 existing_filters = {}
86 removed_filters = []
87
88 for filter_id in self._reader.filters:
89 conf = PHEFilterBase.decode(filter_id)
90 tuple(conf.keys())
91 existing_filters.update({tuple(conf.keys()):filter_id})
92
93 # Add each filter we are going to use to the record.
94 # This is needed for writing out proper #FILTER header in VCF.
95 for record_filter in self.filters:
96 # We know that each filter has short description method.
97 short_doc = record_filter.short_desc()
98 short_doc = short_doc.split('\n')[0].lstrip()
99
100 filter_name = PHEFilterBase.decode(record_filter.filter_name())
101
102 # Check if the sample has been filtered for this type of filter
103 # in the past. If so remove is, because it is going to be refiltered.
104 if tuple(filter_name) in existing_filters:
105 logging.info("Removing existing filter: %s", existing_filters[tuple(filter_name)])
106 removed_filters.append(existing_filters[tuple(filter_name)])
107 del self._reader.filters[existing_filters[tuple(filter_name)]]
108
109 self._reader.filters[record_filter.filter_name()] = _Filter(record_filter.filter_name(), short_doc)
110
111 # For each record (POSITION) apply set of filters.
112 for record in self._reader:
113
114 # If this record failed filters and we removed some,
115 # check is they need to be removed from record.
116 if isinstance(record.FILTER, list) and len(record.FILTER) > 0:
117 for filter_id in removed_filters:
118 if filter_id in record.FILTER:
119 record.FILTER.remove(filter_id)
120
121 for record_filter in self.filters:
122
123 # Call to __call__ method in each filter.
124 result = record_filter(record)
125
126 # Record is KEPT if filter returns None
127 if result == None:
128 continue
129
130 # If we got this far, then record is filtered OUT.
131 record.add_filter(record_filter.filter_name())
132
133 # After applying all filters, check if FILTER is None.
134 # If it is, then record PASSED all filters.
135 if record.FILTER is None or record.FILTER == []:
136 record.FILTER = 'PASS'
137 # FIXME: Does this work for indels?
138 if keep_only_snps and record.is_snp:
139 self.variants.append(record)
140 else:
141 self.variants.append(record)
142
143 self.update_filters(self._reader.filters)
144
145 return [ variant for variant in self.variants if variant.FILTER == "PASS"]
146
147 def add_metadata(self, info):
148 """Add metadata to the variant set.
149
150 Parameters:
151 -----------
152 info: dict
153 Dictionary of key value pairs to be inserted into metadata.
154 """
155 for info_key, metadata in info.items():
156 self.out_template.metadata[info_key] = metadata
157
158 def write_variants(self, vcf_out, only_snps=False, only_good=False):
159 """Write variants to a VCF file.
160
161 Parameters:
162 -----------
163 vcf_out: str
164 Path to the file where VCF data is written.
165 only_snps: bool, optional
166 True is *only* SNP are to be written to the output (default: False).
167 only_good: bool, optional
168 True if only those records that PASS all filters should be written
169 (default: False).
170
171 Returns:
172 int:
173 Number of records written.
174 """
175 written_variants = 0
176 with open(vcf_out, "w") as out_vcf:
177 writer = vcf.Writer(out_vcf, self.out_template)
178 for record in self.variants:
179
180 if only_snps and not record.is_snp:
181 continue
182
183 if only_good and record.FILTER != "PASS" or record.FILTER is None:
184 continue
185
186 writer.write_record(record)
187 written_variants += 1
188
189 return written_variants
190
191 def _write_bad_variants(self, vcf_out):
192 """**PRIVATE:** Write only those records that **haven't** passed."""
193 written_variants = 0
194 with open(vcf_out, "w") as out_vcf:
195 writer = vcf.Writer(out_vcf, self.out_template)
196 for record in self.variants:
197 if record.FILTER != "PASS" and record.FILTER is not None:
198 writer.write_record(record)
199 written_variants += 1
200 return written_variants
201
202 def serialise(self, out_file):
203 """Save the data in this class to a file for future use/reload.
204
205 Parameters:
206 -----------
207 out_file: str
208 path to file where the data should be written to.
209
210 Returns:
211 --------
212 int:
213 Number of variants written.
214 """
215 written_variants = 0
216 with open(out_file, "w") as out_vcf:
217 writer = vcf.Writer(out_vcf, self.out_template)
218 for record in self.variants:
219 writer.write_record(record)
220 written_variants += 1
221
222 return written_variants
223
224 def update_filters(self, new_filters):
225 """Update internal filters in the output template."""
226 for new_filter, filter_data in new_filters.items():
227 self.out_template.filters[new_filter] = filter_data
228
229
230 class VariantCaller(PHEMetaData):
231 """Abstract class used for access to the implemented variant callers."""
232
233 __metaclass__ = abc.ABCMeta
234
235 def __init__(self, cmd_options=None):
236 """Constructor for variant caller.
237
238 Parameters:
239 -----------
240 cmd_options: str, optional
241 Command options to pass to the variant command.
242 """
243 self.cmd_options = cmd_options
244
245 super(VariantCaller, self).__init__()
246
247 @abc.abstractmethod
248 def make_vcf(self, *args, **kwargs):
249 """Create a VCF from **BAM** file.
250
251 Parameters:
252 -----------
253 ref: str
254 Path to the reference file.
255 bam: str
256 Path to the indexed **BAM** file for calling variants.
257 vcf_file: str
258 path to the VCF file where data will be written to.
259
260 Returns:
261 --------
262 bool:
263 True if variant calling was successful, False otherwise.
264 """
265 raise NotImplementedError("make_vcf is not implemented yet.")
266
267 @abc.abstractmethod
268 def create_aux_files(self, ref):
269 """Create needed (if any) auxiliary files.
270 These files are required for proper functioning of the variant caller.
271 """
272 raise NotImplementedError("create_aux_files is not implemeted.")
273
274 @abc.abstractmethod
275 def get_info(self, plain=False):
276 """Get information about this variant caller."""
277 raise NotImplementedError("Get info has not been implemented yet."
278 )
279 def get_meta(self):
280 """Get the metadata about this variant caller."""
281 od = self.get_info()
282 od["ID"] = "VariantCaller"
283 return OrderedDict({"PHEVariantMetaData": [od]})
284
285 @abc.abstractmethod
286 def get_version(self):
287 """Get the version of the underlying command used."""
288 raise NotImplementedError("Get version has not been implemented yet.")