Mercurial > repos > greg > vsnp_get_snps
diff vsnp_get_snps.py @ 3:14285a94fb13 draft
Uploaded
author | greg |
---|---|
date | Sun, 03 Jan 2021 16:06:33 +0000 |
parents | ee4ef1fc23c6 |
children | 5e4595b9f63c |
line wrap: on
line diff
--- a/vsnp_get_snps.py Sat Nov 14 09:16:04 2020 +0000 +++ b/vsnp_get_snps.py Sun Jan 03 16:06:33 2021 +0000 @@ -1,24 +1,20 @@ #!/usr/bin/env python -# Collect quality parsimonious SNPs from vcf files and output alignment files in fasta format. +# Collect quality parsimonious SNPs from vcf files +# and output alignment files in fasta format. import argparse import multiprocessing import os -import pandas import queue import shutil import sys import time -import vcf from collections import OrderedDict from datetime import datetime -ALL_VCFS_DIR = 'all_vcf' -INPUT_VCF_DIR = 'input_vcf_dir' -OUTPUT_JSON_AVG_MQ_DIR = 'output_json_avg_mq_dir' -OUTPUT_JSON_SNPS_DIR = 'output_json_snps_dir' -OUTPUT_SNPS_DIR = 'output_snps_dir' +import pandas +import vcf def get_time_stamp(): @@ -40,74 +36,87 @@ def setup_all_vcfs(vcf_files, vcf_dirs): # Create the all_vcfs directory and link # all input vcf files into it for processing. - os.makedirs(ALL_VCFS_DIR) - vcf_dirs.append(ALL_VCFS_DIR) + all_vcfs_dir = 'all_vcf' + os.makedirs(all_vcfs_dir) + vcf_dirs.append(all_vcfs_dir) for vcf_file in vcf_files: file_name_base = os.path.basename(vcf_file) - dst_file = os.path.join(ALL_VCFS_DIR, file_name_base) + dst_file = os.path.join(all_vcfs_dir, file_name_base) os.symlink(vcf_file, dst_file) return vcf_dirs class SnpFinder: - def __init__(self, num_files, reference, excel_grouper_file, - all_isolates, ac, mq_val, n_threshold, qual_threshold, output_summary): + def __init__(self, num_files, dbkey, input_excel, all_isolates, ac, min_mq, quality_score_n_threshold, min_quality_score, input_vcf_dir, output_json_avg_mq_dir, output_json_snps_dir, output_snps_dir, output_summary): + # Allele count self.ac = ac + # Create a group that will contain all isolates. self.all_isolates = all_isolates + # Evolving positions dictionary. self.all_positions = None - # Filter based on the contents of an Excel file. - self.excel_grouper_file = excel_grouper_file - # Use Genbank file + # Isolate groups. self.groups = [] - # This will be populated from the columns - # in the Excel filter file if it is used. - self.mq_val = mq_val - self.n_threshold = n_threshold - self.qual_threshold = qual_threshold - self.reference = reference + # Excel file for grouping. + self.input_excel = input_excel + # Directory of input zero coverage vcf files. + self.input_vcf_dir = input_vcf_dir + # Minimum map quality value. + self.min_mq = min_mq + # Minimum quality score value. + self.min_quality_score = min_quality_score + # Number of input zero coverage vcf files. + self.num_files = num_files + # Output directory for json average mq files. + self.output_json_avg_mq_dir = output_json_avg_mq_dir + # Output directory for json snps files. + self.output_json_snps_dir = output_json_snps_dir + # Output directory for snps files. + self.output_snps_dir = output_snps_dir + # Quality score N threshold value. + self.quality_score_n_threshold = quality_score_n_threshold + self.dbkey = dbkey self.start_time = get_time_stamp() self.summary_str = "" self.timer_start = datetime.now() - self.num_files = num_files self.initiate_summary(output_summary) def append_to_summary(self, html_str): + # Append a string to the html summary output file. self.summary_str = "%s%s" % (self.summary_str, html_str) def bin_input_files(self, filename, samples_groups_dict, defining_snps, inverted_defining_snps, found_positions, found_positions_mix): + # Categorize input files into closely related + # isolate groups based on discovered SNPs, and + # return a group dictionary. sample_groups_list = [] - table_name = self.get_base_file_name(filename) - try: - defining_snp = False - # Absolute positions in set union of two lists. - for abs_position in list(defining_snps.keys() & (found_positions.keys() | found_positions_mix.keys())): - group = defining_snps[abs_position] + table_name = self.get_sample_name(filename) + defining_snp = False + # Absolute positions in set union of two lists. + for abs_position in list(defining_snps.keys() & (found_positions.keys() | found_positions_mix.keys())): + group = defining_snps[abs_position] + sample_groups_list.append(group) + self.check_add_group(group) + if len(list(defining_snps.keys() & found_positions_mix.keys())) > 0: + table_name = self.get_sample_name(filename) + table_name = '%s<font color="red">[[MIXED]]</font>' % table_name + self.copy_file(filename, group) + defining_snp = True + if not set(inverted_defining_snps.keys()).intersection(found_positions.keys() | found_positions_mix.keys()): + for abs_position in list(inverted_defining_snps.keys()): + group = inverted_defining_snps[abs_position] sample_groups_list.append(group) self.check_add_group(group) - if len(list(defining_snps.keys() & found_positions_mix.keys())) > 0: - table_name = self.get_base_file_name(filename) - table_name = '%s<font color="red">[[MIXED]]</font>' % table_name self.copy_file(filename, group) defining_snp = True - if not set(inverted_defining_snps.keys()).intersection(found_positions.keys() | found_positions_mix.keys()): - for abs_position in list(inverted_defining_snps.keys()): - group = inverted_defining_snps[abs_position] - sample_groups_list.append(group) - self.check_add_group(group) - self.copy_file(filename, group) - defining_snp = True - if defining_snp: - samples_groups_dict[table_name] = sorted(sample_groups_list) - else: - samples_groups_dict[table_name] = ['<font color="red">No defining SNP</font>'] - except TypeError as e: - msg = "<br/>Error processing file %s to generate samples_groups_dict: %s<br/>" % (filename, str(e)) - self.append_to_summary(msg) - samples_groups_dict[table_name] = [msg] + if defining_snp: + samples_groups_dict[table_name] = sorted(sample_groups_list) + else: + samples_groups_dict[table_name] = ['<font color="red">No defining SNP</font>'] return samples_groups_dict def check_add_group(self, group): + # Add a group if it is npt already in the list. if group not in self.groups: self.groups.append(group) @@ -117,12 +126,12 @@ shutil.copy(filename, dir) def decide_snps(self, filename): - positions_dict = self.all_positions # Find the SNPs in a vcf file to produce a pandas data # frame and a dictionary containing sample map qualities. + positions_dict = self.all_positions sample_map_qualities = {} # Eliminate the path. - file_name_base = self.get_base_file_name(filename) + file_name_base = self.get_sample_name(filename) vcf_reader = vcf.Reader(open(filename, 'r')) sample_dict = {} for record in vcf_reader: @@ -132,26 +141,20 @@ if alt == "None": sample_dict.update({record_position: "-"}) else: - # Not sure this is the best place to capture MQM average - # may be faster after parsimony SNPs are decided, but - # then it will require opening the files again. # On rare occassions MQM gets called "NaN", thus passing # a string when a number is expected when calculating average. mq_val = self.get_mq_val(record.INFO, filename) if str(mq_val).lower() not in ["nan"]: sample_map_qualities.update({record_position: mq_val}) - # Add parameters here to change what each vcf represents. - # SNP is represented in table, now how will the vcf represent - # the called position alt != "None", which means a deletion - # as alt is not record.FILTER, or rather passed. - len_alt = len(alt) - if len_alt == 1: + if len(alt) == 1: qual_val = self.val_as_int(record.QUAL) ac = record.INFO['AC'][0] ref = str(record.REF[0]) - if ac == 2 and qual_val > self.n_threshold: + if ac == 2 and qual_val > self.quality_score_n_threshold: + # Add the SNP to a group. sample_dict.update({record_position: alt}) - elif ac == 1 and qual_val > self.n_threshold: + elif ac == 1 and qual_val > self.quality_score_n_threshold: + # The position is ambiguous. alt_ref = "%s%s" % (alt, ref) if alt_ref == "AG": sample_dict.update({record_position: "R"}) @@ -181,28 +184,27 @@ sample_dict.update({record_position: "N"}) # Poor calls elif qual_val <= 50: + # Call the reference allele. # Do not coerce record.REF[0] to a string! sample_dict.update({record_position: record.REF[0]}) - elif qual_val <= self.n_threshold: + elif qual_val <= self.quality_score_n_threshold: sample_dict.update({record_position: "N"}) else: # Insurance -- Will still report on a possible - # SNP even if missed with above statement + # SNP even if missed with above statements. # Do not coerce record.REF[0] to a string! sample_dict.update({record_position: record.REF[0]}) # Merge dictionaries and order merge_dict = {} - # abs_pos:REF merge_dict.update(positions_dict) - # abs_pos:ALT replacing all_positions, because keys must be unique merge_dict.update(sample_dict) sample_df = pandas.DataFrame(merge_dict, index=[file_name_base]) return sample_df, file_name_base, sample_map_qualities def df_to_fasta(self, parsimonious_df, group): - # Generate SNP alignment file from the parsimonious_df - # data frame. - snps_file = os.path.join(OUTPUT_SNPS_DIR, "%s.fasta" % group) + # Generate SNP alignment file from + # the parsimonious_df data frame. + snps_file = os.path.join(self.output_snps_dir, "%s.fasta" % group) test_duplicates = [] has_sequence_data = False for index, row in parsimonious_df.iterrows(): @@ -225,39 +227,30 @@ # Find SNP positions in a vcf file. found_positions = {} found_positions_mix = {} - try: - vcf_reader = vcf.Reader(open(filename, 'r')) - try: - for record in vcf_reader: - qual_val = self.val_as_int(record.QUAL) - chrom = record.CHROM - position = record.POS - absolute_position = "%s:%s" % (str(chrom), str(position)) - alt = str(record.ALT[0]) - if alt != "None": - mq_val = self.get_mq_val(record.INFO, filename) - ac = record.INFO['AC'][0] - len_ref = len(record.REF) - if ac == self.ac and len_ref == 1 and qual_val > self.qual_threshold and mq_val > self.mq_val: - found_positions.update({absolute_position: record.REF}) - if ac == 1 and len_ref == 1 and qual_val > self.qual_threshold and mq_val > self.mq_val: - found_positions_mix.update({absolute_position: record.REF}) - return found_positions, found_positions_mix - except (ZeroDivisionError, ValueError, UnboundLocalError, TypeError) as e: - self.append_to_summar("<br/>Error parsing record in file %s: %s<br/>" % (filename, str(e))) - return {'': ''}, {'': ''} - except (SyntaxError, AttributeError) as e: - self.append_to_summary("<br/>Error attempting to read file %s: %s<br/>" % (filename, str(e))) - return {'': ''}, {'': ''} + vcf_reader = vcf.Reader(open(filename, 'r')) + for record in vcf_reader: + qual_val = self.val_as_int(record.QUAL) + chrom = record.CHROM + position = record.POS + absolute_position = "%s:%s" % (str(chrom), str(position)) + alt = str(record.ALT[0]) + if alt != "None": + mq_val = self.get_mq_val(record.INFO, filename) + ac = record.INFO['AC'][0] + if ac == self.ac and len(record.REF) == 1 and qual_val > self.min_quality_score and mq_val > self.min_mq: + found_positions.update({absolute_position: record.REF}) + if ac == 1 and len(record.REF) == 1 and qual_val > self.min_quality_score and mq_val > self.min_mq: + found_positions_mix.update({absolute_position: record.REF}) + return found_positions, found_positions_mix def gather_and_filter(self, prefilter_df, mq_averages, group_dir): # Group a data frame of SNPs. - if self.excel_grouper_file is None: + if self.input_excel is None: filtered_all_df = prefilter_df sheet_names = None else: # Filter positions to be removed from all. - xl = pandas.ExcelFile(self.excel_grouper_file) + xl = pandas.ExcelFile(self.input_excel) sheet_names = xl.sheet_names # Use the first column to filter "all" postions. exclusion_list_all = self.get_position_list(sheet_names, 0) @@ -265,13 +258,15 @@ exclusion_list = exclusion_list_all + exclusion_list_group # Filters for all applied. filtered_all_df = prefilter_df.drop(columns=exclusion_list, errors='ignore') - json_snps_file = os.path.join(OUTPUT_JSON_SNPS_DIR, "%s.json" % group_dir) + json_snps_file = os.path.join(self.output_json_snps_dir, "%s.json" % group_dir) parsimonious_df = self.get_parsimonious_df(filtered_all_df) samples_number, columns = parsimonious_df.shape if samples_number >= 4: + # Sufficient samples have been found + # to build a phylogenetic tree. has_sequence_data = self.df_to_fasta(parsimonious_df, group_dir) if has_sequence_data: - json_avg_mq_file = os.path.join(OUTPUT_JSON_AVG_MQ_DIR, "%s.json" % group_dir) + json_avg_mq_file = os.path.join(self.output_json_avg_mq_dir, "%s.json" % group_dir) mq_averages.to_json(json_avg_mq_file, orient='split') parsimonious_df.to_json(json_snps_file, orient='split') else: @@ -285,18 +280,13 @@ msg = "%s for group: %s" % (msg, group_dir) self.append_to_summary("%s<br/>\n" % msg) - def get_base_file_name(self, file_path): + def get_sample_name(self, file_path): + # Return the sample part of a file name. base_file_name = os.path.basename(file_path) if base_file_name.find(".") > 0: # Eliminate the extension. return os.path.splitext(base_file_name)[0] - elif base_file_name.find("_") > 0: - # The dot extension was likely changed to - # the " character. - items = base_file_name.split("_") - return "_".join(items[0:-1]) - else: - return base_file_name + return base_file_name def get_mq_val(self, record_info, filename): # Get the MQ (gatk) or MQM (freebayes) value @@ -333,7 +323,7 @@ # Get a list of positions defined by an excel file. exclusion_list = [] try: - filter_to_all = pandas.read_excel(self.excel_grouper_file, header=1, usecols=[group]) + filter_to_all = pandas.read_excel(self.input_excel, header=1, usecols=[group]) for value in filter_to_all.values: value = str(value[0]) if "-" not in value.split(":")[-1]: @@ -348,8 +338,7 @@ exclusion_list.append(chrom + ":" + str(position)) return exclusion_list except ValueError: - exclusion_list = [] - return exclusion_list + return [] def get_snps(self, task_queue, timeout): while True: @@ -357,19 +346,16 @@ group_dir = task_queue.get(block=True, timeout=timeout) except queue.Empty: break - # Parse all vcf files to accumulate SNPs into a - # data frame. + # Parse all vcf files to accumulate + # the SNPs into a data frame. positions_dict = {} group_files = [] for file_name in os.listdir(os.path.abspath(group_dir)): file_path = os.path.abspath(os.path.join(group_dir, file_name)) group_files.append(file_path) for file_name in group_files: - try: - found_positions, found_positions_mix = self.find_initial_positions(file_name) - positions_dict.update(found_positions) - except Exception as e: - self.append_to_summary("Error updating the positions_dict dictionary when processing file %s:\n%s\n" % (file_name, str(e))) + found_positions, found_positions_mix = self.find_initial_positions(file_name) + positions_dict.update(found_positions) # Order before adding to file to match # with ordering of individual samples. # all_positions is abs_pos:REF @@ -394,10 +380,10 @@ def group_vcfs(self, vcf_files): # Parse an excel file to produce a - # grouping dictionary for filtering SNPs. - xl = pandas.ExcelFile(self.excel_grouper_file) + # grouping dictionary for SNPs. + xl = pandas.ExcelFile(self.input_excel) sheet_names = xl.sheet_names - ws = pandas.read_excel(self.excel_grouper_file, sheet_name=sheet_names[0]) + ws = pandas.read_excel(self.input_excel, sheet_name=sheet_names[0]) defining_snps = ws.iloc[0] defsnp_iterator = iter(defining_snps.iteritems()) next(defsnp_iterator) @@ -429,9 +415,9 @@ self.append_to_summary('<html>\n') self.append_to_summary('<head></head>\n') self.append_to_summary('<body style=\"font-size:12px;">') - self.append_to_summary("<b>Time started:</b> %s<br/>" % str(get_time_stamp())) + self.append_to_summary("<b>Time started:</b> %s<br/>" % get_time_stamp()) self.append_to_summary("<b>Number of VCF inputs:</b> %d<br/>" % self.num_files) - self.append_to_summary("<b>Reference:</b> %s<br/>" % str(self.reference)) + self.append_to_summary("<b>Reference:</b> %s<br/>" % self.dbkey) self.append_to_summary("<b>All isolates:</b> %s<br/>" % str(self.all_isolates)) def return_val(self, val, index=0): @@ -450,26 +436,30 @@ if __name__ == '__main__': + parser = argparse.ArgumentParser() - parser.add_argument('--all_isolates', action='store', dest='all_isolates', required=False, default="No", help='Create table with all isolates'), - parser.add_argument('--excel_grouper_file', action='store', dest='excel_grouper_file', required=False, default=None, help='Optional Excel filter file'), + parser.add_argument('--ac', action='store', dest='ac', type=int, help='Allele count value'), + parser.add_argument('--all_isolates', action='store_true', dest='all_isolates', required=False, default=False, help='Create table with all isolates'), + parser.add_argument('--input_excel', action='store', dest='input_excel', required=False, default=None, help='Optional Excel filter file'), + parser.add_argument('--input_vcf_dir', action='store', dest='input_vcf_dir', help='Input vcf directory'), + parser.add_argument('--min_mq', action='store', dest='min_mq', type=int, help='Minimum map quality value'), + parser.add_argument('--min_quality_score', action='store', dest='min_quality_score', type=int, help='Minimum quality score value'), + parser.add_argument('--output_json_avg_mq_dir', action='store', dest='output_json_avg_mq_dir', help='Output json average mq directory'), + parser.add_argument('--output_json_snps_dir', action='store', dest='output_json_snps_dir', help='Output json snps directory'), + parser.add_argument('--output_snps_dir', action='store', dest='output_snps_dir', help='Output snps directory'), parser.add_argument('--output_summary', action='store', dest='output_summary', help='Output summary html file'), - parser.add_argument('--reference', action='store', dest='reference', help='Reference file'), - parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') + parser.add_argument('--processes', action='store', dest='processes', type=int, help='Configured processes for job'), + parser.add_argument('--quality_score_n_threshold', action='store', dest='quality_score_n_threshold', type=int, help='Minimum quality score N value for alleles'), + parser.add_argument('--dbkey', action='store', dest='dbkey', help='Galaxy genome build dbkey'), args = parser.parse_args() - # Initializations - TODO: should these be passed in as command line args? - ac = 2 - mq_val = 56 - n_threshold = 50 - qual_threshold = 150 - - # Build the list of sample vcf files for the current run. + # Build the list of all input zero coverage vcf + # files, both the samples and the "database". vcf_files = [] - for file_name in os.listdir(INPUT_VCF_DIR): - file_path = os.path.abspath(os.path.join(INPUT_VCF_DIR, file_name)) + for file_name in os.listdir(args.input_vcf_dir): + file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name)) vcf_files.append(file_path) multiprocessing.set_start_method('spawn') @@ -480,21 +470,23 @@ timeout = 0.05 # Initialize the snp_finder object. - snp_finder = SnpFinder(num_files, args.reference, args.excel_grouper_file, args.all_isolates, - ac, mq_val, n_threshold, qual_threshold, args.output_summary) + snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary) - # Initialize the set of directories containiing vcf files for analysis. + # Define and make the set of directories into which the input_zc_vcf + # files will be placed. Selected input values (e.g., the use of + # an Excel file for grouping and filtering, creating a group with + # all isolates) are used to define the directories. vcf_dirs = [] - if args.excel_grouper_file is None: + if args.input_excel is None: vcf_dirs = setup_all_vcfs(vcf_files, vcf_dirs) else: - if args.all_isolates.lower() == "yes": + if args.all_isolates: vcf_dirs = setup_all_vcfs(vcf_files, vcf_dirs) # Parse the Excel file to detemine groups for filtering. snp_finder.group_vcfs(vcf_files) # Append the list of group directories created by # the above call to the set of directories containing - # vcf files for analysis + # vcf files for analysis. group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups] vcf_dirs.extend(group_dirs)