view prep_bams_parallel @ 0:6b673ffd9e38 default tip

initial commit
author Yusuf Ali <ali@yusuf.email>
date Wed, 25 Mar 2015 13:29:45 -0600
parents
children
line wrap: on
line source

#!/usr/bin/env perl

use File::Basename;
use strict;
use warnings;

@ARGV > 5 or die "Usage: $0 <num parallel processes> <output log> <reference.fa> <temporary directory> <deduped_realigned_bam_prefix> <input1.bam> [input2.bam] ...\n";

my $num_procs = shift @ARGV;
my $log = shift @ARGV;
my $ref = shift @ARGV;
my $tmpdir = shift @ARGV;
my $outfile_prefix = shift @ARGV;

my $progdir = dirname($0);

# Split the input data into separate files for each chromosome, so we can process them in parallel
my $chr_files_hashref = &generate_chr_split_bams($tmpdir, @ARGV);

my (@cmds, @logs);
for my $chr (keys %$chr_files_hashref){
  push @cmds, "$progdir/prep_bams $log.$chr $ref $tmpdir $outfile_prefix.$chr.bam ".$chr_files_hashref->{$chr};
  push @logs, "$log.$chr";
}

# Run the chrs in parallel to the desrired degree
&run_cmds($num_procs, @cmds);

# Remove the split input files, as they were only needed temporarily
for my $chr_file (values %$chr_files_hashref){
  unlink $chr_file or warn "Cannot remove temporary file $chr_file: $!\n";
}

# Merge the logs
open(LOG, ">$log")
  or die "Could not open output log $log for writing: $!\n";
for my $l (@logs){
  open(L, $l)
    or warn "Log file $l was expected but not found, data processing may have been incomplete.\n";
  print LOG <L>;
  close(L);
  unlink $l;
}
close(LOG);

# Merge the input BAMsinto one file, then split them into per contig BAM files for downstream parallelization
sub generate_chr_split_bams{
  my ($tempdir, @input_bams) = @_;

  my $temp_input_bam = $input_bams[0];
  if(@input_bams > 1){
    $temp_input_bam = "$tempdir/prep_bams_parallel_$$.bam";
    system ("samtools merge $temp_input_bam ".join(" ", @input_bams)) >> 8 
      and die "Could not successfully run samtools merge: exit status was $?\n";
  }
  if(not -e "$temp_input_bam.bai" or -z "$temp_input_bam.bai"){
    system ("samtools index $temp_input_bam") >> 8
      and die "Could not successfully run samtools index: exit status was $?\n";
  }
  my @output_contigs;
  open(SAM, "samtools view -H $temp_input_bam |")
    or die "Cannot run samtools view on $temp_input_bam: $!\n";
  while(<SAM>){
    if(/^\@SQ\tSN:(\S+)/){
      push @output_contigs, $1;
    }
  }
  close(SAM);

  my @split_bamfiles;
  my %contig2outfile;
  for my $output_contig (@output_contigs){
    open(SAM, "samtools view -h $temp_input_bam '$output_contig'|")
      and die "Cannot execute samtools view successfully: $!\n";
    my @headers;
    # read through the headers, then if we get at least one mapped read, open a file to print it
    while(<SAM>){
      last if not /^\@/; # first real record
      push @headers, $_;
    }
    next if not defined $_; # got to EOF before a real record

    my $outfile = "$tempdir/prep_bams_parallel_$$.".quotemeta($output_contig).".bam";
    open(OUT, "| samtools view -b -S - > $outfile")
      or die "Cannot open $outfile for writing: $!\n";
    print OUT @headers, $_;
    while(<SAM>){ # Copy over all lines.  Not using list context as this may be a huge dataset and would needlessly take up memory
      print OUT $_;
    }
    close(SAM);
    close(OUT);
  }
  unlink $temp_input_bam or warn "Could not remove $temp_input_bam: $!\n";
}

sub run_cmds {
  my ($max_cmds, @cmd) = @_;
  
  my ($num_children, $pid); 
  
        for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){
        # Initialize the number of child processes at 1, and increment it by one
        # While it is less than $max_cmds

                my $cmd = shift (@cmds);
                if($pid = fork) {
                        # do nothing if parent
                } elsif(defined $pid) { # $pid is zero here if defined
                        print STDERR $cmd, "\n";
                        system $cmd;
                        exit;
                } else {
                        #weird fork error
                        die "Can't fork: $!\n";
                }
        }

        while(@cmds) {
                undef $pid;
                FORK: {
                        my $cmd = shift (@cmds);
                        if($pid = fork) {
                                # parent here
                                $num_children++;
                                wait;
                                $num_children--;
                                next;

                        } elsif(defined $pid) { # $pid is zero here if defined
                                print STDERR $cmd, "\n";
                                system $cmd;
                                exit;

                        } else {
                                #weird fork error
                                die "Can't fork: $!\n";
                        }
                }
        }
        wait while $num_children--;
}