Mercurial > repos > yusuf > dedup_realign_bam
diff prep_bams_parallel @ 0:6b673ffd9e38 default tip
initial commit
author | Yusuf Ali <ali@yusuf.email> |
---|---|
date | Wed, 25 Mar 2015 13:29:45 -0600 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/prep_bams_parallel Wed Mar 25 13:29:45 2015 -0600 @@ -0,0 +1,142 @@ +#!/usr/bin/env perl + +use File::Basename; +use strict; +use warnings; + +@ARGV > 5 or die "Usage: $0 <num parallel processes> <output log> <reference.fa> <temporary directory> <deduped_realigned_bam_prefix> <input1.bam> [input2.bam] ...\n"; + +my $num_procs = shift @ARGV; +my $log = shift @ARGV; +my $ref = shift @ARGV; +my $tmpdir = shift @ARGV; +my $outfile_prefix = shift @ARGV; + +my $progdir = dirname($0); + +# Split the input data into separate files for each chromosome, so we can process them in parallel +my $chr_files_hashref = &generate_chr_split_bams($tmpdir, @ARGV); + +my (@cmds, @logs); +for my $chr (keys %$chr_files_hashref){ + push @cmds, "$progdir/prep_bams $log.$chr $ref $tmpdir $outfile_prefix.$chr.bam ".$chr_files_hashref->{$chr}; + push @logs, "$log.$chr"; +} + +# Run the chrs in parallel to the desrired degree +&run_cmds($num_procs, @cmds); + +# Remove the split input files, as they were only needed temporarily +for my $chr_file (values %$chr_files_hashref){ + unlink $chr_file or warn "Cannot remove temporary file $chr_file: $!\n"; +} + +# Merge the logs +open(LOG, ">$log") + or die "Could not open output log $log for writing: $!\n"; +for my $l (@logs){ + open(L, $l) + or warn "Log file $l was expected but not found, data processing may have been incomplete.\n"; + print LOG <L>; + close(L); + unlink $l; +} +close(LOG); + +# Merge the input BAMsinto one file, then split them into per contig BAM files for downstream parallelization +sub generate_chr_split_bams{ + my ($tempdir, @input_bams) = @_; + + my $temp_input_bam = $input_bams[0]; + if(@input_bams > 1){ + $temp_input_bam = "$tempdir/prep_bams_parallel_$$.bam"; + system ("samtools merge $temp_input_bam ".join(" ", @input_bams)) >> 8 + and die "Could not successfully run samtools merge: exit status was $?\n"; + } + if(not -e "$temp_input_bam.bai" or -z "$temp_input_bam.bai"){ + system ("samtools index $temp_input_bam") >> 8 + and die "Could not successfully run samtools index: exit status was $?\n"; + } + my @output_contigs; + open(SAM, "samtools view -H $temp_input_bam |") + or die "Cannot run samtools view on $temp_input_bam: $!\n"; + while(<SAM>){ + if(/^\@SQ\tSN:(\S+)/){ + push @output_contigs, $1; + } + } + close(SAM); + + my @split_bamfiles; + my %contig2outfile; + for my $output_contig (@output_contigs){ + open(SAM, "samtools view -h $temp_input_bam '$output_contig'|") + and die "Cannot execute samtools view successfully: $!\n"; + my @headers; + # read through the headers, then if we get at least one mapped read, open a file to print it + while(<SAM>){ + last if not /^\@/; # first real record + push @headers, $_; + } + next if not defined $_; # got to EOF before a real record + + my $outfile = "$tempdir/prep_bams_parallel_$$.".quotemeta($output_contig).".bam"; + open(OUT, "| samtools view -b -S - > $outfile") + or die "Cannot open $outfile for writing: $!\n"; + print OUT @headers, $_; + while(<SAM>){ # Copy over all lines. Not using list context as this may be a huge dataset and would needlessly take up memory + print OUT $_; + } + close(SAM); + close(OUT); + } + unlink $temp_input_bam or warn "Could not remove $temp_input_bam: $!\n"; +} + +sub run_cmds { + my ($max_cmds, @cmd) = @_; + + my ($num_children, $pid); + + for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){ + # Initialize the number of child processes at 1, and increment it by one + # While it is less than $max_cmds + + my $cmd = shift (@cmds); + if($pid = fork) { + # do nothing if parent + } elsif(defined $pid) { # $pid is zero here if defined + print STDERR $cmd, "\n"; + system $cmd; + exit; + } else { + #weird fork error + die "Can't fork: $!\n"; + } + } + + while(@cmds) { + undef $pid; + FORK: { + my $cmd = shift (@cmds); + if($pid = fork) { + # parent here + $num_children++; + wait; + $num_children--; + next; + + } elsif(defined $pid) { # $pid is zero here if defined + print STDERR $cmd, "\n"; + system $cmd; + exit; + + } else { + #weird fork error + die "Can't fork: $!\n"; + } + } + } + wait while $num_children--; +} +