comparison vsnp_get_snps.py @ 4:e3016c6c5994 draft

"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/vsnp commit 95b221f68d19702681babd765c67caeeb24e7f1d"
author iuc
date Tue, 16 Nov 2021 08:28:27 +0000
parents
children a8560decb495
comparison
equal deleted inserted replaced
3:6853676d2bae 4:e3016c6c5994
1 #!/usr/bin/env python
2
3 # Collect quality parsimonious SNPs from vcf files
4 # and output alignment files in fasta format.
5
6 import argparse
7 import multiprocessing
8 import os
9 import queue
10 import shutil
11 import sys
12 import time
13 from collections import OrderedDict
14 from datetime import datetime
15
16 import pandas
17 import vcf
18
19
20 def get_time_stamp():
21 return datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H-%M-%S')
22
23
24 def set_num_cpus(num_files, processes):
25 num_cpus = int(multiprocessing.cpu_count())
26 if num_files < num_cpus and num_files < processes:
27 return num_files
28 if num_cpus < processes:
29 half_cpus = int(num_cpus / 2)
30 if num_files < half_cpus:
31 return num_files
32 return half_cpus
33 return processes
34
35
36 def setup_all_vcfs(vcf_files, vcf_dirs):
37 # Create the all_vcfs directory and link
38 # all input vcf files into it for processing.
39 all_vcfs_dir = 'all_vcf'
40 os.makedirs(all_vcfs_dir)
41 vcf_dirs.append(all_vcfs_dir)
42 for vcf_file in vcf_files:
43 file_name_base = os.path.basename(vcf_file)
44 dst_file = os.path.join(all_vcfs_dir, file_name_base)
45 os.symlink(vcf_file, dst_file)
46 return vcf_dirs
47
48
49 class SnpFinder:
50
51 def __init__(self, num_files, dbkey, input_excel, all_isolates, ac, min_mq, quality_score_n_threshold, min_quality_score, input_vcf_dir, output_json_avg_mq_dir, output_json_snps_dir, output_snps_dir, output_summary):
52 # Allele count
53 self.ac = ac
54 # Create a group that will contain all isolates.
55 self.all_isolates = all_isolates
56 # Evolving positions dictionary.
57 self.all_positions = None
58 # Isolate groups.
59 self.groups = []
60 # Excel file for grouping.
61 self.input_excel = input_excel
62 # Directory of input zero coverage vcf files.
63 self.input_vcf_dir = input_vcf_dir
64 # Minimum map quality value.
65 self.min_mq = min_mq
66 # Minimum quality score value.
67 self.min_quality_score = min_quality_score
68 # Number of input zero coverage vcf files.
69 self.num_files = num_files
70 # Output directory for json average mq files.
71 self.output_json_avg_mq_dir = output_json_avg_mq_dir
72 # Output directory for json snps files.
73 self.output_json_snps_dir = output_json_snps_dir
74 # Output directory for snps files.
75 self.output_snps_dir = output_snps_dir
76 # Quality score N threshold value.
77 self.quality_score_n_threshold = quality_score_n_threshold
78 self.dbkey = dbkey
79 self.start_time = get_time_stamp()
80 self.summary_str = ""
81 self.timer_start = datetime.now()
82 self.initiate_summary(output_summary)
83
84 def append_to_summary(self, html_str):
85 # Append a string to the html summary output file.
86 self.summary_str = "%s%s" % (self.summary_str, html_str)
87
88 def bin_input_files(self, filename, samples_groups_dict, defining_snps, inverted_defining_snps, found_positions, found_positions_mix):
89 # Categorize input files into closely related
90 # isolate groups based on discovered SNPs, and
91 # return a group dictionary.
92 sample_groups_list = []
93 table_name = self.get_sample_name(filename)
94 defining_snp = False
95 # Absolute positions in set union of two lists.
96 for abs_position in list(defining_snps.keys() & (found_positions.keys() | found_positions_mix.keys())):
97 group = defining_snps[abs_position]
98 sample_groups_list.append(group)
99 self.check_add_group(group)
100 if len(list(defining_snps.keys() & found_positions_mix.keys())) > 0:
101 table_name = self.get_sample_name(filename)
102 table_name = '%s<font color="red">[[MIXED]]</font>' % table_name
103 self.copy_file(filename, group)
104 defining_snp = True
105 if not set(inverted_defining_snps.keys()).intersection(found_positions.keys() | found_positions_mix.keys()):
106 for abs_position in list(inverted_defining_snps.keys()):
107 group = inverted_defining_snps[abs_position]
108 sample_groups_list.append(group)
109 self.check_add_group(group)
110 self.copy_file(filename, group)
111 defining_snp = True
112 if defining_snp:
113 samples_groups_dict[table_name] = sorted(sample_groups_list)
114 else:
115 samples_groups_dict[table_name] = ['<font color="red">No defining SNP</font>']
116 return samples_groups_dict
117
118 def check_add_group(self, group):
119 # Add a group if it is npt already in the list.
120 if group not in self.groups:
121 self.groups.append(group)
122
123 def copy_file(self, filename, dir):
124 if not os.path.exists(dir):
125 os.makedirs(dir)
126 shutil.copy(filename, dir)
127
128 def decide_snps(self, filename):
129 # Find the SNPs in a vcf file to produce a pandas data
130 # frame and a dictionary containing sample map qualities.
131 positions_dict = self.all_positions
132 sample_map_qualities = {}
133 # Eliminate the path.
134 file_name_base = self.get_sample_name(filename)
135 vcf_reader = vcf.Reader(open(filename, 'r'))
136 sample_dict = {}
137 for record in vcf_reader:
138 alt = str(record.ALT[0])
139 record_position = "%s:%s" % (str(record.CHROM), str(record.POS))
140 if record_position in positions_dict:
141 if alt == "None":
142 sample_dict.update({record_position: "-"})
143 else:
144 # On rare occassions MQM gets called "NaN", thus passing
145 # a string when a number is expected when calculating average.
146 mq_val = self.get_mq_val(record.INFO, filename)
147 if str(mq_val).lower() not in ["nan"]:
148 sample_map_qualities.update({record_position: mq_val})
149 if len(alt) == 1:
150 qual_val = self.val_as_int(record.QUAL)
151 ac = record.INFO['AC'][0]
152 ref = str(record.REF[0])
153 if ac == 2 and qual_val > self.quality_score_n_threshold:
154 # Add the SNP to a group.
155 sample_dict.update({record_position: alt})
156 elif ac == 1 and qual_val > self.quality_score_n_threshold:
157 # The position is ambiguous.
158 alt_ref = "%s%s" % (alt, ref)
159 if alt_ref == "AG":
160 sample_dict.update({record_position: "R"})
161 elif alt_ref == "CT":
162 sample_dict.update({record_position: "Y"})
163 elif alt_ref == "GC":
164 sample_dict.update({record_position: "S"})
165 elif alt_ref == "AT":
166 sample_dict.update({record_position: "W"})
167 elif alt_ref == "GT":
168 sample_dict.update({record_position: "K"})
169 elif alt_ref == "AC":
170 sample_dict.update({record_position: "M"})
171 elif alt_ref == "GA":
172 sample_dict.update({record_position: "R"})
173 elif alt_ref == "TC":
174 sample_dict.update({record_position: "Y"})
175 elif alt_ref == "CG":
176 sample_dict.update({record_position: "S"})
177 elif alt_ref == "TA":
178 sample_dict.update({record_position: "W"})
179 elif alt_ref == "TG":
180 sample_dict.update({record_position: "K"})
181 elif alt_ref == "CA":
182 sample_dict.update({record_position: "M"})
183 else:
184 sample_dict.update({record_position: "N"})
185 # Poor calls
186 elif qual_val <= 50:
187 # Call the reference allele.
188 # Do not coerce record.REF[0] to a string!
189 sample_dict.update({record_position: record.REF[0]})
190 elif qual_val <= self.quality_score_n_threshold:
191 sample_dict.update({record_position: "N"})
192 else:
193 # Insurance -- Will still report on a possible
194 # SNP even if missed with above statements.
195 # Do not coerce record.REF[0] to a string!
196 sample_dict.update({record_position: record.REF[0]})
197 # Merge dictionaries and order
198 merge_dict = {}
199 merge_dict.update(positions_dict)
200 merge_dict.update(sample_dict)
201 sample_df = pandas.DataFrame(merge_dict, index=[file_name_base])
202 return sample_df, file_name_base, sample_map_qualities
203
204 def df_to_fasta(self, parsimonious_df, group):
205 # Generate SNP alignment file from
206 # the parsimonious_df data frame.
207 snps_file = os.path.join(self.output_snps_dir, "%s.fasta" % group)
208 test_duplicates = []
209 has_sequence_data = False
210 for index, row in parsimonious_df.iterrows():
211 for pos in row:
212 if len(pos) > 0:
213 has_sequence_data = True
214 break
215 if has_sequence_data:
216 with open(snps_file, 'w') as fh:
217 for index, row in parsimonious_df.iterrows():
218 test_duplicates.append(row.name)
219 if test_duplicates.count(row.name) < 2:
220 print(f'>{row.name}', file=fh)
221 for pos in row:
222 print(pos, end='', file=fh)
223 print("", file=fh)
224 return has_sequence_data
225
226 def find_initial_positions(self, filename):
227 # Find SNP positions in a vcf file.
228 found_positions = {}
229 found_positions_mix = {}
230 vcf_reader = vcf.Reader(open(filename, 'r'))
231 for record in vcf_reader:
232 qual_val = self.val_as_int(record.QUAL)
233 chrom = record.CHROM
234 position = record.POS
235 absolute_position = "%s:%s" % (str(chrom), str(position))
236 alt = str(record.ALT[0])
237 if alt != "None":
238 mq_val = self.get_mq_val(record.INFO, filename)
239 ac = record.INFO['AC'][0]
240 if ac == self.ac and len(record.REF) == 1 and qual_val > self.min_quality_score and mq_val > self.min_mq:
241 found_positions.update({absolute_position: record.REF})
242 if ac == 1 and len(record.REF) == 1 and qual_val > self.min_quality_score and mq_val > self.min_mq:
243 found_positions_mix.update({absolute_position: record.REF})
244 return found_positions, found_positions_mix
245
246 def gather_and_filter(self, prefilter_df, mq_averages, group_dir):
247 # Group a data frame of SNPs.
248 if self.input_excel is None:
249 filtered_all_df = prefilter_df
250 sheet_names = None
251 else:
252 # Filter positions to be removed from all.
253 xl = pandas.ExcelFile(self.input_excel)
254 sheet_names = xl.sheet_names
255 # Use the first column to filter "all" postions.
256 exclusion_list_all = self.get_position_list(sheet_names, 0)
257 exclusion_list_group = self.get_position_list(sheet_names, group_dir)
258 exclusion_list = exclusion_list_all + exclusion_list_group
259 # Filters for all applied.
260 filtered_all_df = prefilter_df.drop(columns=exclusion_list, errors='ignore')
261 json_snps_file = os.path.join(self.output_json_snps_dir, "%s.json" % group_dir)
262 parsimonious_df = self.get_parsimonious_df(filtered_all_df)
263 samples_number, columns = parsimonious_df.shape
264 if samples_number >= 4:
265 # Sufficient samples have been found
266 # to build a phylogenetic tree.
267 has_sequence_data = self.df_to_fasta(parsimonious_df, group_dir)
268 if has_sequence_data:
269 json_avg_mq_file = os.path.join(self.output_json_avg_mq_dir, "%s.json" % group_dir)
270 mq_averages.to_json(json_avg_mq_file, orient='split')
271 parsimonious_df.to_json(json_snps_file, orient='split')
272 else:
273 msg = "<br/>No sequence data"
274 if group_dir is not None:
275 msg = "%s for group: %s" % (msg, group_dir)
276 self.append_to_summary("%s<br/>\n" % msg)
277 else:
278 msg = "<br/>Too few samples to build tree"
279 if group_dir is not None:
280 msg = "%s for group: %s" % (msg, group_dir)
281 self.append_to_summary("%s<br/>\n" % msg)
282
283 def get_sample_name(self, file_path):
284 # Return the sample part of a file name.
285 base_file_name = os.path.basename(file_path)
286 if base_file_name.find(".") > 0:
287 # Eliminate the extension.
288 return os.path.splitext(base_file_name)[0]
289 return base_file_name
290
291 def get_mq_val(self, record_info, filename):
292 # Get the MQ (gatk) or MQM (freebayes) value
293 # from the record.INFO component of the vcf file.
294 try:
295 mq_val = record_info['MQM']
296 return self.return_val(mq_val)
297 except Exception:
298 try:
299 mq_val = record_info['MQ']
300 return self.return_val(mq_val)
301 except Exception:
302 msg = "Invalid or unsupported vcf header %s in file: %s\n" % (str(record_info), filename)
303 sys.exit(msg)
304
305 def get_parsimonious_df(self, filtered_all_df):
306 # Get the parsimonious SNPs data frame
307 # from a data frame of filtered SNPs.
308 try:
309 ref_series = filtered_all_df.loc['root']
310 # In all_vcf root needs to be removed.
311 filtered_all_df = filtered_all_df.drop(['root'])
312 except KeyError:
313 pass
314 parsimony = filtered_all_df.loc[:, (filtered_all_df != filtered_all_df.iloc[0]).any()]
315 parsimony_positions = list(parsimony)
316 parse_df = filtered_all_df[parsimony_positions]
317 ref_df = ref_series.to_frame()
318 ref_df = ref_df.T
319 parsimonious_df = pandas.concat([parse_df, ref_df], join='inner')
320 return parsimonious_df
321
322 def get_position_list(self, sheet_names, group):
323 # Get a list of positions defined by an excel file.
324 exclusion_list = []
325 try:
326 filter_to_all = pandas.read_excel(self.input_excel, header=1, usecols=[group])
327 for value in filter_to_all.values:
328 value = str(value[0])
329 if "-" not in value.split(":")[-1]:
330 exclusion_list.append(value)
331 elif "-" in value:
332 try:
333 chrom, sequence_range = value.split(":")
334 except Exception as e:
335 sys.exit(str(e))
336 value = sequence_range.split("-")
337 for position in range(int(value[0].replace(',', '')), int(value[1].replace(',', '')) + 1):
338 exclusion_list.append(chrom + ":" + str(position))
339 return exclusion_list
340 except ValueError:
341 return []
342
343 def get_snps(self, task_queue, timeout):
344 while True:
345 try:
346 group_dir = task_queue.get(block=True, timeout=timeout)
347 except queue.Empty:
348 break
349 # Parse all vcf files to accumulate
350 # the SNPs into a data frame.
351 positions_dict = {}
352 group_files = []
353 for file_name in os.listdir(os.path.abspath(group_dir)):
354 file_path = os.path.abspath(os.path.join(group_dir, file_name))
355 group_files.append(file_path)
356 for file_name in group_files:
357 found_positions, found_positions_mix = self.find_initial_positions(file_name)
358 positions_dict.update(found_positions)
359 # Order before adding to file to match
360 # with ordering of individual samples.
361 # all_positions is abs_pos:REF
362 self.all_positions = OrderedDict(sorted(positions_dict.items()))
363 ref_positions_df = pandas.DataFrame(self.all_positions, index=['root'])
364 all_map_qualities = {}
365 df_list = []
366 for file_name in group_files:
367 sample_df, file_name_base, sample_map_qualities = self.decide_snps(file_name)
368 df_list.append(sample_df)
369 all_map_qualities.update({file_name_base: sample_map_qualities})
370 all_sample_df = pandas.concat(df_list)
371 # All positions have now been selected for each sample,
372 # so select parisomony informative SNPs. This removes
373 # columns where all fields are the same.
374 # Add reference to top row.
375 prefilter_df = pandas.concat([ref_positions_df, all_sample_df], join='inner')
376 all_mq_df = pandas.DataFrame.from_dict(all_map_qualities)
377 mq_averages = all_mq_df.mean(axis=1).astype(int)
378 self.gather_and_filter(prefilter_df, mq_averages, group_dir)
379 task_queue.task_done()
380
381 def group_vcfs(self, vcf_files):
382 # Parse an excel file to produce a
383 # grouping dictionary for SNPs.
384 xl = pandas.ExcelFile(self.input_excel)
385 sheet_names = xl.sheet_names
386 ws = pandas.read_excel(self.input_excel, sheet_name=sheet_names[0])
387 defining_snps = ws.iloc[0]
388 defsnp_iterator = iter(defining_snps.iteritems())
389 next(defsnp_iterator)
390 defining_snps = {}
391 inverted_defining_snps = {}
392 for abs_pos, group in defsnp_iterator:
393 if '!' in abs_pos:
394 inverted_defining_snps[abs_pos.replace('!', '')] = group
395 else:
396 defining_snps[abs_pos] = group
397 samples_groups_dict = {}
398 for vcf_file in vcf_files:
399 found_positions, found_positions_mix = self.find_initial_positions(vcf_file)
400 samples_groups_dict = self.bin_input_files(vcf_file, samples_groups_dict, defining_snps, inverted_defining_snps, found_positions, found_positions_mix)
401 # Output summary grouping table.
402 self.append_to_summary('<br/>')
403 self.append_to_summary('<b>Groupings with %d listed:</b><br/>\n' % len(samples_groups_dict))
404 self.append_to_summary('<table cellpadding="5" cellspaging="5" border="1">\n')
405 for key, value in samples_groups_dict.items():
406 self.append_to_summary('<tr align="left"><th>Sample Name</th>\n')
407 self.append_to_summary('<td>%s</td>' % key)
408 for group in value:
409 self.append_to_summary('<td>%s</td>\n' % group)
410 self.append_to_summary('</tr>\n')
411 self.append_to_summary('</table><br/>\n')
412
413 def initiate_summary(self, output_summary):
414 # Output summary file handle.
415 self.append_to_summary('<html>\n')
416 self.append_to_summary('<head></head>\n')
417 self.append_to_summary('<body style=\"font-size:12px;">')
418 self.append_to_summary("<b>Time started:</b> %s<br/>" % get_time_stamp())
419 self.append_to_summary("<b>Number of VCF inputs:</b> %d<br/>" % self.num_files)
420 self.append_to_summary("<b>Reference:</b> %s<br/>" % self.dbkey)
421 self.append_to_summary("<b>All isolates:</b> %s<br/>" % str(self.all_isolates))
422
423 def return_val(self, val, index=0):
424 # Handle element and single-element list values.
425 if isinstance(val, list):
426 return val[index]
427 return val
428
429 def val_as_int(self, val):
430 # Handle integer value conversion.
431 try:
432 return int(val)
433 except TypeError:
434 # val is likely None here.
435 return 0
436
437
438 if __name__ == '__main__':
439
440 parser = argparse.ArgumentParser()
441
442 parser.add_argument('--ac', action='store', dest='ac', type=int, help='Allele count value'),
443 parser.add_argument('--all_isolates', action='store_true', dest='all_isolates', required=False, default=False, help='Create table with all isolates'),
444 parser.add_argument('--input_excel', action='store', dest='input_excel', required=False, default=None, help='Optional Excel filter file'),
445 parser.add_argument('--input_vcf_dir', action='store', dest='input_vcf_dir', help='Input vcf directory'),
446 parser.add_argument('--min_mq', action='store', dest='min_mq', type=int, help='Minimum map quality value'),
447 parser.add_argument('--min_quality_score', action='store', dest='min_quality_score', type=int, help='Minimum quality score value'),
448 parser.add_argument('--output_json_avg_mq_dir', action='store', dest='output_json_avg_mq_dir', help='Output json average mq directory'),
449 parser.add_argument('--output_json_snps_dir', action='store', dest='output_json_snps_dir', help='Output json snps directory'),
450 parser.add_argument('--output_snps_dir', action='store', dest='output_snps_dir', help='Output snps directory'),
451 parser.add_argument('--output_summary', action='store', dest='output_summary', help='Output summary html file'),
452 parser.add_argument('--processes', action='store', dest='processes', type=int, help='Configured processes for job'),
453 parser.add_argument('--quality_score_n_threshold', action='store', dest='quality_score_n_threshold', type=int, help='Minimum quality score N value for alleles'),
454 parser.add_argument('--dbkey', action='store', dest='dbkey', help='Galaxy genome build dbkey'),
455
456 args = parser.parse_args()
457
458 # Build the list of all input zero coverage vcf
459 # files, both the samples and the "database".
460 vcf_files = []
461 for file_name in os.listdir(args.input_vcf_dir):
462 file_path = os.path.abspath(os.path.join(args.input_vcf_dir, file_name))
463 vcf_files.append(file_path)
464
465 multiprocessing.set_start_method('spawn')
466 queue1 = multiprocessing.JoinableQueue()
467 num_files = len(vcf_files)
468 cpus = set_num_cpus(num_files, args.processes)
469 # Set a timeout for get()s in the queue.
470 timeout = 0.05
471
472 # Initialize the snp_finder object.
473 snp_finder = SnpFinder(num_files, args.dbkey, args.input_excel, args.all_isolates, args.ac, args.min_mq, args.quality_score_n_threshold, args.min_quality_score, args.input_vcf_dir, args.output_json_avg_mq_dir, args.output_json_snps_dir, args.output_snps_dir, args.output_summary)
474
475 # Define and make the set of directories into which the input_zc_vcf
476 # files will be placed. Selected input values (e.g., the use of
477 # an Excel file for grouping and filtering, creating a group with
478 # all isolates) are used to define the directories.
479 vcf_dirs = []
480 if args.input_excel is None:
481 vcf_dirs = setup_all_vcfs(vcf_files, vcf_dirs)
482 else:
483 if args.all_isolates:
484 vcf_dirs = setup_all_vcfs(vcf_files, vcf_dirs)
485 # Parse the Excel file to detemine groups for filtering.
486 snp_finder.group_vcfs(vcf_files)
487 # Append the list of group directories created by
488 # the above call to the set of directories containing
489 # vcf files for analysis.
490 group_dirs = [d for d in os.listdir(os.getcwd()) if os.path.isdir(d) and d in snp_finder.groups]
491 vcf_dirs.extend(group_dirs)
492
493 # Populate the queue for job splitting.
494 for vcf_dir in vcf_dirs:
495 queue1.put(vcf_dir)
496
497 # Complete the get_snps task.
498 processes = [multiprocessing.Process(target=snp_finder.get_snps, args=(queue1, timeout, )) for _ in range(cpus)]
499 for p in processes:
500 p.start()
501 for p in processes:
502 p.join()
503 queue1.join()
504
505 # Finish summary log.
506 snp_finder.append_to_summary("<br/><b>Time finished:</b> %s<br/>\n" % get_time_stamp())
507 total_run_time = datetime.now() - snp_finder.timer_start
508 snp_finder.append_to_summary("<br/><b>Total run time:</b> %s<br/>\n" % str(total_run_time))
509 snp_finder.append_to_summary('</body>\n</html>\n')
510 with open(args.output_summary, "w") as fh:
511 fh.write("%s" % snp_finder.summary_str)