diff vsnp_get_snps.py @ 3:14285a94fb13 draft

Uploaded
author greg
date Sun, 03 Jan 2021 16:06:33 +0000
parents ee4ef1fc23c6
children 5e4595b9f63c
line wrap: on
line diff
--- a/vsnp_get_snps.py	Sat Nov 14 09:16:04 2020 +0000
+++ b/vsnp_get_snps.py	Sun Jan 03 16:06:33 2021 +0000
@@ -1,24 +1,20 @@
 #!/usr/bin/env python
 
-# Collect quality parsimonious SNPs from vcf files and output alignment files in fasta format.
+# Collect quality parsimonious SNPs from vcf files
+# and output alignment files in fasta format.
 
 import argparse
 import multiprocessing
 import os
-import pandas
 import queue
 import shutil
 import sys
 import time
-import vcf
 from collections import OrderedDict
 from datetime import datetime
 
-ALL_VCFS_DIR = 'all_vcf'
-INPUT_VCF_DIR = 'input_vcf_dir'
-OUTPUT_JSON_AVG_MQ_DIR = 'output_json_avg_mq_dir'
-OUTPUT_JSON_SNPS_DIR = 'output_json_snps_dir'
-OUTPUT_SNPS_DIR = 'output_snps_dir'
+import pandas
+import vcf
 
 
 def get_time_stamp():
@@ -40,74 +36,87 @@
 def setup_all_vcfs(vcf_files, vcf_dirs):
     # Create the all_vcfs directory and link
     # all input vcf files into it for processing.
-    os.makedirs(ALL_VCFS_DIR)
-    vcf_dirs.append(ALL_VCFS_DIR)
+    all_vcfs_dir = 'all_vcf'
+    os.makedirs(all_vcfs_dir)
+    vcf_dirs.append(all_vcfs_dir)
     for vcf_file in vcf_files:
         file_name_base = os.path.basename(vcf_file)
-        dst_file = os.path.join(ALL_VCFS_DIR, file_name_base)
+        dst_file = os.path.join(all_vcfs_dir, file_name_base)
         os.symlink(vcf_file, dst_file)
     return vcf_dirs
 
 
 class SnpFinder:
 
-    def __init__(self, num_files, reference, excel_grouper_file,
-                 all_isolates, ac, mq_val, n_threshold, qual_threshold, output_summary):
+    def __init__(self, num_files, dbkey, input_excel, all_isolates, ac, min_mq, quality_score_n_threshold, min_quality_score, input_vcf_dir, output_json_avg_mq_dir, output_json_snps_dir, output_snps_dir, output_summary):
+        # Allele count
         self.ac = ac
+        # Create a group that will contain all isolates.
         self.all_isolates = all_isolates
+        # Evolving positions dictionary.
         self.all_positions = None
-        # Filter based on the contents of an Excel file.
-        self.excel_grouper_file = excel_grouper_file
-        # Use Genbank file
+        # Isolate groups.
         self.groups = []
-        # This will be populated from the columns
-        # in the Excel filter file if it is used.
-        self.mq_val = mq_val
-        self.n_threshold = n_threshold
-        self.qual_threshold = qual_threshold
-        self.reference = reference
+        # Excel file for grouping.
+        self.input_excel = input_excel
+        # Directory of input zero coverage vcf files.
+        self.input_vcf_dir = input_vcf_dir
+        # Minimum map quality value.
+        self.min_mq = min_mq
+        # Minimum quality score value.
+        self.min_quality_score = min_quality_score
+        # Number of input zero coverage vcf files.
+        self.num_files = num_files
+        # Output directory for json average mq files.
+        self.output_json_avg_mq_dir = output_json_avg_mq_dir
+        # Output directory for json snps files.
+        self.output_json_snps_dir = output_json_snps_dir
+        # Output directory for snps files.
+        self.output_snps_dir = output_snps_dir
+        # Quality score N threshold value.
+        self.quality_score_n_threshold = quality_score_n_threshold
+        self.dbkey = dbkey
         self.start_time = get_time_stamp()
         self.summary_str = ""
         self.timer_start = datetime.now()
-        self.num_files = num_files
         self.initiate_summary(output_summary)
 
     def append_to_summary(self, html_str):
+        # Append a string to the html summary output file.
         self.summary_str = "%s%s" % (self.summary_str, html_str)
 
     def bin_input_files(self, filename, samples_groups_dict, defining_snps, inverted_defining_snps, found_positions, found_positions_mix):
+        # Categorize input files into closely related
+        # isolate groups based on discovered SNPs, and
+        # return a group dictionary.
         sample_groups_list = []
-        table_name = self.get_base_file_name(filename)
-        try:
-            defining_snp = False
-            # Absolute positions in set union of two lists.
-            for abs_position in list(defining_snps.keys() & (found_positions.keys() | found_positions_mix.keys())):
-                group = defining_snps[abs_position]
+        table_name = self.get_sample_name(filename)
+        defining_snp = False
+        # Absolute positions in set union of two lists.
+        for abs_position in list(defining_snps.keys() & (found_positions.keys() | found_positions_mix.keys())):
+            group = defining_snps[abs_position]
+            sample_groups_list.append(group)
+            self.check_add_group(group)
+            if len(list(defining_snps.keys() & found_positions_mix.keys())) > 0:
+                table_name = self.get_sample_name(filename)
+                table_name = '%s<font color="red">[[MIXED]]</font>' % table_name
+            self.copy_file(filename, group)
+            defining_snp = True
+        if not set(inverted_defining_snps.keys()).intersection(found_positions.keys() | found_positions_mix.keys()):
+            for abs_position in list(inverted_defining_snps.keys()):
+                group = inverted_defining_snps[abs_position]
                 sample_groups_list.append(group)
                 self.check_add_group(group)
-                if len(list(defining_snps.keys() & found_positions_mix.keys())) > 0:
-                    table_name = self.get_base_file_name(filename)
-                    table_name = '%s<font color="red">[[MIXED]]</font>' % table_name
                 self.copy_file(filename, group)
                 defining_snp = True
-            if not set(inverted_defining_snps.keys()).intersection(found_positions.keys() | found_positions_mix.keys()):
-                for abs_position in list(inverted_defining_snps.keys()):
-                    group = inverted_defining_snps[abs_position]
-                    sample_groups_list.append(group)
-                    self.check_add_group(group)
-                    self.copy_file(filename, group)
-                    defining_snp = True
-            if defining_snp:
-                samples_groups_dict[table_name] = sorted(sample_groups_list)
-            else:
-                samples_groups_dict[table_name] = ['<font color="red">No defining SNP</font>']
-        except TypeError as e:
-            msg = "<br/>Error processing file %s to generate samples_groups_dict: %s<br/>" % (filename, str(e))
-            self.append_to_summary(msg)
-            samples_groups_dict[table_name] = [msg]
+        if defining_snp:
+            samples_groups_dict[table_name] = sorted(sample_groups_list)
+        else:
+            samples_groups_dict[table_name] = ['<font color="red">No defining SNP</font>']
         return samples_groups_dict
 
     def check_add_group(self, group):
+        # Add a group if it is npt already in the list.
         if group not in self.groups:
             self.groups.append(group)
 
@@ -117,12 +126,12 @@
         shutil.copy(filename, dir)
 
     def decide_snps(self, filename):
-        positions_dict = self.all_positions
         # Find the SNPs in a vcf file to produce a pandas data
         # frame and a dictionary containing sample map qualities.
+        positions_dict = self.all_positions
         sample_map_qualities = {}
         # Eliminate the path.
-        file_name_base = self.get_base_file_name(filename)
+        file_name_base = self.get_sample_name(filename)
         vcf_reader = vcf.Reader(open(filename, 'r'))
         sample_dict = {}
         for record in vcf_reader:
@@ -132,26 +141,20 @@
                 if alt == "None":
                     sample_dict.update({record_position: "-"})
                 else:
-                    # Not sure this is the best place to capture MQM average
-                    # may be faster after parsimony SNPs are decided, but
-                    # then it will require opening the files again.
                     # On rare occassions MQM gets called "NaN", thus passing
                     # a string when a number is expected when calculating average.
                     mq_val = self.get_mq_val(record.INFO, filename)
                     if str(mq_val).lower() not in ["nan"]:
                         sample_map_qualities.update({record_position: mq_val})
-                    # Add parameters here to change what each vcf represents.
-                    # SNP is represented in table, now how will the vcf represent
-                    # the called position alt != "None", which means a deletion
-                    # as alt is not record.FILTER, or rather passed.
-                    len_alt = len(alt)
-                    if len_alt == 1:
+                    if len(alt) == 1:
                         qual_val = self.val_as_int(record.QUAL)
                         ac = record.INFO['AC'][0]
                         ref = str(record.REF[0])
-                        if ac == 2 and qual_val > self.n_threshold:
+                        if ac == 2 and qual_val > self.quality_score_n_threshold:
+                            # Add the SNP to a group.
                             sample_dict.update({record_position: alt})
-                        elif ac == 1 and qual_val > self.n_threshold:
+                        elif ac == 1 and qual_val > self.quality_score_n_threshold:
+                            # The position is ambiguous.
                             alt_ref = "%s%s" % (alt, ref)
                             if alt_ref == "AG":
                                 sample_dict.update({record_position: "R"})
@@ -181,28 +184,27 @@
                                 sample_dict.update({record_position: "N"})
                             # Poor calls
                         elif qual_val <= 50:
+                            # Call the reference allele.
                             # Do not coerce record.REF[0] to a string!
                             sample_dict.update({record_position: record.REF[0]})
-                        elif qual_val <= self.n_threshold:
+                        elif qual_val <= self.quality_score_n_threshold:
                             sample_dict.update({record_position: "N"})
                         else:
                             # Insurance -- Will still report on a possible
-                            # SNP even if missed with above statement
+                            # SNP even if missed with above statements.
                             # Do not coerce record.REF[0] to a string!
                             sample_dict.update({record_position: record.REF[0]})
         # Merge dictionaries and order
         merge_dict = {}
-        # abs_pos:REF
         merge_dict.update(positions_dict)
-        # abs_pos:ALT replacing all_positions, because keys must be unique
         merge_dict.update(sample_dict)
         sample_df = pandas.DataFrame(merge_dict, index=[file_name_base])
         return sample_df, file_name_base, sample_map_qualities
 
     def df_to_fasta(self, parsimonious_df, group):
-        # Generate SNP alignment file from the parsimonious_df
-        # data frame.
-        snps_file = os.path.join(OUTPUT_SNPS_DIR, "%s.fasta" % group)
+        # Generate SNP alignment file from
+        # the parsimonious_df data frame.
+        snps_file = os.path.join(self.output_snps_dir, "%s.fasta" % group)
         test_duplicates = []
         has_sequence_data = False
         for index, row in parsimonious_df.iterrows():
@@ -225,39 +227,30 @@
         # Find SNP positions in a vcf file.
         found_positions = {}
         found_positions_mix = {}
-        try:
-            vcf_reader = vcf.Reader(open(filename, 'r'))
-            try:
-                for record in vcf_reader:
-                    qual_val = self.val_as_int(record.QUAL)
-                    chrom = record.CHROM
-                    position = record.POS
-                    absolute_position = "%s:%s" % (str(chrom), str(position))
-                    alt = str(record.ALT[0])
-                    if alt != "None":
-                        mq_val = self.get_mq_val(record.INFO, filename)
-                        ac = record.INFO['AC'][0]
-                        len_ref = len(record.REF)
-                        if ac == self.ac and len_ref == 1 and qual_val > self.qual_threshold and mq_val > self.mq_val:
-                            found_positions.update({absolute_position: record.REF})
-                        if ac == 1 and len_ref == 1 and qual_val > self.qual_threshold and mq_val > self.mq_val:
-                            found_positions_mix.update({absolute_position: record.REF})
-                return found_positions, found_positions_mix
-            except (ZeroDivisionError, ValueError, UnboundLocalError, TypeError) as e:
-                self.append_to_summar("<br/>Error parsing record in file %s: %s<br/>" % (filename, str(e)))
-                return {'': ''}, {'': ''}
-        except (SyntaxError, AttributeError) as e:
-            self.append_to_summary("<br/>Error attempting to read file %s: %s<br/>" % (filename, str(e)))
-            return {'': ''}, {'': ''}
+        vcf_reader = vcf.Reader(open(filename, 'r'))
+        for record in vcf_reader:
+            qual_val = self.val_as_int(record.QUAL)
+            chrom = record.CHROM
+            position = record.POS
+            absolute_position = "%s:%s" % (str(chrom), str(position))
+            alt = str(record.ALT[0])
+            if alt != "None":
+                mq_val = self.get_mq_val(record.INFO, filename)
+                ac = record.INFO['AC'][0]
+                if ac == self.ac and len(record.REF) == 1 and qual_val > self.min_quality_score and mq_val > self.min_mq:
+                    found_positions.update({absolute_position: record.REF})
+                if ac == 1 and len(record.REF) == 1 and qual_val > self.min_quality_score and mq_val > self.min_mq:
+                    found_positions_mix.update({absolute_position: record.REF})
+        return found_positions, found_positions_mix
 
     def gather_and_filter(self, prefilter_df, mq_averages, group_dir):
         # Group a data frame of SNPs.
-        if self.excel_grouper_file is None:
+        if self.input_excel is None:
             filtered_all_df = prefilter_df
             sheet_names = None
         else:
             # Filter positions to be removed from all.
-            xl = pandas.ExcelFile(self.excel_grouper_file)
+            xl = pandas.ExcelFile(self.input_excel)
             sheet_names = xl.sheet_names
             # Use the first column to filter "all" postions.
             exclusion_list_all = self.get_position_list(sheet_names, 0)
@@ -265,13 +258,15 @@
             exclusion_list = exclusion_list_all + exclusion_list_group
             # Filters for all applied.
             filtered_all_df = prefilter_df.drop(columns=exclusion_list, errors='ignore')
-        json_snps_file = os.path.join(OUTPUT_JSON_SNPS_DIR, "%s.json" % group_dir)
+        json_snps_file = os.path.join(self.output_json_snps_dir, "%s.json" % group_dir)
         parsimonious_df = self.get_parsimonious_df(filtered_all_df)
         samples_number, columns = parsimonious_df.shape
         if samples_number >= 4:
+            # Sufficient samples have been found
+            # to build a phylogenetic tree.
             has_sequence_data = self.df_to_fasta(parsimonious_df, group_dir)
             if has_sequence_data:
-                json_avg_mq_file = os.path.join(OUTPUT_JSON_AVG_MQ_DIR, "%s.json" % group_dir)
+                json_avg_mq_file = os.path.join(self.output_json_avg_mq_dir, "%s.json" % group_dir)
                 mq_averages.to_json(json_avg_mq_file, orient='split')
                 parsimonious_df.to_json(json_snps_file, orient='split')
             else:
@@ -285,18 +280,13 @@
                 msg = "%s for group: %s" % (msg, group_dir)
             self.append_to_summary("%s<br/>\n" % msg)
 
-    def get_base_file_name(self, file_path):
+    def get_sample_name(self, file_path):
+        # Return the sample part of a file name.
         base_file_name = os.path.basename(file_path)
         if base_file_name.find(".") > 0:
             # Eliminate the extension.
             return os.path.splitext(base_file_name)[0]
-        elif base_file_name.find("_") > 0:
-            # The dot extension was likely changed to
-            # the " character.
-            items = base_file_name.split("_")
-            return "_".join(items[0:-1])
-        else:
-            return base_file_name
+        return base_file_name
 
     def get_mq_val(self, record_info, filename):
         # Get the MQ (gatk) or MQM (freebayes) value
@@ -333,7 +323,7 @@
         # Get a list of positions defined by an excel file.
         exclusion_list = []
         try:
-            filter_to_all = pandas.read_excel(self.excel_grouper_file, header=1, usecols=[group])
+            filter_to_all = pandas.read_excel(self.input_excel, header=1, usecols=[group])
             for value in filter_to_all.values:
                 value = str(value[0])
                 if "-" not in value.split(":")[-1]:
@@ -348,8 +338,7 @@
                         exclusion_list.append(chrom + ":" + str(position))
             return exclusion_list
         except ValueError:
-            exclusion_list = []
-            return exclusion_list
+            return []
 
     def get_snps(self, task_queue, timeout):
         while True:
@@ -357,19 +346,16 @@
                 group_dir = task_queue.get(block=True, timeout=timeout)
             except queue.Empty:
                 break
-            # Parse all vcf files to accumulate SNPs into a
-            # data frame.
+            # Parse all vcf files to accumulate
+            # the SNPs into a data frame.
             positions_dict = {}
             group_files = []
             for file_name in os.listdir(os.path.abspath(group_dir)):
                 file_path = os.path.abspath(os.path.join(group_dir, file_name))
                 group_files.append(file_path)
             for file_name in group_files:
-                try:
-                    found_positions, found_positions_mix = self.find_initial_positions(file_name)
-                    positions_dict.update(found_positions)
-                except Exception as e:
-                    self.append_to_summary("Error updating the positions_dict dictionary when processing file %s:\n%s\n" % (file_name, str(e)))
+                found_positions, found_positions_mix = self.find_initial_positions(file_name)
+                positions_dict.update(found_positions)
             # Order before adding to file to match
             # with ordering of individual samples.
             # all_positions is abs_pos:REF
@@ -394,10 +380,10 @@
 
     def group_vcfs(self, vcf_files):
         # Parse an excel file to produce a
-        # grouping dictionary for filtering SNPs.
-        xl = pandas.ExcelFile(self.excel_grouper_file)
+        # grouping dictionary for SNPs.
+        xl = pandas.ExcelFile(self.input_excel)
         sheet_names = xl.sheet_names
-        ws = pandas.read_excel(self.excel_grouper_file, sheet_name=sheet_names[0])
+        ws = pandas.read_excel(self.input_excel, sheet_name=sheet_names[0])
         defining_snps = ws.iloc[0]
         defsnp_iterator = iter(defining_snps.iteritems())
         next(defsnp_iterator)
@@ -429,9 +415,9 @@
         self.append_to_summary('<html>\n')
         self.append_to_summary('<head></head>\n')
         self.append_to_summary('<body style=\"font-size:12px;">')
-        self.append_to_summary("<b>Time started:</b> %s<br/>" % str(get_time_stamp()))
+        self.append_to_summary("<b>Time started:</b> %s<br/>" % get_time_stamp())
         self.append_to_summary("<b>Number of VCF inputs:</b> %d<br/>" % self.num_files)
-        self.append_to_summary("<b>Reference:</b> %s<br/>" % str(self.reference))
+        self.append_to_summary("<b>Reference:</b> %s<br/>" % self.dbkey)
         self.append_to_summary("<b>All isolates:</b> %s<br/>" % str(self.all_isolates))
 
     def return_val(self, val, index=0):
@@ -450,26 +436,30 @@
 
 
 if __name__ == '__main__':
+
     parser = argparse.ArgumentParser()
 
-    parser.add_argument('--all_isolates', action='store', dest='all_isolates', required=False, default="No", help='Create table with all isolates'),
-    parser.add_argument('--excel_grouper_file', action='store', dest='excel_grouper_file', required=False, default=None, help='Optional Excel filter file'),
+    parser.add_argument('--ac', action='store', dest='ac', type=int, help='Allele count value'),
+    parser.add_argument('--all_isolates', action='store_true', dest='all_isolates', required=False, default=False, help='Create table with all isolates'),
+    parser.add_argument('--input_excel', action='store', dest='input_excel', required=False, default=None, help='Optional Excel filter file'),
+    parser.add_argument('--input_vcf_dir', action='store', dest='input_vcf_dir', help='Input vcf directory'),
+    parser.add_argument('--min_mq', action='store', dest='min_mq', type=int, help='Minimum map quality value'),
+    parser.add_argument('--min_quality_score', action='store', dest='min_quality_score', type=int, help='Minimum quality score value'),
+    parser.add_argument('--output_json_avg_mq_dir', action='store', dest='output_json_avg_mq_dir', help='Output json average mq directory'),
+    parser.add_argument('--output_json_snps_dir', action='store', dest='output_json_snps_dir', help='Output json snps directory'),
+    parser.add_argument('--output_snps_dir', action='store', dest='output_snps_dir', help='Output snps directory'),
     parser.add_argument('--output_summary', action='store', dest='output_summary', help='Output summary html file'),
-    parser.add_argument('--reference', action='store', dest='reference', help='Reference file'),
-    parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
+    parser.add_argument('--processes', action='store', dest='processes', type=int, help='Configured processes for job'),
+    parser.add_argument('--quality_score_n_threshold', action='store', dest='quality_score_n_threshold', type=int, help='Minimum quality score N value for alleles'),
+    parser.add_argument('--dbkey', action='store', dest='dbkey', help='Galaxy genome build dbkey'),
 
     args = parser.parse_args()
 
-    # Initializations - TODO: should these be passed in as command line args?
-    ac = 2
-    mq_val = 56
-    n_threshold = 50
-    qual_threshold = 150
-
-    # Build the list of sample vcf files for the current run.
+    # Build the list of all input zero coverage vcf
+    # files, both the samples and the "database".
     vcf_files = []
-    for file_name in os.listdir(INPUT_VCF_DIR):
-        file_path = os.path.abspath(os.path.join(INPUT_VCF_DIR, file_name))
+    for file_name in os.listdir(args.input_vcf_dir):
+        file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name))
         vcf_files.append(file_path)
 
     multiprocessing.set_start_method('spawn')
@@ -480,21 +470,23 @@
     timeout = 0.05
 
     # Initialize the snp_finder object.
-    snp_finder = SnpFinder(num_files, args.reference, args.excel_grouper_file, args.all_isolates,
-                           ac, mq_val, n_threshold, qual_threshold, args.output_summary)
+    snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary)
 
-    # Initialize the set of directories containiing vcf files for analysis.
+    # Define and make the set of directories into which the input_zc_vcf
+    # files will be placed.  Selected input values (e.g., the use of
+    # an Excel file for grouping and filtering, creating a group with
+    # all isolates) are used to define the directories.
     vcf_dirs = []
-    if args.excel_grouper_file is None:
+    if args.input_excel is None:
         vcf_dirs = setup_all_vcfs(vcf_files, vcf_dirs)
     else:
-        if args.all_isolates.lower() == "yes":
+        if args.all_isolates:
             vcf_dirs = setup_all_vcfs(vcf_files, vcf_dirs)
         # Parse the Excel file to detemine groups for filtering.
         snp_finder.group_vcfs(vcf_files)
         # Append the list of group directories created by
         # the above call to the set of directories containing
-        # vcf files for analysis
+        # vcf files for analysis.
         group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups]
         vcf_dirs.extend(group_dirs)