0
|
1 #!/usr/bin/env perl
|
|
2
|
|
3 use strict;
|
|
4 use warnings;
|
|
5 use File::Basename;
|
|
6 use File::Temp qw(:POSIX);
|
|
7 use File::Spec qw(tmpdir);
|
|
8
|
|
9 @ARGV == 6 or die "Usage: $0 <# threads> <ref.fasta> <read1.fq[.gz]> <read2.fq[.gz]> <out bam prefix>\nMerges, then pair aligns reads using BWA 0.6.2\n";
|
|
10
|
|
11 my %config;
|
|
12 my $dirname = dirname(__FILE__);
|
|
13 my $tool_dir = shift @ARGV;
|
|
14 #read config file
|
|
15 if(not -e "$tool_dir/miseq_fastq_bam.loc"){
|
|
16 system("cp $dirname/tool-data/miseq_fastq_bam.loc $tool_dir/miseq_fastq_bam.loc");
|
|
17 }
|
|
18 open CONFIG, '<', "$tool_dir/miseq_fastq_bam.loc";
|
|
19 while(<CONFIG>){
|
|
20 (my $key, my $value) = split(/\s+/,$_);
|
|
21 $config{$key} = $value;
|
|
22 }
|
|
23 close CONFIG;
|
|
24
|
|
25 my $threads = shift @ARGV;
|
|
26 my $ref = $config{"ref_genome_dir"} . (shift @ARGV);
|
|
27 my $read1 = shift @ARGV;
|
|
28 my $read2 = shift @ARGV;
|
|
29 my $outbam = shift @ARGV;
|
|
30 my $log = "$outbam.align.log";
|
|
31 $SIG{__DIE__} = $SIG{INT} = $SIG{HUP} = $SIG{TERM} = $SIG{ABRT} = \&cleanup;
|
|
32
|
|
33 open(STDERR, ">$log");
|
|
34 #my $tmpdir = File::Spec->tmpdir();
|
|
35 my $tmpdir = ".";
|
|
36 $ENV{"TMP_DIR"} = $tmpdir;
|
|
37 #my ($label, $dirs, $suffix) = fileparse($read1, ".fastq", ".fq");
|
|
38
|
|
39 if(not -e "$ref.sa"){
|
|
40 # TODO: Use NFS lock to avoid concurrent indexing of the same file?
|
|
41 system("bwa index $ref &>> $log") >> 8 and die "bwa index had non-zero ($?) exit status for $ref, aborting\n";
|
|
42 }
|
|
43
|
|
44 # Split the input into temporary chunks for parallel processing
|
|
45 my $tot_read1_size = -s $read1;
|
|
46 # Estimate expansion of compressed FASTQ files. About 3.5 from sample data we have
|
|
47 my $compressed_reads = 0;
|
|
48 if(`file $read1` =~ /gzip/i){
|
|
49 $compressed_reads = 1;
|
|
50 $tot_read1_size *= 3.5;
|
|
51 }
|
|
52 my $chunk_read_size = $tot_read1_size/$threads;
|
|
53
|
|
54 my (@read1_chunk_tmpfiles, @read2_chunk_tmpfiles);
|
|
55 # run initial alignment of each end in parallel
|
|
56 my $pid;
|
|
57 my $read_so_far = 0;
|
|
58 my $read_chunk_tmpfile_prefix = $$; #tmpnam();
|
|
59 my $tmpbam = basename($read_chunk_tmpfile_prefix);
|
|
60
|
|
61 # Check the average num of lines in a chunk to ensure chunks are lined up between forward and reverse
|
|
62 my $lines_per_chunk = 0;
|
|
63 open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1))
|
|
64 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n";
|
|
65 while(<READS1>){
|
|
66 $read_so_far += length($_);
|
|
67 # FASTQ records have 4 lines...only split if at the end of a record
|
|
68 if($.%4 == 1 and $read_so_far > $chunk_read_size){
|
|
69 last;
|
|
70 }
|
|
71 $lines_per_chunk++;
|
|
72 }
|
|
73 close(READS1);
|
|
74 my $chunk_count = 0;
|
|
75 my @align_cmds;
|
|
76 if($pid = fork) {
|
|
77 # parent here
|
|
78 my $read1_chunk_tmpfile;
|
|
79 open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1))
|
|
80 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n";
|
|
81 while(<READS1>){
|
|
82 # FASTQ records have 4 lines...only split if at the end of a record
|
|
83 if($. % $lines_per_chunk == 1){
|
|
84 if(defined $read1_chunk_tmpfile){
|
|
85 push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log";
|
|
86 }
|
|
87 $read1_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".forward";
|
|
88 push @read1_chunk_tmpfiles, $read1_chunk_tmpfile;
|
|
89 open(FASTQ_CHUNK, ">$read1_chunk_tmpfile")
|
|
90 or die "Cannot open $read1_chunk_tmpfile for writing: $!\n";
|
|
91 }
|
|
92 print FASTQ_CHUNK $_;
|
|
93 }
|
|
94 close(FASTQ_CHUNK);
|
|
95 push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log";
|
|
96 wait;
|
|
97 }
|
|
98 elsif(defined $pid) { # $pid is zero in child process if defined
|
|
99 my $read2_chunk_tmpfile;
|
|
100 open(READS2, ($compressed_reads ? "gzip -cd $read2 |" : $read2))
|
|
101 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read2: $!\n";
|
|
102 while(<READS2>){
|
|
103 # FASTQ records have 4 lines...only split if at the end of a record
|
|
104 if($. % $lines_per_chunk == 1){
|
|
105 $read2_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".reverse";
|
|
106 open(FASTQ_CHUNK, ">$read2_chunk_tmpfile")
|
|
107 or die "Cannot open $read2_chunk_tmpfile for writing: $!\n";
|
|
108 }
|
|
109 print FASTQ_CHUNK $_;
|
|
110 }
|
|
111 close(FASTQ_CHUNK);
|
|
112 exit(0);
|
|
113 }
|
|
114 else {
|
|
115 die "Can't fork: $!\n";
|
|
116 }
|
|
117
|
|
118 # repeat the commands for the reverse reads
|
|
119 my @final_align_cmds = (@align_cmds, map {my $r = $_; $r =~ s/\.forward/.reverse/g; $r} @align_cmds);
|
|
120 &run_cmds($threads, 5, @final_align_cmds);
|
|
121
|
|
122 my @pair_cmds;
|
|
123 my @bam_chunk_sorted_tmpfiles;
|
|
124 for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){
|
|
125 my $read2_chunk_tmpfile = $read1_chunk_tmpfile;
|
|
126 $read2_chunk_tmpfile =~ s/\.forward$/.reverse/;
|
|
127 my $bam_chunk_tmpfile = $read1_chunk_tmpfile;
|
|
128 $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/;
|
|
129 my $bam_chunk_sorted_tmpfile_prefix = $bam_chunk_tmpfile;
|
|
130 $bam_chunk_sorted_tmpfile_prefix =~ s/\.bam$/.sorted/;
|
|
131 push @bam_chunk_sorted_tmpfiles, "$bam_chunk_sorted_tmpfile_prefix.bam";
|
|
132 #push @cmds, "/export/achri_data/programs/bwa-0.6.2/bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - | samtools sort - $bam_chunk_sorted_tmpfile_prefix 2>> $log";
|
|
133 push @pair_cmds, "bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - > $bam_chunk_tmpfile; samtools sort $bam_chunk_tmpfile $bam_chunk_sorted_tmpfile_prefix &>> $log";
|
|
134 }
|
|
135 &run_cmds($threads, 5, @pair_cmds);
|
|
136
|
|
137 system("samtools merge $tmpdir/$tmpbam.bam ".join(" ", @bam_chunk_sorted_tmpfiles)."&>> $log");
|
|
138 system("samtools index $tmpdir/$tmpbam.bam &>> $log") >> 8 and die "samtools index had non-zero ($?) exit status for $tmpdir/$tmpbam.bam, aborting\n";
|
|
139
|
|
140 # post-process the alignments
|
|
141 if($pid = fork){
|
|
142 system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log");
|
|
143 wait(); # for rmdup to finish
|
|
144 }
|
|
145 elsif(defined $pid) {
|
|
146 system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n";
|
|
147 exit(0);
|
|
148 }
|
|
149 else{ # Can't fork, do in serial
|
|
150 system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log");
|
|
151 system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n";
|
|
152 }
|
|
153
|
|
154 die "Samtools rmdup did not generate the expected output file" if not -e "$tmpdir/$tmpbam.rmdup.bam";
|
|
155 die "Samtools generated a blank output file" if -z "$tmpdir/$tmpbam.rmdup.bam";
|
|
156 system "samtools index $tmpdir/$tmpbam.rmdup.bam &>> $log";
|
|
157 # Peak into the BAM to see if it's got Illumina encoded quality values, which will require an extra flag for GATK
|
|
158 my $is_illumina_encoded_qual = 1;
|
|
159 my $num_mapped = 0;
|
|
160 if(open(SAMTOOLS, "samtools view $tmpdir/$tmpbam.rmdup.bam |")){
|
|
161 while(<SAMTOOLS>){
|
|
162 my @F = split /\t/, $_;
|
|
163 next unless $F[2] ne "*"; # ignore unmapped reads
|
|
164 my @quals = map {ord($_)} split(//, $F[10]);
|
|
165 if(grep {$_ < 64} @quals){
|
|
166 $is_illumina_encoded_qual = 0; last;
|
|
167 }
|
|
168 last if ++$num_mapped > 100; # only look at the first 100 mapped reads
|
|
169 }
|
|
170 } # take our chances if samtools call failed that it's not illumina encoded, which will fail quickly in GATK anyways
|
|
171 close(SAMTOOLS);
|
|
172
|
|
173 system "java -Xmx4G -jar $diname/GenomeAnalysisTK.jar -nt $threads -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T RealignerTargetCreator -o $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals &>> $log";
|
|
174 die "GATK did not generate the expected intervals file\n" if not -e "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals";
|
|
175 system "java -Xmx4G -jar $dirname/GenomeAnalysisTK.jar -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T IndelRealigner -targetIntervals $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals -o $outbam.bam ".
|
|
176 ($is_illumina_encoded_qual?"--fix_misencoded_quality_scores":"")." &>> $log";
|
|
177 die "GATK did not generate the expected realigned BAM output file\n" if not -e "$outbam.bam";
|
|
178 die "GATK generated a blank output file\n" if -z "$outbam.bam";
|
|
179 my $outfile_bai = "$outbam.bai";
|
|
180 if(not -e $outfile_bai){
|
|
181 if(not -e "$outbam.bam.bai"){
|
|
182 system "samtools index $outbam.bam &>> $log";
|
|
183 }
|
|
184 system "cp $outbam.bam.bai $outfile_bai";
|
|
185 }
|
|
186 else{
|
|
187 system "cp $outfile_bai $outbam.bam.bai"; # some tools expect name.bai, some expect name.bam.bai, so provide both
|
|
188 }
|
|
189 &cleanup;
|
|
190
|
|
191 sub cleanup{
|
|
192 print STDERR "Cleaning up temp files for aborted bwa run: @_" if @_;
|
|
193 for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){
|
|
194 my $read2_chunk_tmpfile = $read1_chunk_tmpfile;
|
|
195 $read2_chunk_tmpfile =~ s/\.forward$/.reverse/;
|
|
196 my $bam_chunk_tmpfile = $read1_chunk_tmpfile;
|
|
197 $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/;
|
|
198 unlink $read1_chunk_tmpfile, "$read1_chunk_tmpfile.sai", $read2_chunk_tmpfile, "$read2_chunk_tmpfile.sai", $bam_chunk_tmpfile;
|
|
199 }
|
|
200 unlink @bam_chunk_sorted_tmpfiles;
|
|
201 unlink "$tmpdir/$tmpbam.bam";
|
|
202 unlink "$tmpdir/$tmpbam.bam.bai";
|
|
203 unlink "$tmpdir/$tmpbam.rmdup.bam";
|
|
204 unlink "$tmpdir/$tmpbam.rmdup.bam.bai";
|
|
205 unlink "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals";
|
|
206 }
|
|
207
|
|
208 sub run_cmds{
|
|
209
|
|
210 my ($max_cmds, $sleep_time, @cmds) = @_;
|
|
211
|
|
212 my ($num_children, $pid);
|
|
213
|
|
214 for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){
|
|
215 my $cmd = shift (@cmds);
|
|
216 if($pid = fork) {
|
|
217 sleep $sleep_time; # seems to be a major system time spike if all commands launched in quick succession.
|
|
218 } elsif(defined $pid) {
|
|
219 #print STDERR "$cmd\n";
|
|
220 system $cmd;
|
|
221 #print STDERR "Finished $cmd\n";
|
|
222 exit;
|
|
223 } else {
|
|
224 die "Can't fork: $!\n";
|
|
225 }
|
|
226 }
|
|
227
|
|
228 while(@cmds) {
|
|
229 undef $pid;
|
|
230 FORK: {
|
|
231 my $cmd = shift (@cmds);
|
|
232 if($pid = fork) {
|
|
233 $num_children++;
|
|
234 wait;
|
|
235 $num_children--;
|
|
236 next;
|
|
237
|
|
238 } elsif(defined $pid) {
|
|
239 system $cmd;
|
|
240 exit;
|
|
241
|
|
242 } else {
|
|
243 die "Can't fork: $!\n";
|
|
244 }
|
|
245 }
|
|
246 }
|
|
247 wait while $num_children--;
|
|
248 }
|
|
249
|