Mercurial > repos > greg > vsnp_determine_ref_from_data
comparison vsnp_determine_ref_from_data.py @ 4:36bdf8b439ed draft
Uploaded
| author | greg |
|---|---|
| date | Sun, 03 Jan 2021 16:13:22 +0000 |
| parents | ebc08e5ce646 |
| children |
comparison
equal
deleted
inserted
replaced
| 3:6116deacb2c7 | 4:36bdf8b439ed |
|---|---|
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 | 2 |
| 3 import argparse | 3 import argparse |
| 4 import gzip | 4 import gzip |
| 5 import multiprocessing | |
| 6 import os | 5 import os |
| 7 import queue | 6 from collections import OrderedDict |
| 7 | |
| 8 import yaml | 8 import yaml |
| 9 from Bio.SeqIO.QualityIO import FastqGeneralIterator | 9 from Bio.SeqIO.QualityIO import FastqGeneralIterator |
| 10 from collections import OrderedDict | 10 |
| 11 | |
| 12 INPUT_READS_DIR = 'input_reads' | |
| 13 OUTPUT_DBKEY_DIR = 'output_dbkey' | 11 OUTPUT_DBKEY_DIR = 'output_dbkey' |
| 14 OUTPUT_METRICS_DIR = 'output_metrics' | 12 OUTPUT_METRICS_DIR = 'output_metrics' |
| 15 | 13 |
| 16 | 14 |
| 17 def get_base_file_name(file_path): | 15 def get_sample_name(file_path): |
| 18 base_file_name = os.path.basename(file_path) | 16 base_file_name = os.path.basename(file_path) |
| 19 if base_file_name.find(".") > 0: | 17 if base_file_name.find(".") > 0: |
| 20 # Eliminate the extension. | 18 # Eliminate the extension. |
| 21 return os.path.splitext(base_file_name)[0] | 19 return os.path.splitext(base_file_name)[0] |
| 22 elif base_file_name.find("_") > 0: | |
| 23 # The dot extension was likely changed to | |
| 24 # the " character. | |
| 25 items = base_file_name.split("_") | |
| 26 no_ext = "_".join(items[0:-2]) | |
| 27 if len(no_ext) > 0: | |
| 28 return no_ext | |
| 29 return base_file_name | 20 return base_file_name |
| 30 | 21 |
| 31 | 22 |
| 32 def get_dbkey(dnaprints_dict, key, s): | 23 def get_dbkey(dnaprints_dict, key, s): |
| 33 # dnaprints_dict looks something like this: | 24 # dnaprints_dict looks something like this: |
| 89 group = "" | 80 group = "" |
| 90 dbkey = "" | 81 dbkey = "" |
| 91 return group, dbkey | 82 return group, dbkey |
| 92 | 83 |
| 93 | 84 |
| 94 def get_group_and_dbkey_for_collection(task_queue, finished_queue, dnaprints_dict, timeout): | |
| 95 while True: | |
| 96 try: | |
| 97 tup = task_queue.get(block=True, timeout=timeout) | |
| 98 except queue.Empty: | |
| 99 break | |
| 100 fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum = tup | |
| 101 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum) | |
| 102 finished_queue.put((fastq_file, count_list, group, dbkey)) | |
| 103 task_queue.task_done() | |
| 104 | |
| 105 | |
| 106 def get_oligo_dict(): | 85 def get_oligo_dict(): |
| 107 oligo_dict = {} | 86 oligo_dict = {} |
| 108 oligo_dict["01_ab1"] = "AATTGTCGGATAGCCTGGCGATAACGACGC" | 87 oligo_dict["01_ab1"] = "AATTGTCGGATAGCCTGGCGATAACGACGC" |
| 109 oligo_dict["02_ab3"] = "CACACGCGGGCCGGAACTGCCGCAAATGAC" | 88 oligo_dict["02_ab3"] = "CACACGCGGGCCGGAACTGCCGCAAATGAC" |
| 110 oligo_dict["03_ab5"] = "GCTGAAGCGGCAGACCGGCAGAACGAATAT" | 89 oligo_dict["03_ab5"] = "GCTGAAGCGGCAGACCGGCAGAACGAATAT" |
| 136 | 115 |
| 137 | 116 |
| 138 def get_seq_counts(value, fastq_list, gzipped): | 117 def get_seq_counts(value, fastq_list, gzipped): |
| 139 count = 0 | 118 count = 0 |
| 140 for fastq_file in fastq_list: | 119 for fastq_file in fastq_list: |
| 141 if gzipped == "true": | 120 if gzipped: |
| 142 with gzip.open(fastq_file, 'rt') as fh: | 121 with gzip.open(fastq_file, 'rt') as fh: |
| 143 for title, seq, qual in FastqGeneralIterator(fh): | 122 for title, seq, qual in FastqGeneralIterator(fh): |
| 144 count += seq.count(value) | 123 count += seq.count(value) |
| 145 else: | 124 else: |
| 146 with open(fastq_file, 'r') as fh: | 125 with open(fastq_file, 'r') as fh: |
| 162 count_list.append(v) | 141 count_list.append(v) |
| 163 brucella_sum = sum(count_list[:16]) | 142 brucella_sum = sum(count_list[:16]) |
| 164 bovis_sum = sum(count_list[16:24]) | 143 bovis_sum = sum(count_list[16:24]) |
| 165 para_sum = sum(count_list[24:]) | 144 para_sum = sum(count_list[24:]) |
| 166 return count_summary, count_list, brucella_sum, bovis_sum, para_sum | 145 return count_summary, count_list, brucella_sum, bovis_sum, para_sum |
| 167 | |
| 168 | |
| 169 def get_species_counts_for_collection(task_queue, finished_queue, gzipped, timeout): | |
| 170 while True: | |
| 171 try: | |
| 172 fastq_file = task_queue.get(block=True, timeout=timeout) | |
| 173 except queue.Empty: | |
| 174 break | |
| 175 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts([fastq_file], gzipped) | |
| 176 finished_queue.put((fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum)) | |
| 177 task_queue.task_done() | |
| 178 | 146 |
| 179 | 147 |
| 180 def get_species_strings(count_summary): | 148 def get_species_strings(count_summary): |
| 181 binary_dictionary = {} | 149 binary_dictionary = {} |
| 182 for k, v in count_summary.items(): | 150 for k, v in count_summary.items(): |
| 195 para_binary = binary_list[24:] | 163 para_binary = binary_list[24:] |
| 196 para_string = ''.join(str(e) for e in para_binary) | 164 para_string = ''.join(str(e) for e in para_binary) |
| 197 return brucella_string, bovis_string, para_string | 165 return brucella_string, bovis_string, para_string |
| 198 | 166 |
| 199 | 167 |
| 200 def get_species_strings_for_collection(task_queue, finished_queue, timeout): | 168 def output_dbkey(file_name, dbkey, output_file): |
| 201 while True: | |
| 202 try: | |
| 203 tup = task_queue.get(block=True, timeout=timeout) | |
| 204 except queue.Empty: | |
| 205 break | |
| 206 fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum = tup | |
| 207 brucella_string, bovis_string, para_string = get_species_strings(count_summary) | |
| 208 finished_queue.put((fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)) | |
| 209 task_queue.task_done() | |
| 210 | |
| 211 | |
| 212 def output_dbkey(file_name, dbkey, output_file=None): | |
| 213 # Output the dbkey. | 169 # Output the dbkey. |
| 214 if output_file is None: | |
| 215 # We're producing a dataset collection. | |
| 216 output_file = os.path.join(OUTPUT_DBKEY_DIR, "%s.txt" % file_name) | |
| 217 with open(output_file, "w") as fh: | 170 with open(output_file, "w") as fh: |
| 218 fh.write("%s" % dbkey) | 171 fh.write("%s" % dbkey) |
| 219 | 172 |
| 220 | 173 |
| 221 def output_files(fastq_file, count_list, group, dbkey, dbkey_file=None, metrics_file=None): | 174 def output_files(fastq_file, count_list, group, dbkey, dbkey_file, metrics_file): |
| 222 base_file_name = get_base_file_name(fastq_file) | 175 base_file_name = get_sample_name(fastq_file) |
| 223 if dbkey_file is not None: | |
| 224 # We're dealing with a single read or | |
| 225 # a set of paired reads. If the latter, | |
| 226 # the following will hopefully produce a | |
| 227 # good sample string. | |
| 228 if base_file_name.find("_") > 0: | |
| 229 base_file_name = base_file_name.split("_")[0] | |
| 230 output_dbkey(base_file_name, dbkey, dbkey_file) | 176 output_dbkey(base_file_name, dbkey, dbkey_file) |
| 231 output_metrics(base_file_name, count_list, group, dbkey, metrics_file) | 177 output_metrics(base_file_name, count_list, group, dbkey, metrics_file) |
| 232 | 178 |
| 233 | 179 |
| 234 def output_files_for_collection(task_queue, timeout): | 180 def output_metrics(file_name, count_list, group, dbkey, output_file): |
| 235 while True: | |
| 236 try: | |
| 237 tup = task_queue.get(block=True, timeout=timeout) | |
| 238 except queue.Empty: | |
| 239 break | |
| 240 fastq_file, count_list, group, dbkey = tup | |
| 241 output_files(fastq_file, count_list, group, dbkey) | |
| 242 task_queue.task_done() | |
| 243 | |
| 244 | |
| 245 def output_metrics(file_name, count_list, group, dbkey, output_file=None): | |
| 246 # Output the metrics. | 181 # Output the metrics. |
| 247 if output_file is None: | |
| 248 # We're producing a dataset collection. | |
| 249 output_file = os.path.join(OUTPUT_METRICS_DIR, "%s.txt" % file_name) | |
| 250 with open(output_file, "w") as fh: | 182 with open(output_file, "w") as fh: |
| 251 fh.write("Sample: %s\n" % file_name) | 183 fh.write("Sample: %s\n" % file_name) |
| 252 fh.write("Brucella counts: ") | 184 fh.write("Brucella counts: ") |
| 253 for i in count_list[:16]: | 185 for i in count_list[:16]: |
| 254 fh.write("%d," % i) | 186 fh.write("%d," % i) |
| 260 fh.write("%d," % i) | 192 fh.write("%d," % i) |
| 261 fh.write("\nGroup: %s" % group) | 193 fh.write("\nGroup: %s" % group) |
| 262 fh.write("\ndbkey: %s\n" % dbkey) | 194 fh.write("\ndbkey: %s\n" % dbkey) |
| 263 | 195 |
| 264 | 196 |
| 265 def set_num_cpus(num_files, processes): | |
| 266 num_cpus = int(multiprocessing.cpu_count()) | |
| 267 if num_files < num_cpus and num_files < processes: | |
| 268 return num_files | |
| 269 if num_cpus < processes: | |
| 270 half_cpus = int(num_cpus / 2) | |
| 271 if num_files < half_cpus: | |
| 272 return num_files | |
| 273 return half_cpus | |
| 274 return processes | |
| 275 | |
| 276 | |
| 277 if __name__ == '__main__': | 197 if __name__ == '__main__': |
| 278 parser = argparse.ArgumentParser() | 198 parser = argparse.ArgumentParser() |
| 279 | 199 |
| 280 parser.add_argument('--dnaprint_fields', action='append', dest='dnaprint_fields', nargs=2, help="List of dnaprints data table value, name and path fields") | 200 parser.add_argument('--dnaprint_fields', action='append', dest='dnaprint_fields', nargs=2, help="List of dnaprints data table value, name and path fields") |
| 281 parser.add_argument('--read1', action='store', dest='read1', required=False, default=None, help='Required: single read') | 201 parser.add_argument('--read1', action='store', dest='read1', help='Required: single read') |
| 282 parser.add_argument('--read2', action='store', dest='read2', required=False, default=None, help='Optional: paired read') | 202 parser.add_argument('--read2', action='store', dest='read2', required=False, default=None, help='Optional: paired read') |
| 283 parser.add_argument('--gzipped', action='store', dest='gzipped', help='Input files are gzipped') | 203 parser.add_argument('--gzipped', action='store_true', dest='gzipped', help='Input files are gzipped') |
| 284 parser.add_argument('--output_dbkey', action='store', dest='output_dbkey', required=False, default=None, help='Output reference file') | 204 parser.add_argument('--output_dbkey', action='store', dest='output_dbkey', help='Output reference file') |
| 285 parser.add_argument('--output_metrics', action='store', dest='output_metrics', required=False, default=None, help='Output metrics file') | 205 parser.add_argument('--output_metrics', action='store', dest='output_metrics', help='Output metrics file') |
| 286 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') | |
| 287 | 206 |
| 288 args = parser.parse_args() | 207 args = parser.parse_args() |
| 289 | 208 |
| 290 collection = False | 209 fastq_list = [args.read1] |
| 291 fastq_list = [] | 210 if args.read2 is not None: |
| 292 if args.read1 is not None: | 211 fastq_list.append(args.read2) |
| 293 fastq_list.append(args.read1) | |
| 294 if args.read2 is not None: | |
| 295 fastq_list.append(args.read2) | |
| 296 else: | |
| 297 collection = True | |
| 298 for file_name in sorted(os.listdir(INPUT_READS_DIR)): | |
| 299 file_path = os.path.abspath(os.path.join(INPUT_READS_DIR, file_name)) | |
| 300 fastq_list.append(file_path) | |
| 301 | 212 |
| 302 # The value of dnaprint_fields is a list of lists, where each list is | 213 # The value of dnaprint_fields is a list of lists, where each list is |
| 303 # the [value, name, path] components of the vsnp_dnaprints data table. | 214 # the [value, name, path] components of the vsnp_dnaprints data table. |
| 304 # The data_manager_vsnp_dnaprints tool assigns the dbkey column from the | 215 # The data_manager_vsnp_dnaprints tool assigns the dbkey column from the |
| 305 # all_fasta data table to the value column in the vsnp_dnaprints data | 216 # all_fasta data table to the value column in the vsnp_dnaprints data |
| 306 # table to ensure a proper mapping for discovering the dbkey. | 217 # table to ensure a proper mapping for discovering the dbkey. |
| 307 dnaprints_dict = get_dnaprints_dict(args.dnaprint_fields) | 218 dnaprints_dict = get_dnaprints_dict(args.dnaprint_fields) |
| 308 | 219 |
| 309 if collection: | 220 # Here fastq_list consists of either a single read |
| 310 # Here fastq_list consists of any number of | 221 # or a set of paired reads, producing single outputs. |
| 311 # reads, so each file will be processed and | 222 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts(fastq_list, args.gzipped) |
| 312 # dataset collections will be produced as outputs. | 223 brucella_string, bovis_string, para_string = get_species_strings(count_summary) |
| 313 multiprocessing.set_start_method('spawn') | 224 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum) |
| 314 queue1 = multiprocessing.JoinableQueue() | 225 output_files(args.read1, count_list, group, dbkey, dbkey_file=args.output_dbkey, metrics_file=args.output_metrics) |
| 315 queue2 = multiprocessing.JoinableQueue() | |
| 316 num_files = len(fastq_list) | |
| 317 cpus = set_num_cpus(num_files, args.processes) | |
| 318 # Set a timeout for get()s in the queue. | |
| 319 timeout = 0.05 | |
| 320 | |
| 321 for fastq_file in fastq_list: | |
| 322 queue1.put(fastq_file) | |
| 323 | |
| 324 # Complete the get_species_counts task. | |
| 325 processes = [multiprocessing.Process(target=get_species_counts_for_collection, args=(queue1, queue2, args.gzipped, timeout, )) for _ in range(cpus)] | |
| 326 for p in processes: | |
| 327 p.start() | |
| 328 for p in processes: | |
| 329 p.join() | |
| 330 queue1.join() | |
| 331 | |
| 332 # Complete the get_species_strings task. | |
| 333 processes = [multiprocessing.Process(target=get_species_strings_for_collection, args=(queue2, queue1, timeout, )) for _ in range(cpus)] | |
| 334 for p in processes: | |
| 335 p.start() | |
| 336 for p in processes: | |
| 337 p.join() | |
| 338 queue2.join() | |
| 339 | |
| 340 # Complete the get_group_and_dbkey task. | |
| 341 processes = [multiprocessing.Process(target=get_group_and_dbkey_for_collection, args=(queue1, queue2, dnaprints_dict, timeout, )) for _ in range(cpus)] | |
| 342 for p in processes: | |
| 343 p.start() | |
| 344 for p in processes: | |
| 345 p.join() | |
| 346 queue1.join() | |
| 347 | |
| 348 # Complete the output_files task. | |
| 349 processes = [multiprocessing.Process(target=output_files_for_collection, args=(queue2, timeout, )) for _ in range(cpus)] | |
| 350 for p in processes: | |
| 351 p.start() | |
| 352 for p in processes: | |
| 353 p.join() | |
| 354 queue2.join() | |
| 355 | |
| 356 if queue1.empty() and queue2.empty(): | |
| 357 queue1.close() | |
| 358 queue1.join_thread() | |
| 359 queue2.close() | |
| 360 queue2.join_thread() | |
| 361 else: | |
| 362 # Here fastq_list consists of either a single read | |
| 363 # or a set of paired reads, producing single outputs. | |
| 364 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts(fastq_list, args.gzipped) | |
| 365 brucella_string, bovis_string, para_string = get_species_strings(count_summary) | |
| 366 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum) | |
| 367 output_files(args.read1, count_list, group, dbkey, dbkey_file=args.output_dbkey, metrics_file=args.output_metrics) |
