Mercurial > repos > yusuf > miseq_fastq_bam
comparison bwape_parallel_optimized @ 0:8f4d4b4e8b04 default tip
init commit
| author | Yusuf Ali <ali@yusuf.email> |
|---|---|
| date | Wed, 25 Mar 2015 13:41:11 -0600 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:8f4d4b4e8b04 |
|---|---|
| 1 #!/usr/bin/env perl | |
| 2 | |
| 3 use strict; | |
| 4 use warnings; | |
| 5 use File::Basename; | |
| 6 use File::Temp qw(:POSIX); | |
| 7 use File::Spec qw(tmpdir); | |
| 8 | |
| 9 @ARGV == 6 or die "Usage: $0 <# threads> <ref.fasta> <read1.fq[.gz]> <read2.fq[.gz]> <out bam prefix>\nMerges, then pair aligns reads using BWA 0.6.2\n"; | |
| 10 | |
| 11 my %config; | |
| 12 my $dirname = dirname(__FILE__); | |
| 13 my $tool_dir = shift @ARGV; | |
| 14 #read config file | |
| 15 if(not -e "$tool_dir/miseq_fastq_bam.loc"){ | |
| 16 system("cp $dirname/tool-data/miseq_fastq_bam.loc $tool_dir/miseq_fastq_bam.loc"); | |
| 17 } | |
| 18 open CONFIG, '<', "$tool_dir/miseq_fastq_bam.loc"; | |
| 19 while(<CONFIG>){ | |
| 20 (my $key, my $value) = split(/\s+/,$_); | |
| 21 $config{$key} = $value; | |
| 22 } | |
| 23 close CONFIG; | |
| 24 | |
| 25 my $threads = shift @ARGV; | |
| 26 my $ref = $config{"ref_genome_dir"} . (shift @ARGV); | |
| 27 my $read1 = shift @ARGV; | |
| 28 my $read2 = shift @ARGV; | |
| 29 my $outbam = shift @ARGV; | |
| 30 my $log = "$outbam.align.log"; | |
| 31 $SIG{__DIE__} = $SIG{INT} = $SIG{HUP} = $SIG{TERM} = $SIG{ABRT} = \&cleanup; | |
| 32 | |
| 33 open(STDERR, ">$log"); | |
| 34 #my $tmpdir = File::Spec->tmpdir(); | |
| 35 my $tmpdir = "."; | |
| 36 $ENV{"TMP_DIR"} = $tmpdir; | |
| 37 #my ($label, $dirs, $suffix) = fileparse($read1, ".fastq", ".fq"); | |
| 38 | |
| 39 if(not -e "$ref.sa"){ | |
| 40 # TODO: Use NFS lock to avoid concurrent indexing of the same file? | |
| 41 system("bwa index $ref &>> $log") >> 8 and die "bwa index had non-zero ($?) exit status for $ref, aborting\n"; | |
| 42 } | |
| 43 | |
| 44 # Split the input into temporary chunks for parallel processing | |
| 45 my $tot_read1_size = -s $read1; | |
| 46 # Estimate expansion of compressed FASTQ files. About 3.5 from sample data we have | |
| 47 my $compressed_reads = 0; | |
| 48 if(`file $read1` =~ /gzip/i){ | |
| 49 $compressed_reads = 1; | |
| 50 $tot_read1_size *= 3.5; | |
| 51 } | |
| 52 my $chunk_read_size = $tot_read1_size/$threads; | |
| 53 | |
| 54 my (@read1_chunk_tmpfiles, @read2_chunk_tmpfiles); | |
| 55 # run initial alignment of each end in parallel | |
| 56 my $pid; | |
| 57 my $read_so_far = 0; | |
| 58 my $read_chunk_tmpfile_prefix = $$; #tmpnam(); | |
| 59 my $tmpbam = basename($read_chunk_tmpfile_prefix); | |
| 60 | |
| 61 # Check the average num of lines in a chunk to ensure chunks are lined up between forward and reverse | |
| 62 my $lines_per_chunk = 0; | |
| 63 open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1)) | |
| 64 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n"; | |
| 65 while(<READS1>){ | |
| 66 $read_so_far += length($_); | |
| 67 # FASTQ records have 4 lines...only split if at the end of a record | |
| 68 if($.%4 == 1 and $read_so_far > $chunk_read_size){ | |
| 69 last; | |
| 70 } | |
| 71 $lines_per_chunk++; | |
| 72 } | |
| 73 close(READS1); | |
| 74 my $chunk_count = 0; | |
| 75 my @align_cmds; | |
| 76 if($pid = fork) { | |
| 77 # parent here | |
| 78 my $read1_chunk_tmpfile; | |
| 79 open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1)) | |
| 80 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n"; | |
| 81 while(<READS1>){ | |
| 82 # FASTQ records have 4 lines...only split if at the end of a record | |
| 83 if($. % $lines_per_chunk == 1){ | |
| 84 if(defined $read1_chunk_tmpfile){ | |
| 85 push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log"; | |
| 86 } | |
| 87 $read1_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".forward"; | |
| 88 push @read1_chunk_tmpfiles, $read1_chunk_tmpfile; | |
| 89 open(FASTQ_CHUNK, ">$read1_chunk_tmpfile") | |
| 90 or die "Cannot open $read1_chunk_tmpfile for writing: $!\n"; | |
| 91 } | |
| 92 print FASTQ_CHUNK $_; | |
| 93 } | |
| 94 close(FASTQ_CHUNK); | |
| 95 push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log"; | |
| 96 wait; | |
| 97 } | |
| 98 elsif(defined $pid) { # $pid is zero in child process if defined | |
| 99 my $read2_chunk_tmpfile; | |
| 100 open(READS2, ($compressed_reads ? "gzip -cd $read2 |" : $read2)) | |
| 101 or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read2: $!\n"; | |
| 102 while(<READS2>){ | |
| 103 # FASTQ records have 4 lines...only split if at the end of a record | |
| 104 if($. % $lines_per_chunk == 1){ | |
| 105 $read2_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".reverse"; | |
| 106 open(FASTQ_CHUNK, ">$read2_chunk_tmpfile") | |
| 107 or die "Cannot open $read2_chunk_tmpfile for writing: $!\n"; | |
| 108 } | |
| 109 print FASTQ_CHUNK $_; | |
| 110 } | |
| 111 close(FASTQ_CHUNK); | |
| 112 exit(0); | |
| 113 } | |
| 114 else { | |
| 115 die "Can't fork: $!\n"; | |
| 116 } | |
| 117 | |
| 118 # repeat the commands for the reverse reads | |
| 119 my @final_align_cmds = (@align_cmds, map {my $r = $_; $r =~ s/\.forward/.reverse/g; $r} @align_cmds); | |
| 120 &run_cmds($threads, 5, @final_align_cmds); | |
| 121 | |
| 122 my @pair_cmds; | |
| 123 my @bam_chunk_sorted_tmpfiles; | |
| 124 for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){ | |
| 125 my $read2_chunk_tmpfile = $read1_chunk_tmpfile; | |
| 126 $read2_chunk_tmpfile =~ s/\.forward$/.reverse/; | |
| 127 my $bam_chunk_tmpfile = $read1_chunk_tmpfile; | |
| 128 $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/; | |
| 129 my $bam_chunk_sorted_tmpfile_prefix = $bam_chunk_tmpfile; | |
| 130 $bam_chunk_sorted_tmpfile_prefix =~ s/\.bam$/.sorted/; | |
| 131 push @bam_chunk_sorted_tmpfiles, "$bam_chunk_sorted_tmpfile_prefix.bam"; | |
| 132 #push @cmds, "/export/achri_data/programs/bwa-0.6.2/bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - | samtools sort - $bam_chunk_sorted_tmpfile_prefix 2>> $log"; | |
| 133 push @pair_cmds, "bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - > $bam_chunk_tmpfile; samtools sort $bam_chunk_tmpfile $bam_chunk_sorted_tmpfile_prefix &>> $log"; | |
| 134 } | |
| 135 &run_cmds($threads, 5, @pair_cmds); | |
| 136 | |
| 137 system("samtools merge $tmpdir/$tmpbam.bam ".join(" ", @bam_chunk_sorted_tmpfiles)."&>> $log"); | |
| 138 system("samtools index $tmpdir/$tmpbam.bam &>> $log") >> 8 and die "samtools index had non-zero ($?) exit status for $tmpdir/$tmpbam.bam, aborting\n"; | |
| 139 | |
| 140 # post-process the alignments | |
| 141 if($pid = fork){ | |
| 142 system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log"); | |
| 143 wait(); # for rmdup to finish | |
| 144 } | |
| 145 elsif(defined $pid) { | |
| 146 system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n"; | |
| 147 exit(0); | |
| 148 } | |
| 149 else{ # Can't fork, do in serial | |
| 150 system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log"); | |
| 151 system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n"; | |
| 152 } | |
| 153 | |
| 154 die "Samtools rmdup did not generate the expected output file" if not -e "$tmpdir/$tmpbam.rmdup.bam"; | |
| 155 die "Samtools generated a blank output file" if -z "$tmpdir/$tmpbam.rmdup.bam"; | |
| 156 system "samtools index $tmpdir/$tmpbam.rmdup.bam &>> $log"; | |
| 157 # Peak into the BAM to see if it's got Illumina encoded quality values, which will require an extra flag for GATK | |
| 158 my $is_illumina_encoded_qual = 1; | |
| 159 my $num_mapped = 0; | |
| 160 if(open(SAMTOOLS, "samtools view $tmpdir/$tmpbam.rmdup.bam |")){ | |
| 161 while(<SAMTOOLS>){ | |
| 162 my @F = split /\t/, $_; | |
| 163 next unless $F[2] ne "*"; # ignore unmapped reads | |
| 164 my @quals = map {ord($_)} split(//, $F[10]); | |
| 165 if(grep {$_ < 64} @quals){ | |
| 166 $is_illumina_encoded_qual = 0; last; | |
| 167 } | |
| 168 last if ++$num_mapped > 100; # only look at the first 100 mapped reads | |
| 169 } | |
| 170 } # take our chances if samtools call failed that it's not illumina encoded, which will fail quickly in GATK anyways | |
| 171 close(SAMTOOLS); | |
| 172 | |
| 173 system "java -Xmx4G -jar $diname/GenomeAnalysisTK.jar -nt $threads -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T RealignerTargetCreator -o $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals &>> $log"; | |
| 174 die "GATK did not generate the expected intervals file\n" if not -e "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals"; | |
| 175 system "java -Xmx4G -jar $dirname/GenomeAnalysisTK.jar -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T IndelRealigner -targetIntervals $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals -o $outbam.bam ". | |
| 176 ($is_illumina_encoded_qual?"--fix_misencoded_quality_scores":"")." &>> $log"; | |
| 177 die "GATK did not generate the expected realigned BAM output file\n" if not -e "$outbam.bam"; | |
| 178 die "GATK generated a blank output file\n" if -z "$outbam.bam"; | |
| 179 my $outfile_bai = "$outbam.bai"; | |
| 180 if(not -e $outfile_bai){ | |
| 181 if(not -e "$outbam.bam.bai"){ | |
| 182 system "samtools index $outbam.bam &>> $log"; | |
| 183 } | |
| 184 system "cp $outbam.bam.bai $outfile_bai"; | |
| 185 } | |
| 186 else{ | |
| 187 system "cp $outfile_bai $outbam.bam.bai"; # some tools expect name.bai, some expect name.bam.bai, so provide both | |
| 188 } | |
| 189 &cleanup; | |
| 190 | |
| 191 sub cleanup{ | |
| 192 print STDERR "Cleaning up temp files for aborted bwa run: @_" if @_; | |
| 193 for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){ | |
| 194 my $read2_chunk_tmpfile = $read1_chunk_tmpfile; | |
| 195 $read2_chunk_tmpfile =~ s/\.forward$/.reverse/; | |
| 196 my $bam_chunk_tmpfile = $read1_chunk_tmpfile; | |
| 197 $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/; | |
| 198 unlink $read1_chunk_tmpfile, "$read1_chunk_tmpfile.sai", $read2_chunk_tmpfile, "$read2_chunk_tmpfile.sai", $bam_chunk_tmpfile; | |
| 199 } | |
| 200 unlink @bam_chunk_sorted_tmpfiles; | |
| 201 unlink "$tmpdir/$tmpbam.bam"; | |
| 202 unlink "$tmpdir/$tmpbam.bam.bai"; | |
| 203 unlink "$tmpdir/$tmpbam.rmdup.bam"; | |
| 204 unlink "$tmpdir/$tmpbam.rmdup.bam.bai"; | |
| 205 unlink "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals"; | |
| 206 } | |
| 207 | |
| 208 sub run_cmds{ | |
| 209 | |
| 210 my ($max_cmds, $sleep_time, @cmds) = @_; | |
| 211 | |
| 212 my ($num_children, $pid); | |
| 213 | |
| 214 for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){ | |
| 215 my $cmd = shift (@cmds); | |
| 216 if($pid = fork) { | |
| 217 sleep $sleep_time; # seems to be a major system time spike if all commands launched in quick succession. | |
| 218 } elsif(defined $pid) { | |
| 219 #print STDERR "$cmd\n"; | |
| 220 system $cmd; | |
| 221 #print STDERR "Finished $cmd\n"; | |
| 222 exit; | |
| 223 } else { | |
| 224 die "Can't fork: $!\n"; | |
| 225 } | |
| 226 } | |
| 227 | |
| 228 while(@cmds) { | |
| 229 undef $pid; | |
| 230 FORK: { | |
| 231 my $cmd = shift (@cmds); | |
| 232 if($pid = fork) { | |
| 233 $num_children++; | |
| 234 wait; | |
| 235 $num_children--; | |
| 236 next; | |
| 237 | |
| 238 } elsif(defined $pid) { | |
| 239 system $cmd; | |
| 240 exit; | |
| 241 | |
| 242 } else { | |
| 243 die "Can't fork: $!\n"; | |
| 244 } | |
| 245 } | |
| 246 } | |
| 247 wait while $num_children--; | |
| 248 } | |
| 249 |
