comparison vsnp_build_tables.py @ 7:57bd5b859e86 draft default tip

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/vsnp commit c38fd63f7980c70390d104a73ba4c72b266444c3
author iuc
date Fri, 10 Jun 2022 06:10:23 +0000
parents 532a11cdd818
children
comparison
equal deleted inserted replaced
6:532a11cdd818 7:57bd5b859e86
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 2
3 import argparse 3 import argparse
4 import itertools
4 import multiprocessing 5 import multiprocessing
5 import os 6 import os
6 import queue 7 import queue
7 import re 8 import re
8 9
151 # Eliminate the extension. 152 # Eliminate the extension.
152 return os.path.splitext(base_file_name)[0] 153 return os.path.splitext(base_file_name)[0]
153 return base_file_name 154 return base_file_name
154 155
155 156
156 def output_cascade_table(cascade_order, mqdf, group, annotation_dict):
157 cascade_order_mq = pandas.concat([cascade_order, mqdf], join='inner')
158 output_table(cascade_order_mq, "cascade", group, annotation_dict)
159
160
161 def output_excel(df, type_str, group, annotation_dict, count=None): 157 def output_excel(df, type_str, group, annotation_dict, count=None):
162 # Output the temporary json file that 158 # Output the temporary json file that
163 # is used by the excel_formatter. 159 # is used by the excel_formatter.
164 if count is None: 160 if count is None:
165 if group is None: 161 if group is None:
179 json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_order_mq_%d.json" % (group, type_str, count)) 175 json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_order_mq_%d.json" % (group, type_str, count))
180 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table_%d.xlsx" % (group, type_str, count)) 176 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table_%d.xlsx" % (group, type_str, count))
181 df.to_json(json_file_name, orient='split') 177 df.to_json(json_file_name, orient='split')
182 # Output the Excel file. 178 # Output the Excel file.
183 excel_formatter(json_file_name, excel_file_name, group, annotation_dict) 179 excel_formatter(json_file_name, excel_file_name, group, annotation_dict)
184
185
186 def output_sort_table(cascade_order, mqdf, group, annotation_dict):
187 sort_df = cascade_order.T
188 sort_df['abs_value'] = sort_df.index
189 sort_df[['chrom', 'pos']] = sort_df['abs_value'].str.split(':', expand=True)
190 sort_df = sort_df.drop(['abs_value', 'chrom'], axis=1)
191 sort_df.pos = sort_df.pos.astype(int)
192 sort_df = sort_df.sort_values(by=['pos'])
193 sort_df = sort_df.drop(['pos'], axis=1)
194 sort_df = sort_df.T
195 sort_order_mq = pandas.concat([sort_df, mqdf], join='inner')
196 output_table(sort_order_mq, "sort", group, annotation_dict)
197 180
198 181
199 def output_table(df, type_str, group, annotation_dict): 182 def output_table(df, type_str, group, annotation_dict):
200 if isinstance(group, str) and group.startswith("dataset"): 183 if isinstance(group, str) and group.startswith("dataset"):
201 # Inputs are single files, not collections, 184 # Inputs are single files, not collections,
231 try: 214 try:
232 tup = task_queue.get(block=True, timeout=timeout) 215 tup = task_queue.get(block=True, timeout=timeout)
233 except queue.Empty: 216 except queue.Empty:
234 break 217 break
235 newick_file, json_file, json_avg_mq_file = tup 218 newick_file, json_file, json_avg_mq_file = tup
236 avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split')
237 # Map quality to dataframe.
238 mqdf = avg_mq_series.to_frame(name='MQ')
239 mqdf = mqdf.T
240 # Get the group. 219 # Get the group.
241 group = get_sample_name(newick_file) 220 group = get_sample_name(newick_file)
242 snps_df = pandas.read_json(json_file, orient='split') 221 snps_df = pandas.read_json(json_file, orient='split')
243 with open(newick_file, 'r') as fh: 222 with open(newick_file, 'r') as fh:
244 for line in fh: 223 for line in fh:
245 line = re.sub('[:,]', '\n', line) 224 line = re.sub('[:,]', '\n', line)
246 line = re.sub('[)(]', '', line) 225 line = re.sub('[)(]', '', line)
247 line = re.sub(r'[0-9].*\.[0-9].*\n', '', line) 226 line = re.sub(r'[0-9].*\.[0-9].*\n', '', line)
227 line = re.sub("'", '', line)
248 line = re.sub('root\n', '', line) 228 line = re.sub('root\n', '', line)
249 sample_order = line.split('\n') 229 sample_order = line.split('\n')
250 sample_order = list([_f for _f in sample_order if _f]) 230 sample_order = list(filter(None, sample_order))
251 sample_order.insert(0, 'root') 231 sample_order.insert(0, 'root')
252 tree_order = snps_df.loc[sample_order] 232 tree_order_df = snps_df.loc[sample_order]
233 # Output the sorted table.
234 output_table(tree_order_df, "sort", group, annotation_dict)
253 # Count number of SNPs in each column. 235 # Count number of SNPs in each column.
254 snp_per_column = [] 236 snp_per_column = []
255 for column_header in tree_order: 237 for column_header in tree_order_df:
256 count = 0 238 count = 0
257 column = tree_order[column_header] 239 column = tree_order_df[column_header]
258 for element in column: 240 for element in column:
259 if element != column[0]: 241 # column[0] is top row/root/reference,
242 # element is everything below it.
243 if element != column[0] and element != '-':
260 count = count + 1 244 count = count + 1
261 snp_per_column.append(count) 245 snp_per_column.append(count)
262 row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column") 246 row1 = pandas.Series(snp_per_column, tree_order_df.columns, name="snp_per_column")
263 # Count number of SNPS from the 247 # Count number of SNPS from the
264 # top of each column in the table. 248 # top of each column in the table.
265 snp_from_top = [] 249 snp_from_top = []
266 for column_header in tree_order: 250 for column_header in tree_order_df:
267 count = 0 251 count = 0
268 column = tree_order[column_header] 252 column = tree_order_df[column_header]
269 # for each element in the column 253 index_list_of_ref_differences = []
270 # skip the first element 254 for ind, list_item in enumerate(column[1:].to_list()):
271 for element in column[1:]: 255 if list_item not in [column[0], '-']:
272 if element == column[0]: 256 index_list_of_ref_differences.append(ind)
273 count = count + 1 257 c = itertools.count()
274 else: 258 val = max((list(g) for _, g in itertools.groupby(index_list_of_ref_differences, lambda x: x - next(c))), key=len)
275 break 259 # Starting row number with longest continous SNPs in column
276 snp_from_top.append(count) 260 snp_from_top.append(val[0])
277 row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top") 261 row2 = pandas.Series(snp_from_top, tree_order_df.columns, name="snp_from_top")
278 tree_order = tree_order.append([row1]) 262 tree_order_df = tree_order_df.append([row1])
279 tree_order = tree_order.append([row2]) 263 tree_order_df = tree_order_df.append([row2])
280 # In pandas=0.18.1 even this does not work: 264 tree_order_df = tree_order_df.T
281 # abc = row1.to_frame() 265 tree_order_df = tree_order_df.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
282 # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18) 266 tree_order_df = tree_order_df.T
283 # tree_order.append(abc)
284 # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions"
285 tree_order = tree_order.T
286 tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False])
287 tree_order = tree_order.T
288 # Remove snp_per_column and snp_from_top rows. 267 # Remove snp_per_column and snp_from_top rows.
289 cascade_order = tree_order[:-2] 268 cascade_order_df = tree_order_df[:-2]
290 # Output the cascade table. 269 # Output the cascade table.
291 output_cascade_table(cascade_order, mqdf, group, annotation_dict) 270 output_table(cascade_order_df, "cascade", group, annotation_dict)
292 # Output the sorted table.
293 output_sort_table(cascade_order, mqdf, group, annotation_dict)
294 task_queue.task_done() 271 task_queue.task_done()
295
296
297 def set_num_cpus(num_files, processes):
298 num_cpus = len(os.sched_getaffinity(0))
299 if num_files < num_cpus and num_files < processes:
300 return num_files
301 if num_cpus < processes:
302 half_cpus = int(num_cpus / 2)
303 if num_files < half_cpus:
304 return num_files
305 return half_cpus
306 return processes
307 272
308 273
309 if __name__ == '__main__': 274 if __name__ == '__main__':
310 parser = argparse.ArgumentParser() 275 parser = argparse.ArgumentParser()
311 276
355 320
356 multiprocessing.set_start_method('spawn') 321 multiprocessing.set_start_method('spawn')
357 queue1 = multiprocessing.JoinableQueue() 322 queue1 = multiprocessing.JoinableQueue()
358 queue2 = multiprocessing.JoinableQueue() 323 queue2 = multiprocessing.JoinableQueue()
359 num_files = len(newick_files) 324 num_files = len(newick_files)
360 cpus = set_num_cpus(num_files, args.processes)
361 # Set a timeout for get()s in the queue. 325 # Set a timeout for get()s in the queue.
362 timeout = 0.05 326 timeout = 0.05
363 327
364 for i, newick_file in enumerate(newick_files): 328 for i, newick_file in enumerate(newick_files):
365 json_file = json_files[i] 329 json_file = json_files[i]
366 json_avg_mq_file = json_avg_mq_files[i] 330 json_avg_mq_file = json_avg_mq_files[i]
367 queue1.put((newick_file, json_file, json_avg_mq_file)) 331 queue1.put((newick_file, json_file, json_avg_mq_file))
368 332
369 # Complete the preprocess_tables task. 333 # Complete the preprocess_tables task.
370 processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(cpus)] 334 processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(args.processes)]
371 for p in processes: 335 for p in processes:
372 p.start() 336 p.start()
373 for p in processes: 337 for p in processes:
374 p.join() 338 p.join()
375 queue1.join() 339 queue1.join()