diff vsnp_statistics.py @ 1:14e29f7d59ca draft

Uploaded
author greg
date Wed, 29 Apr 2020 16:56:10 -0400
parents c21d338dbdc4
children 321a8259e3f9
line wrap: on
line diff
--- a/vsnp_statistics.py	Tue Apr 21 10:19:53 2020 -0400
+++ b/vsnp_statistics.py	Wed Apr 29 16:56:10 2020 -0400
@@ -2,19 +2,22 @@
 
 import argparse
 import gzip
-import multiprocessing
 import numpy
 import os
 import pandas
-import queue
+import shutil
 
 INPUT_IDXSTATS_DIR = 'input_idxstats'
 INPUT_METRICS_DIR = 'input_metrics'
 INPUT_READS_DIR = 'input_reads'
-OUTPUT_DIR = 'output'
 QUALITYKEY = {'!':'0', '"':'1', '#':'2', '$':'3', '%':'4', '&':'5', "'":'6', '(':'7', ')':'8', '*':'9', '+':'10', ',':'11', '-':'12', '.':'13', '/':'14', '0':'15', '1':'16', '2':'17', '3':'18', '4':'19', '5':'20', '6':'21', '7':'22', '8':'23', '9':'24', ':':'25', ';':'26', '<':'27', '=':'28', '>':'29', '?':'30', '@':'31', 'A':'32', 'B':'33', 'C':'34', 'D':'35', 'E':'36', 'F':'37', 'G':'38', 'H':'39', 'I':'40', 'J':'41', 'K':'42', 'L':'43', 'M':'44', 'N':'45', 'O':'46', 'P':'47', 'Q':'48', 'R':'49', 'S':'50', 'T':'51', 'U':'52', 'V':'53', 'W':'54', 'X':'55', 'Y':'56', 'Z':'57', '_':'1', ']':'1', '[':'1', '\\':'1', '\n':'1', '`':'1', 'a':'1', 'b':'1', 'c':'1', 'd':'1', 'e':'1', 'f':'1', 'g':'1', 'h':'1', 'i':'1', 'j':'1', 'k':'1', 'l':'1', 'm':'1', 'n':'1', 'o':'1', 'p':'1', 'q':'1', 'r':'1', 's':'1', 't':'1', 'u':'1', 'v':'1', 'w':'1', 'x':'1', 'y':'1', 'z':'1', ' ':'1'}
-READCOLUMNS = ['Sample', 'Reference', 'Fastq File', 'Size', 'Total Reads', 'Mean Read Length', 'Mean Read Quality', 'Reads Passing Q30']
-SEP = "\t"
+
+
+def fastq_to_df(fastq_file, gzipped):
+    if gzipped.lower() == "true":
+        return pandas.read_csv(gzip.open(fastq_file, "r"), header=None, sep="^")
+    else:
+        return pandas.read_csv(open(fastq_file, "r"), header=None, sep="^")
 
 
 def get_base_file_name(file_path):
@@ -52,121 +55,108 @@
     return '??? bytes'
 
 
-def output_read_stats(gzipped, fastq_file, ofh, sampling_number=10000, output_sample=False, dbkey=None, collection=False):
-    file_name_base = os.path.basename(fastq_file)
-    # Output a 2-column file where column 1 is
-    # the labels and column 2 is the values.
-    if output_sample:
-        # The Sample and Reference columns should be
-        # output only once, so this block handles
-        # paired reads, where the columns are not
-        # output for Read2.
-        try:
-            # Illumina read file names are something like:
-            # 13-1941-6_S4_L001_R1_600000_fastq_gz
-            sample = file_name_base.split("_")[0]
-        except Exception:
-            sample = ""
-        # Sample
-        ofh.write("Sample%s%s\n" % (SEP, sample))
-        ofh.write("Reference%s%s\n" % (SEP, dbkey))
-        if collection:
-            read = "Read"
+def output_statistics(reads_files, idxstats_files, metrics_files, output_file, gzipped, dbkey):
+    # Produce an Excel spreadsheet that
+    # contains a row for each sample.
+    columns = ['Reference', 'File Size', 'Mean Read Length', 'Mean Read Quality', 'Reads Passing Q30',
+               'Total Reads', 'All Mapped Reads', 'Unmapped Reads', 'Unmapped Reads Percentage of Total',
+               'Reference with Coverage', 'Average Depth of Coverage', 'Good SNP Count']
+    data_frames = []
+    for i, fastq_file in enumerate(reads_files):
+        idxstats_file = idxstats_files[i]
+        metrics_file = metrics_files[i]
+        file_name_base = os.path.basename(fastq_file)
+        # Read fastq_file into a data frame.
+        fastq_df = fastq_to_df(fastq_file, gzipped)
+        total_reads = int(len(fastq_df.index) / 4)
+        current_sample_df = pandas.DataFrame(index=[file_name_base], columns=columns)
+        # Reference
+        current_sample_df.at[file_name_base, 'Reference'] = dbkey
+        # File Size
+        current_sample_df.at[file_name_base, 'File Size'] = nice_size(os.path.getsize(fastq_file))
+        # Mean Read Length
+        sampling_size = 10000
+        if sampling_size > total_reads:
+            sampling_size = total_reads
+        fastq_df = fastq_df.iloc[3::4].sample(sampling_size)
+        dict_mean = {}
+        list_length = []
+        for index, row in fastq_df.iterrows():
+            base_qualities = []
+            for base in list(row.array[0]):
+                base_qualities.append(int(QUALITYKEY[base]))
+            dict_mean[index] = numpy.mean(base_qualities)
+            list_length.append(len(row.array[0]))
+        current_sample_df.at[file_name_base, 'Mean Read Length'] = "%.1f" % numpy.mean(list_length)
+        # Mean Read Quality
+        df_mean = pandas.DataFrame.from_dict(dict_mean, orient='index', columns=['ave'])
+        current_sample_df.at[file_name_base, 'Mean Read Quality'] = "%.1f" % df_mean['ave'].mean()
+        # Reads Passing Q30
+        reads_gt_q30 = len(df_mean[df_mean['ave'] >= 30])
+        reads_passing_q30 = "{:10.2f}".format(reads_gt_q30 / sampling_size)
+        current_sample_df.at[file_name_base, 'Reads Passing Q30'] = reads_passing_q30
+        # Total Reads
+        current_sample_df.at[file_name_base, 'Total Reads'] = total_reads
+        # All Mapped Reads
+        all_mapped_reads, unmapped_reads = process_idxstats_file(idxstats_file)
+        current_sample_df.at[file_name_base, 'All Mapped Reads'] = all_mapped_reads
+        # Unmapped Reads
+        current_sample_df.at[file_name_base, 'Unmapped Reads'] = unmapped_reads
+        # Unmapped Reads Percentage of Total
+        if unmapped_reads > 0:
+            unmapped_reads_percentage = "{:10.2f}".format(unmapped_reads / total_reads)
         else:
-            read = "Read1"
-    else:
-        read = "Read2"
-    # Read
-    ofh.write("%s File%s%s\n" % (read, SEP, file_name_base))
-    # File Size
-    ofh.write("%s File Size%s%s\n" % (read, SEP, nice_size(os.path.getsize(fastq_file))))
-    if gzipped.lower() == "true":
-        df = pandas.read_csv(gzip.open(fastq_file, "r"), header=None, sep="^")
-    else:
-        df = pandas.read_csv(open(fastq_file, "r"), header=None, sep="^")
-    total_read_count = int(len(df.index) / 4)
-    # Readx Total Reads
-    ofh.write("%s Total Reads%s%s\n" % (read, SEP, total_read_count))
-    # Mean Read Length
-    sampling_size = int(sampling_number)
-    if sampling_size > total_read_count:
-        sampling_size = total_read_count
-    df = df.iloc[3::4].sample(sampling_size)
-    dict_mean = {}
-    list_length = []
-    for index, row in df.iterrows():
-        base_qualities = []
-        for base in list(row.array[0]):
-            base_qualities.append(int(QUALITYKEY[base]))
-        dict_mean[index] = numpy.mean(base_qualities)
-        list_length.append(len(row.array[0]))
-    ofh.write("%s Mean Read Length%s%s\n" % (read, SEP, "%.1f" % numpy.mean(list_length)))
-    # Mean Read Quality
-    df_mean = pandas.DataFrame.from_dict(dict_mean, orient='index', columns=['ave'])
-    ofh.write("%s Mean Read Quality%s%s\n" % (read, SEP, "%.1f" % df_mean['ave'].mean()))
-    # Reads Passing Q30
-    reads_gt_q30 = len(df_mean[df_mean['ave'] >= 30])
-    reads_passing_q30 = "{:10.2f}".format(reads_gt_q30 / sampling_size)
-    ofh.write("%s reads passing Q30%s%s\n" % (read, SEP, reads_passing_q30))
-    return total_read_count
+            unmapped_reads_percentage = 0
+        current_sample_df.at[file_name_base, 'Unmapped Reads Percentage of Total'] = unmapped_reads_percentage
+        # Reference with Coverage
+        ref_with_coverage, avg_depth_of_coverage, good_snp_count = process_metrics_file(metrics_file)
+        current_sample_df.at[file_name_base, 'Reference with Coverage'] = ref_with_coverage
+        # Average Depth of Coverage
+        current_sample_df.at[file_name_base, 'Average Depth of Coverage'] = avg_depth_of_coverage
+        # Good SNP Count
+        current_sample_df.at[file_name_base, 'Good SNP Count'] = good_snp_count
+        data_frames.append(current_sample_df)
+    excel_df = pandas.concat(data_frames)
+    excel_file_name = "output.xlsx"
+    writer = pandas.ExcelWriter(excel_file_name, engine='xlsxwriter')
+    excel_df.to_excel(writer, sheet_name='Sheet1')
+    writer.save()
+    shutil.move(excel_file_name, output_file)
 
 
-def output_statistics(task_queue, read2, collection, gzipped, dbkey, timeout):
-    while True:
-        try:
-            tup = task_queue.get(block=True, timeout=timeout)
-        except queue.Empty:
-            break
-        read_file, idxstats_file, metrics_file, output_file = tup
-        total_reads = 0
-        with open(output_file, "w") as ofh:
-            total_reads += output_read_stats(gzipped, read_file, ofh, output_sample=True, dbkey=dbkey, collection=collection)
-            if read2 is not None:
-                total_reads += output_read_stats(gzipped, read2, ofh)
-            ofh.write("Total Reads%s%d\n" % (SEP, total_reads))
-            with open(idxstats_file, "r") as ifh:
-                unmapped_reads = 0
-                for i, line in enumerate(ifh):
-                    items = line.split("\t")
-                    if i == 0:
-                        # NC_002945.4 4349904 213570 4047
-                        ofh.write("All Mapped Reads%s%s\n" % (SEP, items[2]))
-                    elif i == 1:
-                        # * 0 0 82774
-                        unmapped_reads = int(items[3])
-                        ofh.write("Unmapped Reads%s%d\n" % (SEP, unmapped_reads))
-                percent_str = "Unmapped Reads Percentage of Total"
-                if unmapped_reads > 0:
-                    unmapped_reads_percentage = "{:10.2f}".format(unmapped_reads / total_reads)
-                    ofh.write("%s%s%s\n" % (percent_str, SEP, unmapped_reads_percentage))
-                else:
-                    ofh.write("%s%s0\n" % (percent_str, SEP))
-            with open(metrics_file, "r") as ifh:
-                for i, line in enumerate(ifh):
-                    if i == 0:
-                        # Skip comments.
-                        continue
-                    items = line.split("\t")
-                    if i == 1:
-                        # MarkDuplicates 10.338671 98.74%
-                        ofh.write("Average Depth of Coverage%s%s\n" % (SEP, items[2]))
-                        ofh.write("Reference with Coverage%s%s\n" % (SEP, items[3]))
-                    elif i == 2:
-                        # VCFfilter 611
-                        ofh.write("Good SNP Count%s%s\n" % (SEP, items[1]))
-        task_queue.task_done()
+def process_idxstats_file(idxstats_file):
+    all_mapped_reads = 0
+    unmapped_reads = 0
+    with open(idxstats_file, "r") as fh:
+        for i, line in enumerate(fh):
+            items = line.split("\t")
+            if i == 0:
+                # NC_002945.4 4349904 213570 4047
+                all_mapped_reads = int(items[2])
+            elif i == 1:
+                # * 0 0 82774
+                unmapped_reads = int(items[3])
+    return all_mapped_reads, unmapped_reads
 
 
-def set_num_cpus(num_files, processes):
-    num_cpus = int(multiprocessing.cpu_count())
-    if num_files < num_cpus and num_files < processes:
-        return num_files
-    if num_cpus < processes:
-        half_cpus = int(num_cpus / 2)
-        if num_files < half_cpus:
-            return num_files
-        return half_cpus
-    return processes
+def process_metrics_file(metrics_file):
+    ref_with_coverage = '0%'
+    avg_depth_of_coverage = 0
+    good_snp_count = 0
+    with open(metrics_file, "r") as ifh:
+        for i, line in enumerate(ifh):
+            if i == 0:
+                # Skip comments.
+                continue
+            items = line.split("\t")
+            if i == 1:
+                # MarkDuplicates 10.338671 98.74%
+                ref_with_coverage = items[3]
+                avg_depth_of_coverage = items[2]
+            elif i == 2:
+                # VCFfilter 611
+                good_snp_count = items[1]
+    return ref_with_coverage, avg_depth_of_coverage, good_snp_count
 
 
 if __name__ == '__main__':
@@ -177,60 +167,36 @@
     parser.add_argument('--dbkey', action='store', dest='dbkey', help='Reference dbkey')
     parser.add_argument('--gzipped', action='store', dest='gzipped', help='Input files are gzipped')
     parser.add_argument('--samtools_idxstats', action='store', dest='samtools_idxstats', required=False, default=None, help='Output of samtools_idxstats')
-    parser.add_argument('--output', action='store', dest='output', required=False, default=None, help='Output statisticsfile')
+    parser.add_argument('--output', action='store', dest='output', help='Output Excel statistics file')
     parser.add_argument('--vsnp_azc', action='store', dest='vsnp_azc', required=False, default=None, help='Output of vsnp_add_zero_coverage')
-    parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
 
     args = parser.parse_args()
+    print("args:\n%s\n" % str(args))
 
     reads_files = []
     idxstats_files = []
     metrics_files = []
-    output_files = []
-    if args.output is not None:
-        # The inputs were not dataset collections, so
+    # Accumulate inputs.
+    if args.read1 is not None:
+        # The inputs are not dataset collections, so
         # read1, read2 (possibly) and vsnp_azc will also
         # not be None.
-        collection = False
         reads_files.append(args.read1)
         idxstats_files.append(args.samtools_idxstats)
         metrics_files.append(args.vsnp_azc)
-        output_files.append(args.output)
+        if args.read2 is not None:
+            reads_files.append(args.read2)
+            idxstats_files.append(args.samtools_idxstats)
+            metrics_files.append(args.vsnp_azc)
     else:
-        collection = True
         for file_name in sorted(os.listdir(INPUT_READS_DIR)):
             file_path = os.path.abspath(os.path.join(INPUT_READS_DIR, file_name))
             reads_files.append(file_path)
             base_file_name = get_base_file_name(file_path)
-            output_files.append(os.path.abspath(os.path.join(OUTPUT_DIR, "%s.tabular" % base_file_name)))
         for file_name in sorted(os.listdir(INPUT_IDXSTATS_DIR)):
             file_path = os.path.abspath(os.path.join(INPUT_IDXSTATS_DIR, file_name))
             idxstats_files.append(file_path)
         for file_name in sorted(os.listdir(INPUT_METRICS_DIR)):
             file_path = os.path.abspath(os.path.join(INPUT_METRICS_DIR, file_name))
             metrics_files.append(file_path)
-
-    multiprocessing.set_start_method('spawn')
-    queue1 = multiprocessing.JoinableQueue()
-    num_files = len(output_files)
-    cpus = set_num_cpus(num_files, args.processes)
-    # Set a timeout for get()s in the queue.
-    timeout = 0.05
-
-    for i, output_file in enumerate(output_files):
-        read_file = reads_files[i]
-        idxstats_file = idxstats_files[i]
-        metrics_file = metrics_files[i]
-        queue1.put((read_file, idxstats_file, metrics_file, output_file))
-
-    # Complete the output_statistics task.
-    processes = [multiprocessing.Process(target=output_statistics, args=(queue1, args.read2, collection, args.gzipped, args.dbkey, timeout, )) for _ in range(cpus)]
-    for p in processes:
-        p.start()
-    for p in processes:
-        p.join()
-    queue1.join()
-
-    if queue1.empty():
-        queue1.close()
-        queue1.join_thread()
+    output_statistics(reads_files, idxstats_files, metrics_files, args.output, args.gzipped, args.dbkey)