diff vsnp_build_tables.py @ 3:2e863710a2f0 draft

"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/vsnp commit 2e312886647244b416c64eca91e1a61dd1be939b"
author iuc
date Thu, 10 Dec 2020 15:26:25 +0000
parents aed013f6b13b
children 6dc6dd4666e3
line wrap: on
line diff
--- a/vsnp_build_tables.py	Wed Dec 02 09:10:53 2020 +0000
+++ b/vsnp_build_tables.py	Thu Dec 10 15:26:25 2020 +0000
@@ -1,18 +1,13 @@
 #!/usr/bin/env python
 
 import argparse
-import multiprocessing
 import os
-import queue
 import re
 
 import pandas
 import pandas.io.formats.excel
 from Bio import SeqIO
 
-INPUT_JSON_AVG_MQ_DIR = 'input_json_avg_mq_dir'
-INPUT_JSON_DIR = 'input_json_dir'
-INPUT_NEWICK_DIR = 'input_newick_dir'
 # Maximum columns allowed in a LibreOffice
 # spreadsheet is 1024.  Excel allows for
 # 16,384 columns, but we'll set the lower
@@ -145,18 +140,12 @@
     return annotation_dict
 
 
-def get_base_file_name(file_path):
+def get_sample_name(file_path):
     base_file_name = os.path.basename(file_path)
     if base_file_name.find(".") > 0:
         # Eliminate the extension.
         return os.path.splitext(base_file_name)[0]
-    elif base_file_name.find("_") > 0:
-        # The dot extension was likely changed to
-        # the " character.
-        items = base_file_name.split("_")
-        return "_".join(items[0:-1])
-    else:
-        return base_file_name
+    return base_file_name
 
 
 def output_cascade_table(cascade_order, mqdf, group, annotation_dict):
@@ -169,17 +158,20 @@
     # is used by the excel_formatter.
     if count is None:
         if group is None:
-            json_file_name = "%s_order_mq.json" % type_str
+            json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_order_mq.json" % type_str)
             excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_table.xlsx" % type_str)
         else:
-            json_file_name = "%s_%s_order_mq.json" % (group, type_str)
+            json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_order_mq.json" % (group, type_str))
             excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table.xlsx" % (group, type_str))
     else:
+        # The table has more columns than is allowed by the
+        # MAXCOLS setting, so multiple files will be produced
+        # as an output collection.
         if group is None:
-            json_file_name = "%s_order_mq_%d.json" % (type_str, count)
+            json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_order_mq_%d.json" % (type_str, count))
             excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_table_%d.xlsx" % (type_str, count))
         else:
-            json_file_name = "%s_%s_order_mq_%d.json" % (group, type_str, count)
+            json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_order_mq_%d.json" % (group, type_str, count))
             excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table_%d.xlsx" % (group, type_str, count))
     df.to_json(json_file_name, orient='split')
     # Output the Excel file.
@@ -229,94 +221,74 @@
         output_excel(df, type_str, group_str, annotation_dict)
 
 
-def preprocess_tables(task_queue, annotation_dict, timeout):
-    while True:
-        try:
-            tup = task_queue.get(block=True, timeout=timeout)
-        except queue.Empty:
-            break
-        newick_file, json_file, json_avg_mq_file = tup
-        avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split')
-        # Map quality to dataframe.
-        mqdf = avg_mq_series.to_frame(name='MQ')
-        mqdf = mqdf.T
-        # Get the group.
-        group = get_base_file_name(newick_file)
-        snps_df = pandas.read_json(json_file, orient='split')
-        with open(newick_file, 'r') as fh:
-            for line in fh:
-                line = re.sub('[:,]', '\n', line)
-                line = re.sub('[)(]', '', line)
-                line = re.sub(r'[0-9].*\.[0-9].*\n', '', line)
-                line = re.sub('root\n', '', line)
-        sample_order = line.split('\n')
-        sample_order = list([_f for _f in sample_order if _f])
-        sample_order.insert(0, 'root')
-        tree_order = snps_df.loc[sample_order]
-        # Count number of SNPs in each column.
-        snp_per_column = []
-        for column_header in tree_order:
-            count = 0
-            column = tree_order[column_header]
-            for element in column:
-                if element != column[0]:
-                    count = count + 1
-            snp_per_column.append(count)
-        row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column")
-        # Count number of SNPS from the
-        # top of each column in the table.
-        snp_from_top = []
-        for column_header in tree_order:
-            count = 0
-            column = tree_order[column_header]
-            # for each element in the column
-            # skip the first element
-            for element in column[1:]:
-                if element == column[0]:
-                    count = count + 1
-                else:
-                    break
-            snp_from_top.append(count)
-        row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top")
-        tree_order = tree_order.append([row1])
-        tree_order = tree_order.append([row2])
-        # In pandas=0.18.1 even this does not work:
-        # abc = row1.to_frame()
-        # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18)
-        # tree_order.append(abc)
-        # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions"
-        tree_order = tree_order.T
-        tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
-        tree_order = tree_order.T
-        # Remove snp_per_column and snp_from_top rows.
-        cascade_order = tree_order[:-2]
-        # Output the cascade table.
-        output_cascade_table(cascade_order, mqdf, group, annotation_dict)
-        # Output the sorted table.
-        output_sort_table(cascade_order, mqdf, group, annotation_dict)
-        task_queue.task_done()
-
-
-def set_num_cpus(num_files, processes):
-    num_cpus = int(multiprocessing.cpu_count())
-    if num_files < num_cpus and num_files < processes:
-        return num_files
-    if num_cpus < processes:
-        half_cpus = int(num_cpus / 2)
-        if num_files < half_cpus:
-            return num_files
-        return half_cpus
-    return processes
+def preprocess_tables(newick_file, json_file, json_avg_mq_file, annotation_dict):
+    avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split')
+    # Map quality to dataframe.
+    mqdf = avg_mq_series.to_frame(name='MQ')
+    mqdf = mqdf.T
+    # Get the group.
+    group = get_sample_name(newick_file)
+    snps_df = pandas.read_json(json_file, orient='split')
+    with open(newick_file, 'r') as fh:
+        for line in fh:
+            line = re.sub('[:,]', '\n', line)
+            line = re.sub('[)(]', '', line)
+            line = re.sub(r'[0-9].*\.[0-9].*\n', '', line)
+            line = re.sub('root\n', '', line)
+    sample_order = line.split('\n')
+    sample_order = list([_f for _f in sample_order if _f])
+    sample_order.insert(0, 'root')
+    tree_order = snps_df.loc[sample_order]
+    # Count number of SNPs in each column.
+    snp_per_column = []
+    for column_header in tree_order:
+        count = 0
+        column = tree_order[column_header]
+        for element in column:
+            if element != column[0]:
+                count = count + 1
+        snp_per_column.append(count)
+    row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column")
+    # Count number of SNPS from the
+    # top of each column in the table.
+    snp_from_top = []
+    for column_header in tree_order:
+        count = 0
+        column = tree_order[column_header]
+        # for each element in the column
+        # skip the first element
+        for element in column[1:]:
+            if element == column[0]:
+                count = count + 1
+            else:
+                break
+        snp_from_top.append(count)
+    row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top")
+    tree_order = tree_order.append([row1])
+    tree_order = tree_order.append([row2])
+    # In pandas=0.18.1 even this does not work:
+    # abc = row1.to_frame()
+    # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18)
+    # tree_order.append(abc)
+    # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions"
+    tree_order = tree_order.T
+    tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
+    tree_order = tree_order.T
+    # Remove snp_per_column and snp_from_top rows.
+    cascade_order = tree_order[:-2]
+    # Output the cascade table.
+    output_cascade_table(cascade_order, mqdf, group, annotation_dict)
+    # Output the sorted table.
+    output_sort_table(cascade_order, mqdf, group, annotation_dict)
 
 
 if __name__ == '__main__':
     parser = argparse.ArgumentParser()
 
-    parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', required=False, default=None, help='Average MQ json file')
-    parser.add_argument('--input_newick', action='store', dest='input_newick', required=False, default=None, help='Newick file')
-    parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', required=False, default=None, help='SNPs json file')
     parser.add_argument('--gbk_file', action='store', dest='gbk_file', required=False, default=None, help='Optional gbk file'),
-    parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
+    parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', help='Average MQ json file')
+    parser.add_argument('--input_newick', action='store', dest='input_newick', help='Newick file')
+    parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', help='SNPs json file')
 
     args = parser.parse_args()
 
@@ -327,56 +299,4 @@
     else:
         annotation_dict = None
 
-    # The assumption here is that the list of files
-    # in both INPUT_NEWICK_DIR and INPUT_JSON_DIR are
-    # named such that they are properly matched if
-    # the directories contain more than 1 file (i.e.,
-    # hopefully the newick file names and json file names
-    # will be something like Mbovis-01D6_* so they can be
-    # sorted and properly associated with each other).
-    if args.input_newick is not None:
-        newick_files = [args.input_newick]
-    else:
-        newick_files = []
-        for file_name in sorted(os.listdir(INPUT_NEWICK_DIR)):
-            file_path = os.path.abspath(os.path.join(INPUT_NEWICK_DIR, file_name))
-            newick_files.append(file_path)
-    if args.input_snps_json is not None:
-        json_files = [args.input_snps_json]
-    else:
-        json_files = []
-        for file_name in sorted(os.listdir(INPUT_JSON_DIR)):
-            file_path = os.path.abspath(os.path.join(INPUT_JSON_DIR, file_name))
-            json_files.append(file_path)
-    if args.input_avg_mq_json is not None:
-        json_avg_mq_files = [args.input_avg_mq_json]
-    else:
-        json_avg_mq_files = []
-        for file_name in sorted(os.listdir(INPUT_JSON_AVG_MQ_DIR)):
-            file_path = os.path.abspath(os.path.join(INPUT_JSON_AVG_MQ_DIR, file_name))
-            json_avg_mq_files.append(file_path)
-
-    multiprocessing.set_start_method('spawn')
-    queue1 = multiprocessing.JoinableQueue()
-    queue2 = multiprocessing.JoinableQueue()
-    num_files = len(newick_files)
-    cpus = set_num_cpus(num_files, args.processes)
-    # Set a timeout for get()s in the queue.
-    timeout = 0.05
-
-    for i, newick_file in enumerate(newick_files):
-        json_file = json_files[i]
-        json_avg_mq_file = json_avg_mq_files[i]
-        queue1.put((newick_file, json_file, json_avg_mq_file))
-
-    # Complete the preprocess_tables task.
-    processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(cpus)]
-    for p in processes:
-        p.start()
-    for p in processes:
-        p.join()
-    queue1.join()
-
-    if queue1.empty():
-        queue1.close()
-        queue1.join_thread()
+    preprocess_tables(args.input_newick, args.input_snps_json, args.input_avg_mq_json, annotation_dict)