view pedify.py @ 0:64e75e21466e draft default tip

Uploaded
author pmac
date Wed, 01 Jun 2016 03:38:39 -0400
parents
children
line wrap: on
line source

import sys
import csv
import argparse

DEBUG = 0

REQ_KEYS = ['genotype_column', 'reference_column', 'alternate_column', 
			'sample_id_column', 'chromosome_column', 'position_column',
			'variant_id_column']

GENOTYPE_DICT = {
	"'0/0": "hom_ref",
	"'0/1": "het",
	"'1/1": "hom_alt",
	"'1/2": "tri_allelic"
}

GENOTYPE_TO_NUMERIC = {
	"'0/0": 0,
	"'0/1": 1,
	"'1/1": 2,
	"'1/2": 2
}

class PedConverter:
	def __init__(self):
		self.cfg = None
		self.samples = {}
		self.sites = {}
		self.xsamples = []

	def verify_column_names(self, datafile_header):
		# check all the config column names actually exist in the data file
		for col in self.cfg.col_names.values():
			if col not in datafile_header:
				print "The '{}' column was not found in the datafile! Double check your config file is correct. Exiting...".format(
					col)
				sys.exit(1)

	def verify_filters(self, datafile_header):
		# print warning messages if filters invalid
		all_filters = self.cfg.nfilters.copy()
		all_filters.update(self.cfg.sfilters)
		for key in all_filters:
			col_name = all_filters[key]["col_name"]
			if col_name not in datafile_header:
				print "Warning! The '{}' filter was not applied as the datafile does not contain the column '{}'".format(
					key, col_name)

	def read_config_file(self, cfname):
		self.cfg = ConfigSettings()
		rc = self.cfg.parse_config_file(cfname)
		return rc

	def read_data_file(self, dfname):
		if (self.cfg == None) or (not self.cfg.is_valid()):
			return 1

		datafile = open(dfname, 'r')
		dreader = csv.DictReader(datafile, delimiter='\t')
		# verify datafile data matches config file
		self.verify_column_names(dreader.fieldnames)
		self.verify_filters(dreader.fieldnames)
		all_sample_ids = set()
		i = 0

		for row in dreader:
			failed_filters = self.filter_all(row)
			sample_key = row[self.cfg.col_names['sample_id_column']]
			all_sample_ids.add(sample_key)
			if not failed_filters:
				# add to sample dict
				# key is a tuple made up of which chromosome the snp is found on
				# and the position on the chromosome itself
				SNP_key = (row[self.cfg.col_names['chromosome_column']], int(row[self.cfg.col_names['position_column']]))
				genotype = row[self.cfg.col_names['genotype_column']]

				# create a dictionary for each sample (person); each person is associated
				# with another dictionary of all the SNPs found in that person
				if sample_key not in self.samples:
					self.samples[sample_key] = {SNP_key: genotype} 
				else:
					self.samples[sample_key][SNP_key] = genotype

				# create a dict of all the sites where SNPs exist
				if SNP_key not in self.sites:
					# generate arbitrary ID's if there is no pre-existing ID for the SNP
					if row[self.cfg.col_names['variant_id_column']] == '.':
						SNP_id = "SNP_" + str(i)
					else:
						SNP_id = row[self.cfg.col_names['variant_id_column']]

					SNP_data = {'ref_col': row[self.cfg.col_names['reference_column']], 
								'alt_col': row[self.cfg.col_names['alternate_column']], 
								'SNP_id': SNP_id}
					self.sites[SNP_key] = SNP_data
			i += 1
		
		# make sure every sample contains a genotype for every SNP found
		for sample_key in self.samples:
			this_sample = self.samples[sample_key]
			for SNP_key in self.sites:
				if SNP_key not in this_sample:
					this_sample[SNP_key] = "'0/0"
		datafile.close()

		# get list of samples which were filtered out
		self.xsamples = list(all_sample_ids.difference(set(self.samples.keys())))
		return 0

	# returns key of the specific filter/s that failed
	def filter_numeric(self, row):
		failed_filters = set()
		for key in self.cfg.nfilters.keys():
			nfilter = self.cfg.nfilters[key]
			cutoff = float(nfilter["cutoff"])
			op = nfilter["op"]
			col_name = nfilter["col_name"]
			if col_name in row.keys():
				cv = float(row[col_name])
				if not string_as_operator(cv, cutoff, op):
					failed_filters.add(key)

		return failed_filters

	# returns key of the specific filter/s that failed
	def filter_string(self, row):
		failed_filters = set()
		for key in self.cfg.sfilters.keys():
			sfilter = self.cfg.sfilters[key]
			col_name = sfilter["col_name"]
			if col_name in row.keys():
				cs = row[col_name]
				af = sfilter['accept_flag']
				ef = sfilter['exact_flag']
				patterns = sfilter['patterns']
				if ef:
					if af:
						passed = False
						for p in patterns:
							if p == cs:
								passed = True
								break
						if passed == False:
							failed_filters.add(key)
					else:
						for p in patterns:
							if p == cs:
								failed_filters.add(key)
								break
				else:
					if af:
						passed = False
						for p in patterns:
							if p in cs:
								passed = True
								break
						if passed == False:
							failed_filters.add(key)
					else:
						for p in patterns:
							if p in cs:
								failed_filters.add(key)
								break

		return failed_filters

	def filter_all(self, row):
		return self.filter_numeric(row).union(self.filter_string(row))

	def create_ped_file(self, filename, numeric=False):
		output = ""
		
		sorted_sample_keys = sorted(self.samples.keys())
		for sample_key in sorted_sample_keys:
			this_sample = self.samples[sample_key]
			sorted_site_keys = sorted(this_sample.keys())
			variants = []
			if numeric:
				pef = sample_key
			else:
				pef = self.create_ped_entry_front(sample_key)

			for SNP_key in sorted_site_keys:
				genotype = this_sample[SNP_key]
				if numeric == True:
					variants.append(str(GENOTYPE_TO_NUMERIC[genotype]))
				else:
					variants.append(genotype_to_bases(genotype, self.sites[SNP_key]))
			
			output += "{}\t{}\n".format(pef, '\t'.join(variants))				

		pedfile = open(filename, 'w')
		pedfile.write(output)
		pedfile.close()

	def create_ped_entry_front(self, sample_id):
		if self.cfg.control_info["control_tag"]["tag"] in sample_id:
			group = 2
		elif self.cfg.control_info["cases_tag"]["tag"] in sample_id:
			group = 1
		else:
			group = 1

		entry = "{0}\t{1}\t{2}\t{3}\t{4}\t{5}".format(
					sample_id, 
					sample_id,
					sample_id + "_F", 
					sample_id + "_M", 
					2, 
					group)

		return entry

	def create_map_file(self, filename):
		output = ""
		for SNP_key in sorted(self.sites.keys()):
			chrom = SNP_key[0]
			SNP_id = self.sites[SNP_key]['SNP_id']
			posn = SNP_key[1]
			output += "{}\t{}\t{}\n".format(chrom, SNP_id, str(posn))

		mapfile = open(filename, 'w')
		mapfile.write(output)
		mapfile.close()

	def create_excluded_samples_file(self, filename):
		xsfile = open(filename, 'w')
		xsfile.write('\n'.join(self.xsamples))
		xsfile.close()

class ConfigSettings:

	SECTIONS = [
		"#control",
		"#column_names",
		"#numeric_filters",
		"#string_filters"
		]

	def __init__(self):
		self.control_info = {}
		self.col_names = {}
		self.nfilters = {}
		self.sfilters = {}

	def parse_config_file(self, cfname):
		cffile = open(cfname, 'r')
		section = None
		rc = 0
		
		for line in cffile:
			# clean trailing/leading whitespace/newlines
			line = line.strip()
			# set section flag
			if line[0] == '#':
				if line in ConfigSettings.SECTIONS:
					section = line
				else:
					continue
			else:
				# fill up config dicts
				if section == "#control":
					(key, col_name, tag) = line.split(',')
					self.control_info[key] = {'col_name': col_name, 'tag': tag}
				elif section == "#column_names":
					(key, col_name) = line.split(',')
					self.col_names[key] = col_name
				elif section == "#numeric_filters":
					(key, col_name, op, cutoff) = line.split(',')
					self.add_numeric_filter(key, col_name, op, float(cutoff))
				elif section == "#string_filters":
					(key, col_name, exact_flag, accept_flag) = line.split(',')
					patterns = next(cffile).strip().split(',')
					self.add_string_filter(key, col_name, exact_flag, accept_flag, patterns)
				else:
					rc = 2
					break

		cffile.close()
		if rc != 0:
			return rc
		if not self.is_valid():
			rc = 3
		return rc


	def is_valid(self):
		for k in REQ_KEYS:
			if k not in self.col_names.keys():
				return False
		return True

	def add_numeric_filter(self, key, col_name, op, cutoff):					
		self.nfilters[key] = {
			'col_name': col_name,
			'op': op,
			'cutoff': cutoff
			}

	def add_string_filter(self, key, col_name, exact_flag, accept_flag, patterns):
		if exact_flag == "exact":
			ef = True
		elif exact_flag == "not_exact":
			ef = False
		else:
			return False

		if accept_flag == "accept":
			af = True
		elif accept_flag == "reject":
			af = False
		else:
			return False

		self.sfilters[key] = {
			'col_name': col_name,
			'exact_flag': ef,
			'accept_flag': af,
			'patterns': patterns
			}

	def __str__(self):
		rv = "is Valid: {} || control info: {} || col names: {} || numeric filters: {} || string filters: {}".format(
			self.is_valid(), self.control_info, self.col_names, self.nfilters, self.sfilters)
		return rv 


### Utility ###
def string_as_operator(arg1, arg2, op):
	if op == "==":
		return arg1 == arg2
	elif op == ">":
		return arg1 > arg2
	elif op == "<":
		return arg1 < arg2
	elif op == "<=":
		return arg1 <= arg2
	elif op == ">=":
		return arg1 >= arg2 

def genotype_to_bases(genotype, SNPdata):
	bases = ""
	if genotype in GENOTYPE_DICT:
		gtype = GENOTYPE_DICT[genotype]
		if gtype == "hom_ref":
			bases = "{} {}".format(SNPdata['ref_col'], SNPdata['ref_col'])
		elif gtype == "hom_alt":
			bases = "{} {}".format(SNPdata['alt_col'], SNPdata['alt_col'])	
		elif gtype == "het":
			bases = "{} {}".format(SNPdata['ref_col'], SNPdata['alt_col'])	
		elif gtype == "tri_allelic":
			aa_col = SNPdata['alt_col']
			if len(aa_col) > 1:
				# arbitrarily choose the first one
				alt_allele = aa_col[0]
			else:
				alt_allele = aa_col
			bases = "{} {}".format(alt_allele, alt_allele)
	else:
		print genotype
		print "Unrecognized genotype!"
		sys.exit(1)
	return bases

### Main ###
def main():
	# argument parsing
	parser = argparse.ArgumentParser()
	parser.add_argument("dfname", help="name of input data file")
	parser.add_argument("cfname", help="name of input configuration file")
	parser.add_argument("pfname", help="name of output ped file")
	parser.add_argument("mfname", help="name of output map file")
	parser.add_argument("xsname", help="name of output file containing exact IDs of samples who were excluded")
	args = parser.parse_args()

	pc = PedConverter()
	# read in config file
	rc = pc.read_config_file(args.cfname)
	if rc == 0:
		print 'config file read successfully'
	else:
		print 'failed to read in config file successfully. Error code: {}'.format(rc)
	# read in data file
	rc = pc.read_data_file(args.dfname)
	if rc == 0:
		print 'data file read successfully'
	else:
		print 'failed to read in data file successfully. Error code: {}'.format(rc)	
	pc.create_ped_file(args.pfname, numeric=True)
	pc.create_map_file(args.mfname)
	pc.create_excluded_samples_file(args.xsname)

if __name__ == "__main__":
	main()