Mercurial > repos > greg > vsnp_get_snps
comparison vsnp_get_snps.py @ 8:5e4595b9f63c draft
"planemo upload for repository https://github.com/gregvonkuster/galaxy_tools/tree/master/tools/sequence_analysis/vsnp/vsnp_get_snps commit 7423e5bb852a786195c095b9f663aac0ec9c8fd9"
author | greg |
---|---|
date | Thu, 29 Jul 2021 12:50:01 +0000 |
parents | 14285a94fb13 |
children | 0fe292b20b9d |
comparison
equal
deleted
inserted
replaced
7:2286f3a13e4d | 8:5e4595b9f63c |
---|---|
2 | 2 |
3 # Collect quality parsimonious SNPs from vcf files | 3 # Collect quality parsimonious SNPs from vcf files |
4 # and output alignment files in fasta format. | 4 # and output alignment files in fasta format. |
5 | 5 |
6 import argparse | 6 import argparse |
7 import multiprocessing | |
8 import os | 7 import os |
9 import queue | |
10 import shutil | 8 import shutil |
11 import sys | 9 import sys |
12 import time | 10 import time |
13 from collections import OrderedDict | 11 from collections import OrderedDict |
14 from datetime import datetime | 12 from datetime import datetime |
17 import vcf | 15 import vcf |
18 | 16 |
19 | 17 |
20 def get_time_stamp(): | 18 def get_time_stamp(): |
21 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S') | 19 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S') |
22 | |
23 | |
24 def set_num_cpus(num_files, processes): | |
25 num_cpus = int(multiprocessing.cpu_count()) | |
26 if num_files < num_cpus and num_files < processes: | |
27 return num_files | |
28 if num_cpus < processes: | |
29 half_cpus = int(num_cpus / 2) | |
30 if num_files < half_cpus: | |
31 return num_files | |
32 return half_cpus | |
33 return processes | |
34 | 20 |
35 | 21 |
36 def setup_all_vcfs(vcf_files, vcf_dirs): | 22 def setup_all_vcfs(vcf_files, vcf_dirs): |
37 # Create the all_vcfs directory and link | 23 # Create the all_vcfs directory and link |
38 # all input vcf files into it for processing. | 24 # all input vcf files into it for processing. |
338 exclusion_list.append(chrom + ":" + str(position)) | 324 exclusion_list.append(chrom + ":" + str(position)) |
339 return exclusion_list | 325 return exclusion_list |
340 except ValueError: | 326 except ValueError: |
341 return [] | 327 return [] |
342 | 328 |
343 def get_snps(self, task_queue, timeout): | 329 def get_snps(self, group_dir): |
344 while True: | 330 # Parse all vcf files to accumulate |
345 try: | 331 # the SNPs into a data frame. |
346 group_dir = task_queue.get(block=True, timeout=timeout) | 332 positions_dict = {} |
347 except queue.Empty: | 333 group_files = [] |
348 break | 334 for file_name in os.listdir(os.path.abspath(group_dir)): |
349 # Parse all vcf files to accumulate | 335 file_path = os.path.abspath(os.path.join(group_dir, file_name)) |
350 # the SNPs into a data frame. | 336 group_files.append(file_path) |
351 positions_dict = {} | 337 for file_name in group_files: |
352 group_files = [] | 338 found_positions, found_positions_mix = self.find_initial_positions(file_name) |
353 for file_name in os.listdir(os.path.abspath(group_dir)): | 339 positions_dict.update(found_positions) |
354 file_path = os.path.abspath(os.path.join(group_dir, file_name)) | 340 # Order before adding to file to match |
355 group_files.append(file_path) | 341 # with ordering of individual samples. |
356 for file_name in group_files: | 342 # all_positions is abs_pos:REF |
357 found_positions, found_positions_mix = self.find_initial_positions(file_name) | 343 self.all_positions = OrderedDict(sorted(positions_dict.items())) |
358 positions_dict.update(found_positions) | 344 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root']) |
359 # Order before adding to file to match | 345 all_map_qualities = {} |
360 # with ordering of individual samples. | 346 df_list = [] |
361 # all_positions is abs_pos:REF | 347 for file_name in group_files: |
362 self.all_positions = OrderedDict(sorted(positions_dict.items())) | 348 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name) |
363 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root']) | 349 df_list.append(sample_df) |
364 all_map_qualities = {} | 350 all_map_qualities.update({file_name_base: sample_map_qualities}) |
365 df_list = [] | 351 all_sample_df = pandas.concat(df_list) |
366 for file_name in group_files: | 352 # All positions have now been selected for each sample, |
367 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name) | 353 # so select parisomony informative SNPs. This removes |
368 df_list.append(sample_df) | 354 # columns where all fields are the same. |
369 all_map_qualities.update({file_name_base: sample_map_qualities}) | 355 # Add reference to top row. |
370 all_sample_df = pandas.concat(df_list) | 356 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner') |
371 # All positions have now been selected for each sample, | 357 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities) |
372 # so select parisomony informative SNPs. This removes | 358 mq_averages = all_mq_df.mean(axis=1).astype(int) |
373 # columns where all fields are the same. | 359 self.gather_and_filter(prefilter_df, mq_averages, group_dir) |
374 # Add reference to top row. | |
375 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner') | |
376 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities) | |
377 mq_averages = all_mq_df.mean(axis=1).astype(int) | |
378 self.gather_and_filter(prefilter_df, mq_averages, group_dir) | |
379 task_queue.task_done() | |
380 | 360 |
381 def group_vcfs(self, vcf_files): | 361 def group_vcfs(self, vcf_files): |
382 # Parse an excel file to produce a | 362 # Parse an excel file to produce a |
383 # grouping dictionary for SNPs. | 363 # grouping dictionary for SNPs. |
384 xl = pandas.ExcelFile(self.input_excel) | 364 xl = pandas.ExcelFile(self.input_excel) |
459 # files, both the samples and the "database". | 439 # files, both the samples and the "database". |
460 vcf_files = [] | 440 vcf_files = [] |
461 for file_name in os.listdir(args.input_vcf_dir): | 441 for file_name in os.listdir(args.input_vcf_dir): |
462 file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name)) | 442 file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name)) |
463 vcf_files.append(file_path) | 443 vcf_files.append(file_path) |
464 | |
465 multiprocessing.set_start_method('spawn') | |
466 queue1 = multiprocessing.JoinableQueue() | |
467 num_files = len(vcf_files) | 444 num_files = len(vcf_files) |
468 cpus = set_num_cpus(num_files, args.processes) | |
469 # Set a timeout for get()s in the queue. | |
470 timeout = 0.05 | |
471 | 445 |
472 # Initialize the snp_finder object. | 446 # Initialize the snp_finder object. |
473 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary) | 447 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary) |
474 | 448 |
475 # Define and make the set of directories into which the input_zc_vcf | 449 # Define and make the set of directories into which the input_zc_vcf |
488 # the above call to the set of directories containing | 462 # the above call to the set of directories containing |
489 # vcf files for analysis. | 463 # vcf files for analysis. |
490 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups] | 464 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups] |
491 vcf_dirs.extend(group_dirs) | 465 vcf_dirs.extend(group_dirs) |
492 | 466 |
493 # Populate the queue for job splitting. | |
494 for vcf_dir in vcf_dirs: | 467 for vcf_dir in vcf_dirs: |
495 queue1.put(vcf_dir) | 468 snp_finder.get_snps(vcf_dir) |
496 | |
497 # Complete the get_snps task. | |
498 processes = [multiprocessing.Process(target=snp_finder.get_snps, args=(queue1, timeout, )) for _ in range(cpus)] | |
499 for p in processes: | |
500 p.start() | |
501 for p in processes: | |
502 p.join() | |
503 queue1.join() | |
504 | 469 |
505 # Finish summary log. | 470 # Finish summary log. |
506 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp()) | 471 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp()) |
507 total_run_time = datetime.now() - snp_finder.timer_start | 472 total_run_time = datetime.now() - snp_finder.timer_start |
508 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time)) | 473 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time)) |