0
|
1 #!/usr/bin/env python
|
|
2
|
|
3 import argparse
|
|
4 import gzip
|
|
5 import multiprocessing
|
|
6 import os
|
|
7 import queue
|
|
8 import yaml
|
|
9 from Bio.SeqIO.QualityIO import FastqGeneralIterator
|
|
10 from collections import OrderedDict
|
|
11
|
|
12 INPUT_READS_DIR = 'input_reads'
|
|
13 OUTPUT_DBKEY_DIR = 'output_dbkey'
|
|
14 OUTPUT_METRICS_DIR = 'output_metrics'
|
|
15
|
|
16
|
|
17 def get_base_file_name(file_path):
|
|
18 base_file_name = os.path.basename(file_path)
|
|
19 if base_file_name.find(".") > 0:
|
|
20 # Eliminate the extension.
|
|
21 return os.path.splitext(base_file_name)[0]
|
|
22 elif base_file_name.find("_") > 0:
|
|
23 # The dot extension was likely changed to
|
|
24 # the " character.
|
|
25 items = base_file_name.split("_")
|
|
26 no_ext = "_".join(items[0:-2])
|
|
27 if len(no_ext) > 0:
|
|
28 return no_ext
|
|
29 return base_file_name
|
|
30
|
|
31
|
|
32 def get_dbkey(dnaprints_dict, key, s):
|
|
33 # dnaprints_dict looks something like this:
|
|
34 # {'brucella': {'NC_002945v4': ['11001110', '11011110', '11001100']}
|
|
35 # {'bovis': {'NC_006895': ['11111110', '00010010', '01111011']}}
|
|
36 d = dnaprints_dict.get(key, {})
|
|
37 for data_table_value, v_list in d.items():
|
|
38 if s in v_list:
|
|
39 return data_table_value
|
|
40 return ""
|
|
41
|
|
42
|
|
43 def get_dnaprints_dict(dnaprint_fields):
|
|
44 # A dndprint_fields entry looks something liek this.
|
|
45 # [['AF2122', '/galaxy/tool-data/vsnp/AF2122/dnaprints/NC_002945v4.yml']]
|
|
46 dnaprints_dict = {}
|
|
47 for item in dnaprint_fields:
|
|
48 # Here item is a 2-element list of data
|
|
49 # table components, # value and path.
|
|
50 value = item[0]
|
|
51 path = item[1].strip()
|
|
52 with open(path, "rt") as fh:
|
|
53 # The format of all dnaprints yaml
|
|
54 # files is something like this:
|
|
55 # brucella:
|
|
56 # - 0111111111111111
|
|
57 print_dict = yaml.load(fh, Loader=yaml.Loader)
|
|
58 for print_dict_k, print_dict_v in print_dict.items():
|
|
59 dnaprints_v_dict = dnaprints_dict.get(print_dict_k, {})
|
|
60 if len(dnaprints_v_dict) > 0:
|
|
61 # dnaprints_dict already contains k (e.g., 'brucella',
|
|
62 # and dnaprints_v_dict will be a dictionary # that
|
|
63 # looks something like this:
|
|
64 # {'NC_002945v4': ['11001110', '11011110', '11001100']}
|
|
65 value_list = dnaprints_v_dict.get(value, [])
|
|
66 value_list = value_list + print_dict_v
|
|
67 dnaprints_v_dict[value] = value_list
|
|
68 else:
|
|
69 # dnaprints_v_dict is an empty dictionary.
|
|
70 dnaprints_v_dict[value] = print_dict_v
|
|
71 dnaprints_dict[print_dict_k] = dnaprints_v_dict
|
|
72 # dnaprints_dict looks something like this:
|
|
73 # {'brucella': {'NC_002945v4': ['11001110', '11011110', '11001100']}
|
|
74 # {'bovis': {'NC_006895': ['11111110', '00010010', '01111011']}}
|
|
75 return dnaprints_dict
|
|
76
|
|
77
|
|
78 def get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum):
|
|
79 if brucella_sum > 3:
|
|
80 group = "Brucella"
|
|
81 dbkey = get_dbkey(dnaprints_dict, "brucella", brucella_string)
|
|
82 elif bovis_sum > 3:
|
|
83 group = "TB"
|
|
84 dbkey = get_dbkey(dnaprints_dict, "bovis", bovis_string)
|
|
85 elif para_sum >= 1:
|
|
86 group = "paraTB"
|
|
87 dbkey = get_dbkey(dnaprints_dict, "para", para_string)
|
|
88 else:
|
|
89 group = ""
|
|
90 dbkey = ""
|
|
91 return group, dbkey
|
|
92
|
|
93
|
|
94 def get_group_and_dbkey_for_collection(task_queue, finished_queue, dnaprints_dict, timeout):
|
|
95 while True:
|
|
96 try:
|
|
97 tup = task_queue.get(block=True, timeout=timeout)
|
|
98 except queue.Empty:
|
|
99 break
|
|
100 fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum = tup
|
|
101 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)
|
|
102 finished_queue.put((fastq_file, count_list, group, dbkey))
|
|
103 task_queue.task_done()
|
|
104
|
|
105
|
|
106 def get_oligo_dict():
|
|
107 oligo_dict = {}
|
|
108 oligo_dict["01_ab1"] = "AATTGTCGGATAGCCTGGCGATAACGACGC"
|
|
109 oligo_dict["02_ab3"] = "CACACGCGGGCCGGAACTGCCGCAAATGAC"
|
|
110 oligo_dict["03_ab5"] = "GCTGAAGCGGCAGACCGGCAGAACGAATAT"
|
|
111 oligo_dict["04_mel"] = "TGTCGCGCGTCAAGCGGCGTGAAATCTCTG"
|
|
112 oligo_dict["05_suis1"] = "TGCGTTGCCGTGAAGCTTAATTCGGCTGAT"
|
|
113 oligo_dict["06_suis2"] = "GGCAATCATGCGCAGGGCTTTGCATTCGTC"
|
|
114 oligo_dict["07_suis3"] = "CAAGGCAGATGCACATAATCCGGCGACCCG"
|
|
115 oligo_dict["08_ceti1"] = "GTGAATATAGGGTGAATTGATCTTCAGCCG"
|
|
116 oligo_dict["09_ceti2"] = "TTACAAGCAGGCCTATGAGCGCGGCGTGAA"
|
|
117 oligo_dict["10_canis4"] = "CTGCTACATAAAGCACCCGGCGACCGAGTT"
|
|
118 oligo_dict["11_canis"] = "ATCGTTTTGCGGCATATCGCTGACCACAGC"
|
|
119 oligo_dict["12_ovis"] = "CACTCAATCTTCTCTACGGGCGTGGTATCC"
|
|
120 oligo_dict["13_ether2"] = "CGAAATCGTGGTGAAGGACGGGACCGAACC"
|
|
121 oligo_dict["14_63B1"] = "CCTGTTTAAAAGAATCGTCGGAACCGCTCT"
|
|
122 oligo_dict["15_16M0"] = "TCCCGCCGCCATGCCGCCGAAAGTCGCCGT"
|
|
123 oligo_dict["16_mel1b"] = "TCTGTCCAAACCCCGTGACCGAACAATAGA"
|
|
124 oligo_dict["17_tb157"] = "CTCTTCGTATACCGTTCCGTCGTCACCATGGTCCT"
|
|
125 oligo_dict["18_tb7"] = "TCACGCAGCCAACGATATTCGTGTACCGCGACGGT"
|
|
126 oligo_dict["19_tbbov"] = "CTGGGCGACCCGGCCGACCTGCACACCGCGCATCA"
|
|
127 oligo_dict["20_tb5"] = "CCGTGGTGGCGTATCGGGCCCCTGGATCGCGCCCT"
|
|
128 oligo_dict["21_tb2"] = "ATGTCTGCGTAAAGAAGTTCCATGTCCGGGAAGTA"
|
|
129 oligo_dict["22_tb3"] = "GAAGACCTTGATGCCGATCTGGGTGTCGATCTTGA"
|
|
130 oligo_dict["23_tb4"] = "CGGTGTTGAAGGGTCCCCCGTTCCAGAAGCCGGTG"
|
|
131 oligo_dict["24_tb6"] = "ACGGTGATTCGGGTGGTCGACACCGATGGTTCAGA"
|
|
132 oligo_dict["25_para"] = "CCTTTCTTGAAGGGTGTTCG"
|
|
133 oligo_dict["26_para_sheep"] = "CGTGGTGGCGACGGCGGCGGGCCTGTCTAT"
|
|
134 oligo_dict["27_para_cattle"] = "TCTCCTCGGTCGGTGATTCGGGGGCGCGGT"
|
|
135 return oligo_dict
|
|
136
|
|
137
|
|
138 def get_seq_counts(value, fastq_list, gzipped):
|
|
139 count = 0
|
|
140 for fastq_file in fastq_list:
|
|
141 if gzipped == "true":
|
|
142 with gzip.open(fastq_file, 'rt') as fh:
|
|
143 for title, seq, qual in FastqGeneralIterator(fh):
|
|
144 count += seq.count(value)
|
|
145 else:
|
|
146 with open(fastq_file, 'r') as fh:
|
|
147 for title, seq, qual in FastqGeneralIterator(fh):
|
|
148 count += seq.count(value)
|
|
149 return(value, count)
|
|
150
|
|
151
|
|
152 def get_species_counts(fastq_list, gzipped):
|
|
153 count_summary = {}
|
|
154 oligo_dict = get_oligo_dict()
|
|
155 for v1 in oligo_dict.values():
|
|
156 returned_value, count = get_seq_counts(v1, fastq_list, gzipped)
|
|
157 for key, v2 in oligo_dict.items():
|
|
158 if returned_value == v2:
|
|
159 count_summary.update({key: count})
|
|
160 count_list = []
|
|
161 for v in count_summary.values():
|
|
162 count_list.append(v)
|
|
163 brucella_sum = sum(count_list[:16])
|
|
164 bovis_sum = sum(count_list[16:24])
|
|
165 para_sum = sum(count_list[24:])
|
|
166 return count_summary, count_list, brucella_sum, bovis_sum, para_sum
|
|
167
|
|
168
|
|
169 def get_species_counts_for_collection(task_queue, finished_queue, gzipped, timeout):
|
|
170 while True:
|
|
171 try:
|
|
172 fastq_file = task_queue.get(block=True, timeout=timeout)
|
|
173 except queue.Empty:
|
|
174 break
|
|
175 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts([fastq_file], gzipped)
|
|
176 finished_queue.put((fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum))
|
|
177 task_queue.task_done()
|
|
178
|
|
179
|
|
180 def get_species_strings(count_summary):
|
|
181 binary_dictionary = {}
|
|
182 for k, v in count_summary.items():
|
|
183 if v > 1:
|
|
184 binary_dictionary.update({k: 1})
|
|
185 else:
|
|
186 binary_dictionary.update({k: 0})
|
|
187 binary_dictionary = OrderedDict(sorted(binary_dictionary.items()))
|
|
188 binary_list = []
|
|
189 for v in binary_dictionary.values():
|
|
190 binary_list.append(v)
|
|
191 brucella_binary = binary_list[:16]
|
|
192 brucella_string = ''.join(str(e) for e in brucella_binary)
|
|
193 bovis_binary = binary_list[16:24]
|
|
194 bovis_string = ''.join(str(e) for e in bovis_binary)
|
|
195 para_binary = binary_list[24:]
|
|
196 para_string = ''.join(str(e) for e in para_binary)
|
|
197 return brucella_string, bovis_string, para_string
|
|
198
|
|
199
|
|
200 def get_species_strings_for_collection(task_queue, finished_queue, timeout):
|
|
201 while True:
|
|
202 try:
|
|
203 tup = task_queue.get(block=True, timeout=timeout)
|
|
204 except queue.Empty:
|
|
205 break
|
|
206 fastq_file, count_summary, count_list, brucella_sum, bovis_sum, para_sum = tup
|
|
207 brucella_string, bovis_string, para_string = get_species_strings(count_summary)
|
|
208 finished_queue.put((fastq_file, count_list, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum))
|
|
209 task_queue.task_done()
|
|
210
|
|
211
|
|
212 def output_dbkey(file_name, dbkey, output_file=None):
|
|
213 # Output the dbkey.
|
|
214 if output_file is None:
|
|
215 # We're producing a dataset collection.
|
|
216 output_file = os.path.join(OUTPUT_DBKEY_DIR, "%s.txt" % file_name)
|
|
217 with open(output_file, "w") as fh:
|
|
218 fh.write("%s" % dbkey)
|
|
219
|
|
220
|
|
221 def output_files(fastq_file, count_list, group, dbkey, dbkey_file=None, metrics_file=None):
|
|
222 base_file_name = get_base_file_name(fastq_file)
|
|
223 if dbkey_file is not None:
|
|
224 # We're dealing with a single read or
|
|
225 # a set of paired reads. If the latter,
|
|
226 # the following will hopefully produce a
|
|
227 # good sample string.
|
|
228 if base_file_name.find("_") > 0:
|
|
229 base_file_name = base_file_name.split("_")[0]
|
|
230 output_dbkey(base_file_name, dbkey, dbkey_file)
|
|
231 output_metrics(base_file_name, count_list, group, dbkey, metrics_file)
|
|
232
|
|
233
|
|
234 def output_files_for_collection(task_queue, timeout):
|
|
235 while True:
|
|
236 try:
|
|
237 tup = task_queue.get(block=True, timeout=timeout)
|
|
238 except queue.Empty:
|
|
239 break
|
|
240 fastq_file, count_list, group, dbkey = tup
|
|
241 output_files(fastq_file, count_list, group, dbkey)
|
|
242 task_queue.task_done()
|
|
243
|
|
244
|
|
245 def output_metrics(file_name, count_list, group, dbkey, output_file=None):
|
|
246 # Output the metrics.
|
|
247 if output_file is None:
|
|
248 # We're producing a dataset collection.
|
|
249 output_file = os.path.join(OUTPUT_METRICS_DIR, "%s.txt" % file_name)
|
|
250 with open(output_file, "w") as fh:
|
|
251 fh.write("Sample: %s\n" % file_name)
|
|
252 fh.write("Brucella counts: ")
|
|
253 for i in count_list[:16]:
|
|
254 fh.write("%d," % i)
|
|
255 fh.write("\nTB counts: ")
|
|
256 for i in count_list[16:24]:
|
|
257 fh.write("%d," % i)
|
|
258 fh.write("\nPara counts: ")
|
|
259 for i in count_list[24:]:
|
|
260 fh.write("%d," % i)
|
|
261 fh.write("\nGroup: %s" % group)
|
|
262 fh.write("\ndbkey: %s\n" % dbkey)
|
|
263
|
|
264
|
|
265 def set_num_cpus(num_files, processes):
|
|
266 num_cpus = int(multiprocessing.cpu_count())
|
|
267 if num_files < num_cpus and num_files < processes:
|
|
268 return num_files
|
|
269 if num_cpus < processes:
|
|
270 half_cpus = int(num_cpus / 2)
|
|
271 if num_files < half_cpus:
|
|
272 return num_files
|
|
273 return half_cpus
|
|
274 return processes
|
|
275
|
|
276
|
|
277 if __name__ == '__main__':
|
|
278 parser = argparse.ArgumentParser()
|
|
279
|
|
280 parser.add_argument('--dnaprint_fields', action='append', dest='dnaprint_fields', nargs=2, help="List of dnaprints data table value, name and path fields")
|
|
281 parser.add_argument('--read1', action='store', dest='read1', required=False, default=None, help='Required: single read')
|
|
282 parser.add_argument('--read2', action='store', dest='read2', required=False, default=None, help='Optional: paired read')
|
|
283 parser.add_argument('--gzipped', action='store', dest='gzipped', help='Input files are gzipped')
|
|
284 parser.add_argument('--output_dbkey', action='store', dest='output_dbkey', required=False, default=None, help='Output reference file')
|
|
285 parser.add_argument('--output_metrics', action='store', dest='output_metrics', required=False, default=None, help='Output metrics file')
|
|
286 parser.add_argument('--processes', action='store', dest='processes', type=int, help='User-selected number of processes to use for job splitting')
|
|
287
|
|
288 args = parser.parse_args()
|
|
289
|
|
290 collection = False
|
|
291 fastq_list = []
|
|
292 if args.read1 is not None:
|
|
293 fastq_list.append(args.read1)
|
|
294 if args.read2 is not None:
|
|
295 fastq_list.append(args.read2)
|
|
296 else:
|
|
297 collection = True
|
|
298 for file_name in sorted(os.listdir(INPUT_READS_DIR)):
|
|
299 file_path = os.path.abspath(os.path.join(INPUT_READS_DIR, file_name))
|
|
300 fastq_list.append(file_path)
|
|
301
|
|
302 # The value of dnaprint_fields is a list of lists, where each list is
|
|
303 # the [value, name, path] components of the vsnp_dnaprints data table.
|
|
304 # The data_manager_vsnp_dnaprints tool assigns the dbkey column from the
|
|
305 # all_fasta data table to the value column in the vsnp_dnaprints data
|
|
306 # table to ensure a proper mapping for discovering the dbkey.
|
|
307 dnaprints_dict = get_dnaprints_dict(args.dnaprint_fields)
|
|
308
|
|
309 if collection:
|
|
310 # Here fastq_list consists of any number of
|
|
311 # reads, so each file will be processed and
|
|
312 # dataset collections will be produced as outputs.
|
|
313 multiprocessing.set_start_method('spawn')
|
|
314 queue1 = multiprocessing.JoinableQueue()
|
|
315 queue2 = multiprocessing.JoinableQueue()
|
|
316 num_files = len(fastq_list)
|
|
317 cpus = set_num_cpus(num_files, args.processes)
|
|
318 # Set a timeout for get()s in the queue.
|
|
319 timeout = 0.05
|
|
320
|
|
321 for fastq_file in fastq_list:
|
|
322 queue1.put(fastq_file)
|
|
323
|
|
324 # Complete the get_species_counts task.
|
|
325 processes = [multiprocessing.Process(target=get_species_counts_for_collection, args=(queue1, queue2, args.gzipped, timeout, )) for _ in range(cpus)]
|
|
326 for p in processes:
|
|
327 p.start()
|
|
328 for p in processes:
|
|
329 p.join()
|
|
330 queue1.join()
|
|
331
|
|
332 # Complete the get_species_strings task.
|
|
333 processes = [multiprocessing.Process(target=get_species_strings_for_collection, args=(queue2, queue1, timeout, )) for _ in range(cpus)]
|
|
334 for p in processes:
|
|
335 p.start()
|
|
336 for p in processes:
|
|
337 p.join()
|
|
338 queue2.join()
|
|
339
|
|
340 # Complete the get_group_and_dbkey task.
|
|
341 processes = [multiprocessing.Process(target=get_group_and_dbkey_for_collection, args=(queue1, queue2, dnaprints_dict, timeout, )) for _ in range(cpus)]
|
|
342 for p in processes:
|
|
343 p.start()
|
|
344 for p in processes:
|
|
345 p.join()
|
|
346 queue1.join()
|
|
347
|
|
348 # Complete the output_files task.
|
|
349 processes = [multiprocessing.Process(target=output_files_for_collection, args=(queue2, timeout, )) for _ in range(cpus)]
|
|
350 for p in processes:
|
|
351 p.start()
|
|
352 for p in processes:
|
|
353 p.join()
|
|
354 queue2.join()
|
|
355
|
|
356 if queue1.empty() and queue2.empty():
|
|
357 queue1.close()
|
|
358 queue1.join_thread()
|
|
359 queue2.close()
|
|
360 queue2.join_thread()
|
|
361 else:
|
|
362 # Here fastq_list consists of either a single read
|
|
363 # or a set of paired reads, producing single outputs.
|
|
364 count_summary, count_list, brucella_sum, bovis_sum, para_sum = get_species_counts(fastq_list, args.gzipped)
|
|
365 brucella_string, bovis_string, para_string = get_species_strings(count_summary)
|
|
366 group, dbkey = get_group_and_dbkey(dnaprints_dict, brucella_string, brucella_sum, bovis_string, bovis_sum, para_string, para_sum)
|
|
367 output_files(args.read1, count_list, group, dbkey, dbkey_file=args.output_dbkey, metrics_file=args.output_metrics)
|