Mercurial > repos > ulfschaefer > vcfs2fasta
comparison phe/variant/__init__.py @ 14:f72039c5faa4 draft
Uploaded
| author | ulfschaefer | 
|---|---|
| date | Wed, 16 Dec 2015 07:29:05 -0500 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| 13:2e69ce9dca65 | 14:f72039c5faa4 | 
|---|---|
| 1 """Classes and methods to work with variants and such.""" | |
| 2 import abc | |
| 3 #ulf | |
| 4 # from collections import OrderedDict | |
| 5 try: | |
| 6 from collections import OrderedDict | |
| 7 except ImportError: | |
| 8 from ordereddict import OrderedDict | |
| 9 | |
| 10 import logging | |
| 11 import pickle | |
| 12 | |
| 13 from vcf import filters | |
| 14 import vcf | |
| 15 from vcf.parser import _Filter | |
| 16 | |
| 17 from phe.metadata import PHEMetaData | |
| 18 from phe.variant_filters import make_filters, PHEFilterBase, str_to_filters | |
| 19 | |
| 20 | |
| 21 class VCFTemplate(object): | |
| 22 """This is a small hack class for the Template used in generating | |
| 23 VCF file.""" | |
| 24 | |
| 25 def __init__(self, vcf_reader): | |
| 26 self.infos = vcf_reader.infos | |
| 27 self.formats = vcf_reader.formats | |
| 28 self.filters = vcf_reader.filters | |
| 29 self.alts = vcf_reader.alts | |
| 30 self.contigs = vcf_reader.contigs | |
| 31 self.metadata = vcf_reader.metadata | |
| 32 self._column_headers = vcf_reader._column_headers | |
| 33 self.samples = vcf_reader.samples | |
| 34 | |
| 35 class VariantSet(object): | |
| 36 """A convenient representation of set of variants. | |
| 37 TODO: Implement iterator and generator for the variant set. | |
| 38 """ | |
| 39 | |
| 40 _reader = None | |
| 41 | |
| 42 def __init__(self, vcf_in, filters=None): | |
| 43 """Constructor of variant set. | |
| 44 | |
| 45 Parameters: | |
| 46 ----------- | |
| 47 vcf_in: str | |
| 48 Path to the VCF file for loading information. | |
| 49 filters: str or dict, optional | |
| 50 Dictionary or string of the filter:threshold key value pairs. | |
| 51 """ | |
| 52 self.vcf_in = vcf_in | |
| 53 self._reader = vcf.Reader(filename=vcf_in) | |
| 54 self.out_template = VCFTemplate(self._reader) | |
| 55 | |
| 56 self.filters = [] | |
| 57 if filters is not None: | |
| 58 if isinstance(filters, str): | |
| 59 self.filters = str_to_filters(filters) | |
| 60 elif isinstance(filters, dict): | |
| 61 self.filters = make_filters(config=filters) | |
| 62 elif isinstance(filters, list): | |
| 63 self.filters = filters | |
| 64 else: | |
| 65 logging.warn("Could not create filters from %s", filters) | |
| 66 else: | |
| 67 reader = vcf.Reader(filename=self.vcf_in) | |
| 68 filters = {} | |
| 69 for filter_id in reader.filters: | |
| 70 filters.update(PHEFilterBase.decode(filter_id)) | |
| 71 | |
| 72 if filters: | |
| 73 self.filters = make_filters(config=filters) | |
| 74 | |
| 75 self.variants = [] | |
| 76 | |
| 77 def filter_variants(self, keep_only_snps=True): | |
| 78 """Create a variant """ | |
| 79 | |
| 80 if self._reader is None: | |
| 81 # Create a reader class from input VCF. | |
| 82 self._reader = vcf.Reader(filename=self.vcf_in) | |
| 83 | |
| 84 # get list of existing filters. | |
| 85 existing_filters = {} | |
| 86 removed_filters = [] | |
| 87 | |
| 88 for filter_id in self._reader.filters: | |
| 89 conf = PHEFilterBase.decode(filter_id) | |
| 90 tuple(conf.keys()) | |
| 91 existing_filters.update({tuple(conf.keys()):filter_id}) | |
| 92 | |
| 93 # Add each filter we are going to use to the record. | |
| 94 # This is needed for writing out proper #FILTER header in VCF. | |
| 95 for record_filter in self.filters: | |
| 96 # We know that each filter has short description method. | |
| 97 short_doc = record_filter.short_desc() | |
| 98 short_doc = short_doc.split('\n')[0].lstrip() | |
| 99 | |
| 100 filter_name = PHEFilterBase.decode(record_filter.filter_name()) | |
| 101 | |
| 102 # Check if the sample has been filtered for this type of filter | |
| 103 # in the past. If so remove is, because it is going to be refiltered. | |
| 104 if tuple(filter_name) in existing_filters: | |
| 105 logging.info("Removing existing filter: %s", existing_filters[tuple(filter_name)]) | |
| 106 removed_filters.append(existing_filters[tuple(filter_name)]) | |
| 107 del self._reader.filters[existing_filters[tuple(filter_name)]] | |
| 108 | |
| 109 self._reader.filters[record_filter.filter_name()] = _Filter(record_filter.filter_name(), short_doc) | |
| 110 | |
| 111 # For each record (POSITION) apply set of filters. | |
| 112 for record in self._reader: | |
| 113 | |
| 114 # If this record failed filters and we removed some, | |
| 115 # check is they need to be removed from record. | |
| 116 if isinstance(record.FILTER, list) and len(record.FILTER) > 0: | |
| 117 for filter_id in removed_filters: | |
| 118 if filter_id in record.FILTER: | |
| 119 record.FILTER.remove(filter_id) | |
| 120 | |
| 121 for record_filter in self.filters: | |
| 122 | |
| 123 # Call to __call__ method in each filter. | |
| 124 result = record_filter(record) | |
| 125 | |
| 126 # Record is KEPT if filter returns None | |
| 127 if result == None: | |
| 128 continue | |
| 129 | |
| 130 # If we got this far, then record is filtered OUT. | |
| 131 record.add_filter(record_filter.filter_name()) | |
| 132 | |
| 133 # After applying all filters, check if FILTER is None. | |
| 134 # If it is, then record PASSED all filters. | |
| 135 if record.FILTER is None or record.FILTER == []: | |
| 136 record.FILTER = 'PASS' | |
| 137 # FIXME: Does this work for indels? | |
| 138 if keep_only_snps and record.is_snp: | |
| 139 self.variants.append(record) | |
| 140 else: | |
| 141 self.variants.append(record) | |
| 142 | |
| 143 self.update_filters(self._reader.filters) | |
| 144 | |
| 145 return [ variant for variant in self.variants if variant.FILTER == "PASS"] | |
| 146 | |
| 147 def add_metadata(self, info): | |
| 148 """Add metadata to the variant set. | |
| 149 | |
| 150 Parameters: | |
| 151 ----------- | |
| 152 info: dict | |
| 153 Dictionary of key value pairs to be inserted into metadata. | |
| 154 """ | |
| 155 for info_key, metadata in info.items(): | |
| 156 self.out_template.metadata[info_key] = metadata | |
| 157 | |
| 158 def write_variants(self, vcf_out, only_snps=False, only_good=False): | |
| 159 """Write variants to a VCF file. | |
| 160 | |
| 161 Parameters: | |
| 162 ----------- | |
| 163 vcf_out: str | |
| 164 Path to the file where VCF data is written. | |
| 165 only_snps: bool, optional | |
| 166 True is *only* SNP are to be written to the output (default: False). | |
| 167 only_good: bool, optional | |
| 168 True if only those records that PASS all filters should be written | |
| 169 (default: False). | |
| 170 | |
| 171 Returns: | |
| 172 int: | |
| 173 Number of records written. | |
| 174 """ | |
| 175 written_variants = 0 | |
| 176 with open(vcf_out, "w") as out_vcf: | |
| 177 writer = vcf.Writer(out_vcf, self.out_template) | |
| 178 for record in self.variants: | |
| 179 | |
| 180 if only_snps and not record.is_snp: | |
| 181 continue | |
| 182 | |
| 183 if only_good and record.FILTER != "PASS" or record.FILTER is None: | |
| 184 continue | |
| 185 | |
| 186 writer.write_record(record) | |
| 187 written_variants += 1 | |
| 188 | |
| 189 return written_variants | |
| 190 | |
| 191 def _write_bad_variants(self, vcf_out): | |
| 192 """**PRIVATE:** Write only those records that **haven't** passed.""" | |
| 193 written_variants = 0 | |
| 194 with open(vcf_out, "w") as out_vcf: | |
| 195 writer = vcf.Writer(out_vcf, self.out_template) | |
| 196 for record in self.variants: | |
| 197 if record.FILTER != "PASS" and record.FILTER is not None: | |
| 198 writer.write_record(record) | |
| 199 written_variants += 1 | |
| 200 return written_variants | |
| 201 | |
| 202 def serialise(self, out_file): | |
| 203 """Save the data in this class to a file for future use/reload. | |
| 204 | |
| 205 Parameters: | |
| 206 ----------- | |
| 207 out_file: str | |
| 208 path to file where the data should be written to. | |
| 209 | |
| 210 Returns: | |
| 211 -------- | |
| 212 int: | |
| 213 Number of variants written. | |
| 214 """ | |
| 215 written_variants = 0 | |
| 216 with open(out_file, "w") as out_vcf: | |
| 217 writer = vcf.Writer(out_vcf, self.out_template) | |
| 218 for record in self.variants: | |
| 219 writer.write_record(record) | |
| 220 written_variants += 1 | |
| 221 | |
| 222 return written_variants | |
| 223 | |
| 224 def update_filters(self, new_filters): | |
| 225 """Update internal filters in the output template.""" | |
| 226 for new_filter, filter_data in new_filters.items(): | |
| 227 self.out_template.filters[new_filter] = filter_data | |
| 228 | |
| 229 | |
| 230 class VariantCaller(PHEMetaData): | |
| 231 """Abstract class used for access to the implemented variant callers.""" | |
| 232 | |
| 233 __metaclass__ = abc.ABCMeta | |
| 234 | |
| 235 def __init__(self, cmd_options=None): | |
| 236 """Constructor for variant caller. | |
| 237 | |
| 238 Parameters: | |
| 239 ----------- | |
| 240 cmd_options: str, optional | |
| 241 Command options to pass to the variant command. | |
| 242 """ | |
| 243 self.cmd_options = cmd_options | |
| 244 | |
| 245 super(VariantCaller, self).__init__() | |
| 246 | |
| 247 @abc.abstractmethod | |
| 248 def make_vcf(self, *args, **kwargs): | |
| 249 """Create a VCF from **BAM** file. | |
| 250 | |
| 251 Parameters: | |
| 252 ----------- | |
| 253 ref: str | |
| 254 Path to the reference file. | |
| 255 bam: str | |
| 256 Path to the indexed **BAM** file for calling variants. | |
| 257 vcf_file: str | |
| 258 path to the VCF file where data will be written to. | |
| 259 | |
| 260 Returns: | |
| 261 -------- | |
| 262 bool: | |
| 263 True if variant calling was successful, False otherwise. | |
| 264 """ | |
| 265 raise NotImplementedError("make_vcf is not implemented yet.") | |
| 266 | |
| 267 @abc.abstractmethod | |
| 268 def create_aux_files(self, ref): | |
| 269 """Create needed (if any) auxiliary files. | |
| 270 These files are required for proper functioning of the variant caller. | |
| 271 """ | |
| 272 raise NotImplementedError("create_aux_files is not implemeted.") | |
| 273 | |
| 274 @abc.abstractmethod | |
| 275 def get_info(self, plain=False): | |
| 276 """Get information about this variant caller.""" | |
| 277 raise NotImplementedError("Get info has not been implemented yet." | |
| 278 ) | |
| 279 def get_meta(self): | |
| 280 """Get the metadata about this variant caller.""" | |
| 281 od = self.get_info() | |
| 282 od["ID"] = "VariantCaller" | |
| 283 return OrderedDict({"PHEVariantMetaData": [od]}) | |
| 284 | |
| 285 @abc.abstractmethod | |
| 286 def get_version(self): | |
| 287 """Get the version of the underlying command used.""" | |
| 288 raise NotImplementedError("Get version has not been implemented yet.") | 
