Mercurial > repos > yusuf > dedup_realign_bam
view prep_bams_parallel @ 0:6b673ffd9e38 default tip
initial commit
author | Yusuf Ali <ali@yusuf.email> |
---|---|
date | Wed, 25 Mar 2015 13:29:45 -0600 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env perl use File::Basename; use strict; use warnings; @ARGV > 5 or die "Usage: $0 <num parallel processes> <output log> <reference.fa> <temporary directory> <deduped_realigned_bam_prefix> <input1.bam> [input2.bam] ...\n"; my $num_procs = shift @ARGV; my $log = shift @ARGV; my $ref = shift @ARGV; my $tmpdir = shift @ARGV; my $outfile_prefix = shift @ARGV; my $progdir = dirname($0); # Split the input data into separate files for each chromosome, so we can process them in parallel my $chr_files_hashref = &generate_chr_split_bams($tmpdir, @ARGV); my (@cmds, @logs); for my $chr (keys %$chr_files_hashref){ push @cmds, "$progdir/prep_bams $log.$chr $ref $tmpdir $outfile_prefix.$chr.bam ".$chr_files_hashref->{$chr}; push @logs, "$log.$chr"; } # Run the chrs in parallel to the desrired degree &run_cmds($num_procs, @cmds); # Remove the split input files, as they were only needed temporarily for my $chr_file (values %$chr_files_hashref){ unlink $chr_file or warn "Cannot remove temporary file $chr_file: $!\n"; } # Merge the logs open(LOG, ">$log") or die "Could not open output log $log for writing: $!\n"; for my $l (@logs){ open(L, $l) or warn "Log file $l was expected but not found, data processing may have been incomplete.\n"; print LOG <L>; close(L); unlink $l; } close(LOG); # Merge the input BAMsinto one file, then split them into per contig BAM files for downstream parallelization sub generate_chr_split_bams{ my ($tempdir, @input_bams) = @_; my $temp_input_bam = $input_bams[0]; if(@input_bams > 1){ $temp_input_bam = "$tempdir/prep_bams_parallel_$$.bam"; system ("samtools merge $temp_input_bam ".join(" ", @input_bams)) >> 8 and die "Could not successfully run samtools merge: exit status was $?\n"; } if(not -e "$temp_input_bam.bai" or -z "$temp_input_bam.bai"){ system ("samtools index $temp_input_bam") >> 8 and die "Could not successfully run samtools index: exit status was $?\n"; } my @output_contigs; open(SAM, "samtools view -H $temp_input_bam |") or die "Cannot run samtools view on $temp_input_bam: $!\n"; while(<SAM>){ if(/^\@SQ\tSN:(\S+)/){ push @output_contigs, $1; } } close(SAM); my @split_bamfiles; my %contig2outfile; for my $output_contig (@output_contigs){ open(SAM, "samtools view -h $temp_input_bam '$output_contig'|") and die "Cannot execute samtools view successfully: $!\n"; my @headers; # read through the headers, then if we get at least one mapped read, open a file to print it while(<SAM>){ last if not /^\@/; # first real record push @headers, $_; } next if not defined $_; # got to EOF before a real record my $outfile = "$tempdir/prep_bams_parallel_$$.".quotemeta($output_contig).".bam"; open(OUT, "| samtools view -b -S - > $outfile") or die "Cannot open $outfile for writing: $!\n"; print OUT @headers, $_; while(<SAM>){ # Copy over all lines. Not using list context as this may be a huge dataset and would needlessly take up memory print OUT $_; } close(SAM); close(OUT); } unlink $temp_input_bam or warn "Could not remove $temp_input_bam: $!\n"; } sub run_cmds { my ($max_cmds, @cmd) = @_; my ($num_children, $pid); for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){ # Initialize the number of child processes at 1, and increment it by one # While it is less than $max_cmds my $cmd = shift (@cmds); if($pid = fork) { # do nothing if parent } elsif(defined $pid) { # $pid is zero here if defined print STDERR $cmd, "\n"; system $cmd; exit; } else { #weird fork error die "Can't fork: $!\n"; } } while(@cmds) { undef $pid; FORK: { my $cmd = shift (@cmds); if($pid = fork) { # parent here $num_children++; wait; $num_children--; next; } elsif(defined $pid) { # $pid is zero here if defined print STDERR $cmd, "\n"; system $cmd; exit; } else { #weird fork error die "Can't fork: $!\n"; } } } wait while $num_children--; }