Mercurial > repos > yusuf > miseq_fastq_bam
comparison bwape_parallel_optimized @ 0:8f4d4b4e8b04 default tip
init commit
author | Yusuf Ali <ali@yusuf.email> |
---|---|
date | Wed, 25 Mar 2015 13:41:11 -0600 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:8f4d4b4e8b04 |
---|---|
1 #!/usr/bin/env perl | |
2 | |
3 use strict; | |
4 use warnings; | |
5 use File::Basename; | |
6 use File::Temp qw(:POSIX); | |
7 use File::Spec qw(tmpdir); | |
8 | |
9 @ARGV == 6 or die "Usage: $0 <# threads> <ref.fasta> <read1.fq[.gz]> <read2.fq[.gz]> <out bam prefix>\nMerges, then pair aligns reads using BWA 0.6.2\n"; | |
10 | |
11 my %config; | |
12 my $dirname = dirname(__FILE__); | |
13 my $tool_dir = shift @ARGV; | |
14 #read config file | |
15 if(not -e "$tool_dir/miseq_fastq_bam.loc"){ | |
16 system("cp $dirname/tool-data/miseq_fastq_bam.loc $tool_dir/miseq_fastq_bam.loc"); | |
17 } | |
18 open CONFIG, '<', "$tool_dir/miseq_fastq_bam.loc"; | |
19 while(<CONFIG>){ | |
20 (my $key, my $value) = split(/\s+/,$_); | |
21 $config{$key} = $value; | |
22 } | |
23 close CONFIG; | |
24 | |
25 my $threads = shift @ARGV; | |
26 my $ref = $config{"ref_genome_dir"} . (shift @ARGV); | |
27 my $read1 = shift @ARGV; | |
28 my $read2 = shift @ARGV; | |
29 my $outbam = shift @ARGV; | |
30 my $log = "$outbam.align.log"; | |
31 $SIG{__DIE__} = $SIG{INT} = $SIG{HUP} = $SIG{TERM} = $SIG{ABRT} = \&cleanup; | |
32 | |
33 open(STDERR, ">$log"); | |
34 #my $tmpdir = File::Spec->tmpdir(); | |
35 my $tmpdir = "."; | |
36 $ENV{"TMP_DIR"} = $tmpdir; | |
37 #my ($label, $dirs, $suffix) = fileparse($read1, ".fastq", ".fq"); | |
38 | |
39 if(not -e "$ref.sa"){ | |
40 # TODO: Use NFS lock to avoid concurrent indexing of the same file? | |
41 system("bwa index $ref &>> $log") >> 8 and die "bwa index had non-zero ($?) exit status for $ref, aborting\n"; | |
42 } | |
43 | |
44 # Split the input into temporary chunks for parallel processing | |
45 my $tot_read1_size = -s $read1; | |
46 # Estimate expansion of compressed FASTQ files. About 3.5 from sample data we have | |
47 my $compressed_reads = 0; | |
48 if(`file $read1` =~ /gzip/i){ | |
49 $compressed_reads = 1; | |
50 $tot_read1_size *= 3.5; | |
51 } | |
52 my $chunk_read_size = $tot_read1_size/$threads; | |
53 | |
54 my (@read1_chunk_tmpfiles, @read2_chunk_tmpfiles); | |
55 # run initial alignment of each end in parallel | |
56 my $pid; | |
57 my $read_so_far = 0; | |
58 my $read_chunk_tmpfile_prefix = $$; #tmpnam(); | |
59 my $tmpbam = basename($read_chunk_tmpfile_prefix); | |
60 | |
61 # Check the average num of lines in a chunk to ensure chunks are lined up between forward and reverse | |
62 my $lines_per_chunk = 0; | |
63 open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1)) | |
64 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n"; | |
65 while(<READS1>){ | |
66 $read_so_far += length($_); | |
67 # FASTQ records have 4 lines...only split if at the end of a record | |
68 if($.%4 == 1 and $read_so_far > $chunk_read_size){ | |
69 last; | |
70 } | |
71 $lines_per_chunk++; | |
72 } | |
73 close(READS1); | |
74 my $chunk_count = 0; | |
75 my @align_cmds; | |
76 if($pid = fork) { | |
77 # parent here | |
78 my $read1_chunk_tmpfile; | |
79 open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1)) | |
80 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n"; | |
81 while(<READS1>){ | |
82 # FASTQ records have 4 lines...only split if at the end of a record | |
83 if($. % $lines_per_chunk == 1){ | |
84 if(defined $read1_chunk_tmpfile){ | |
85 push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log"; | |
86 } | |
87 $read1_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".forward"; | |
88 push @read1_chunk_tmpfiles, $read1_chunk_tmpfile; | |
89 open(FASTQ_CHUNK, ">$read1_chunk_tmpfile") | |
90 or die "Cannot open $read1_chunk_tmpfile for writing: $!\n"; | |
91 } | |
92 print FASTQ_CHUNK $_; | |
93 } | |
94 close(FASTQ_CHUNK); | |
95 push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log"; | |
96 wait; | |
97 } | |
98 elsif(defined $pid) { # $pid is zero in child process if defined | |
99 my $read2_chunk_tmpfile; | |
100 open(READS2, ($compressed_reads ? "gzip -cd $read2 |" : $read2)) | |
101 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read2: $!\n"; | |
102 while(<READS2>){ | |
103 # FASTQ records have 4 lines...only split if at the end of a record | |
104 if($. % $lines_per_chunk == 1){ | |
105 $read2_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".reverse"; | |
106 open(FASTQ_CHUNK, ">$read2_chunk_tmpfile") | |
107 or die "Cannot open $read2_chunk_tmpfile for writing: $!\n"; | |
108 } | |
109 print FASTQ_CHUNK $_; | |
110 } | |
111 close(FASTQ_CHUNK); | |
112 exit(0); | |
113 } | |
114 else { | |
115 die "Can't fork: $!\n"; | |
116 } | |
117 | |
118 # repeat the commands for the reverse reads | |
119 my @final_align_cmds = (@align_cmds, map {my $r = $_; $r =~ s/\.forward/.reverse/g; $r} @align_cmds); | |
120 &run_cmds($threads, 5, @final_align_cmds); | |
121 | |
122 my @pair_cmds; | |
123 my @bam_chunk_sorted_tmpfiles; | |
124 for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){ | |
125 my $read2_chunk_tmpfile = $read1_chunk_tmpfile; | |
126 $read2_chunk_tmpfile =~ s/\.forward$/.reverse/; | |
127 my $bam_chunk_tmpfile = $read1_chunk_tmpfile; | |
128 $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/; | |
129 my $bam_chunk_sorted_tmpfile_prefix = $bam_chunk_tmpfile; | |
130 $bam_chunk_sorted_tmpfile_prefix =~ s/\.bam$/.sorted/; | |
131 push @bam_chunk_sorted_tmpfiles, "$bam_chunk_sorted_tmpfile_prefix.bam"; | |
132 #push @cmds, "/export/achri_data/programs/bwa-0.6.2/bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - | samtools sort - $bam_chunk_sorted_tmpfile_prefix 2>> $log"; | |
133 push @pair_cmds, "bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - > $bam_chunk_tmpfile; samtools sort $bam_chunk_tmpfile $bam_chunk_sorted_tmpfile_prefix &>> $log"; | |
134 } | |
135 &run_cmds($threads, 5, @pair_cmds); | |
136 | |
137 system("samtools merge $tmpdir/$tmpbam.bam ".join(" ", @bam_chunk_sorted_tmpfiles)."&>> $log"); | |
138 system("samtools index $tmpdir/$tmpbam.bam &>> $log") >> 8 and die "samtools index had non-zero ($?) exit status for $tmpdir/$tmpbam.bam, aborting\n"; | |
139 | |
140 # post-process the alignments | |
141 if($pid = fork){ | |
142 system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log"); | |
143 wait(); # for rmdup to finish | |
144 } | |
145 elsif(defined $pid) { | |
146 system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n"; | |
147 exit(0); | |
148 } | |
149 else{ # Can't fork, do in serial | |
150 system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log"); | |
151 system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n"; | |
152 } | |
153 | |
154 die "Samtools rmdup did not generate the expected output file" if not -e "$tmpdir/$tmpbam.rmdup.bam"; | |
155 die "Samtools generated a blank output file" if -z "$tmpdir/$tmpbam.rmdup.bam"; | |
156 system "samtools index $tmpdir/$tmpbam.rmdup.bam &>> $log"; | |
157 # Peak into the BAM to see if it's got Illumina encoded quality values, which will require an extra flag for GATK | |
158 my $is_illumina_encoded_qual = 1; | |
159 my $num_mapped = 0; | |
160 if(open(SAMTOOLS, "samtools view $tmpdir/$tmpbam.rmdup.bam |")){ | |
161 while(<SAMTOOLS>){ | |
162 my @F = split /\t/, $_; | |
163 next unless $F[2] ne "*"; # ignore unmapped reads | |
164 my @quals = map {ord($_)} split(//, $F[10]); | |
165 if(grep {$_ < 64} @quals){ | |
166 $is_illumina_encoded_qual = 0; last; | |
167 } | |
168 last if ++$num_mapped > 100; # only look at the first 100 mapped reads | |
169 } | |
170 } # take our chances if samtools call failed that it's not illumina encoded, which will fail quickly in GATK anyways | |
171 close(SAMTOOLS); | |
172 | |
173 system "java -Xmx4G -jar $diname/GenomeAnalysisTK.jar -nt $threads -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T RealignerTargetCreator -o $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals &>> $log"; | |
174 die "GATK did not generate the expected intervals file\n" if not -e "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals"; | |
175 system "java -Xmx4G -jar $dirname/GenomeAnalysisTK.jar -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T IndelRealigner -targetIntervals $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals -o $outbam.bam ". | |
176 ($is_illumina_encoded_qual?"--fix_misencoded_quality_scores":"")." &>> $log"; | |
177 die "GATK did not generate the expected realigned BAM output file\n" if not -e "$outbam.bam"; | |
178 die "GATK generated a blank output file\n" if -z "$outbam.bam"; | |
179 my $outfile_bai = "$outbam.bai"; | |
180 if(not -e $outfile_bai){ | |
181 if(not -e "$outbam.bam.bai"){ | |
182 system "samtools index $outbam.bam &>> $log"; | |
183 } | |
184 system "cp $outbam.bam.bai $outfile_bai"; | |
185 } | |
186 else{ | |
187 system "cp $outfile_bai $outbam.bam.bai"; # some tools expect name.bai, some expect name.bam.bai, so provide both | |
188 } | |
189 &cleanup; | |
190 | |
191 sub cleanup{ | |
192 print STDERR "Cleaning up temp files for aborted bwa run: @_" if @_; | |
193 for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){ | |
194 my $read2_chunk_tmpfile = $read1_chunk_tmpfile; | |
195 $read2_chunk_tmpfile =~ s/\.forward$/.reverse/; | |
196 my $bam_chunk_tmpfile = $read1_chunk_tmpfile; | |
197 $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/; | |
198 unlink $read1_chunk_tmpfile, "$read1_chunk_tmpfile.sai", $read2_chunk_tmpfile, "$read2_chunk_tmpfile.sai", $bam_chunk_tmpfile; | |
199 } | |
200 unlink @bam_chunk_sorted_tmpfiles; | |
201 unlink "$tmpdir/$tmpbam.bam"; | |
202 unlink "$tmpdir/$tmpbam.bam.bai"; | |
203 unlink "$tmpdir/$tmpbam.rmdup.bam"; | |
204 unlink "$tmpdir/$tmpbam.rmdup.bam.bai"; | |
205 unlink "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals"; | |
206 } | |
207 | |
208 sub run_cmds{ | |
209 | |
210 my ($max_cmds, $sleep_time, @cmds) = @_; | |
211 | |
212 my ($num_children, $pid); | |
213 | |
214 for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){ | |
215 my $cmd = shift (@cmds); | |
216 if($pid = fork) { | |
217 sleep $sleep_time; # seems to be a major system time spike if all commands launched in quick succession. | |
218 } elsif(defined $pid) { | |
219 #print STDERR "$cmd\n"; | |
220 system $cmd; | |
221 #print STDERR "Finished $cmd\n"; | |
222 exit; | |
223 } else { | |
224 die "Can't fork: $!\n"; | |
225 } | |
226 } | |
227 | |
228 while(@cmds) { | |
229 undef $pid; | |
230 FORK: { | |
231 my $cmd = shift (@cmds); | |
232 if($pid = fork) { | |
233 $num_children++; | |
234 wait; | |
235 $num_children--; | |
236 next; | |
237 | |
238 } elsif(defined $pid) { | |
239 system $cmd; | |
240 exit; | |
241 | |
242 } else { | |
243 die "Can't fork: $!\n"; | |
244 } | |
245 } | |
246 } | |
247 wait while $num_children--; | |
248 } | |
249 |