diff prep_bams_parallel @ 0:6b673ffd9e38 default tip

initial commit
author Yusuf Ali <ali@yusuf.email>
date Wed, 25 Mar 2015 13:29:45 -0600
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/prep_bams_parallel	Wed Mar 25 13:29:45 2015 -0600
@@ -0,0 +1,142 @@
+#!/usr/bin/env perl
+
+use File::Basename;
+use strict;
+use warnings;
+
+@ARGV > 5 or die "Usage: $0 <num parallel processes> <output log> <reference.fa> <temporary directory> <deduped_realigned_bam_prefix> <input1.bam> [input2.bam] ...\n";
+
+my $num_procs = shift @ARGV;
+my $log = shift @ARGV;
+my $ref = shift @ARGV;
+my $tmpdir = shift @ARGV;
+my $outfile_prefix = shift @ARGV;
+
+my $progdir = dirname($0);
+
+# Split the input data into separate files for each chromosome, so we can process them in parallel
+my $chr_files_hashref = &generate_chr_split_bams($tmpdir, @ARGV);
+
+my (@cmds, @logs);
+for my $chr (keys %$chr_files_hashref){
+  push @cmds, "$progdir/prep_bams $log.$chr $ref $tmpdir $outfile_prefix.$chr.bam ".$chr_files_hashref->{$chr};
+  push @logs, "$log.$chr";
+}
+
+# Run the chrs in parallel to the desrired degree
+&run_cmds($num_procs, @cmds);
+
+# Remove the split input files, as they were only needed temporarily
+for my $chr_file (values %$chr_files_hashref){
+  unlink $chr_file or warn "Cannot remove temporary file $chr_file: $!\n";
+}
+
+# Merge the logs
+open(LOG, ">$log")
+  or die "Could not open output log $log for writing: $!\n";
+for my $l (@logs){
+  open(L, $l)
+    or warn "Log file $l was expected but not found, data processing may have been incomplete.\n";
+  print LOG <L>;
+  close(L);
+  unlink $l;
+}
+close(LOG);
+
+# Merge the input BAMsinto one file, then split them into per contig BAM files for downstream parallelization
+sub generate_chr_split_bams{
+  my ($tempdir, @input_bams) = @_;
+
+  my $temp_input_bam = $input_bams[0];
+  if(@input_bams > 1){
+    $temp_input_bam = "$tempdir/prep_bams_parallel_$$.bam";
+    system ("samtools merge $temp_input_bam ".join(" ", @input_bams)) >> 8 
+      and die "Could not successfully run samtools merge: exit status was $?\n";
+  }
+  if(not -e "$temp_input_bam.bai" or -z "$temp_input_bam.bai"){
+    system ("samtools index $temp_input_bam") >> 8
+      and die "Could not successfully run samtools index: exit status was $?\n";
+  }
+  my @output_contigs;
+  open(SAM, "samtools view -H $temp_input_bam |")
+    or die "Cannot run samtools view on $temp_input_bam: $!\n";
+  while(<SAM>){
+    if(/^\@SQ\tSN:(\S+)/){
+      push @output_contigs, $1;
+    }
+  }
+  close(SAM);
+
+  my @split_bamfiles;
+  my %contig2outfile;
+  for my $output_contig (@output_contigs){
+    open(SAM, "samtools view -h $temp_input_bam '$output_contig'|")
+      and die "Cannot execute samtools view successfully: $!\n";
+    my @headers;
+    # read through the headers, then if we get at least one mapped read, open a file to print it
+    while(<SAM>){
+      last if not /^\@/; # first real record
+      push @headers, $_;
+    }
+    next if not defined $_; # got to EOF before a real record
+
+    my $outfile = "$tempdir/prep_bams_parallel_$$.".quotemeta($output_contig).".bam";
+    open(OUT, "| samtools view -b -S - > $outfile")
+      or die "Cannot open $outfile for writing: $!\n";
+    print OUT @headers, $_;
+    while(<SAM>){ # Copy over all lines.  Not using list context as this may be a huge dataset and would needlessly take up memory
+      print OUT $_;
+    }
+    close(SAM);
+    close(OUT);
+  }
+  unlink $temp_input_bam or warn "Could not remove $temp_input_bam: $!\n";
+}
+
+sub run_cmds {
+  my ($max_cmds, @cmd) = @_;
+  
+  my ($num_children, $pid); 
+  
+        for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){
+        # Initialize the number of child processes at 1, and increment it by one
+        # While it is less than $max_cmds
+
+                my $cmd = shift (@cmds);
+                if($pid = fork) {
+                        # do nothing if parent
+                } elsif(defined $pid) { # $pid is zero here if defined
+                        print STDERR $cmd, "\n";
+                        system $cmd;
+                        exit;
+                } else {
+                        #weird fork error
+                        die "Can't fork: $!\n";
+                }
+        }
+
+        while(@cmds) {
+                undef $pid;
+                FORK: {
+                        my $cmd = shift (@cmds);
+                        if($pid = fork) {
+                                # parent here
+                                $num_children++;
+                                wait;
+                                $num_children--;
+                                next;
+
+                        } elsif(defined $pid) { # $pid is zero here if defined
+                                print STDERR $cmd, "\n";
+                                system $cmd;
+                                exit;
+
+                        } else {
+                                #weird fork error
+                                die "Can't fork: $!\n";
+                        }
+                }
+        }
+        wait while $num_children--;
+}
+