comparison vsnp_determine_ref_from_data.py @ 4:36bdf8b439ed draft

Uploaded
author greg
date Sun, 03 Jan 2021 16:13:22 +0000
parents ebc08e5ce646
children
comparison
equal deleted inserted replaced
3:6116deacb2c7 4:36bdf8b439ed
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 2
3 import argparse 3 import argparse
4 import gzip 4 import gzip
5 import multiprocessing
6 import os 5 import os
7 import queue 6 from collections import OrderedDict
7
8 import yaml 8 import yaml
9 from Bio.SeqIO.QualityIO import FastqGeneralIterator 9 from Bio.SeqIO.QualityIO import FastqGeneralIterator
10 from collections import OrderedDict 10
11
12 INPUT_READS_DIR = 'input_reads'
13 OUTPUT_DBKEY_DIR = 'output_dbkey' 11 OUTPUT_DBKEY_DIR = 'output_dbkey'
14 OUTPUT_METRICS_DIR = 'output_metrics' 12 OUTPUT_METRICS_DIR = 'output_metrics'
15 13
16 14
17 def get_base_file_name(file_path): 15 def get_sample_name(file_path):
18 base_file_name = os.path.basename(file_path) 16 base_file_name = os.path.basename(file_path)
19 if base_file_name.find(".") > 0: 17 if base_file_name.find(".") > 0:
20 # Eliminate the extension. 18 # Eliminate the extension.
21 return os.path.splitext(base_file_name)[0] 19 return os.path.splitext(base_file_name)[0]
22 elif base_file_name.find("_") > 0:
23 # The dot extension was likely changed to
24 # the " character.
25 items = base_file_name.split("_")
26 no_ext = "_".join(items[0:-2])
27 if len(no_ext) > 0:
28 return no_ext
29 return base_file_name 20 return base_file_name
30 21
31 22
32 def get_dbkey(dnaprints_dict, key, s): 23 def get_dbkey(dnaprints_dict, key, s):
33 # dnaprints_dict looks something like this: 24 # dnaprints_dict looks something like this:
89 group = "" 80 group = ""
90 dbkey = "" 81 dbkey = ""
91 return group, dbkey 82 return group, dbkey
92 83
93 84
94 def get_group_and_dbkey_for_collection(task_queue, finished_queue, dnaprints_dict, timeout):
95 while True:
96 try:
97 tup = task_queue.get(block=True, timeout=timeout)
98 except queue.Empty:
99 break
100 fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum = tup
101 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)
102 finished_queue.put((fastq_file, count_list, group, dbkey))
103 task_queue.task_done()
104
105
106 def get_oligo_dict(): 85 def get_oligo_dict():
107 oligo_dict = {} 86 oligo_dict = {}
108 oligo_dict["01_ab1"] = "AATTGTCGGATAGCCTGGCGATAACGACGC" 87 oligo_dict["01_ab1"] = "AATTGTCGGATAGCCTGGCGATAACGACGC"
109 oligo_dict["02_ab3"] = "CACACGCGGGCCGGAACTGCCGCAAATGAC" 88 oligo_dict["02_ab3"] = "CACACGCGGGCCGGAACTGCCGCAAATGAC"
110 oligo_dict["03_ab5"] = "GCTGAAGCGGCAGACCGGCAGAACGAATAT" 89 oligo_dict["03_ab5"] = "GCTGAAGCGGCAGACCGGCAGAACGAATAT"
136 115
137 116
138 def get_seq_counts(value, fastq_list, gzipped): 117 def get_seq_counts(value, fastq_list, gzipped):
139 count = 0 118 count = 0
140 for fastq_file in fastq_list: 119 for fastq_file in fastq_list:
141 if gzipped == "true": 120 if gzipped:
142 with gzip.open(fastq_file, 'rt') as fh: 121 with gzip.open(fastq_file, 'rt') as fh:
143 for title, seq, qual in FastqGeneralIterator(fh): 122 for title, seq, qual in FastqGeneralIterator(fh):
144 count += seq.count(value) 123 count += seq.count(value)
145 else: 124 else:
146 with open(fastq_file, 'r') as fh: 125 with open(fastq_file, 'r') as fh:
162 count_list.append(v) 141 count_list.append(v)
163 brucella_sum = sum(count_list[:16]) 142 brucella_sum = sum(count_list[:16])
164 bovis_sum = sum(count_list[16:24]) 143 bovis_sum = sum(count_list[16:24])
165 para_sum = sum(count_list[24:]) 144 para_sum = sum(count_list[24:])
166 return count_summary, count_list, brucella_sum, bovis_sum, para_sum 145 return count_summary, count_list, brucella_sum, bovis_sum, para_sum
167
168
169 def get_species_counts_for_collection(task_queue, finished_queue, gzipped, timeout):
170 while True:
171 try:
172 fastq_file = task_queue.get(block=True, timeout=timeout)
173 except queue.Empty:
174 break
175 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts([fastq_file], gzipped)
176 finished_queue.put((fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum))
177 task_queue.task_done()
178 146
179 147
180 def get_species_strings(count_summary): 148 def get_species_strings(count_summary):
181 binary_dictionary = {} 149 binary_dictionary = {}
182 for k, v in count_summary.items(): 150 for k, v in count_summary.items():
195 para_binary = binary_list[24:] 163 para_binary = binary_list[24:]
196 para_string = ''.join(str(e) for e in para_binary) 164 para_string = ''.join(str(e) for e in para_binary)
197 return brucella_string, bovis_string, para_string 165 return brucella_string, bovis_string, para_string
198 166
199 167
200 def get_species_strings_for_collection(task_queue, finished_queue, timeout): 168 def output_dbkey(file_name, dbkey, output_file):
201 while True:
202 try:
203 tup = task_queue.get(block=True, timeout=timeout)
204 except queue.Empty:
205 break
206 fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum = tup
207 brucella_string, bovis_string, para_string = get_species_strings(count_summary)
208 finished_queue.put((fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum))
209 task_queue.task_done()
210
211
212 def output_dbkey(file_name, dbkey, output_file=None):
213 # Output the dbkey. 169 # Output the dbkey.
214 if output_file is None:
215 # We're producing a dataset collection.
216 output_file = os.path.join(OUTPUT_DBKEY_DIR, "%s.txt" % file_name)
217 with open(output_file, "w") as fh: 170 with open(output_file, "w") as fh:
218 fh.write("%s" % dbkey) 171 fh.write("%s" % dbkey)
219 172
220 173
221 def output_files(fastq_file, count_list, group, dbkey, dbkey_file=None, metrics_file=None): 174 def output_files(fastq_file, count_list, group, dbkey, dbkey_file, metrics_file):
222 base_file_name = get_base_file_name(fastq_file) 175 base_file_name = get_sample_name(fastq_file)
223 if dbkey_file is not None:
224 # We're dealing with a single read or
225 # a set of paired reads. If the latter,
226 # the following will hopefully produce a
227 # good sample string.
228 if base_file_name.find("_") > 0:
229 base_file_name = base_file_name.split("_")[0]
230 output_dbkey(base_file_name, dbkey, dbkey_file) 176 output_dbkey(base_file_name, dbkey, dbkey_file)
231 output_metrics(base_file_name, count_list, group, dbkey, metrics_file) 177 output_metrics(base_file_name, count_list, group, dbkey, metrics_file)
232 178
233 179
234 def output_files_for_collection(task_queue, timeout): 180 def output_metrics(file_name, count_list, group, dbkey, output_file):
235 while True:
236 try:
237 tup = task_queue.get(block=True, timeout=timeout)
238 except queue.Empty:
239 break
240 fastq_file, count_list, group, dbkey = tup
241 output_files(fastq_file, count_list, group, dbkey)
242 task_queue.task_done()
243
244
245 def output_metrics(file_name, count_list, group, dbkey, output_file=None):
246 # Output the metrics. 181 # Output the metrics.
247 if output_file is None:
248 # We're producing a dataset collection.
249 output_file = os.path.join(OUTPUT_METRICS_DIR, "%s.txt" % file_name)
250 with open(output_file, "w") as fh: 182 with open(output_file, "w") as fh:
251 fh.write("Sample: %s\n" % file_name) 183 fh.write("Sample: %s\n" % file_name)
252 fh.write("Brucella counts: ") 184 fh.write("Brucella counts: ")
253 for i in count_list[:16]: 185 for i in count_list[:16]:
254 fh.write("%d," % i) 186 fh.write("%d," % i)
260 fh.write("%d," % i) 192 fh.write("%d," % i)
261 fh.write("\nGroup: %s" % group) 193 fh.write("\nGroup: %s" % group)
262 fh.write("\ndbkey: %s\n" % dbkey) 194 fh.write("\ndbkey: %s\n" % dbkey)
263 195
264 196
265 def set_num_cpus(num_files, processes):
266 num_cpus = int(multiprocessing.cpu_count())
267 if num_files < num_cpus and num_files < processes:
268 return num_files
269 if num_cpus < processes:
270 half_cpus = int(num_cpus / 2)
271 if num_files < half_cpus:
272 return num_files
273 return half_cpus
274 return processes
275
276
277 if __name__ == '__main__': 197 if __name__ == '__main__':
278 parser = argparse.ArgumentParser() 198 parser = argparse.ArgumentParser()
279 199
280 parser.add_argument('--dnaprint_fields', action='append', dest='dnaprint_fields', nargs=2, help="List of dnaprints data table value, name and path fields") 200 parser.add_argument('--dnaprint_fields', action='append', dest='dnaprint_fields', nargs=2, help="List of dnaprints data table value, name and path fields")
281 parser.add_argument('--read1', action='store', dest='read1', required=False, default=None, help='Required: single read') 201 parser.add_argument('--read1', action='store', dest='read1', help='Required: single read')
282 parser.add_argument('--read2', action='store', dest='read2', required=False, default=None, help='Optional: paired read') 202 parser.add_argument('--read2', action='store', dest='read2', required=False, default=None, help='Optional: paired read')
283 parser.add_argument('--gzipped', action='store', dest='gzipped', help='Input files are gzipped') 203 parser.add_argument('--gzipped', action='store_true', dest='gzipped', help='Input files are gzipped')
284 parser.add_argument('--output_dbkey', action='store', dest='output_dbkey', required=False, default=None, help='Output reference file') 204 parser.add_argument('--output_dbkey', action='store', dest='output_dbkey', help='Output reference file')
285 parser.add_argument('--output_metrics', action='store', dest='output_metrics', required=False, default=None, help='Output metrics file') 205 parser.add_argument('--output_metrics', action='store', dest='output_metrics', help='Output metrics file')
286 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
287 206
288 args = parser.parse_args() 207 args = parser.parse_args()
289 208
290 collection = False 209 fastq_list = [args.read1]
291 fastq_list = [] 210 if args.read2 is not None:
292 if args.read1 is not None: 211 fastq_list.append(args.read2)
293 fastq_list.append(args.read1)
294 if args.read2 is not None:
295 fastq_list.append(args.read2)
296 else:
297 collection = True
298 for file_name in sorted(os.listdir(INPUT_READS_DIR)):
299 file_path = os.path.abspath(os.path.join(INPUT_READS_DIR, file_name))
300 fastq_list.append(file_path)
301 212
302 # The value of dnaprint_fields is a list of lists, where each list is 213 # The value of dnaprint_fields is a list of lists, where each list is
303 # the [value, name, path] components of the vsnp_dnaprints data table. 214 # the [value, name, path] components of the vsnp_dnaprints data table.
304 # The data_manager_vsnp_dnaprints tool assigns the dbkey column from the 215 # The data_manager_vsnp_dnaprints tool assigns the dbkey column from the
305 # all_fasta data table to the value column in the vsnp_dnaprints data 216 # all_fasta data table to the value column in the vsnp_dnaprints data
306 # table to ensure a proper mapping for discovering the dbkey. 217 # table to ensure a proper mapping for discovering the dbkey.
307 dnaprints_dict = get_dnaprints_dict(args.dnaprint_fields) 218 dnaprints_dict = get_dnaprints_dict(args.dnaprint_fields)
308 219
309 if collection: 220 # Here fastq_list consists of either a single read
310 # Here fastq_list consists of any number of 221 # or a set of paired reads, producing single outputs.
311 # reads, so each file will be processed and 222 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts(fastq_list, args.gzipped)
312 # dataset collections will be produced as outputs. 223 brucella_string, bovis_string, para_string = get_species_strings(count_summary)
313 multiprocessing.set_start_method('spawn') 224 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)
314 queue1 = multiprocessing.JoinableQueue() 225 output_files(args.read1, count_list, group, dbkey, dbkey_file=args.output_dbkey, metrics_file=args.output_metrics)
315 queue2 = multiprocessing.JoinableQueue()
316 num_files = len(fastq_list)
317 cpus = set_num_cpus(num_files, args.processes)
318 # Set a timeout for get()s in the queue.
319 timeout = 0.05
320
321 for fastq_file in fastq_list:
322 queue1.put(fastq_file)
323
324 # Complete the get_species_counts task.
325 processes = [multiprocessing.Process(target=get_species_counts_for_collection, args=(queue1, queue2, args.gzipped, timeout, )) for _ in range(cpus)]
326 for p in processes:
327 p.start()
328 for p in processes:
329 p.join()
330 queue1.join()
331
332 # Complete the get_species_strings task.
333 processes = [multiprocessing.Process(target=get_species_strings_for_collection, args=(queue2, queue1, timeout, )) for _ in range(cpus)]
334 for p in processes:
335 p.start()
336 for p in processes:
337 p.join()
338 queue2.join()
339
340 # Complete the get_group_and_dbkey task.
341 processes = [multiprocessing.Process(target=get_group_and_dbkey_for_collection, args=(queue1, queue2, dnaprints_dict, timeout, )) for _ in range(cpus)]
342 for p in processes:
343 p.start()
344 for p in processes:
345 p.join()
346 queue1.join()
347
348 # Complete the output_files task.
349 processes = [multiprocessing.Process(target=output_files_for_collection, args=(queue2, timeout, )) for _ in range(cpus)]
350 for p in processes:
351 p.start()
352 for p in processes:
353 p.join()
354 queue2.join()
355
356 if queue1.empty() and queue2.empty():
357 queue1.close()
358 queue1.join_thread()
359 queue2.close()
360 queue2.join_thread()
361 else:
362 # Here fastq_list consists of either a single read
363 # or a set of paired reads, producing single outputs.
364 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts(fastq_list, args.gzipped)
365 brucella_string, bovis_string, para_string = get_species_strings(count_summary)
366 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)
367 output_files(args.read1, count_list, group, dbkey, dbkey_file=args.output_dbkey, metrics_file=args.output_metrics)