0
|
1 #!/usr/bin/env perl
|
|
2
|
|
3 use File::Basename;
|
|
4 use strict;
|
|
5 use warnings;
|
|
6
|
|
7 @ARGV > 5 or die "Usage: $0 <num parallel processes> <output log> <reference.fa> <temporary directory> <deduped_realigned_bam_prefix> <input1.bam> [input2.bam] ...\n";
|
|
8
|
|
9 my $num_procs = shift @ARGV;
|
|
10 my $log = shift @ARGV;
|
|
11 my $ref = shift @ARGV;
|
|
12 my $tmpdir = shift @ARGV;
|
|
13 my $outfile_prefix = shift @ARGV;
|
|
14
|
|
15 my $progdir = dirname($0);
|
|
16
|
|
17 # Split the input data into separate files for each chromosome, so we can process them in parallel
|
|
18 my $chr_files_hashref = &generate_chr_split_bams($tmpdir, @ARGV);
|
|
19
|
|
20 my (@cmds, @logs);
|
|
21 for my $chr (keys %$chr_files_hashref){
|
|
22 push @cmds, "$progdir/prep_bams $log.$chr $ref $tmpdir $outfile_prefix.$chr.bam ".$chr_files_hashref->{$chr};
|
|
23 push @logs, "$log.$chr";
|
|
24 }
|
|
25
|
|
26 # Run the chrs in parallel to the desrired degree
|
|
27 &run_cmds($num_procs, @cmds);
|
|
28
|
|
29 # Remove the split input files, as they were only needed temporarily
|
|
30 for my $chr_file (values %$chr_files_hashref){
|
|
31 unlink $chr_file or warn "Cannot remove temporary file $chr_file: $!\n";
|
|
32 }
|
|
33
|
|
34 # Merge the logs
|
|
35 open(LOG, ">$log")
|
|
36 or die "Could not open output log $log for writing: $!\n";
|
|
37 for my $l (@logs){
|
|
38 open(L, $l)
|
|
39 or warn "Log file $l was expected but not found, data processing may have been incomplete.\n";
|
|
40 print LOG <L>;
|
|
41 close(L);
|
|
42 unlink $l;
|
|
43 }
|
|
44 close(LOG);
|
|
45
|
|
46 # Merge the input BAMsinto one file, then split them into per contig BAM files for downstream parallelization
|
|
47 sub generate_chr_split_bams{
|
|
48 my ($tempdir, @input_bams) = @_;
|
|
49
|
|
50 my $temp_input_bam = $input_bams[0];
|
|
51 if(@input_bams > 1){
|
|
52 $temp_input_bam = "$tempdir/prep_bams_parallel_$$.bam";
|
|
53 system ("samtools merge $temp_input_bam ".join(" ", @input_bams)) >> 8
|
|
54 and die "Could not successfully run samtools merge: exit status was $?\n";
|
|
55 }
|
|
56 if(not -e "$temp_input_bam.bai" or -z "$temp_input_bam.bai"){
|
|
57 system ("samtools index $temp_input_bam") >> 8
|
|
58 and die "Could not successfully run samtools index: exit status was $?\n";
|
|
59 }
|
|
60 my @output_contigs;
|
|
61 open(SAM, "samtools view -H $temp_input_bam |")
|
|
62 or die "Cannot run samtools view on $temp_input_bam: $!\n";
|
|
63 while(<SAM>){
|
|
64 if(/^\@SQ\tSN:(\S+)/){
|
|
65 push @output_contigs, $1;
|
|
66 }
|
|
67 }
|
|
68 close(SAM);
|
|
69
|
|
70 my @split_bamfiles;
|
|
71 my %contig2outfile;
|
|
72 for my $output_contig (@output_contigs){
|
|
73 open(SAM, "samtools view -h $temp_input_bam '$output_contig'|")
|
|
74 and die "Cannot execute samtools view successfully: $!\n";
|
|
75 my @headers;
|
|
76 # read through the headers, then if we get at least one mapped read, open a file to print it
|
|
77 while(<SAM>){
|
|
78 last if not /^\@/; # first real record
|
|
79 push @headers, $_;
|
|
80 }
|
|
81 next if not defined $_; # got to EOF before a real record
|
|
82
|
|
83 my $outfile = "$tempdir/prep_bams_parallel_$$.".quotemeta($output_contig).".bam";
|
|
84 open(OUT, "| samtools view -b -S - > $outfile")
|
|
85 or die "Cannot open $outfile for writing: $!\n";
|
|
86 print OUT @headers, $_;
|
|
87 while(<SAM>){ # Copy over all lines. Not using list context as this may be a huge dataset and would needlessly take up memory
|
|
88 print OUT $_;
|
|
89 }
|
|
90 close(SAM);
|
|
91 close(OUT);
|
|
92 }
|
|
93 unlink $temp_input_bam or warn "Could not remove $temp_input_bam: $!\n";
|
|
94 }
|
|
95
|
|
96 sub run_cmds {
|
|
97 my ($max_cmds, @cmd) = @_;
|
|
98
|
|
99 my ($num_children, $pid);
|
|
100
|
|
101 for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){
|
|
102 # Initialize the number of child processes at 1, and increment it by one
|
|
103 # While it is less than $max_cmds
|
|
104
|
|
105 my $cmd = shift (@cmds);
|
|
106 if($pid = fork) {
|
|
107 # do nothing if parent
|
|
108 } elsif(defined $pid) { # $pid is zero here if defined
|
|
109 print STDERR $cmd, "\n";
|
|
110 system $cmd;
|
|
111 exit;
|
|
112 } else {
|
|
113 #weird fork error
|
|
114 die "Can't fork: $!\n";
|
|
115 }
|
|
116 }
|
|
117
|
|
118 while(@cmds) {
|
|
119 undef $pid;
|
|
120 FORK: {
|
|
121 my $cmd = shift (@cmds);
|
|
122 if($pid = fork) {
|
|
123 # parent here
|
|
124 $num_children++;
|
|
125 wait;
|
|
126 $num_children--;
|
|
127 next;
|
|
128
|
|
129 } elsif(defined $pid) { # $pid is zero here if defined
|
|
130 print STDERR $cmd, "\n";
|
|
131 system $cmd;
|
|
132 exit;
|
|
133
|
|
134 } else {
|
|
135 #weird fork error
|
|
136 die "Can't fork: $!\n";
|
|
137 }
|
|
138 }
|
|
139 }
|
|
140 wait while $num_children--;
|
|
141 }
|
|
142
|