diff vsnp_build_tables.py @ 1:9ac0b1d5560d draft

"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/vsnp commit 2a94c64d6c7236550bf483d2ffc4e86248c63aab"
author iuc
date Tue, 16 Nov 2021 20:11:30 +0000
parents ec6e02f4eab7
children 6fdfdee72f8a
line wrap: on
line diff
--- a/vsnp_build_tables.py	Tue Nov 16 08:26:58 2021 +0000
+++ b/vsnp_build_tables.py	Tue Nov 16 20:11:30 2021 +0000
@@ -1,7 +1,9 @@
 #!/usr/bin/env python
 
 import argparse
+import multiprocessing
 import os
+import queue
 import re
 
 import pandas
@@ -16,6 +18,9 @@
 # to use LibreOffice for Excel spreadsheets.
 MAXCOLS = 1024
 OUTPUT_EXCEL_DIR = 'output_excel_dir'
+INPUT_JSON_AVG_MQ_DIR = 'input_json_avg_mq_dir'
+INPUT_JSON_DIR = 'input_json_dir'
+INPUT_NEWICK_DIR = 'input_newick_dir'
 
 
 def annotate_table(table_df, group, annotation_dict):
@@ -221,74 +226,94 @@
         output_excel(df, type_str, group_str, annotation_dict)
 
 
-def preprocess_tables(newick_file, json_file, json_avg_mq_file, annotation_dict):
-    avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split')
-    # Map quality to dataframe.
-    mqdf = avg_mq_series.to_frame(name='MQ')
-    mqdf = mqdf.T
-    # Get the group.
-    group = get_sample_name(newick_file)
-    snps_df = pandas.read_json(json_file, orient='split')
-    with open(newick_file, 'r') as fh:
-        for line in fh:
-            line = re.sub('[:,]', '\n', line)
-            line = re.sub('[)(]', '', line)
-            line = re.sub(r'[0-9].*\.[0-9].*\n', '', line)
-            line = re.sub('root\n', '', line)
-    sample_order = line.split('\n')
-    sample_order = list([_f for _f in sample_order if _f])
-    sample_order.insert(0, 'root')
-    tree_order = snps_df.loc[sample_order]
-    # Count number of SNPs in each column.
-    snp_per_column = []
-    for column_header in tree_order:
-        count = 0
-        column = tree_order[column_header]
-        for element in column:
-            if element != column[0]:
-                count = count + 1
-        snp_per_column.append(count)
-    row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column")
-    # Count number of SNPS from the
-    # top of each column in the table.
-    snp_from_top = []
-    for column_header in tree_order:
-        count = 0
-        column = tree_order[column_header]
-        # for each element in the column
-        # skip the first element
-        for element in column[1:]:
-            if element == column[0]:
-                count = count + 1
-            else:
-                break
-        snp_from_top.append(count)
-    row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top")
-    tree_order = tree_order.append([row1])
-    tree_order = tree_order.append([row2])
-    # In pandas=0.18.1 even this does not work:
-    # abc = row1.to_frame()
-    # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18)
-    # tree_order.append(abc)
-    # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions"
-    tree_order = tree_order.T
-    tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
-    tree_order = tree_order.T
-    # Remove snp_per_column and snp_from_top rows.
-    cascade_order = tree_order[:-2]
-    # Output the cascade table.
-    output_cascade_table(cascade_order, mqdf, group, annotation_dict)
-    # Output the sorted table.
-    output_sort_table(cascade_order, mqdf, group, annotation_dict)
+def preprocess_tables(task_queue, annotation_dict, timeout):
+    while True:
+        try:
+            tup = task_queue.get(block=True, timeout=timeout)
+        except queue.Empty:
+            break
+        newick_file, json_file, json_avg_mq_file = tup
+        avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split')
+        # Map quality to dataframe.
+        mqdf = avg_mq_series.to_frame(name='MQ')
+        mqdf = mqdf.T
+        # Get the group.
+        group = get_sample_name(newick_file)
+        snps_df = pandas.read_json(json_file, orient='split')
+        with open(newick_file, 'r') as fh:
+            for line in fh:
+                line = re.sub('[:,]', '\n', line)
+                line = re.sub('[)(]', '', line)
+                line = re.sub(r'[0-9].*\.[0-9].*\n', '', line)
+                line = re.sub('root\n', '', line)
+        sample_order = line.split('\n')
+        sample_order = list([_f for _f in sample_order if _f])
+        sample_order.insert(0, 'root')
+        tree_order = snps_df.loc[sample_order]
+        # Count number of SNPs in each column.
+        snp_per_column = []
+        for column_header in tree_order:
+            count = 0
+            column = tree_order[column_header]
+            for element in column:
+                if element != column[0]:
+                    count = count + 1
+            snp_per_column.append(count)
+        row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column")
+        # Count number of SNPS from the
+        # top of each column in the table.
+        snp_from_top = []
+        for column_header in tree_order:
+            count = 0
+            column = tree_order[column_header]
+            # for each element in the column
+            # skip the first element
+            for element in column[1:]:
+                if element == column[0]:
+                    count = count + 1
+                else:
+                    break
+            snp_from_top.append(count)
+        row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top")
+        tree_order = tree_order.append([row1])
+        tree_order = tree_order.append([row2])
+        # In pandas=0.18.1 even this does not work:
+        # abc = row1.to_frame()
+        # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18)
+        # tree_order.append(abc)
+        # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions"
+        tree_order = tree_order.T
+        tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
+        tree_order = tree_order.T
+        # Remove snp_per_column and snp_from_top rows.
+        cascade_order = tree_order[:-2]
+        # Output the cascade table.
+        output_cascade_table(cascade_order, mqdf, group, annotation_dict)
+        # Output the sorted table.
+        output_sort_table(cascade_order, mqdf, group, annotation_dict)
+        task_queue.task_done()
+
+
+def set_num_cpus(num_files, processes):
+    num_cpus = int(multiprocessing.cpu_count())
+    if num_files < num_cpus and num_files < processes:
+        return num_files
+    if num_cpus < processes:
+        half_cpus = int(num_cpus / 2)
+        if num_files < half_cpus:
+            return num_files
+        return half_cpus
+    return processes
 
 
 if __name__ == '__main__':
     parser = argparse.ArgumentParser()
 
+    parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', required=False, default=None, help='Average MQ json file')
+    parser.add_argument('--input_newick', action='store', dest='input_newick', required=False, default=None, help='Newick file')
+    parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', required=False, default=None, help='SNPs json file')
     parser.add_argument('--gbk_file', action='store', dest='gbk_file', required=False, default=None, help='Optional gbk file'),
-    parser.add_argument('--input_avg_mq_json', action='store', dest='input_avg_mq_json', help='Average MQ json file')
-    parser.add_argument('--input_newick', action='store', dest='input_newick', help='Newick file')
-    parser.add_argument('--input_snps_json', action='store', dest='input_snps_json', help='SNPs json file')
+    parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
 
     args = parser.parse_args()
 
@@ -299,4 +324,56 @@
     else:
         annotation_dict = None
 
-    preprocess_tables(args.input_newick, args.input_snps_json, args.input_avg_mq_json, annotation_dict)
+    # The assumption here is that the list of files
+    # in both INPUT_NEWICK_DIR and INPUT_JSON_DIR are
+    # named such that they are properly matched if
+    # the directories contain more than 1 file (i.e.,
+    # hopefully the newick file names and json file names
+    # will be something like Mbovis-01D6_* so they can be
+    # sorted and properly associated with each other).
+    if args.input_newick is not None:
+        newick_files = [args.input_newick]
+    else:
+        newick_files = []
+        for file_name in sorted(os.listdir(INPUT_NEWICK_DIR)):
+            file_path = os.path.abspath(os.path.join(INPUT_NEWICK_DIR, file_name))
+            newick_files.append(file_path)
+    if args.input_snps_json is not None:
+        json_files = [args.input_snps_json]
+    else:
+        json_files = []
+        for file_name in sorted(os.listdir(INPUT_JSON_DIR)):
+            file_path = os.path.abspath(os.path.join(INPUT_JSON_DIR, file_name))
+            json_files.append(file_path)
+    if args.input_avg_mq_json is not None:
+        json_avg_mq_files = [args.input_avg_mq_json]
+    else:
+        json_avg_mq_files = []
+        for file_name in sorted(os.listdir(INPUT_JSON_AVG_MQ_DIR)):
+            file_path = os.path.abspath(os.path.join(INPUT_JSON_AVG_MQ_DIR, file_name))
+            json_avg_mq_files.append(file_path)
+
+    multiprocessing.set_start_method('spawn')
+    queue1 = multiprocessing.JoinableQueue()
+    queue2 = multiprocessing.JoinableQueue()
+    num_files = len(newick_files)
+    cpus = set_num_cpus(num_files, args.processes)
+    # Set a timeout for get()s in the queue.
+    timeout = 0.05
+
+    for i, newick_file in enumerate(newick_files):
+        json_file = json_files[i]
+        json_avg_mq_file = json_avg_mq_files[i]
+        queue1.put((newick_file, json_file, json_avg_mq_file))
+
+    # Complete the preprocess_tables task.
+    processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(cpus)]
+    for p in processes:
+        p.start()
+    for p in processes:
+        p.join()
+    queue1.join()
+
+    if queue1.empty():
+        queue1.close()
+        queue1.join_thread()