comparison vsnp_get_snps.py @ 9:0fe292b20b9d draft

"planemo upload for repository https://github.com/gregvonkuster/galaxy_tools/tree/master/tools/sequence_analysis/vsnp/vsnp_get_snps commit 3b7fef2d17fec96647345e89c774d4af417d23d7"
author greg
date Thu, 29 Jul 2021 13:16:03 +0000
parents 5e4595b9f63c
children be5875f29ea4
comparison
equal deleted inserted replaced
8:5e4595b9f63c 9:0fe292b20b9d
2 2
3 # Collect quality parsimonious SNPs from vcf files 3 # Collect quality parsimonious SNPs from vcf files
4 # and output alignment files in fasta format. 4 # and output alignment files in fasta format.
5 5
6 import argparse 6 import argparse
7 import multiprocessing
7 import os 8 import os
9 import queue
8 import shutil 10 import shutil
9 import sys 11 import sys
10 import time 12 import time
11 from collections import OrderedDict 13 from collections import OrderedDict
12 from datetime import datetime 14 from datetime import datetime
15 import vcf 17 import vcf
16 18
17 19
18 def get_time_stamp(): 20 def get_time_stamp():
19 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S') 21 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S')
22
23
24 def set_num_cpus(num_files, processes):
25 num_cpus = int(multiprocessing.cpu_count())
26 if num_files < num_cpus and num_files < processes:
27 return num_files
28 if num_cpus < processes:
29 half_cpus = int(num_cpus / 2)
30 if num_files < half_cpus:
31 return num_files
32 return half_cpus
33 return processes
20 34
21 35
22 def setup_all_vcfs(vcf_files, vcf_dirs): 36 def setup_all_vcfs(vcf_files, vcf_dirs):
23 # Create the all_vcfs directory and link 37 # Create the all_vcfs directory and link
24 # all input vcf files into it for processing. 38 # all input vcf files into it for processing.
324 exclusion_list.append(chrom + ":" + str(position)) 338 exclusion_list.append(chrom + ":" + str(position))
325 return exclusion_list 339 return exclusion_list
326 except ValueError: 340 except ValueError:
327 return [] 341 return []
328 342
329 def get_snps(self, group_dir): 343 def get_snps(self, task_queue, timeout):
330 # Parse all vcf files to accumulate 344 while True:
331 # the SNPs into a data frame. 345 try:
332 positions_dict = {} 346 group_dir = task_queue.get(block=True, timeout=timeout)
333 group_files = [] 347 except queue.Empty:
334 for file_name in os.listdir(os.path.abspath(group_dir)): 348 break
335 file_path = os.path.abspath(os.path.join(group_dir, file_name)) 349 # Parse all vcf files to accumulate
336 group_files.append(file_path) 350 # the SNPs into a data frame.
337 for file_name in group_files: 351 positions_dict = {}
338 found_positions, found_positions_mix = self.find_initial_positions(file_name) 352 group_files = []
339 positions_dict.update(found_positions) 353 for file_name in os.listdir(os.path.abspath(group_dir)):
340 # Order before adding to file to match 354 file_path = os.path.abspath(os.path.join(group_dir, file_name))
341 # with ordering of individual samples. 355 group_files.append(file_path)
342 # all_positions is abs_pos:REF 356 for file_name in group_files:
343 self.all_positions = OrderedDict(sorted(positions_dict.items())) 357 found_positions, found_positions_mix = self.find_initial_positions(file_name)
344 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root']) 358 positions_dict.update(found_positions)
345 all_map_qualities = {} 359 # Order before adding to file to match
346 df_list = [] 360 # with ordering of individual samples.
347 for file_name in group_files: 361 # all_positions is abs_pos:REF
348 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name) 362 self.all_positions = OrderedDict(sorted(positions_dict.items()))
349 df_list.append(sample_df) 363 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root'])
350 all_map_qualities.update({file_name_base: sample_map_qualities}) 364 all_map_qualities = {}
351 all_sample_df = pandas.concat(df_list) 365 df_list = []
352 # All positions have now been selected for each sample, 366 for file_name in group_files:
353 # so select parisomony informative SNPs. This removes 367 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name)
354 # columns where all fields are the same. 368 df_list.append(sample_df)
355 # Add reference to top row. 369 all_map_qualities.update({file_name_base: sample_map_qualities})
356 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner') 370 all_sample_df = pandas.concat(df_list)
357 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities) 371 # All positions have now been selected for each sample,
358 mq_averages = all_mq_df.mean(axis=1).astype(int) 372 # so select parisomony informative SNPs. This removes
359 self.gather_and_filter(prefilter_df, mq_averages, group_dir) 373 # columns where all fields are the same.
374 # Add reference to top row.
375 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner')
376 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities)
377 mq_averages = all_mq_df.mean(axis=1).astype(int)
378 self.gather_and_filter(prefilter_df, mq_averages, group_dir)
379 task_queue.task_done()
360 380
361 def group_vcfs(self, vcf_files): 381 def group_vcfs(self, vcf_files):
362 # Parse an excel file to produce a 382 # Parse an excel file to produce a
363 # grouping dictionary for SNPs. 383 # grouping dictionary for SNPs.
364 xl = pandas.ExcelFile(self.input_excel) 384 xl = pandas.ExcelFile(self.input_excel)
439 # files, both the samples and the "database". 459 # files, both the samples and the "database".
440 vcf_files = [] 460 vcf_files = []
441 for file_name in os.listdir(args.input_vcf_dir): 461 for file_name in os.listdir(args.input_vcf_dir):
442 file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name)) 462 file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name))
443 vcf_files.append(file_path) 463 vcf_files.append(file_path)
464
465 multiprocessing.set_start_method('spawn')
466 queue1 = multiprocessing.JoinableQueue()
444 num_files = len(vcf_files) 467 num_files = len(vcf_files)
468 cpus = set_num_cpus(num_files, args.processes)
469 # Set a timeout for get()s in the queue.
470 timeout = 0.05
445 471
446 # Initialize the snp_finder object. 472 # Initialize the snp_finder object.
447 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary) 473 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary)
448 474
449 # Define and make the set of directories into which the input_zc_vcf 475 # Define and make the set of directories into which the input_zc_vcf
462 # the above call to the set of directories containing 488 # the above call to the set of directories containing
463 # vcf files for analysis. 489 # vcf files for analysis.
464 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups] 490 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups]
465 vcf_dirs.extend(group_dirs) 491 vcf_dirs.extend(group_dirs)
466 492
493 # Populate the queue for job splitting.
467 for vcf_dir in vcf_dirs: 494 for vcf_dir in vcf_dirs:
468 snp_finder.get_snps(vcf_dir) 495 queue1.put(vcf_dir)
496
497 # Complete the get_snps task.
498 processes = [multiprocessing.Process(target=snp_finder.get_snps, args=(queue1, timeout, )) for _ in range(cpus)]
499 for p in processes:
500 p.start()
501 for p in processes:
502 p.join()
503 queue1.join()
469 504
470 # Finish summary log. 505 # Finish summary log.
471 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp()) 506 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp())
472 total_run_time = datetime.now() - snp_finder.timer_start 507 total_run_time = datetime.now() - snp_finder.timer_start
473 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time)) 508 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time))