Mercurial > repos > yusuf > dedup_realign_bam
comparison prep_bams_parallel @ 0:6b673ffd9e38 default tip
initial commit
| author | Yusuf Ali <ali@yusuf.email> |
|---|---|
| date | Wed, 25 Mar 2015 13:29:45 -0600 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:6b673ffd9e38 |
|---|---|
| 1 #!/usr/bin/env perl | |
| 2 | |
| 3 use File::Basename; | |
| 4 use strict; | |
| 5 use warnings; | |
| 6 | |
| 7 @ARGV > 5 or die "Usage: $0 <num parallel processes> <output log> <reference.fa> <temporary directory> <deduped_realigned_bam_prefix> <input1.bam> [input2.bam] ...\n"; | |
| 8 | |
| 9 my $num_procs = shift @ARGV; | |
| 10 my $log = shift @ARGV; | |
| 11 my $ref = shift @ARGV; | |
| 12 my $tmpdir = shift @ARGV; | |
| 13 my $outfile_prefix = shift @ARGV; | |
| 14 | |
| 15 my $progdir = dirname($0); | |
| 16 | |
| 17 # Split the input data into separate files for each chromosome, so we can process them in parallel | |
| 18 my $chr_files_hashref = &generate_chr_split_bams($tmpdir, @ARGV); | |
| 19 | |
| 20 my (@cmds, @logs); | |
| 21 for my $chr (keys %$chr_files_hashref){ | |
| 22 push @cmds, "$progdir/prep_bams $log.$chr $ref $tmpdir $outfile_prefix.$chr.bam ".$chr_files_hashref->{$chr}; | |
| 23 push @logs, "$log.$chr"; | |
| 24 } | |
| 25 | |
| 26 # Run the chrs in parallel to the desrired degree | |
| 27 &run_cmds($num_procs, @cmds); | |
| 28 | |
| 29 # Remove the split input files, as they were only needed temporarily | |
| 30 for my $chr_file (values %$chr_files_hashref){ | |
| 31 unlink $chr_file or warn "Cannot remove temporary file $chr_file: $!\n"; | |
| 32 } | |
| 33 | |
| 34 # Merge the logs | |
| 35 open(LOG, ">$log") | |
| 36 or die "Could not open output log $log for writing: $!\n"; | |
| 37 for my $l (@logs){ | |
| 38 open(L, $l) | |
| 39 or warn "Log file $l was expected but not found, data processing may have been incomplete.\n"; | |
| 40 print LOG <L>; | |
| 41 close(L); | |
| 42 unlink $l; | |
| 43 } | |
| 44 close(LOG); | |
| 45 | |
| 46 # Merge the input BAMsinto one file, then split them into per contig BAM files for downstream parallelization | |
| 47 sub generate_chr_split_bams{ | |
| 48 my ($tempdir, @input_bams) = @_; | |
| 49 | |
| 50 my $temp_input_bam = $input_bams[0]; | |
| 51 if(@input_bams > 1){ | |
| 52 $temp_input_bam = "$tempdir/prep_bams_parallel_$$.bam"; | |
| 53 system ("samtools merge $temp_input_bam ".join(" ", @input_bams)) >> 8 | |
| 54 and die "Could not successfully run samtools merge: exit status was $?\n"; | |
| 55 } | |
| 56 if(not -e "$temp_input_bam.bai" or -z "$temp_input_bam.bai"){ | |
| 57 system ("samtools index $temp_input_bam") >> 8 | |
| 58 and die "Could not successfully run samtools index: exit status was $?\n"; | |
| 59 } | |
| 60 my @output_contigs; | |
| 61 open(SAM, "samtools view -H $temp_input_bam |") | |
| 62 or die "Cannot run samtools view on $temp_input_bam: $!\n"; | |
| 63 while(<SAM>){ | |
| 64 if(/^\@SQ\tSN:(\S+)/){ | |
| 65 push @output_contigs, $1; | |
| 66 } | |
| 67 } | |
| 68 close(SAM); | |
| 69 | |
| 70 my @split_bamfiles; | |
| 71 my %contig2outfile; | |
| 72 for my $output_contig (@output_contigs){ | |
| 73 open(SAM, "samtools view -h $temp_input_bam '$output_contig'|") | |
| 74 and die "Cannot execute samtools view successfully: $!\n"; | |
| 75 my @headers; | |
| 76 # read through the headers, then if we get at least one mapped read, open a file to print it | |
| 77 while(<SAM>){ | |
| 78 last if not /^\@/; # first real record | |
| 79 push @headers, $_; | |
| 80 } | |
| 81 next if not defined $_; # got to EOF before a real record | |
| 82 | |
| 83 my $outfile = "$tempdir/prep_bams_parallel_$$.".quotemeta($output_contig).".bam"; | |
| 84 open(OUT, "| samtools view -b -S - > $outfile") | |
| 85 or die "Cannot open $outfile for writing: $!\n"; | |
| 86 print OUT @headers, $_; | |
| 87 while(<SAM>){ # Copy over all lines. Not using list context as this may be a huge dataset and would needlessly take up memory | |
| 88 print OUT $_; | |
| 89 } | |
| 90 close(SAM); | |
| 91 close(OUT); | |
| 92 } | |
| 93 unlink $temp_input_bam or warn "Could not remove $temp_input_bam: $!\n"; | |
| 94 } | |
| 95 | |
| 96 sub run_cmds { | |
| 97 my ($max_cmds, @cmd) = @_; | |
| 98 | |
| 99 my ($num_children, $pid); | |
| 100 | |
| 101 for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){ | |
| 102 # Initialize the number of child processes at 1, and increment it by one | |
| 103 # While it is less than $max_cmds | |
| 104 | |
| 105 my $cmd = shift (@cmds); | |
| 106 if($pid = fork) { | |
| 107 # do nothing if parent | |
| 108 } elsif(defined $pid) { # $pid is zero here if defined | |
| 109 print STDERR $cmd, "\n"; | |
| 110 system $cmd; | |
| 111 exit; | |
| 112 } else { | |
| 113 #weird fork error | |
| 114 die "Can't fork: $!\n"; | |
| 115 } | |
| 116 } | |
| 117 | |
| 118 while(@cmds) { | |
| 119 undef $pid; | |
| 120 FORK: { | |
| 121 my $cmd = shift (@cmds); | |
| 122 if($pid = fork) { | |
| 123 # parent here | |
| 124 $num_children++; | |
| 125 wait; | |
| 126 $num_children--; | |
| 127 next; | |
| 128 | |
| 129 } elsif(defined $pid) { # $pid is zero here if defined | |
| 130 print STDERR $cmd, "\n"; | |
| 131 system $cmd; | |
| 132 exit; | |
| 133 | |
| 134 } else { | |
| 135 #weird fork error | |
| 136 die "Can't fork: $!\n"; | |
| 137 } | |
| 138 } | |
| 139 } | |
| 140 wait while $num_children--; | |
| 141 } | |
| 142 |
