Mercurial > repos > iuc > vsnp_add_zero_coverage
comparison vsnp_get_snps.py @ 9:40b97055bb99 draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/vsnp commit c38fd63f7980c70390d104a73ba4c72b266444c3
author | iuc |
---|---|
date | Fri, 10 Jun 2022 06:08:02 +0000 |
parents | 18b59c38017e |
children |
comparison
equal
deleted
inserted
replaced
8:18b59c38017e | 9:40b97055bb99 |
---|---|
17 import vcf | 17 import vcf |
18 | 18 |
19 | 19 |
20 def get_time_stamp(): | 20 def get_time_stamp(): |
21 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S') | 21 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S') |
22 | |
23 | |
24 def set_num_cpus(num_files, processes): | |
25 num_cpus = len(os.sched_getaffinity(0)) | |
26 if num_files < num_cpus and num_files < processes: | |
27 return num_files | |
28 if num_cpus < processes: | |
29 half_cpus = int(num_cpus / 2) | |
30 if num_files < half_cpus: | |
31 return num_files | |
32 return half_cpus | |
33 return processes | |
34 | 22 |
35 | 23 |
36 def setup_all_vcfs(vcf_files, vcf_dirs): | 24 def setup_all_vcfs(vcf_files, vcf_dirs): |
37 # Create the all_vcfs directory and link | 25 # Create the all_vcfs directory and link |
38 # all input vcf files into it for processing. | 26 # all input vcf files into it for processing. |
463 vcf_files.append(file_path) | 451 vcf_files.append(file_path) |
464 | 452 |
465 multiprocessing.set_start_method('spawn') | 453 multiprocessing.set_start_method('spawn') |
466 queue1 = multiprocessing.JoinableQueue() | 454 queue1 = multiprocessing.JoinableQueue() |
467 num_files = len(vcf_files) | 455 num_files = len(vcf_files) |
468 cpus = set_num_cpus(num_files, args.processes) | |
469 # Set a timeout for get()s in the queue. | 456 # Set a timeout for get()s in the queue. |
470 timeout = 0.05 | 457 timeout = 0.05 |
471 | 458 |
472 # Initialize the snp_finder object. | 459 # Initialize the snp_finder object. |
473 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary) | 460 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary) |
493 # Populate the queue for job splitting. | 480 # Populate the queue for job splitting. |
494 for vcf_dir in vcf_dirs: | 481 for vcf_dir in vcf_dirs: |
495 queue1.put(vcf_dir) | 482 queue1.put(vcf_dir) |
496 | 483 |
497 # Complete the get_snps task. | 484 # Complete the get_snps task. |
498 processes = [multiprocessing.Process(target=snp_finder.get_snps, args=(queue1, timeout, )) for _ in range(cpus)] | 485 processes = [multiprocessing.Process(target=snp_finder.get_snps, args=(queue1, timeout, )) for _ in range(args.processes)] |
499 for p in processes: | 486 for p in processes: |
500 p.start() | 487 p.start() |
501 for p in processes: | 488 for p in processes: |
502 p.join() | 489 p.join() |
503 queue1.join() | 490 queue1.join() |