comparison prep_bams_parallel @ 0:6b673ffd9e38 default tip

initial commit
author Yusuf Ali <ali@yusuf.email>
date Wed, 25 Mar 2015 13:29:45 -0600
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:6b673ffd9e38
1 #!/usr/bin/env perl
2
3 use File::Basename;
4 use strict;
5 use warnings;
6
7 @ARGV > 5 or die "Usage: $0 <num parallel processes> <output log> <reference.fa> <temporary directory> <deduped_realigned_bam_prefix> <input1.bam> [input2.bam] ...\n";
8
9 my $num_procs = shift @ARGV;
10 my $log = shift @ARGV;
11 my $ref = shift @ARGV;
12 my $tmpdir = shift @ARGV;
13 my $outfile_prefix = shift @ARGV;
14
15 my $progdir = dirname($0);
16
17 # Split the input data into separate files for each chromosome, so we can process them in parallel
18 my $chr_files_hashref = &generate_chr_split_bams($tmpdir, @ARGV);
19
20 my (@cmds, @logs);
21 for my $chr (keys %$chr_files_hashref){
22 push @cmds, "$progdir/prep_bams $log.$chr $ref $tmpdir $outfile_prefix.$chr.bam ".$chr_files_hashref->{$chr};
23 push @logs, "$log.$chr";
24 }
25
26 # Run the chrs in parallel to the desrired degree
27 &run_cmds($num_procs, @cmds);
28
29 # Remove the split input files, as they were only needed temporarily
30 for my $chr_file (values %$chr_files_hashref){
31 unlink $chr_file or warn "Cannot remove temporary file $chr_file: $!\n";
32 }
33
34 # Merge the logs
35 open(LOG, ">$log")
36 or die "Could not open output log $log for writing: $!\n";
37 for my $l (@logs){
38 open(L, $l)
39 or warn "Log file $l was expected but not found, data processing may have been incomplete.\n";
40 print LOG <L>;
41 close(L);
42 unlink $l;
43 }
44 close(LOG);
45
46 # Merge the input BAMsinto one file, then split them into per contig BAM files for downstream parallelization
47 sub generate_chr_split_bams{
48 my ($tempdir, @input_bams) = @_;
49
50 my $temp_input_bam = $input_bams[0];
51 if(@input_bams > 1){
52 $temp_input_bam = "$tempdir/prep_bams_parallel_$$.bam";
53 system ("samtools merge $temp_input_bam ".join(" ", @input_bams)) >> 8
54 and die "Could not successfully run samtools merge: exit status was $?\n";
55 }
56 if(not -e "$temp_input_bam.bai" or -z "$temp_input_bam.bai"){
57 system ("samtools index $temp_input_bam") >> 8
58 and die "Could not successfully run samtools index: exit status was $?\n";
59 }
60 my @output_contigs;
61 open(SAM, "samtools view -H $temp_input_bam |")
62 or die "Cannot run samtools view on $temp_input_bam: $!\n";
63 while(<SAM>){
64 if(/^\@SQ\tSN:(\S+)/){
65 push @output_contigs, $1;
66 }
67 }
68 close(SAM);
69
70 my @split_bamfiles;
71 my %contig2outfile;
72 for my $output_contig (@output_contigs){
73 open(SAM, "samtools view -h $temp_input_bam '$output_contig'|")
74 and die "Cannot execute samtools view successfully: $!\n";
75 my @headers;
76 # read through the headers, then if we get at least one mapped read, open a file to print it
77 while(<SAM>){
78 last if not /^\@/; # first real record
79 push @headers, $_;
80 }
81 next if not defined $_; # got to EOF before a real record
82
83 my $outfile = "$tempdir/prep_bams_parallel_$$.".quotemeta($output_contig).".bam";
84 open(OUT, "| samtools view -b -S - > $outfile")
85 or die "Cannot open $outfile for writing: $!\n";
86 print OUT @headers, $_;
87 while(<SAM>){ # Copy over all lines. Not using list context as this may be a huge dataset and would needlessly take up memory
88 print OUT $_;
89 }
90 close(SAM);
91 close(OUT);
92 }
93 unlink $temp_input_bam or warn "Could not remove $temp_input_bam: $!\n";
94 }
95
96 sub run_cmds {
97 my ($max_cmds, @cmd) = @_;
98
99 my ($num_children, $pid);
100
101 for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){
102 # Initialize the number of child processes at 1, and increment it by one
103 # While it is less than $max_cmds
104
105 my $cmd = shift (@cmds);
106 if($pid = fork) {
107 # do nothing if parent
108 } elsif(defined $pid) { # $pid is zero here if defined
109 print STDERR $cmd, "\n";
110 system $cmd;
111 exit;
112 } else {
113 #weird fork error
114 die "Can't fork: $!\n";
115 }
116 }
117
118 while(@cmds) {
119 undef $pid;
120 FORK: {
121 my $cmd = shift (@cmds);
122 if($pid = fork) {
123 # parent here
124 $num_children++;
125 wait;
126 $num_children--;
127 next;
128
129 } elsif(defined $pid) { # $pid is zero here if defined
130 print STDERR $cmd, "\n";
131 system $cmd;
132 exit;
133
134 } else {
135 #weird fork error
136 die "Can't fork: $!\n";
137 }
138 }
139 }
140 wait while $num_children--;
141 }
142