comparison bwape_parallel_optimized @ 0:8f4d4b4e8b04 default tip

init commit
author Yusuf Ali <ali@yusuf.email>
date Wed, 25 Mar 2015 13:41:11 -0600
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:8f4d4b4e8b04
1 #!/usr/bin/env perl
2
3 use strict;
4 use warnings;
5 use File::Basename;
6 use File::Temp qw(:POSIX);
7 use File::Spec qw(tmpdir);
8
9 @ARGV == 6 or die "Usage: $0 <# threads> <ref.fasta> <read1.fq[.gz]> <read2.fq[.gz]> <out bam prefix>\nMerges, then pair aligns reads using BWA 0.6.2\n";
10
11 my %config;
12 my $dirname = dirname(__FILE__);
13 my $tool_dir = shift @ARGV;
14 #read config file
15 if(not -e "$tool_dir/miseq_fastq_bam.loc"){
16 system("cp $dirname/tool-data/miseq_fastq_bam.loc $tool_dir/miseq_fastq_bam.loc");
17 }
18 open CONFIG, '<', "$tool_dir/miseq_fastq_bam.loc";
19 while(<CONFIG>){
20 (my $key, my $value) = split(/\s+/,$_);
21 $config{$key} = $value;
22 }
23 close CONFIG;
24
25 my $threads = shift @ARGV;
26 my $ref = $config{"ref_genome_dir"} . (shift @ARGV);
27 my $read1 = shift @ARGV;
28 my $read2 = shift @ARGV;
29 my $outbam = shift @ARGV;
30 my $log = "$outbam.align.log";
31 $SIG{__DIE__} = $SIG{INT} = $SIG{HUP} = $SIG{TERM} = $SIG{ABRT} = \&cleanup;
32
33 open(STDERR, ">$log");
34 #my $tmpdir = File::Spec->tmpdir();
35 my $tmpdir = ".";
36 $ENV{"TMP_DIR"} = $tmpdir;
37 #my ($label, $dirs, $suffix) = fileparse($read1, ".fastq", ".fq");
38
39 if(not -e "$ref.sa"){
40 # TODO: Use NFS lock to avoid concurrent indexing of the same file?
41 system("bwa index $ref &>> $log") >> 8 and die "bwa index had non-zero ($?) exit status for $ref, aborting\n";
42 }
43
44 # Split the input into temporary chunks for parallel processing
45 my $tot_read1_size = -s $read1;
46 # Estimate expansion of compressed FASTQ files. About 3.5 from sample data we have
47 my $compressed_reads = 0;
48 if(`file $read1` =~ /gzip/i){
49 $compressed_reads = 1;
50 $tot_read1_size *= 3.5;
51 }
52 my $chunk_read_size = $tot_read1_size/$threads;
53
54 my (@read1_chunk_tmpfiles, @read2_chunk_tmpfiles);
55 # run initial alignment of each end in parallel
56 my $pid;
57 my $read_so_far = 0;
58 my $read_chunk_tmpfile_prefix = $$; #tmpnam();
59 my $tmpbam = basename($read_chunk_tmpfile_prefix);
60
61 # Check the average num of lines in a chunk to ensure chunks are lined up between forward and reverse
62 my $lines_per_chunk = 0;
63 open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1))
64 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n";
65 while(<READS1>){
66 $read_so_far += length($_);
67 # FASTQ records have 4 lines...only split if at the end of a record
68 if($.%4 == 1 and $read_so_far > $chunk_read_size){
69 last;
70 }
71 $lines_per_chunk++;
72 }
73 close(READS1);
74 my $chunk_count = 0;
75 my @align_cmds;
76 if($pid = fork) {
77 # parent here
78 my $read1_chunk_tmpfile;
79 open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1))
80 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n";
81 while(<READS1>){
82 # FASTQ records have 4 lines...only split if at the end of a record
83 if($. % $lines_per_chunk == 1){
84 if(defined $read1_chunk_tmpfile){
85 push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log";
86 }
87 $read1_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".forward";
88 push @read1_chunk_tmpfiles, $read1_chunk_tmpfile;
89 open(FASTQ_CHUNK, ">$read1_chunk_tmpfile")
90 or die "Cannot open $read1_chunk_tmpfile for writing: $!\n";
91 }
92 print FASTQ_CHUNK $_;
93 }
94 close(FASTQ_CHUNK);
95 push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log";
96 wait;
97 }
98 elsif(defined $pid) { # $pid is zero in child process if defined
99 my $read2_chunk_tmpfile;
100 open(READS2, ($compressed_reads ? "gzip -cd $read2 |" : $read2))
101 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read2: $!\n";
102 while(<READS2>){
103 # FASTQ records have 4 lines...only split if at the end of a record
104 if($. % $lines_per_chunk == 1){
105 $read2_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".reverse";
106 open(FASTQ_CHUNK, ">$read2_chunk_tmpfile")
107 or die "Cannot open $read2_chunk_tmpfile for writing: $!\n";
108 }
109 print FASTQ_CHUNK $_;
110 }
111 close(FASTQ_CHUNK);
112 exit(0);
113 }
114 else {
115 die "Can't fork: $!\n";
116 }
117
118 # repeat the commands for the reverse reads
119 my @final_align_cmds = (@align_cmds, map {my $r = $_; $r =~ s/\.forward/.reverse/g; $r} @align_cmds);
120 &run_cmds($threads, 5, @final_align_cmds);
121
122 my @pair_cmds;
123 my @bam_chunk_sorted_tmpfiles;
124 for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){
125 my $read2_chunk_tmpfile = $read1_chunk_tmpfile;
126 $read2_chunk_tmpfile =~ s/\.forward$/.reverse/;
127 my $bam_chunk_tmpfile = $read1_chunk_tmpfile;
128 $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/;
129 my $bam_chunk_sorted_tmpfile_prefix = $bam_chunk_tmpfile;
130 $bam_chunk_sorted_tmpfile_prefix =~ s/\.bam$/.sorted/;
131 push @bam_chunk_sorted_tmpfiles, "$bam_chunk_sorted_tmpfile_prefix.bam";
132 #push @cmds, "/export/achri_data/programs/bwa-0.6.2/bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - | samtools sort - $bam_chunk_sorted_tmpfile_prefix 2>> $log";
133 push @pair_cmds, "bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - > $bam_chunk_tmpfile; samtools sort $bam_chunk_tmpfile $bam_chunk_sorted_tmpfile_prefix &>> $log";
134 }
135 &run_cmds($threads, 5, @pair_cmds);
136
137 system("samtools merge $tmpdir/$tmpbam.bam ".join(" ", @bam_chunk_sorted_tmpfiles)."&>> $log");
138 system("samtools index $tmpdir/$tmpbam.bam &>> $log") >> 8 and die "samtools index had non-zero ($?) exit status for $tmpdir/$tmpbam.bam, aborting\n";
139
140 # post-process the alignments
141 if($pid = fork){
142 system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log");
143 wait(); # for rmdup to finish
144 }
145 elsif(defined $pid) {
146 system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n";
147 exit(0);
148 }
149 else{ # Can't fork, do in serial
150 system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log");
151 system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n";
152 }
153
154 die "Samtools rmdup did not generate the expected output file" if not -e "$tmpdir/$tmpbam.rmdup.bam";
155 die "Samtools generated a blank output file" if -z "$tmpdir/$tmpbam.rmdup.bam";
156 system "samtools index $tmpdir/$tmpbam.rmdup.bam &>> $log";
157 # Peak into the BAM to see if it's got Illumina encoded quality values, which will require an extra flag for GATK
158 my $is_illumina_encoded_qual = 1;
159 my $num_mapped = 0;
160 if(open(SAMTOOLS, "samtools view $tmpdir/$tmpbam.rmdup.bam |")){
161 while(<SAMTOOLS>){
162 my @F = split /\t/, $_;
163 next unless $F[2] ne "*"; # ignore unmapped reads
164 my @quals = map {ord($_)} split(//, $F[10]);
165 if(grep {$_ < 64} @quals){
166 $is_illumina_encoded_qual = 0; last;
167 }
168 last if ++$num_mapped > 100; # only look at the first 100 mapped reads
169 }
170 } # take our chances if samtools call failed that it's not illumina encoded, which will fail quickly in GATK anyways
171 close(SAMTOOLS);
172
173 system "java -Xmx4G -jar $diname/GenomeAnalysisTK.jar -nt $threads -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T RealignerTargetCreator -o $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals &>> $log";
174 die "GATK did not generate the expected intervals file\n" if not -e "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals";
175 system "java -Xmx4G -jar $dirname/GenomeAnalysisTK.jar -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T IndelRealigner -targetIntervals $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals -o $outbam.bam ".
176 ($is_illumina_encoded_qual?"--fix_misencoded_quality_scores":"")." &>> $log";
177 die "GATK did not generate the expected realigned BAM output file\n" if not -e "$outbam.bam";
178 die "GATK generated a blank output file\n" if -z "$outbam.bam";
179 my $outfile_bai = "$outbam.bai";
180 if(not -e $outfile_bai){
181 if(not -e "$outbam.bam.bai"){
182 system "samtools index $outbam.bam &>> $log";
183 }
184 system "cp $outbam.bam.bai $outfile_bai";
185 }
186 else{
187 system "cp $outfile_bai $outbam.bam.bai"; # some tools expect name.bai, some expect name.bam.bai, so provide both
188 }
189 &cleanup;
190
191 sub cleanup{
192 print STDERR "Cleaning up temp files for aborted bwa run: @_" if @_;
193 for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){
194 my $read2_chunk_tmpfile = $read1_chunk_tmpfile;
195 $read2_chunk_tmpfile =~ s/\.forward$/.reverse/;
196 my $bam_chunk_tmpfile = $read1_chunk_tmpfile;
197 $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/;
198 unlink $read1_chunk_tmpfile, "$read1_chunk_tmpfile.sai", $read2_chunk_tmpfile, "$read2_chunk_tmpfile.sai", $bam_chunk_tmpfile;
199 }
200 unlink @bam_chunk_sorted_tmpfiles;
201 unlink "$tmpdir/$tmpbam.bam";
202 unlink "$tmpdir/$tmpbam.bam.bai";
203 unlink "$tmpdir/$tmpbam.rmdup.bam";
204 unlink "$tmpdir/$tmpbam.rmdup.bam.bai";
205 unlink "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals";
206 }
207
208 sub run_cmds{
209
210 my ($max_cmds, $sleep_time, @cmds) = @_;
211
212 my ($num_children, $pid);
213
214 for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){
215 my $cmd = shift (@cmds);
216 if($pid = fork) {
217 sleep $sleep_time; # seems to be a major system time spike if all commands launched in quick succession.
218 } elsif(defined $pid) {
219 #print STDERR "$cmd\n";
220 system $cmd;
221 #print STDERR "Finished $cmd\n";
222 exit;
223 } else {
224 die "Can't fork: $!\n";
225 }
226 }
227
228 while(@cmds) {
229 undef $pid;
230 FORK: {
231 my $cmd = shift (@cmds);
232 if($pid = fork) {
233 $num_children++;
234 wait;
235 $num_children--;
236 next;
237
238 } elsif(defined $pid) {
239 system $cmd;
240 exit;
241
242 } else {
243 die "Can't fork: $!\n";
244 }
245 }
246 }
247 wait while $num_children--;
248 }
249