Mercurial > repos > yusuf > miseq_fastq_bam
view bwape_parallel_optimized @ 0:8f4d4b4e8b04 default tip
init commit
author | Yusuf Ali <ali@yusuf.email> |
---|---|
date | Wed, 25 Mar 2015 13:41:11 -0600 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env perl use strict; use warnings; use File::Basename; use File::Temp qw(:POSIX); use File::Spec qw(tmpdir); @ARGV == 6 or die "Usage: $0 <# threads> <ref.fasta> <read1.fq[.gz]> <read2.fq[.gz]> <out bam prefix>\nMerges, then pair aligns reads using BWA 0.6.2\n"; my %config; my $dirname = dirname(__FILE__); my $tool_dir = shift @ARGV; #read config file if(not -e "$tool_dir/miseq_fastq_bam.loc"){ system("cp $dirname/tool-data/miseq_fastq_bam.loc $tool_dir/miseq_fastq_bam.loc"); } open CONFIG, '<', "$tool_dir/miseq_fastq_bam.loc"; while(<CONFIG>){ (my $key, my $value) = split(/\s+/,$_); $config{$key} = $value; } close CONFIG; my $threads = shift @ARGV; my $ref = $config{"ref_genome_dir"} . (shift @ARGV); my $read1 = shift @ARGV; my $read2 = shift @ARGV; my $outbam = shift @ARGV; my $log = "$outbam.align.log"; $SIG{__DIE__} = $SIG{INT} = $SIG{HUP} = $SIG{TERM} = $SIG{ABRT} = \&cleanup; open(STDERR, ">$log"); #my $tmpdir = File::Spec->tmpdir(); my $tmpdir = "."; $ENV{"TMP_DIR"} = $tmpdir; #my ($label, $dirs, $suffix) = fileparse($read1, ".fastq", ".fq"); if(not -e "$ref.sa"){ # TODO: Use NFS lock to avoid concurrent indexing of the same file? system("bwa index $ref &>> $log") >> 8 and die "bwa index had non-zero ($?) exit status for $ref, aborting\n"; } # Split the input into temporary chunks for parallel processing my $tot_read1_size = -s $read1; # Estimate expansion of compressed FASTQ files. About 3.5 from sample data we have my $compressed_reads = 0; if(`file $read1` =~ /gzip/i){ $compressed_reads = 1; $tot_read1_size *= 3.5; } my $chunk_read_size = $tot_read1_size/$threads; my (@read1_chunk_tmpfiles, @read2_chunk_tmpfiles); # run initial alignment of each end in parallel my $pid; my $read_so_far = 0; my $read_chunk_tmpfile_prefix = $$; #tmpnam(); my $tmpbam = basename($read_chunk_tmpfile_prefix); # Check the average num of lines in a chunk to ensure chunks are lined up between forward and reverse my $lines_per_chunk = 0; open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1)) or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n"; while(<READS1>){ $read_so_far += length($_); # FASTQ records have 4 lines...only split if at the end of a record if($.%4 == 1 and $read_so_far > $chunk_read_size){ last; } $lines_per_chunk++; } close(READS1); my $chunk_count = 0; my @align_cmds; if($pid = fork) { # parent here my $read1_chunk_tmpfile; open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1)) or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n"; while(<READS1>){ # FASTQ records have 4 lines...only split if at the end of a record if($. % $lines_per_chunk == 1){ if(defined $read1_chunk_tmpfile){ push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log"; } $read1_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".forward"; push @read1_chunk_tmpfiles, $read1_chunk_tmpfile; open(FASTQ_CHUNK, ">$read1_chunk_tmpfile") or die "Cannot open $read1_chunk_tmpfile for writing: $!\n"; } print FASTQ_CHUNK $_; } close(FASTQ_CHUNK); push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log"; wait; } elsif(defined $pid) { # $pid is zero in child process if defined my $read2_chunk_tmpfile; open(READS2, ($compressed_reads ? "gzip -cd $read2 |" : $read2)) or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read2: $!\n"; while(<READS2>){ # FASTQ records have 4 lines...only split if at the end of a record if($. % $lines_per_chunk == 1){ $read2_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".reverse"; open(FASTQ_CHUNK, ">$read2_chunk_tmpfile") or die "Cannot open $read2_chunk_tmpfile for writing: $!\n"; } print FASTQ_CHUNK $_; } close(FASTQ_CHUNK); exit(0); } else { die "Can't fork: $!\n"; } # repeat the commands for the reverse reads my @final_align_cmds = (@align_cmds, map {my $r = $_; $r =~ s/\.forward/.reverse/g; $r} @align_cmds); &run_cmds($threads, 5, @final_align_cmds); my @pair_cmds; my @bam_chunk_sorted_tmpfiles; for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){ my $read2_chunk_tmpfile = $read1_chunk_tmpfile; $read2_chunk_tmpfile =~ s/\.forward$/.reverse/; my $bam_chunk_tmpfile = $read1_chunk_tmpfile; $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/; my $bam_chunk_sorted_tmpfile_prefix = $bam_chunk_tmpfile; $bam_chunk_sorted_tmpfile_prefix =~ s/\.bam$/.sorted/; push @bam_chunk_sorted_tmpfiles, "$bam_chunk_sorted_tmpfile_prefix.bam"; #push @cmds, "/export/achri_data/programs/bwa-0.6.2/bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - | samtools sort - $bam_chunk_sorted_tmpfile_prefix 2>> $log"; push @pair_cmds, "bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - > $bam_chunk_tmpfile; samtools sort $bam_chunk_tmpfile $bam_chunk_sorted_tmpfile_prefix &>> $log"; } &run_cmds($threads, 5, @pair_cmds); system("samtools merge $tmpdir/$tmpbam.bam ".join(" ", @bam_chunk_sorted_tmpfiles)."&>> $log"); system("samtools index $tmpdir/$tmpbam.bam &>> $log") >> 8 and die "samtools index had non-zero ($?) exit status for $tmpdir/$tmpbam.bam, aborting\n"; # post-process the alignments if($pid = fork){ system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log"); wait(); # for rmdup to finish } elsif(defined $pid) { system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n"; exit(0); } else{ # Can't fork, do in serial system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log"); system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n"; } die "Samtools rmdup did not generate the expected output file" if not -e "$tmpdir/$tmpbam.rmdup.bam"; die "Samtools generated a blank output file" if -z "$tmpdir/$tmpbam.rmdup.bam"; system "samtools index $tmpdir/$tmpbam.rmdup.bam &>> $log"; # Peak into the BAM to see if it's got Illumina encoded quality values, which will require an extra flag for GATK my $is_illumina_encoded_qual = 1; my $num_mapped = 0; if(open(SAMTOOLS, "samtools view $tmpdir/$tmpbam.rmdup.bam |")){ while(<SAMTOOLS>){ my @F = split /\t/, $_; next unless $F[2] ne "*"; # ignore unmapped reads my @quals = map {ord($_)} split(//, $F[10]); if(grep {$_ < 64} @quals){ $is_illumina_encoded_qual = 0; last; } last if ++$num_mapped > 100; # only look at the first 100 mapped reads } } # take our chances if samtools call failed that it's not illumina encoded, which will fail quickly in GATK anyways close(SAMTOOLS); system "java -Xmx4G -jar $diname/GenomeAnalysisTK.jar -nt $threads -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T RealignerTargetCreator -o $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals &>> $log"; die "GATK did not generate the expected intervals file\n" if not -e "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals"; system "java -Xmx4G -jar $dirname/GenomeAnalysisTK.jar -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T IndelRealigner -targetIntervals $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals -o $outbam.bam ". ($is_illumina_encoded_qual?"--fix_misencoded_quality_scores":"")." &>> $log"; die "GATK did not generate the expected realigned BAM output file\n" if not -e "$outbam.bam"; die "GATK generated a blank output file\n" if -z "$outbam.bam"; my $outfile_bai = "$outbam.bai"; if(not -e $outfile_bai){ if(not -e "$outbam.bam.bai"){ system "samtools index $outbam.bam &>> $log"; } system "cp $outbam.bam.bai $outfile_bai"; } else{ system "cp $outfile_bai $outbam.bam.bai"; # some tools expect name.bai, some expect name.bam.bai, so provide both } &cleanup; sub cleanup{ print STDERR "Cleaning up temp files for aborted bwa run: @_" if @_; for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){ my $read2_chunk_tmpfile = $read1_chunk_tmpfile; $read2_chunk_tmpfile =~ s/\.forward$/.reverse/; my $bam_chunk_tmpfile = $read1_chunk_tmpfile; $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/; unlink $read1_chunk_tmpfile, "$read1_chunk_tmpfile.sai", $read2_chunk_tmpfile, "$read2_chunk_tmpfile.sai", $bam_chunk_tmpfile; } unlink @bam_chunk_sorted_tmpfiles; unlink "$tmpdir/$tmpbam.bam"; unlink "$tmpdir/$tmpbam.bam.bai"; unlink "$tmpdir/$tmpbam.rmdup.bam"; unlink "$tmpdir/$tmpbam.rmdup.bam.bai"; unlink "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals"; } sub run_cmds{ my ($max_cmds, $sleep_time, @cmds) = @_; my ($num_children, $pid); for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){ my $cmd = shift (@cmds); if($pid = fork) { sleep $sleep_time; # seems to be a major system time spike if all commands launched in quick succession. } elsif(defined $pid) { #print STDERR "$cmd\n"; system $cmd; #print STDERR "Finished $cmd\n"; exit; } else { die "Can't fork: $!\n"; } } while(@cmds) { undef $pid; FORK: { my $cmd = shift (@cmds); if($pid = fork) { $num_children++; wait; $num_children--; next; } elsif(defined $pid) { system $cmd; exit; } else { die "Can't fork: $!\n"; } } } wait while $num_children--; }