annotate fastq_groomer_parallel.py @ 2:cac848910bd8

Uploaded
author kevyin
date Fri, 14 Sep 2012 03:17:38 -0400
parents 18a08d476d5e
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
0
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
1 # Kenneth Sabir
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
2 # Garvan Institute
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
3 import sys
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
4 import time
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
5 import os
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
6 import math
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
7 import subprocess
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
8 import string
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
9 import shutil
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
10 import pickle
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
11 import io
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
12 from multiprocessing import Process
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
13 from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
14
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
15
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
16
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
17 def main():
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
18 split_program = "split"
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
19 cat_program = "cat"
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
20 input_filename = sys.argv[1]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
21 output_filename = sys.argv[3]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
22 number_of_processes = 1;
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
23 if (len(sys.argv) > 7):
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
24 number_of_processes = int(sys.argv[7])
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
25 file_prefix = "temp_groomer_part_"
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
26
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
27 t1 = time.time()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
28 old_path = os.getcwd()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
29
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
30 lines_per_process,number_of_lines = calculate_lines_per_process(input_filename, number_of_processes)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
31 temp_dir_name = move_to_temp_dir()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
32 sequences = number_of_lines/4;
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
33 args = [split_program, "-l"+str(lines_per_process), input_filename, file_prefix]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
34 # print "The args are: " , args
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
35 subprocess.call(args)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
36 # print "Finished"
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
37 file_count = 0;
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
38 keep_checking = True
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
39 processes = []
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
40 output_filenames = []
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
41 while keep_checking:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
42
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
43 # only need to support 26x26 different processes, so do it brute force (ie not in a loop) for 2 chars.
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
44 lastchar = string.letters[file_count % len(string.letters)]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
45 firstchar = string.letters[(file_count / len(string.letters)) % len(string.letters)]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
46 temp_input_filename = "%s%c%c" % (file_prefix, firstchar, lastchar)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
47
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
48 # print 'looking for ' + temp_input_filename
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
49 if os.path.exists(temp_input_filename):
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
50 # print 'found ' + temp_input_filename
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
51 temp_output_filename = temp_input_filename + "_output"
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
52 output_filenames.append(temp_output_filename)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
53 p = Process(target=partition, args=([temp_input_filename, temp_output_filename, file_count]))
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
54 p.start()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
55 processes.append(p)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
56 file_count = file_count + 1
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
57 else:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
58 break
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
59 for p in processes :
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
60 p.join()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
61 cat_params = [cat_program]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
62 cat_params.extend(output_filenames)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
63 with open(output_filename, 'w') as catOutputFile:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
64 subprocess.call(cat_params, stdout=catOutputFile)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
65 summarize_input = sys.argv[6] == 'summarize_input'
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
66 input_type = sys.argv[2]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
67 output_type = sys.argv[4]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
68 print "Groomed %i %s reads into %s reads." % ( sequences, input_type, output_type )
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
69
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
70 aggregators = []
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
71 if summarize_input:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
72 for temp_output_filename in output_filenames :
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
73 with open(temp_output_filename + "_summary", 'r') as summaryLogFile:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
74 temp_aggregator = pickle.load(summaryLogFile)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
75 aggregators.append(temp_aggregator)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
76
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
77 print_aggregators(aggregators)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
78 os.chdir(old_path)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
79 shutil.rmtree(temp_dir_name)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
80 time2 = time.time()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
81 print 'Groomer took: %0.3f ms using %d processes' % (((time2 - t1)*1000.0), number_of_processes)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
82
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
83 def calculate_lines_per_process(input_filename, number_of_processes):
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
84 wc_program = "wc"
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
85 p = subprocess.Popen([wc_program, "-l", input_filename], stdout=subprocess.PIPE)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
86 out, err = p.communicate()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
87 number_of_lines = int(string.split(string.lstrip(out), ' ', 1)[0])
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
88 exact_lines_per_process = number_of_lines * 1.0 / number_of_processes
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
89 lines_per_process = int(math.ceil((exact_lines_per_process / 4.0))) * 4
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
90 return lines_per_process,number_of_lines
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
91
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
92 def move_to_temp_dir():
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
93 dirExists = False;
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
94 dir_name = None
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
95
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
96 while not dirExists:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
97 dir_name = "temp_groomer_part_" + str(time.time())
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
98 if not os.path.exists(dir_name):
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
99 os.makedirs(dir_name)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
100 break;
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
101 os.chdir(dir_name)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
102 return dir_name
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
103
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
104 def print_aggregators(aggregators):
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
105 total_ascii_range = [None, None]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
106 total_decimal_range = [None, None]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
107 total_valid_formats = set()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
108 for aggregator in aggregators:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
109 # print "This aggregators valid formats are: " + str(aggregator.get_valid_formats())
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
110 total_valid_formats = total_valid_formats.union(set(aggregator.get_valid_formats()))
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
111 ascii_range = aggregator.get_ascii_range()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
112 decimal_range = aggregator.get_decimal_range()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
113
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
114 if total_ascii_range[0] is None:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
115 total_ascii_range[0] = ascii_range[0]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
116 else:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
117 total_ascii_range[0] = min (total_ascii_range[0], ascii_range[0])
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
118
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
119 # max of None and a value is the value
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
120 total_ascii_range[1] = max (total_ascii_range[1], ascii_range[1])
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
121 if total_decimal_range[0] is None:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
122 total_decimal_range[0] = decimal_range[0]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
123 else:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
124 total_decimal_range[0] = min (total_decimal_range[0], decimal_range[0])
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
125 # max of None and a value is the value
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
126 total_decimal_range[1] = max (total_decimal_range[1], decimal_range[1])
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
127 print "total_valid_formats= " + str(total_valid_formats)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
128 print "Based upon quality and sequence, the input data is valid for: %s" % ( ", ".join( total_valid_formats ) or "None" )
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
129 print "Input ASCII range: %s(%i) - %s(%i)" % ( repr( total_ascii_range[0] ), ord( total_ascii_range[0] ), repr( total_ascii_range[1] ), ord( total_ascii_range[1] ) ) #print using repr, since \x00 (null) causes info truncation in galaxy when printed
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
130 print "Input decimal range: %i - %i" % ( total_decimal_range[0], total_decimal_range[1] )
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
131
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
132
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
133 def partition(input_filename, temp_output_filename, fileCount):
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
134 # print 'Starting Thread: ' + str(fileCount)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
135 # input_filename = sys.argv[1]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
136 input_type = sys.argv[2]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
137 output_type = sys.argv[4]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
138 force_quality_encoding = sys.argv[5]
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
139 summarize_input = sys.argv[6] == 'summarize_input'
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
140 if force_quality_encoding == 'None':
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
141 force_quality_encoding = None
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
142 aggregator = fastqAggregator()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
143 temp_process_file = fastqWriter( open( temp_output_filename, 'wb'), format = output_type, force_quality_encoding = force_quality_encoding )
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
144 read_count = None
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
145 if summarize_input:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
146 reader = fastqVerboseErrorReader
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
147 else:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
148 reader = fastqReader
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
149 for read_count, fastq_read in enumerate( reader( open(input_filename, 'rb'), format = input_type, apply_galaxy_conventions = True ) ):
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
150 if summarize_input:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
151 aggregator.consume_read( fastq_read )
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
152 temp_process_file.write( fastq_read )
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
153 # print "Just wrote (%d): " % read_count + str(fastq_read)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
154 temp_process_file.close()
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
155 if read_count is not None:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
156 if input_type != output_type and 'solexa' in [ input_type, output_type ]:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
157 print "Converted between Solexa and PHRED scores."
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
158 if summarize_input:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
159 with open(temp_output_filename + "_summary", 'w') as summaryLogFile :
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
160 pickle.dump(aggregator, summaryLogFile)
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
161 else:
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
162 print "No valid FASTQ reads were provided."
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
163
18a08d476d5e First upload 0.3.0
kevyin
parents:
diff changeset
164 if __name__ == "__main__": main()