Mercurial > repos > greg > vsnp_add_zero_coverage
diff vsnp_add_zero_coverage.py @ 0:3cb0bf7e1b2d draft
Uploaded
author | greg |
---|---|
date | Tue, 21 Apr 2020 09:44:38 -0400 |
parents | |
children | 01312f8a6ca9 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/vsnp_add_zero_coverage.py Tue Apr 21 09:44:38 2020 -0400 @@ -0,0 +1,191 @@ +#!/usr/bin/env python + +import argparse +import multiprocessing +import os +import pandas +import pysam +import queue +import re +import shutil +from numpy import mean +from Bio import SeqIO + +INPUT_BAM_DIR = 'input_bam_dir' +INPUT_VCF_DIR = 'input_vcf_dir' +OUTPUT_VCF_DIR = 'output_vcf_dir' +OUTPUT_METRICS_DIR = 'output_metrics_dir' + + +def get_base_file_name(file_path): + base_file_name = os.path.basename(file_path) + if base_file_name.find(".") > 0: + # Eliminate the extension. + return os.path.splitext(base_file_name)[0] + elif base_file_name.find("_") > 0: + # The dot extension was likely changed to + # the " character. + items = base_file_name.split("_") + return "_".join(items[0:-1]) + else: + return base_file_name + + +def get_coverage_and_snp_count(task_queue, reference, output_metrics, output_vcf, timeout): + while True: + try: + tup = task_queue.get(block=True, timeout=timeout) + except queue.Empty: + break + bam_file, vcf_file = tup + # Create a coverage dictionary. + coverage_dict = {} + coverage_list = pysam.depth(bam_file, split_lines=True) + for line in coverage_list: + chrom, position, depth = line.split('\t') + coverage_dict["%s-%s" % (chrom, position)] = depth + # Convert it to a data frame. + coverage_df = pandas.DataFrame.from_dict(coverage_dict, orient='index', columns=["depth"]) + # Create a zero coverage dictionary. + zero_dict = {} + for record in SeqIO.parse(reference, "fasta"): + chrom = record.id + total_len = len(record.seq) + for pos in list(range(1, total_len + 1)): + zero_dict["%s-%s" % (str(chrom), str(pos))] = 0 + # Convert it to a data frame with depth_x + # and depth_y columns - index is NaN. + zero_df = pandas.DataFrame.from_dict(zero_dict, orient='index', columns=["depth"]) + coverage_df = zero_df.merge(coverage_df, left_index=True, right_index=True, how='outer') + # depth_x "0" column no longer needed. + coverage_df = coverage_df.drop(columns=['depth_x']) + coverage_df = coverage_df.rename(columns={'depth_y': 'depth'}) + # Covert the NaN to 0 coverage and get some metrics. + coverage_df = coverage_df.fillna(0) + coverage_df['depth'] = coverage_df['depth'].apply(int) + total_length = len(coverage_df) + average_coverage = coverage_df['depth'].mean() + zero_df = coverage_df[coverage_df['depth'] == 0] + total_zero_coverage = len(zero_df) + total_coverage = total_length - total_zero_coverage + genome_coverage = "{:.2%}".format(total_coverage / total_length) + # Process the associated VCF input. + column_names = ["CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT", "Sample"] + vcf_df = pandas.read_csv(vcf_file, sep='\t', header=None, names=column_names, comment='#') + good_snp_count = len(vcf_df[(vcf_df['ALT'].str.len() == 1) & (vcf_df['REF'].str.len() == 1) & (vcf_df['QUAL'] > 150)]) + base_file_name = get_base_file_name(vcf_file) + if total_zero_coverage > 0: + header_file = "%s_header.csv" % base_file_name + with open(header_file, 'w') as outfile: + with open(vcf_file) as infile: + for line in infile: + if re.search('^#', line): + outfile.write("%s" % line) + vcf_df_snp = vcf_df[vcf_df['REF'].str.len() == 1] + vcf_df_snp = vcf_df_snp[vcf_df_snp['ALT'].str.len() == 1] + vcf_df_snp['ABS_VALUE'] = vcf_df_snp['CHROM'].map(str) + "-" + vcf_df_snp['POS'].map(str) + vcf_df_snp = vcf_df_snp.set_index('ABS_VALUE') + cat_df = pandas.concat([vcf_df_snp, zero_df], axis=1, sort=False) + cat_df = cat_df.drop(columns=['CHROM', 'POS', 'depth']) + cat_df[['ID', 'ALT', 'QUAL', 'FILTER', 'INFO']] = cat_df[['ID', 'ALT', 'QUAL', 'FILTER', 'INFO']].fillna('.') + cat_df['REF'] = cat_df['REF'].fillna('N') + cat_df['FORMAT'] = cat_df['FORMAT'].fillna('GT') + cat_df['Sample'] = cat_df['Sample'].fillna('./.') + cat_df['temp'] = cat_df.index.str.rsplit('-', n=1) + cat_df[['CHROM', 'POS']] = pandas.DataFrame(cat_df.temp.values.tolist(), index=cat_df.index) + cat_df = cat_df[['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT', 'Sample']] + cat_df['POS'] = cat_df['POS'].astype(int) + cat_df = cat_df.sort_values(['CHROM', 'POS']) + body_file = "%s_body.csv" % base_file_name + cat_df.to_csv(body_file, sep='\t', header=False, index=False) + if output_vcf is None: + output_vcf_file = os.path.join(OUTPUT_VCF_DIR, "%s.vcf" % base_file_name) + else: + output_vcf_file = output_vcf + with open(output_vcf_file, "w") as outfile: + for cf in [header_file, body_file]: + with open(cf, "r") as infile: + for line in infile: + outfile.write("%s" % line) + else: + if output_vcf is None: + output_vcf_file = os.path.join(OUTPUT_VCF_DIR, "%s.vcf" % base_file_name) + else: + output_vcf_file = output_vcf + shutil.copyfile(vcf_file, output_vcf_file) + bam_metrics = [base_file_name, "", "%4f" % average_coverage, genome_coverage] + vcf_metrics = [base_file_name, str(good_snp_count), "", ""] + if output_metrics is None: + output_metrics_file = os.path.join(OUTPUT_METRICS_DIR, "%s.tabular" % base_file_name) + else: + output_metrics_file = output_metrics + metrics_columns = ["File", "Number of Good SNPs", "Average Coverage", "Genome Coverage"] + with open(output_metrics_file, "w") as fh: + fh.write("# %s\n" % "\t".join(metrics_columns)) + fh.write("%s\n" % "\t".join(bam_metrics)) + fh.write("%s\n" % "\t".join(vcf_metrics)) + task_queue.task_done() + + +def set_num_cpus(num_files, processes): + num_cpus = int(multiprocessing.cpu_count()) + if num_files < num_cpus and num_files < processes: + return num_files + if num_cpus < processes: + half_cpus = int(num_cpus / 2) + if num_files < half_cpus: + return num_files + return half_cpus + return processes + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument('--output_metrics', action='store', dest='output_metrics', required=False, default=None, help='Output metrics text file') + parser.add_argument('--output_vcf', action='store', dest='output_vcf', required=False, default=None, help='Output VCF file') + parser.add_argument('--reference', action='store', dest='reference', help='Reference dataset') + parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') + + args = parser.parse_args() + + # The assumption here is that the list of files + # in both INPUT_BAM_DIR and INPUT_VCF_DIR are + # equal in number and named such that they are + # properly matched if the directories contain + # more than 1 file (i.e., hopefully the bam file + # names and vcf file names will be something like + # Mbovis-01D6_* so they can be # sorted and properly + # associated with each other). + bam_files = [] + for file_name in sorted(os.listdir(INPUT_BAM_DIR)): + file_path = os.path.abspath(os.path.join(INPUT_BAM_DIR, file_name)) + bam_files.append(file_path) + vcf_files = [] + for file_name in sorted(os.listdir(INPUT_VCF_DIR)): + file_path = os.path.abspath(os.path.join(INPUT_VCF_DIR, file_name)) + vcf_files.append(file_path) + + multiprocessing.set_start_method('spawn') + queue1 = multiprocessing.JoinableQueue() + num_files = len(bam_files) + cpus = set_num_cpus(num_files, args.processes) + # Set a timeout for get()s in the queue. + timeout = 0.05 + + # Add each associated bam and vcf file pair to the queue. + for i, bam_file in enumerate(bam_files): + vcf_file = vcf_files[i] + queue1.put((bam_file, vcf_file)) + + # Complete the get_coverage_and_snp_count task. + processes = [multiprocessing.Process(target=get_coverage_and_snp_count, args=(queue1, args.reference, args.output_metrics, args.output_vcf, timeout, )) for _ in range(cpus)] + for p in processes: + p.start() + for p in processes: + p.join() + queue1.join() + + if queue1.empty(): + queue1.close() + queue1.join_thread()