Mercurial > repos > greg > vsnp_get_snps
comparison vsnp_get_snps.py @ 9:0fe292b20b9d draft
"planemo upload for repository https://github.com/gregvonkuster/galaxy_tools/tree/master/tools/sequence_analysis/vsnp/vsnp_get_snps commit 3b7fef2d17fec96647345e89c774d4af417d23d7"
author | greg |
---|---|
date | Thu, 29 Jul 2021 13:16:03 +0000 |
parents | 5e4595b9f63c |
children | be5875f29ea4 |
comparison
equal
deleted
inserted
replaced
8:5e4595b9f63c | 9:0fe292b20b9d |
---|---|
2 | 2 |
3 # Collect quality parsimonious SNPs from vcf files | 3 # Collect quality parsimonious SNPs from vcf files |
4 # and output alignment files in fasta format. | 4 # and output alignment files in fasta format. |
5 | 5 |
6 import argparse | 6 import argparse |
7 import multiprocessing | |
7 import os | 8 import os |
9 import queue | |
8 import shutil | 10 import shutil |
9 import sys | 11 import sys |
10 import time | 12 import time |
11 from collections import OrderedDict | 13 from collections import OrderedDict |
12 from datetime import datetime | 14 from datetime import datetime |
15 import vcf | 17 import vcf |
16 | 18 |
17 | 19 |
18 def get_time_stamp(): | 20 def get_time_stamp(): |
19 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S') | 21 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S') |
22 | |
23 | |
24 def set_num_cpus(num_files, processes): | |
25 num_cpus = int(multiprocessing.cpu_count()) | |
26 if num_files < num_cpus and num_files < processes: | |
27 return num_files | |
28 if num_cpus < processes: | |
29 half_cpus = int(num_cpus / 2) | |
30 if num_files < half_cpus: | |
31 return num_files | |
32 return half_cpus | |
33 return processes | |
20 | 34 |
21 | 35 |
22 def setup_all_vcfs(vcf_files, vcf_dirs): | 36 def setup_all_vcfs(vcf_files, vcf_dirs): |
23 # Create the all_vcfs directory and link | 37 # Create the all_vcfs directory and link |
24 # all input vcf files into it for processing. | 38 # all input vcf files into it for processing. |
324 exclusion_list.append(chrom + ":" + str(position)) | 338 exclusion_list.append(chrom + ":" + str(position)) |
325 return exclusion_list | 339 return exclusion_list |
326 except ValueError: | 340 except ValueError: |
327 return [] | 341 return [] |
328 | 342 |
329 def get_snps(self, group_dir): | 343 def get_snps(self, task_queue, timeout): |
330 # Parse all vcf files to accumulate | 344 while True: |
331 # the SNPs into a data frame. | 345 try: |
332 positions_dict = {} | 346 group_dir = task_queue.get(block=True, timeout=timeout) |
333 group_files = [] | 347 except queue.Empty: |
334 for file_name in os.listdir(os.path.abspath(group_dir)): | 348 break |
335 file_path = os.path.abspath(os.path.join(group_dir, file_name)) | 349 # Parse all vcf files to accumulate |
336 group_files.append(file_path) | 350 # the SNPs into a data frame. |
337 for file_name in group_files: | 351 positions_dict = {} |
338 found_positions, found_positions_mix = self.find_initial_positions(file_name) | 352 group_files = [] |
339 positions_dict.update(found_positions) | 353 for file_name in os.listdir(os.path.abspath(group_dir)): |
340 # Order before adding to file to match | 354 file_path = os.path.abspath(os.path.join(group_dir, file_name)) |
341 # with ordering of individual samples. | 355 group_files.append(file_path) |
342 # all_positions is abs_pos:REF | 356 for file_name in group_files: |
343 self.all_positions = OrderedDict(sorted(positions_dict.items())) | 357 found_positions, found_positions_mix = self.find_initial_positions(file_name) |
344 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root']) | 358 positions_dict.update(found_positions) |
345 all_map_qualities = {} | 359 # Order before adding to file to match |
346 df_list = [] | 360 # with ordering of individual samples. |
347 for file_name in group_files: | 361 # all_positions is abs_pos:REF |
348 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name) | 362 self.all_positions = OrderedDict(sorted(positions_dict.items())) |
349 df_list.append(sample_df) | 363 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root']) |
350 all_map_qualities.update({file_name_base: sample_map_qualities}) | 364 all_map_qualities = {} |
351 all_sample_df = pandas.concat(df_list) | 365 df_list = [] |
352 # All positions have now been selected for each sample, | 366 for file_name in group_files: |
353 # so select parisomony informative SNPs. This removes | 367 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name) |
354 # columns where all fields are the same. | 368 df_list.append(sample_df) |
355 # Add reference to top row. | 369 all_map_qualities.update({file_name_base: sample_map_qualities}) |
356 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner') | 370 all_sample_df = pandas.concat(df_list) |
357 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities) | 371 # All positions have now been selected for each sample, |
358 mq_averages = all_mq_df.mean(axis=1).astype(int) | 372 # so select parisomony informative SNPs. This removes |
359 self.gather_and_filter(prefilter_df, mq_averages, group_dir) | 373 # columns where all fields are the same. |
374 # Add reference to top row. | |
375 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner') | |
376 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities) | |
377 mq_averages = all_mq_df.mean(axis=1).astype(int) | |
378 self.gather_and_filter(prefilter_df, mq_averages, group_dir) | |
379 task_queue.task_done() | |
360 | 380 |
361 def group_vcfs(self, vcf_files): | 381 def group_vcfs(self, vcf_files): |
362 # Parse an excel file to produce a | 382 # Parse an excel file to produce a |
363 # grouping dictionary for SNPs. | 383 # grouping dictionary for SNPs. |
364 xl = pandas.ExcelFile(self.input_excel) | 384 xl = pandas.ExcelFile(self.input_excel) |
439 # files, both the samples and the "database". | 459 # files, both the samples and the "database". |
440 vcf_files = [] | 460 vcf_files = [] |
441 for file_name in os.listdir(args.input_vcf_dir): | 461 for file_name in os.listdir(args.input_vcf_dir): |
442 file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name)) | 462 file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name)) |
443 vcf_files.append(file_path) | 463 vcf_files.append(file_path) |
464 | |
465 multiprocessing.set_start_method('spawn') | |
466 queue1 = multiprocessing.JoinableQueue() | |
444 num_files = len(vcf_files) | 467 num_files = len(vcf_files) |
468 cpus = set_num_cpus(num_files, args.processes) | |
469 # Set a timeout for get()s in the queue. | |
470 timeout = 0.05 | |
445 | 471 |
446 # Initialize the snp_finder object. | 472 # Initialize the snp_finder object. |
447 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary) | 473 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary) |
448 | 474 |
449 # Define and make the set of directories into which the input_zc_vcf | 475 # Define and make the set of directories into which the input_zc_vcf |
462 # the above call to the set of directories containing | 488 # the above call to the set of directories containing |
463 # vcf files for analysis. | 489 # vcf files for analysis. |
464 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups] | 490 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups] |
465 vcf_dirs.extend(group_dirs) | 491 vcf_dirs.extend(group_dirs) |
466 | 492 |
493 # Populate the queue for job splitting. | |
467 for vcf_dir in vcf_dirs: | 494 for vcf_dir in vcf_dirs: |
468 snp_finder.get_snps(vcf_dir) | 495 queue1.put(vcf_dir) |
496 | |
497 # Complete the get_snps task. | |
498 processes = [multiprocessing.Process(target=snp_finder.get_snps, args=(queue1, timeout, )) for _ in range(cpus)] | |
499 for p in processes: | |
500 p.start() | |
501 for p in processes: | |
502 p.join() | |
503 queue1.join() | |
469 | 504 |
470 # Finish summary log. | 505 # Finish summary log. |
471 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp()) | 506 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp()) |
472 total_run_time = datetime.now() - snp_finder.timer_start | 507 total_run_time = datetime.now() - snp_finder.timer_start |
473 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time)) | 508 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time)) |