| 0 | 1 #!/usr/bin/env python | 
|  | 2 | 
|  | 3 import argparse | 
|  | 4 import gzip | 
|  | 5 import multiprocessing | 
|  | 6 import os | 
|  | 7 import queue | 
|  | 8 import yaml | 
|  | 9 from Bio.SeqIO.QualityIO import FastqGeneralIterator | 
|  | 10 from collections import OrderedDict | 
|  | 11 | 
|  | 12 INPUT_READS_DIR = 'input_reads' | 
|  | 13 OUTPUT_DBKEY_DIR = 'output_dbkey' | 
|  | 14 OUTPUT_METRICS_DIR = 'output_metrics' | 
|  | 15 | 
|  | 16 | 
|  | 17 def get_base_file_name(file_path): | 
|  | 18     base_file_name = os.path.basename(file_path) | 
|  | 19     if base_file_name.find(".") > 0: | 
|  | 20         # Eliminate the extension. | 
|  | 21         return os.path.splitext(base_file_name)[0] | 
|  | 22     elif base_file_name.find("_") > 0: | 
|  | 23         # The dot extension was likely changed to | 
|  | 24         # the " character. | 
|  | 25         items = base_file_name.split("_") | 
|  | 26         no_ext = "_".join(items[0:-2]) | 
|  | 27         if len(no_ext) > 0: | 
|  | 28             return no_ext | 
|  | 29     return base_file_name | 
|  | 30 | 
|  | 31 | 
|  | 32 def get_dbkey(dnaprints_dict, key, s): | 
|  | 33     # dnaprints_dict looks something like this: | 
|  | 34     # {'brucella': {'NC_002945v4': ['11001110', '11011110', '11001100']} | 
|  | 35     # {'bovis': {'NC_006895': ['11111110', '00010010', '01111011']}} | 
|  | 36     d = dnaprints_dict.get(key, {}) | 
|  | 37     for data_table_value, v_list in d.items(): | 
|  | 38         if s in v_list: | 
|  | 39             return data_table_value | 
|  | 40     return "" | 
|  | 41 | 
|  | 42 | 
|  | 43 def get_dnaprints_dict(dnaprint_fields): | 
|  | 44     # A dndprint_fields entry looks something liek this. | 
|  | 45     # [['AF2122', '/galaxy/tool-data/vsnp/AF2122/dnaprints/NC_002945v4.yml']] | 
|  | 46     dnaprints_dict = {} | 
|  | 47     for item in dnaprint_fields: | 
|  | 48         # Here item is a 2-element list of data | 
|  | 49         # table components, # value and path. | 
|  | 50         value = item[0] | 
|  | 51         path = item[1].strip() | 
|  | 52         with open(path, "rt") as fh: | 
|  | 53             # The format of all dnaprints yaml | 
|  | 54             # files is something like this: | 
|  | 55             # brucella: | 
|  | 56             #  - 0111111111111111 | 
|  | 57             print_dict = yaml.load(fh, Loader=yaml.Loader) | 
|  | 58         for print_dict_k, print_dict_v in print_dict.items(): | 
|  | 59             dnaprints_v_dict = dnaprints_dict.get(print_dict_k, {}) | 
|  | 60             if len(dnaprints_v_dict) > 0: | 
|  | 61                 # dnaprints_dict already contains k (e.g., 'brucella', | 
|  | 62                 # and dnaprints_v_dict will be a dictionary # that | 
|  | 63                 # looks something like this: | 
|  | 64                 # {'NC_002945v4': ['11001110', '11011110', '11001100']} | 
|  | 65                 value_list = dnaprints_v_dict.get(value, []) | 
|  | 66                 value_list = value_list + print_dict_v | 
|  | 67                 dnaprints_v_dict[value] = value_list | 
|  | 68             else: | 
|  | 69                 # dnaprints_v_dict is an empty dictionary. | 
|  | 70                 dnaprints_v_dict[value] = print_dict_v | 
|  | 71             dnaprints_dict[print_dict_k] = dnaprints_v_dict | 
|  | 72     # dnaprints_dict looks something like this: | 
|  | 73     # {'brucella': {'NC_002945v4': ['11001110', '11011110', '11001100']} | 
|  | 74     # {'bovis': {'NC_006895': ['11111110', '00010010', '01111011']}} | 
|  | 75     return dnaprints_dict | 
|  | 76 | 
|  | 77 | 
|  | 78 def get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum): | 
|  | 79     if brucella_sum > 3: | 
|  | 80         group = "Brucella" | 
|  | 81         dbkey = get_dbkey(dnaprints_dict, "brucella", brucella_string) | 
|  | 82     elif bovis_sum > 3: | 
|  | 83         group = "TB" | 
|  | 84         dbkey = get_dbkey(dnaprints_dict, "bovis", bovis_string) | 
|  | 85     elif para_sum >= 1: | 
|  | 86         group = "paraTB" | 
|  | 87         dbkey = get_dbkey(dnaprints_dict, "para", para_string) | 
|  | 88     else: | 
|  | 89         group = "" | 
|  | 90         dbkey = "" | 
|  | 91     return group, dbkey | 
|  | 92 | 
|  | 93 | 
|  | 94 def get_group_and_dbkey_for_collection(task_queue, finished_queue, dnaprints_dict, timeout): | 
|  | 95     while True: | 
|  | 96         try: | 
|  | 97             tup = task_queue.get(block=True, timeout=timeout) | 
|  | 98         except queue.Empty: | 
|  | 99             break | 
|  | 100         fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum = tup | 
|  | 101         group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum) | 
|  | 102         finished_queue.put((fastq_file, count_list, group, dbkey)) | 
|  | 103         task_queue.task_done() | 
|  | 104 | 
|  | 105 | 
|  | 106 def get_oligo_dict(): | 
|  | 107     oligo_dict = {} | 
|  | 108     oligo_dict["01_ab1"] = "AATTGTCGGATAGCCTGGCGATAACGACGC" | 
|  | 109     oligo_dict["02_ab3"] = "CACACGCGGGCCGGAACTGCCGCAAATGAC" | 
|  | 110     oligo_dict["03_ab5"] = "GCTGAAGCGGCAGACCGGCAGAACGAATAT" | 
|  | 111     oligo_dict["04_mel"] = "TGTCGCGCGTCAAGCGGCGTGAAATCTCTG" | 
|  | 112     oligo_dict["05_suis1"] = "TGCGTTGCCGTGAAGCTTAATTCGGCTGAT" | 
|  | 113     oligo_dict["06_suis2"] = "GGCAATCATGCGCAGGGCTTTGCATTCGTC" | 
|  | 114     oligo_dict["07_suis3"] = "CAAGGCAGATGCACATAATCCGGCGACCCG" | 
|  | 115     oligo_dict["08_ceti1"] = "GTGAATATAGGGTGAATTGATCTTCAGCCG" | 
|  | 116     oligo_dict["09_ceti2"] = "TTACAAGCAGGCCTATGAGCGCGGCGTGAA" | 
|  | 117     oligo_dict["10_canis4"] = "CTGCTACATAAAGCACCCGGCGACCGAGTT" | 
|  | 118     oligo_dict["11_canis"] = "ATCGTTTTGCGGCATATCGCTGACCACAGC" | 
|  | 119     oligo_dict["12_ovis"] = "CACTCAATCTTCTCTACGGGCGTGGTATCC" | 
|  | 120     oligo_dict["13_ether2"] = "CGAAATCGTGGTGAAGGACGGGACCGAACC" | 
|  | 121     oligo_dict["14_63B1"] = "CCTGTTTAAAAGAATCGTCGGAACCGCTCT" | 
|  | 122     oligo_dict["15_16M0"] = "TCCCGCCGCCATGCCGCCGAAAGTCGCCGT" | 
|  | 123     oligo_dict["16_mel1b"] = "TCTGTCCAAACCCCGTGACCGAACAATAGA" | 
|  | 124     oligo_dict["17_tb157"] = "CTCTTCGTATACCGTTCCGTCGTCACCATGGTCCT" | 
|  | 125     oligo_dict["18_tb7"] = "TCACGCAGCCAACGATATTCGTGTACCGCGACGGT" | 
|  | 126     oligo_dict["19_tbbov"] = "CTGGGCGACCCGGCCGACCTGCACACCGCGCATCA" | 
|  | 127     oligo_dict["20_tb5"] = "CCGTGGTGGCGTATCGGGCCCCTGGATCGCGCCCT" | 
|  | 128     oligo_dict["21_tb2"] = "ATGTCTGCGTAAAGAAGTTCCATGTCCGGGAAGTA" | 
|  | 129     oligo_dict["22_tb3"] = "GAAGACCTTGATGCCGATCTGGGTGTCGATCTTGA" | 
|  | 130     oligo_dict["23_tb4"] = "CGGTGTTGAAGGGTCCCCCGTTCCAGAAGCCGGTG" | 
|  | 131     oligo_dict["24_tb6"] = "ACGGTGATTCGGGTGGTCGACACCGATGGTTCAGA" | 
|  | 132     oligo_dict["25_para"] = "CCTTTCTTGAAGGGTGTTCG" | 
|  | 133     oligo_dict["26_para_sheep"] = "CGTGGTGGCGACGGCGGCGGGCCTGTCTAT" | 
|  | 134     oligo_dict["27_para_cattle"] = "TCTCCTCGGTCGGTGATTCGGGGGCGCGGT" | 
|  | 135     return oligo_dict | 
|  | 136 | 
|  | 137 | 
|  | 138 def get_seq_counts(value, fastq_list, gzipped): | 
|  | 139     count = 0 | 
|  | 140     for fastq_file in fastq_list: | 
|  | 141         if gzipped == "true": | 
|  | 142             with gzip.open(fastq_file, 'rt') as fh: | 
|  | 143                 for title, seq, qual in FastqGeneralIterator(fh): | 
|  | 144                     count += seq.count(value) | 
|  | 145         else: | 
|  | 146             with open(fastq_file, 'r') as fh: | 
|  | 147                 for title, seq, qual in FastqGeneralIterator(fh): | 
|  | 148                     count += seq.count(value) | 
|  | 149     return(value, count) | 
|  | 150 | 
|  | 151 | 
|  | 152 def get_species_counts(fastq_list, gzipped): | 
|  | 153     count_summary = {} | 
|  | 154     oligo_dict = get_oligo_dict() | 
|  | 155     for v1 in oligo_dict.values(): | 
|  | 156         returned_value, count = get_seq_counts(v1, fastq_list, gzipped) | 
|  | 157         for key, v2 in oligo_dict.items(): | 
|  | 158             if returned_value == v2: | 
|  | 159                 count_summary.update({key: count}) | 
|  | 160     count_list = [] | 
|  | 161     for v in count_summary.values(): | 
|  | 162         count_list.append(v) | 
|  | 163     brucella_sum = sum(count_list[:16]) | 
|  | 164     bovis_sum = sum(count_list[16:24]) | 
|  | 165     para_sum = sum(count_list[24:]) | 
|  | 166     return count_summary, count_list, brucella_sum, bovis_sum, para_sum | 
|  | 167 | 
|  | 168 | 
|  | 169 def get_species_counts_for_collection(task_queue, finished_queue, gzipped, timeout): | 
|  | 170     while True: | 
|  | 171         try: | 
|  | 172             fastq_file = task_queue.get(block=True, timeout=timeout) | 
|  | 173         except queue.Empty: | 
|  | 174             break | 
|  | 175         count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts([fastq_file], gzipped) | 
|  | 176         finished_queue.put((fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum)) | 
|  | 177         task_queue.task_done() | 
|  | 178 | 
|  | 179 | 
|  | 180 def get_species_strings(count_summary): | 
|  | 181     binary_dictionary = {} | 
|  | 182     for k, v in count_summary.items(): | 
|  | 183         if v > 1: | 
|  | 184             binary_dictionary.update({k: 1}) | 
|  | 185         else: | 
|  | 186             binary_dictionary.update({k: 0}) | 
|  | 187     binary_dictionary = OrderedDict(sorted(binary_dictionary.items())) | 
|  | 188     binary_list = [] | 
|  | 189     for v in binary_dictionary.values(): | 
|  | 190         binary_list.append(v) | 
|  | 191     brucella_binary = binary_list[:16] | 
|  | 192     brucella_string = ''.join(str(e) for e in brucella_binary) | 
|  | 193     bovis_binary = binary_list[16:24] | 
|  | 194     bovis_string = ''.join(str(e) for e in bovis_binary) | 
|  | 195     para_binary = binary_list[24:] | 
|  | 196     para_string = ''.join(str(e) for e in para_binary) | 
|  | 197     return brucella_string, bovis_string, para_string | 
|  | 198 | 
|  | 199 | 
|  | 200 def get_species_strings_for_collection(task_queue, finished_queue, timeout): | 
|  | 201     while True: | 
|  | 202         try: | 
|  | 203             tup = task_queue.get(block=True, timeout=timeout) | 
|  | 204         except queue.Empty: | 
|  | 205             break | 
|  | 206         fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum = tup | 
|  | 207         brucella_string, bovis_string, para_string = get_species_strings(count_summary) | 
|  | 208         finished_queue.put((fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)) | 
|  | 209         task_queue.task_done() | 
|  | 210 | 
|  | 211 | 
|  | 212 def output_dbkey(file_name, dbkey, output_file=None): | 
|  | 213     # Output the dbkey. | 
|  | 214     if output_file is None: | 
|  | 215         # We're producing a dataset collection. | 
|  | 216         output_file = os.path.join(OUTPUT_DBKEY_DIR, "%s.txt" % file_name) | 
|  | 217     with open(output_file, "w") as fh: | 
|  | 218         fh.write("%s" % dbkey) | 
|  | 219 | 
|  | 220 | 
|  | 221 def output_files(fastq_file, count_list, group, dbkey, dbkey_file=None, metrics_file=None): | 
|  | 222     base_file_name = get_base_file_name(fastq_file) | 
|  | 223     if dbkey_file is not None: | 
|  | 224         # We're dealing with a single read or | 
|  | 225         # a set of paired reads.  If the latter, | 
|  | 226         # the following will hopefully produce a | 
|  | 227         # good sample string. | 
|  | 228         if base_file_name.find("_") > 0: | 
|  | 229             base_file_name = base_file_name.split("_")[0] | 
|  | 230     output_dbkey(base_file_name, dbkey, dbkey_file) | 
|  | 231     output_metrics(base_file_name, count_list, group, dbkey, metrics_file) | 
|  | 232 | 
|  | 233 | 
|  | 234 def output_files_for_collection(task_queue, timeout): | 
|  | 235     while True: | 
|  | 236         try: | 
|  | 237             tup = task_queue.get(block=True, timeout=timeout) | 
|  | 238         except queue.Empty: | 
|  | 239             break | 
|  | 240         fastq_file, count_list, group, dbkey = tup | 
|  | 241         output_files(fastq_file, count_list, group, dbkey) | 
|  | 242         task_queue.task_done() | 
|  | 243 | 
|  | 244 | 
|  | 245 def output_metrics(file_name, count_list, group, dbkey, output_file=None): | 
|  | 246     # Output the metrics. | 
|  | 247     if output_file is None: | 
|  | 248         # We're producing a dataset collection. | 
|  | 249         output_file = os.path.join(OUTPUT_METRICS_DIR, "%s.txt" % file_name) | 
|  | 250     with open(output_file, "w") as fh: | 
|  | 251         fh.write("Sample: %s\n" % file_name) | 
|  | 252         fh.write("Brucella counts: ") | 
|  | 253         for i in count_list[:16]: | 
|  | 254             fh.write("%d," % i) | 
|  | 255         fh.write("\nTB counts: ") | 
|  | 256         for i in count_list[16:24]: | 
|  | 257             fh.write("%d," % i) | 
|  | 258         fh.write("\nPara counts: ") | 
|  | 259         for i in count_list[24:]: | 
|  | 260             fh.write("%d," % i) | 
|  | 261         fh.write("\nGroup: %s" % group) | 
|  | 262         fh.write("\ndbkey: %s\n" % dbkey) | 
|  | 263 | 
|  | 264 | 
|  | 265 def set_num_cpus(num_files, processes): | 
|  | 266     num_cpus = int(multiprocessing.cpu_count()) | 
|  | 267     if num_files < num_cpus and num_files < processes: | 
|  | 268         return num_files | 
|  | 269     if num_cpus < processes: | 
|  | 270         half_cpus = int(num_cpus / 2) | 
|  | 271         if num_files < half_cpus: | 
|  | 272             return num_files | 
|  | 273         return half_cpus | 
|  | 274     return processes | 
|  | 275 | 
|  | 276 | 
|  | 277 if __name__ == '__main__': | 
|  | 278     parser = argparse.ArgumentParser() | 
|  | 279 | 
|  | 280     parser.add_argument('--dnaprint_fields', action='append', dest='dnaprint_fields', nargs=2, help="List of dnaprints data table value, name and path fields") | 
|  | 281     parser.add_argument('--read1', action='store', dest='read1', required=False, default=None, help='Required: single read') | 
|  | 282     parser.add_argument('--read2', action='store', dest='read2', required=False, default=None, help='Optional: paired read') | 
|  | 283     parser.add_argument('--gzipped', action='store', dest='gzipped', help='Input files are gzipped') | 
|  | 284     parser.add_argument('--output_dbkey', action='store', dest='output_dbkey', required=False, default=None, help='Output reference file') | 
|  | 285     parser.add_argument('--output_metrics', action='store', dest='output_metrics', required=False, default=None, help='Output metrics file') | 
|  | 286     parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') | 
|  | 287 | 
|  | 288     args = parser.parse_args() | 
|  | 289 | 
|  | 290     collection = False | 
|  | 291     fastq_list = [] | 
|  | 292     if args.read1 is not None: | 
|  | 293         fastq_list.append(args.read1) | 
|  | 294         if args.read2 is not None: | 
|  | 295             fastq_list.append(args.read2) | 
|  | 296     else: | 
|  | 297         collection = True | 
|  | 298         for file_name in sorted(os.listdir(INPUT_READS_DIR)): | 
|  | 299             file_path = os.path.abspath(os.path.join(INPUT_READS_DIR, file_name)) | 
|  | 300             fastq_list.append(file_path) | 
|  | 301 | 
|  | 302     # The value of dnaprint_fields is a list of lists, where each list is | 
|  | 303     # the [value, name, path] components of the vsnp_dnaprints data table. | 
|  | 304     # The data_manager_vsnp_dnaprints tool assigns the dbkey column from the | 
|  | 305     # all_fasta data table to the value column in the vsnp_dnaprints data | 
|  | 306     # table to ensure a proper mapping for discovering the dbkey. | 
|  | 307     dnaprints_dict = get_dnaprints_dict(args.dnaprint_fields) | 
|  | 308 | 
|  | 309     if collection: | 
|  | 310         # Here fastq_list consists of any number of | 
|  | 311         # reads, so each file will be processed and | 
|  | 312         # dataset collections will be produced as outputs. | 
|  | 313         multiprocessing.set_start_method('spawn') | 
|  | 314         queue1 = multiprocessing.JoinableQueue() | 
|  | 315         queue2 = multiprocessing.JoinableQueue() | 
|  | 316         num_files = len(fastq_list) | 
|  | 317         cpus = set_num_cpus(num_files, args.processes) | 
|  | 318         # Set a timeout for get()s in the queue. | 
|  | 319         timeout = 0.05 | 
|  | 320 | 
|  | 321         for fastq_file in fastq_list: | 
|  | 322             queue1.put(fastq_file) | 
|  | 323 | 
|  | 324         # Complete the get_species_counts task. | 
|  | 325         processes = [multiprocessing.Process(target=get_species_counts_for_collection, args=(queue1, queue2, args.gzipped, timeout, )) for _ in range(cpus)] | 
|  | 326         for p in processes: | 
|  | 327             p.start() | 
|  | 328         for p in processes: | 
|  | 329             p.join() | 
|  | 330         queue1.join() | 
|  | 331 | 
|  | 332         # Complete the get_species_strings task. | 
|  | 333         processes = [multiprocessing.Process(target=get_species_strings_for_collection, args=(queue2, queue1, timeout, )) for _ in range(cpus)] | 
|  | 334         for p in processes: | 
|  | 335             p.start() | 
|  | 336         for p in processes: | 
|  | 337             p.join() | 
|  | 338         queue2.join() | 
|  | 339 | 
|  | 340         # Complete the get_group_and_dbkey task. | 
|  | 341         processes = [multiprocessing.Process(target=get_group_and_dbkey_for_collection, args=(queue1, queue2, dnaprints_dict, timeout, )) for _ in range(cpus)] | 
|  | 342         for p in processes: | 
|  | 343             p.start() | 
|  | 344         for p in processes: | 
|  | 345             p.join() | 
|  | 346         queue1.join() | 
|  | 347 | 
|  | 348         # Complete the output_files task. | 
|  | 349         processes = [multiprocessing.Process(target=output_files_for_collection, args=(queue2, timeout, )) for _ in range(cpus)] | 
|  | 350         for p in processes: | 
|  | 351             p.start() | 
|  | 352         for p in processes: | 
|  | 353             p.join() | 
|  | 354         queue2.join() | 
|  | 355 | 
|  | 356         if queue1.empty() and queue2.empty(): | 
|  | 357             queue1.close() | 
|  | 358             queue1.join_thread() | 
|  | 359             queue2.close() | 
|  | 360             queue2.join_thread() | 
|  | 361     else: | 
|  | 362         # Here fastq_list consists of either a single read | 
|  | 363         # or a set of paired reads, producing single outputs. | 
|  | 364         count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts(fastq_list, args.gzipped) | 
|  | 365         brucella_string, bovis_string, para_string = get_species_strings(count_summary) | 
|  | 366         group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum) | 
|  | 367         output_files(args.read1, count_list, group, dbkey, dbkey_file=args.output_dbkey, metrics_file=args.output_metrics) |