Mercurial > repos > pmac > iterativepca
view pedify.py @ 0:64e75e21466e draft default tip
Uploaded
author | pmac |
---|---|
date | Wed, 01 Jun 2016 03:38:39 -0400 |
parents | |
children |
line wrap: on
line source
import sys import csv import argparse DEBUG = 0 REQ_KEYS = ['genotype_column', 'reference_column', 'alternate_column', 'sample_id_column', 'chromosome_column', 'position_column', 'variant_id_column'] GENOTYPE_DICT = { "'0/0": "hom_ref", "'0/1": "het", "'1/1": "hom_alt", "'1/2": "tri_allelic" } GENOTYPE_TO_NUMERIC = { "'0/0": 0, "'0/1": 1, "'1/1": 2, "'1/2": 2 } class PedConverter: def __init__(self): self.cfg = None self.samples = {} self.sites = {} self.xsamples = [] def verify_column_names(self, datafile_header): # check all the config column names actually exist in the data file for col in self.cfg.col_names.values(): if col not in datafile_header: print "The '{}' column was not found in the datafile! Double check your config file is correct. Exiting...".format( col) sys.exit(1) def verify_filters(self, datafile_header): # print warning messages if filters invalid all_filters = self.cfg.nfilters.copy() all_filters.update(self.cfg.sfilters) for key in all_filters: col_name = all_filters[key]["col_name"] if col_name not in datafile_header: print "Warning! The '{}' filter was not applied as the datafile does not contain the column '{}'".format( key, col_name) def read_config_file(self, cfname): self.cfg = ConfigSettings() rc = self.cfg.parse_config_file(cfname) return rc def read_data_file(self, dfname): if (self.cfg == None) or (not self.cfg.is_valid()): return 1 datafile = open(dfname, 'r') dreader = csv.DictReader(datafile, delimiter='\t') # verify datafile data matches config file self.verify_column_names(dreader.fieldnames) self.verify_filters(dreader.fieldnames) all_sample_ids = set() i = 0 for row in dreader: failed_filters = self.filter_all(row) sample_key = row[self.cfg.col_names['sample_id_column']] all_sample_ids.add(sample_key) if not failed_filters: # add to sample dict # key is a tuple made up of which chromosome the snp is found on # and the position on the chromosome itself SNP_key = (row[self.cfg.col_names['chromosome_column']], int(row[self.cfg.col_names['position_column']])) genotype = row[self.cfg.col_names['genotype_column']] # create a dictionary for each sample (person); each person is associated # with another dictionary of all the SNPs found in that person if sample_key not in self.samples: self.samples[sample_key] = {SNP_key: genotype} else: self.samples[sample_key][SNP_key] = genotype # create a dict of all the sites where SNPs exist if SNP_key not in self.sites: # generate arbitrary ID's if there is no pre-existing ID for the SNP if row[self.cfg.col_names['variant_id_column']] == '.': SNP_id = "SNP_" + str(i) else: SNP_id = row[self.cfg.col_names['variant_id_column']] SNP_data = {'ref_col': row[self.cfg.col_names['reference_column']], 'alt_col': row[self.cfg.col_names['alternate_column']], 'SNP_id': SNP_id} self.sites[SNP_key] = SNP_data i += 1 # make sure every sample contains a genotype for every SNP found for sample_key in self.samples: this_sample = self.samples[sample_key] for SNP_key in self.sites: if SNP_key not in this_sample: this_sample[SNP_key] = "'0/0" datafile.close() # get list of samples which were filtered out self.xsamples = list(all_sample_ids.difference(set(self.samples.keys()))) return 0 # returns key of the specific filter/s that failed def filter_numeric(self, row): failed_filters = set() for key in self.cfg.nfilters.keys(): nfilter = self.cfg.nfilters[key] cutoff = float(nfilter["cutoff"]) op = nfilter["op"] col_name = nfilter["col_name"] if col_name in row.keys(): cv = float(row[col_name]) if not string_as_operator(cv, cutoff, op): failed_filters.add(key) return failed_filters # returns key of the specific filter/s that failed def filter_string(self, row): failed_filters = set() for key in self.cfg.sfilters.keys(): sfilter = self.cfg.sfilters[key] col_name = sfilter["col_name"] if col_name in row.keys(): cs = row[col_name] af = sfilter['accept_flag'] ef = sfilter['exact_flag'] patterns = sfilter['patterns'] if ef: if af: passed = False for p in patterns: if p == cs: passed = True break if passed == False: failed_filters.add(key) else: for p in patterns: if p == cs: failed_filters.add(key) break else: if af: passed = False for p in patterns: if p in cs: passed = True break if passed == False: failed_filters.add(key) else: for p in patterns: if p in cs: failed_filters.add(key) break return failed_filters def filter_all(self, row): return self.filter_numeric(row).union(self.filter_string(row)) def create_ped_file(self, filename, numeric=False): output = "" sorted_sample_keys = sorted(self.samples.keys()) for sample_key in sorted_sample_keys: this_sample = self.samples[sample_key] sorted_site_keys = sorted(this_sample.keys()) variants = [] if numeric: pef = sample_key else: pef = self.create_ped_entry_front(sample_key) for SNP_key in sorted_site_keys: genotype = this_sample[SNP_key] if numeric == True: variants.append(str(GENOTYPE_TO_NUMERIC[genotype])) else: variants.append(genotype_to_bases(genotype, self.sites[SNP_key])) output += "{}\t{}\n".format(pef, '\t'.join(variants)) pedfile = open(filename, 'w') pedfile.write(output) pedfile.close() def create_ped_entry_front(self, sample_id): if self.cfg.control_info["control_tag"]["tag"] in sample_id: group = 2 elif self.cfg.control_info["cases_tag"]["tag"] in sample_id: group = 1 else: group = 1 entry = "{0}\t{1}\t{2}\t{3}\t{4}\t{5}".format( sample_id, sample_id, sample_id + "_F", sample_id + "_M", 2, group) return entry def create_map_file(self, filename): output = "" for SNP_key in sorted(self.sites.keys()): chrom = SNP_key[0] SNP_id = self.sites[SNP_key]['SNP_id'] posn = SNP_key[1] output += "{}\t{}\t{}\n".format(chrom, SNP_id, str(posn)) mapfile = open(filename, 'w') mapfile.write(output) mapfile.close() def create_excluded_samples_file(self, filename): xsfile = open(filename, 'w') xsfile.write('\n'.join(self.xsamples)) xsfile.close() class ConfigSettings: SECTIONS = [ "#control", "#column_names", "#numeric_filters", "#string_filters" ] def __init__(self): self.control_info = {} self.col_names = {} self.nfilters = {} self.sfilters = {} def parse_config_file(self, cfname): cffile = open(cfname, 'r') section = None rc = 0 for line in cffile: # clean trailing/leading whitespace/newlines line = line.strip() # set section flag if line[0] == '#': if line in ConfigSettings.SECTIONS: section = line else: continue else: # fill up config dicts if section == "#control": (key, col_name, tag) = line.split(',') self.control_info[key] = {'col_name': col_name, 'tag': tag} elif section == "#column_names": (key, col_name) = line.split(',') self.col_names[key] = col_name elif section == "#numeric_filters": (key, col_name, op, cutoff) = line.split(',') self.add_numeric_filter(key, col_name, op, float(cutoff)) elif section == "#string_filters": (key, col_name, exact_flag, accept_flag) = line.split(',') patterns = next(cffile).strip().split(',') self.add_string_filter(key, col_name, exact_flag, accept_flag, patterns) else: rc = 2 break cffile.close() if rc != 0: return rc if not self.is_valid(): rc = 3 return rc def is_valid(self): for k in REQ_KEYS: if k not in self.col_names.keys(): return False return True def add_numeric_filter(self, key, col_name, op, cutoff): self.nfilters[key] = { 'col_name': col_name, 'op': op, 'cutoff': cutoff } def add_string_filter(self, key, col_name, exact_flag, accept_flag, patterns): if exact_flag == "exact": ef = True elif exact_flag == "not_exact": ef = False else: return False if accept_flag == "accept": af = True elif accept_flag == "reject": af = False else: return False self.sfilters[key] = { 'col_name': col_name, 'exact_flag': ef, 'accept_flag': af, 'patterns': patterns } def __str__(self): rv = "is Valid: {} || control info: {} || col names: {} || numeric filters: {} || string filters: {}".format( self.is_valid(), self.control_info, self.col_names, self.nfilters, self.sfilters) return rv ### Utility ### def string_as_operator(arg1, arg2, op): if op == "==": return arg1 == arg2 elif op == ">": return arg1 > arg2 elif op == "<": return arg1 < arg2 elif op == "<=": return arg1 <= arg2 elif op == ">=": return arg1 >= arg2 def genotype_to_bases(genotype, SNPdata): bases = "" if genotype in GENOTYPE_DICT: gtype = GENOTYPE_DICT[genotype] if gtype == "hom_ref": bases = "{} {}".format(SNPdata['ref_col'], SNPdata['ref_col']) elif gtype == "hom_alt": bases = "{} {}".format(SNPdata['alt_col'], SNPdata['alt_col']) elif gtype == "het": bases = "{} {}".format(SNPdata['ref_col'], SNPdata['alt_col']) elif gtype == "tri_allelic": aa_col = SNPdata['alt_col'] if len(aa_col) > 1: # arbitrarily choose the first one alt_allele = aa_col[0] else: alt_allele = aa_col bases = "{} {}".format(alt_allele, alt_allele) else: print genotype print "Unrecognized genotype!" sys.exit(1) return bases ### Main ### def main(): # argument parsing parser = argparse.ArgumentParser() parser.add_argument("dfname", help="name of input data file") parser.add_argument("cfname", help="name of input configuration file") parser.add_argument("pfname", help="name of output ped file") parser.add_argument("mfname", help="name of output map file") parser.add_argument("xsname", help="name of output file containing exact IDs of samples who were excluded") args = parser.parse_args() pc = PedConverter() # read in config file rc = pc.read_config_file(args.cfname) if rc == 0: print 'config file read successfully' else: print 'failed to read in config file successfully. Error code: {}'.format(rc) # read in data file rc = pc.read_data_file(args.dfname) if rc == 0: print 'data file read successfully' else: print 'failed to read in data file successfully. Error code: {}'.format(rc) pc.create_ped_file(args.pfname, numeric=True) pc.create_map_file(args.mfname) pc.create_excluded_samples_file(args.xsname) if __name__ == "__main__": main()