comparison vsnp_get_snps.py @ 8:5e4595b9f63c draft

"planemo upload for repository https://github.com/gregvonkuster/galaxy_tools/tree/master/tools/sequence_analysis/vsnp/vsnp_get_snps commit 7423e5bb852a786195c095b9f663aac0ec9c8fd9"
author greg
date Thu, 29 Jul 2021 12:50:01 +0000
parents 14285a94fb13
children 0fe292b20b9d
comparison
equal deleted inserted replaced
7:2286f3a13e4d 8:5e4595b9f63c
2 2
3 # Collect quality parsimonious SNPs from vcf files 3 # Collect quality parsimonious SNPs from vcf files
4 # and output alignment files in fasta format. 4 # and output alignment files in fasta format.
5 5
6 import argparse 6 import argparse
7 import multiprocessing
8 import os 7 import os
9 import queue
10 import shutil 8 import shutil
11 import sys 9 import sys
12 import time 10 import time
13 from collections import OrderedDict 11 from collections import OrderedDict
14 from datetime import datetime 12 from datetime import datetime
17 import vcf 15 import vcf
18 16
19 17
20 def get_time_stamp(): 18 def get_time_stamp():
21 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S') 19 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S')
22
23
24 def set_num_cpus(num_files, processes):
25 num_cpus = int(multiprocessing.cpu_count())
26 if num_files < num_cpus and num_files < processes:
27 return num_files
28 if num_cpus < processes:
29 half_cpus = int(num_cpus / 2)
30 if num_files < half_cpus:
31 return num_files
32 return half_cpus
33 return processes
34 20
35 21
36 def setup_all_vcfs(vcf_files, vcf_dirs): 22 def setup_all_vcfs(vcf_files, vcf_dirs):
37 # Create the all_vcfs directory and link 23 # Create the all_vcfs directory and link
38 # all input vcf files into it for processing. 24 # all input vcf files into it for processing.
338 exclusion_list.append(chrom + ":" + str(position)) 324 exclusion_list.append(chrom + ":" + str(position))
339 return exclusion_list 325 return exclusion_list
340 except ValueError: 326 except ValueError:
341 return [] 327 return []
342 328
343 def get_snps(self, task_queue, timeout): 329 def get_snps(self, group_dir):
344 while True: 330 # Parse all vcf files to accumulate
345 try: 331 # the SNPs into a data frame.
346 group_dir = task_queue.get(block=True, timeout=timeout) 332 positions_dict = {}
347 except queue.Empty: 333 group_files = []
348 break 334 for file_name in os.listdir(os.path.abspath(group_dir)):
349 # Parse all vcf files to accumulate 335 file_path = os.path.abspath(os.path.join(group_dir, file_name))
350 # the SNPs into a data frame. 336 group_files.append(file_path)
351 positions_dict = {} 337 for file_name in group_files:
352 group_files = [] 338 found_positions, found_positions_mix = self.find_initial_positions(file_name)
353 for file_name in os.listdir(os.path.abspath(group_dir)): 339 positions_dict.update(found_positions)
354 file_path = os.path.abspath(os.path.join(group_dir, file_name)) 340 # Order before adding to file to match
355 group_files.append(file_path) 341 # with ordering of individual samples.
356 for file_name in group_files: 342 # all_positions is abs_pos:REF
357 found_positions, found_positions_mix = self.find_initial_positions(file_name) 343 self.all_positions = OrderedDict(sorted(positions_dict.items()))
358 positions_dict.update(found_positions) 344 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root'])
359 # Order before adding to file to match 345 all_map_qualities = {}
360 # with ordering of individual samples. 346 df_list = []
361 # all_positions is abs_pos:REF 347 for file_name in group_files:
362 self.all_positions = OrderedDict(sorted(positions_dict.items())) 348 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name)
363 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root']) 349 df_list.append(sample_df)
364 all_map_qualities = {} 350 all_map_qualities.update({file_name_base: sample_map_qualities})
365 df_list = [] 351 all_sample_df = pandas.concat(df_list)
366 for file_name in group_files: 352 # All positions have now been selected for each sample,
367 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name) 353 # so select parisomony informative SNPs. This removes
368 df_list.append(sample_df) 354 # columns where all fields are the same.
369 all_map_qualities.update({file_name_base: sample_map_qualities}) 355 # Add reference to top row.
370 all_sample_df = pandas.concat(df_list) 356 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner')
371 # All positions have now been selected for each sample, 357 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities)
372 # so select parisomony informative SNPs. This removes 358 mq_averages = all_mq_df.mean(axis=1).astype(int)
373 # columns where all fields are the same. 359 self.gather_and_filter(prefilter_df, mq_averages, group_dir)
374 # Add reference to top row.
375 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner')
376 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities)
377 mq_averages = all_mq_df.mean(axis=1).astype(int)
378 self.gather_and_filter(prefilter_df, mq_averages, group_dir)
379 task_queue.task_done()
380 360
381 def group_vcfs(self, vcf_files): 361 def group_vcfs(self, vcf_files):
382 # Parse an excel file to produce a 362 # Parse an excel file to produce a
383 # grouping dictionary for SNPs. 363 # grouping dictionary for SNPs.
384 xl = pandas.ExcelFile(self.input_excel) 364 xl = pandas.ExcelFile(self.input_excel)
459 # files, both the samples and the "database". 439 # files, both the samples and the "database".
460 vcf_files = [] 440 vcf_files = []
461 for file_name in os.listdir(args.input_vcf_dir): 441 for file_name in os.listdir(args.input_vcf_dir):
462 file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name)) 442 file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name))
463 vcf_files.append(file_path) 443 vcf_files.append(file_path)
464
465 multiprocessing.set_start_method('spawn')
466 queue1 = multiprocessing.JoinableQueue()
467 num_files = len(vcf_files) 444 num_files = len(vcf_files)
468 cpus = set_num_cpus(num_files, args.processes)
469 # Set a timeout for get()s in the queue.
470 timeout = 0.05
471 445
472 # Initialize the snp_finder object. 446 # Initialize the snp_finder object.
473 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary) 447 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary)
474 448
475 # Define and make the set of directories into which the input_zc_vcf 449 # Define and make the set of directories into which the input_zc_vcf
488 # the above call to the set of directories containing 462 # the above call to the set of directories containing
489 # vcf files for analysis. 463 # vcf files for analysis.
490 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups] 464 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups]
491 vcf_dirs.extend(group_dirs) 465 vcf_dirs.extend(group_dirs)
492 466
493 # Populate the queue for job splitting.
494 for vcf_dir in vcf_dirs: 467 for vcf_dir in vcf_dirs:
495 queue1.put(vcf_dir) 468 snp_finder.get_snps(vcf_dir)
496
497 # Complete the get_snps task.
498 processes = [multiprocessing.Process(target=snp_finder.get_snps, args=(queue1, timeout, )) for _ in range(cpus)]
499 for p in processes:
500 p.start()
501 for p in processes:
502 p.join()
503 queue1.join()
504 469
505 # Finish summary log. 470 # Finish summary log.
506 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp()) 471 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp())
507 total_run_time = datetime.now() - snp_finder.timer_start 472 total_run_time = datetime.now() - snp_finder.timer_start
508 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time)) 473 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time))