Mercurial > repos > greg > vsnp_determine_ref_from_data
comparison vsnp_determine_ref_from_data.py @ 0:ebc08e5ce646 draft
Uploaded
author | greg |
---|---|
date | Tue, 21 Apr 2020 10:08:28 -0400 |
parents | |
children | 36bdf8b439ed |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:ebc08e5ce646 |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 import argparse | |
4 import gzip | |
5 import multiprocessing | |
6 import os | |
7 import queue | |
8 import yaml | |
9 from Bio.SeqIO.QualityIO import FastqGeneralIterator | |
10 from collections import OrderedDict | |
11 | |
12 INPUT_READS_DIR = 'input_reads' | |
13 OUTPUT_DBKEY_DIR = 'output_dbkey' | |
14 OUTPUT_METRICS_DIR = 'output_metrics' | |
15 | |
16 | |
17 def get_base_file_name(file_path): | |
18 base_file_name = os.path.basename(file_path) | |
19 if base_file_name.find(".") > 0: | |
20 # Eliminate the extension. | |
21 return os.path.splitext(base_file_name)[0] | |
22 elif base_file_name.find("_") > 0: | |
23 # The dot extension was likely changed to | |
24 # the " character. | |
25 items = base_file_name.split("_") | |
26 no_ext = "_".join(items[0:-2]) | |
27 if len(no_ext) > 0: | |
28 return no_ext | |
29 return base_file_name | |
30 | |
31 | |
32 def get_dbkey(dnaprints_dict, key, s): | |
33 # dnaprints_dict looks something like this: | |
34 # {'brucella': {'NC_002945v4': ['11001110', '11011110', '11001100']} | |
35 # {'bovis': {'NC_006895': ['11111110', '00010010', '01111011']}} | |
36 d = dnaprints_dict.get(key, {}) | |
37 for data_table_value, v_list in d.items(): | |
38 if s in v_list: | |
39 return data_table_value | |
40 return "" | |
41 | |
42 | |
43 def get_dnaprints_dict(dnaprint_fields): | |
44 # A dndprint_fields entry looks something liek this. | |
45 # [['AF2122', '/galaxy/tool-data/vsnp/AF2122/dnaprints/NC_002945v4.yml']] | |
46 dnaprints_dict = {} | |
47 for item in dnaprint_fields: | |
48 # Here item is a 2-element list of data | |
49 # table components, # value and path. | |
50 value = item[0] | |
51 path = item[1].strip() | |
52 with open(path, "rt") as fh: | |
53 # The format of all dnaprints yaml | |
54 # files is something like this: | |
55 # brucella: | |
56 # - 0111111111111111 | |
57 print_dict = yaml.load(fh, Loader=yaml.Loader) | |
58 for print_dict_k, print_dict_v in print_dict.items(): | |
59 dnaprints_v_dict = dnaprints_dict.get(print_dict_k, {}) | |
60 if len(dnaprints_v_dict) > 0: | |
61 # dnaprints_dict already contains k (e.g., 'brucella', | |
62 # and dnaprints_v_dict will be a dictionary # that | |
63 # looks something like this: | |
64 # {'NC_002945v4': ['11001110', '11011110', '11001100']} | |
65 value_list = dnaprints_v_dict.get(value, []) | |
66 value_list = value_list + print_dict_v | |
67 dnaprints_v_dict[value] = value_list | |
68 else: | |
69 # dnaprints_v_dict is an empty dictionary. | |
70 dnaprints_v_dict[value] = print_dict_v | |
71 dnaprints_dict[print_dict_k] = dnaprints_v_dict | |
72 # dnaprints_dict looks something like this: | |
73 # {'brucella': {'NC_002945v4': ['11001110', '11011110', '11001100']} | |
74 # {'bovis': {'NC_006895': ['11111110', '00010010', '01111011']}} | |
75 return dnaprints_dict | |
76 | |
77 | |
78 def get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum): | |
79 if brucella_sum > 3: | |
80 group = "Brucella" | |
81 dbkey = get_dbkey(dnaprints_dict, "brucella", brucella_string) | |
82 elif bovis_sum > 3: | |
83 group = "TB" | |
84 dbkey = get_dbkey(dnaprints_dict, "bovis", bovis_string) | |
85 elif para_sum >= 1: | |
86 group = "paraTB" | |
87 dbkey = get_dbkey(dnaprints_dict, "para", para_string) | |
88 else: | |
89 group = "" | |
90 dbkey = "" | |
91 return group, dbkey | |
92 | |
93 | |
94 def get_group_and_dbkey_for_collection(task_queue, finished_queue, dnaprints_dict, timeout): | |
95 while True: | |
96 try: | |
97 tup = task_queue.get(block=True, timeout=timeout) | |
98 except queue.Empty: | |
99 break | |
100 fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum = tup | |
101 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum) | |
102 finished_queue.put((fastq_file, count_list, group, dbkey)) | |
103 task_queue.task_done() | |
104 | |
105 | |
106 def get_oligo_dict(): | |
107 oligo_dict = {} | |
108 oligo_dict["01_ab1"] = "AATTGTCGGATAGCCTGGCGATAACGACGC" | |
109 oligo_dict["02_ab3"] = "CACACGCGGGCCGGAACTGCCGCAAATGAC" | |
110 oligo_dict["03_ab5"] = "GCTGAAGCGGCAGACCGGCAGAACGAATAT" | |
111 oligo_dict["04_mel"] = "TGTCGCGCGTCAAGCGGCGTGAAATCTCTG" | |
112 oligo_dict["05_suis1"] = "TGCGTTGCCGTGAAGCTTAATTCGGCTGAT" | |
113 oligo_dict["06_suis2"] = "GGCAATCATGCGCAGGGCTTTGCATTCGTC" | |
114 oligo_dict["07_suis3"] = "CAAGGCAGATGCACATAATCCGGCGACCCG" | |
115 oligo_dict["08_ceti1"] = "GTGAATATAGGGTGAATTGATCTTCAGCCG" | |
116 oligo_dict["09_ceti2"] = "TTACAAGCAGGCCTATGAGCGCGGCGTGAA" | |
117 oligo_dict["10_canis4"] = "CTGCTACATAAAGCACCCGGCGACCGAGTT" | |
118 oligo_dict["11_canis"] = "ATCGTTTTGCGGCATATCGCTGACCACAGC" | |
119 oligo_dict["12_ovis"] = "CACTCAATCTTCTCTACGGGCGTGGTATCC" | |
120 oligo_dict["13_ether2"] = "CGAAATCGTGGTGAAGGACGGGACCGAACC" | |
121 oligo_dict["14_63B1"] = "CCTGTTTAAAAGAATCGTCGGAACCGCTCT" | |
122 oligo_dict["15_16M0"] = "TCCCGCCGCCATGCCGCCGAAAGTCGCCGT" | |
123 oligo_dict["16_mel1b"] = "TCTGTCCAAACCCCGTGACCGAACAATAGA" | |
124 oligo_dict["17_tb157"] = "CTCTTCGTATACCGTTCCGTCGTCACCATGGTCCT" | |
125 oligo_dict["18_tb7"] = "TCACGCAGCCAACGATATTCGTGTACCGCGACGGT" | |
126 oligo_dict["19_tbbov"] = "CTGGGCGACCCGGCCGACCTGCACACCGCGCATCA" | |
127 oligo_dict["20_tb5"] = "CCGTGGTGGCGTATCGGGCCCCTGGATCGCGCCCT" | |
128 oligo_dict["21_tb2"] = "ATGTCTGCGTAAAGAAGTTCCATGTCCGGGAAGTA" | |
129 oligo_dict["22_tb3"] = "GAAGACCTTGATGCCGATCTGGGTGTCGATCTTGA" | |
130 oligo_dict["23_tb4"] = "CGGTGTTGAAGGGTCCCCCGTTCCAGAAGCCGGTG" | |
131 oligo_dict["24_tb6"] = "ACGGTGATTCGGGTGGTCGACACCGATGGTTCAGA" | |
132 oligo_dict["25_para"] = "CCTTTCTTGAAGGGTGTTCG" | |
133 oligo_dict["26_para_sheep"] = "CGTGGTGGCGACGGCGGCGGGCCTGTCTAT" | |
134 oligo_dict["27_para_cattle"] = "TCTCCTCGGTCGGTGATTCGGGGGCGCGGT" | |
135 return oligo_dict | |
136 | |
137 | |
138 def get_seq_counts(value, fastq_list, gzipped): | |
139 count = 0 | |
140 for fastq_file in fastq_list: | |
141 if gzipped == "true": | |
142 with gzip.open(fastq_file, 'rt') as fh: | |
143 for title, seq, qual in FastqGeneralIterator(fh): | |
144 count += seq.count(value) | |
145 else: | |
146 with open(fastq_file, 'r') as fh: | |
147 for title, seq, qual in FastqGeneralIterator(fh): | |
148 count += seq.count(value) | |
149 return(value, count) | |
150 | |
151 | |
152 def get_species_counts(fastq_list, gzipped): | |
153 count_summary = {} | |
154 oligo_dict = get_oligo_dict() | |
155 for v1 in oligo_dict.values(): | |
156 returned_value, count = get_seq_counts(v1, fastq_list, gzipped) | |
157 for key, v2 in oligo_dict.items(): | |
158 if returned_value == v2: | |
159 count_summary.update({key: count}) | |
160 count_list = [] | |
161 for v in count_summary.values(): | |
162 count_list.append(v) | |
163 brucella_sum = sum(count_list[:16]) | |
164 bovis_sum = sum(count_list[16:24]) | |
165 para_sum = sum(count_list[24:]) | |
166 return count_summary, count_list, brucella_sum, bovis_sum, para_sum | |
167 | |
168 | |
169 def get_species_counts_for_collection(task_queue, finished_queue, gzipped, timeout): | |
170 while True: | |
171 try: | |
172 fastq_file = task_queue.get(block=True, timeout=timeout) | |
173 except queue.Empty: | |
174 break | |
175 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts([fastq_file], gzipped) | |
176 finished_queue.put((fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum)) | |
177 task_queue.task_done() | |
178 | |
179 | |
180 def get_species_strings(count_summary): | |
181 binary_dictionary = {} | |
182 for k, v in count_summary.items(): | |
183 if v > 1: | |
184 binary_dictionary.update({k: 1}) | |
185 else: | |
186 binary_dictionary.update({k: 0}) | |
187 binary_dictionary = OrderedDict(sorted(binary_dictionary.items())) | |
188 binary_list = [] | |
189 for v in binary_dictionary.values(): | |
190 binary_list.append(v) | |
191 brucella_binary = binary_list[:16] | |
192 brucella_string = ''.join(str(e) for e in brucella_binary) | |
193 bovis_binary = binary_list[16:24] | |
194 bovis_string = ''.join(str(e) for e in bovis_binary) | |
195 para_binary = binary_list[24:] | |
196 para_string = ''.join(str(e) for e in para_binary) | |
197 return brucella_string, bovis_string, para_string | |
198 | |
199 | |
200 def get_species_strings_for_collection(task_queue, finished_queue, timeout): | |
201 while True: | |
202 try: | |
203 tup = task_queue.get(block=True, timeout=timeout) | |
204 except queue.Empty: | |
205 break | |
206 fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum = tup | |
207 brucella_string, bovis_string, para_string = get_species_strings(count_summary) | |
208 finished_queue.put((fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)) | |
209 task_queue.task_done() | |
210 | |
211 | |
212 def output_dbkey(file_name, dbkey, output_file=None): | |
213 # Output the dbkey. | |
214 if output_file is None: | |
215 # We're producing a dataset collection. | |
216 output_file = os.path.join(OUTPUT_DBKEY_DIR, "%s.txt" % file_name) | |
217 with open(output_file, "w") as fh: | |
218 fh.write("%s" % dbkey) | |
219 | |
220 | |
221 def output_files(fastq_file, count_list, group, dbkey, dbkey_file=None, metrics_file=None): | |
222 base_file_name = get_base_file_name(fastq_file) | |
223 if dbkey_file is not None: | |
224 # We're dealing with a single read or | |
225 # a set of paired reads. If the latter, | |
226 # the following will hopefully produce a | |
227 # good sample string. | |
228 if base_file_name.find("_") > 0: | |
229 base_file_name = base_file_name.split("_")[0] | |
230 output_dbkey(base_file_name, dbkey, dbkey_file) | |
231 output_metrics(base_file_name, count_list, group, dbkey, metrics_file) | |
232 | |
233 | |
234 def output_files_for_collection(task_queue, timeout): | |
235 while True: | |
236 try: | |
237 tup = task_queue.get(block=True, timeout=timeout) | |
238 except queue.Empty: | |
239 break | |
240 fastq_file, count_list, group, dbkey = tup | |
241 output_files(fastq_file, count_list, group, dbkey) | |
242 task_queue.task_done() | |
243 | |
244 | |
245 def output_metrics(file_name, count_list, group, dbkey, output_file=None): | |
246 # Output the metrics. | |
247 if output_file is None: | |
248 # We're producing a dataset collection. | |
249 output_file = os.path.join(OUTPUT_METRICS_DIR, "%s.txt" % file_name) | |
250 with open(output_file, "w") as fh: | |
251 fh.write("Sample: %s\n" % file_name) | |
252 fh.write("Brucella counts: ") | |
253 for i in count_list[:16]: | |
254 fh.write("%d," % i) | |
255 fh.write("\nTB counts: ") | |
256 for i in count_list[16:24]: | |
257 fh.write("%d," % i) | |
258 fh.write("\nPara counts: ") | |
259 for i in count_list[24:]: | |
260 fh.write("%d," % i) | |
261 fh.write("\nGroup: %s" % group) | |
262 fh.write("\ndbkey: %s\n" % dbkey) | |
263 | |
264 | |
265 def set_num_cpus(num_files, processes): | |
266 num_cpus = int(multiprocessing.cpu_count()) | |
267 if num_files < num_cpus and num_files < processes: | |
268 return num_files | |
269 if num_cpus < processes: | |
270 half_cpus = int(num_cpus / 2) | |
271 if num_files < half_cpus: | |
272 return num_files | |
273 return half_cpus | |
274 return processes | |
275 | |
276 | |
277 if __name__ == '__main__': | |
278 parser = argparse.ArgumentParser() | |
279 | |
280 parser.add_argument('--dnaprint_fields', action='append', dest='dnaprint_fields', nargs=2, help="List of dnaprints data table value, name and path fields") | |
281 parser.add_argument('--read1', action='store', dest='read1', required=False, default=None, help='Required: single read') | |
282 parser.add_argument('--read2', action='store', dest='read2', required=False, default=None, help='Optional: paired read') | |
283 parser.add_argument('--gzipped', action='store', dest='gzipped', help='Input files are gzipped') | |
284 parser.add_argument('--output_dbkey', action='store', dest='output_dbkey', required=False, default=None, help='Output reference file') | |
285 parser.add_argument('--output_metrics', action='store', dest='output_metrics', required=False, default=None, help='Output metrics file') | |
286 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting') | |
287 | |
288 args = parser.parse_args() | |
289 | |
290 collection = False | |
291 fastq_list = [] | |
292 if args.read1 is not None: | |
293 fastq_list.append(args.read1) | |
294 if args.read2 is not None: | |
295 fastq_list.append(args.read2) | |
296 else: | |
297 collection = True | |
298 for file_name in sorted(os.listdir(INPUT_READS_DIR)): | |
299 file_path = os.path.abspath(os.path.join(INPUT_READS_DIR, file_name)) | |
300 fastq_list.append(file_path) | |
301 | |
302 # The value of dnaprint_fields is a list of lists, where each list is | |
303 # the [value, name, path] components of the vsnp_dnaprints data table. | |
304 # The data_manager_vsnp_dnaprints tool assigns the dbkey column from the | |
305 # all_fasta data table to the value column in the vsnp_dnaprints data | |
306 # table to ensure a proper mapping for discovering the dbkey. | |
307 dnaprints_dict = get_dnaprints_dict(args.dnaprint_fields) | |
308 | |
309 if collection: | |
310 # Here fastq_list consists of any number of | |
311 # reads, so each file will be processed and | |
312 # dataset collections will be produced as outputs. | |
313 multiprocessing.set_start_method('spawn') | |
314 queue1 = multiprocessing.JoinableQueue() | |
315 queue2 = multiprocessing.JoinableQueue() | |
316 num_files = len(fastq_list) | |
317 cpus = set_num_cpus(num_files, args.processes) | |
318 # Set a timeout for get()s in the queue. | |
319 timeout = 0.05 | |
320 | |
321 for fastq_file in fastq_list: | |
322 queue1.put(fastq_file) | |
323 | |
324 # Complete the get_species_counts task. | |
325 processes = [multiprocessing.Process(target=get_species_counts_for_collection, args=(queue1, queue2, args.gzipped, timeout, )) for _ in range(cpus)] | |
326 for p in processes: | |
327 p.start() | |
328 for p in processes: | |
329 p.join() | |
330 queue1.join() | |
331 | |
332 # Complete the get_species_strings task. | |
333 processes = [multiprocessing.Process(target=get_species_strings_for_collection, args=(queue2, queue1, timeout, )) for _ in range(cpus)] | |
334 for p in processes: | |
335 p.start() | |
336 for p in processes: | |
337 p.join() | |
338 queue2.join() | |
339 | |
340 # Complete the get_group_and_dbkey task. | |
341 processes = [multiprocessing.Process(target=get_group_and_dbkey_for_collection, args=(queue1, queue2, dnaprints_dict, timeout, )) for _ in range(cpus)] | |
342 for p in processes: | |
343 p.start() | |
344 for p in processes: | |
345 p.join() | |
346 queue1.join() | |
347 | |
348 # Complete the output_files task. | |
349 processes = [multiprocessing.Process(target=output_files_for_collection, args=(queue2, timeout, )) for _ in range(cpus)] | |
350 for p in processes: | |
351 p.start() | |
352 for p in processes: | |
353 p.join() | |
354 queue2.join() | |
355 | |
356 if queue1.empty() and queue2.empty(): | |
357 queue1.close() | |
358 queue1.join_thread() | |
359 queue2.close() | |
360 queue2.join_thread() | |
361 else: | |
362 # Here fastq_list consists of either a single read | |
363 # or a set of paired reads, producing single outputs. | |
364 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts(fastq_list, args.gzipped) | |
365 brucella_string, bovis_string, para_string = get_species_strings(count_summary) | |
366 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum) | |
367 output_files(args.read1, count_list, group, dbkey, dbkey_file=args.output_dbkey, metrics_file=args.output_metrics) |