Mercurial > repos > iuc > vsnp_add_zero_coverage
comparison vsnp_build_tables.py @ 9:40b97055bb99 draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/vsnp commit c38fd63f7980c70390d104a73ba4c72b266444c3
author | iuc |
---|---|
date | Fri, 10 Jun 2022 06:08:02 +0000 |
parents | 18b59c38017e |
children |
comparison
equal
deleted
inserted
replaced
8:18b59c38017e | 9:40b97055bb99 |
---|---|
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 | 2 |
3 import argparse | 3 import argparse |
4 import itertools | |
4 import multiprocessing | 5 import multiprocessing |
5 import os | 6 import os |
6 import queue | 7 import queue |
7 import re | 8 import re |
8 | 9 |
151 # Eliminate the extension. | 152 # Eliminate the extension. |
152 return os.path.splitext(base_file_name)[0] | 153 return os.path.splitext(base_file_name)[0] |
153 return base_file_name | 154 return base_file_name |
154 | 155 |
155 | 156 |
156 def output_cascade_table(cascade_order, mqdf, group, annotation_dict): | |
157 cascade_order_mq = pandas.concat([cascade_order, mqdf], join='inner') | |
158 output_table(cascade_order_mq, "cascade", group, annotation_dict) | |
159 | |
160 | |
161 def output_excel(df, type_str, group, annotation_dict, count=None): | 157 def output_excel(df, type_str, group, annotation_dict, count=None): |
162 # Output the temporary json file that | 158 # Output the temporary json file that |
163 # is used by the excel_formatter. | 159 # is used by the excel_formatter. |
164 if count is None: | 160 if count is None: |
165 if group is None: | 161 if group is None: |
179 json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_order_mq_%d.json" % (group, type_str, count)) | 175 json_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_order_mq_%d.json" % (group, type_str, count)) |
180 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table_%d.xlsx" % (group, type_str, count)) | 176 excel_file_name = os.path.join(OUTPUT_EXCEL_DIR, "%s_%s_table_%d.xlsx" % (group, type_str, count)) |
181 df.to_json(json_file_name, orient='split') | 177 df.to_json(json_file_name, orient='split') |
182 # Output the Excel file. | 178 # Output the Excel file. |
183 excel_formatter(json_file_name, excel_file_name, group, annotation_dict) | 179 excel_formatter(json_file_name, excel_file_name, group, annotation_dict) |
184 | |
185 | |
186 def output_sort_table(cascade_order, mqdf, group, annotation_dict): | |
187 sort_df = cascade_order.T | |
188 sort_df['abs_value'] = sort_df.index | |
189 sort_df[['chrom', 'pos']] = sort_df['abs_value'].str.split(':', expand=True) | |
190 sort_df = sort_df.drop(['abs_value', 'chrom'], axis=1) | |
191 sort_df.pos = sort_df.pos.astype(int) | |
192 sort_df = sort_df.sort_values(by=['pos']) | |
193 sort_df = sort_df.drop(['pos'], axis=1) | |
194 sort_df = sort_df.T | |
195 sort_order_mq = pandas.concat([sort_df, mqdf], join='inner') | |
196 output_table(sort_order_mq, "sort", group, annotation_dict) | |
197 | 180 |
198 | 181 |
199 def output_table(df, type_str, group, annotation_dict): | 182 def output_table(df, type_str, group, annotation_dict): |
200 if isinstance(group, str) and group.startswith("dataset"): | 183 if isinstance(group, str) and group.startswith("dataset"): |
201 # Inputs are single files, not collections, | 184 # Inputs are single files, not collections, |
231 try: | 214 try: |
232 tup = task_queue.get(block=True, timeout=timeout) | 215 tup = task_queue.get(block=True, timeout=timeout) |
233 except queue.Empty: | 216 except queue.Empty: |
234 break | 217 break |
235 newick_file, json_file, json_avg_mq_file = tup | 218 newick_file, json_file, json_avg_mq_file = tup |
236 avg_mq_series = pandas.read_json(json_avg_mq_file, typ='series', orient='split') | |
237 # Map quality to dataframe. | |
238 mqdf = avg_mq_series.to_frame(name='MQ') | |
239 mqdf = mqdf.T | |
240 # Get the group. | 219 # Get the group. |
241 group = get_sample_name(newick_file) | 220 group = get_sample_name(newick_file) |
242 snps_df = pandas.read_json(json_file, orient='split') | 221 snps_df = pandas.read_json(json_file, orient='split') |
243 with open(newick_file, 'r') as fh: | 222 with open(newick_file, 'r') as fh: |
244 for line in fh: | 223 for line in fh: |
245 line = re.sub('[:,]', '\n', line) | 224 line = re.sub('[:,]', '\n', line) |
246 line = re.sub('[)(]', '', line) | 225 line = re.sub('[)(]', '', line) |
247 line = re.sub(r'[0-9].*\.[0-9].*\n', '', line) | 226 line = re.sub(r'[0-9].*\.[0-9].*\n', '', line) |
227 line = re.sub("'", '', line) | |
248 line = re.sub('root\n', '', line) | 228 line = re.sub('root\n', '', line) |
249 sample_order = line.split('\n') | 229 sample_order = line.split('\n') |
250 sample_order = list([_f for _f in sample_order if _f]) | 230 sample_order = list(filter(None, sample_order)) |
251 sample_order.insert(0, 'root') | 231 sample_order.insert(0, 'root') |
252 tree_order = snps_df.loc[sample_order] | 232 tree_order_df = snps_df.loc[sample_order] |
233 # Output the sorted table. | |
234 output_table(tree_order_df, "sort", group, annotation_dict) | |
253 # Count number of SNPs in each column. | 235 # Count number of SNPs in each column. |
254 snp_per_column = [] | 236 snp_per_column = [] |
255 for column_header in tree_order: | 237 for column_header in tree_order_df: |
256 count = 0 | 238 count = 0 |
257 column = tree_order[column_header] | 239 column = tree_order_df[column_header] |
258 for element in column: | 240 for element in column: |
259 if element != column[0]: | 241 # column[0] is top row/root/reference, |
242 # element is everything below it. | |
243 if element != column[0] and element != '-': | |
260 count = count + 1 | 244 count = count + 1 |
261 snp_per_column.append(count) | 245 snp_per_column.append(count) |
262 row1 = pandas.Series(snp_per_column, tree_order.columns, name="snp_per_column") | 246 row1 = pandas.Series(snp_per_column, tree_order_df.columns, name="snp_per_column") |
263 # Count number of SNPS from the | 247 # Count number of SNPS from the |
264 # top of each column in the table. | 248 # top of each column in the table. |
265 snp_from_top = [] | 249 snp_from_top = [] |
266 for column_header in tree_order: | 250 for column_header in tree_order_df: |
267 count = 0 | 251 count = 0 |
268 column = tree_order[column_header] | 252 column = tree_order_df[column_header] |
269 # for each element in the column | 253 index_list_of_ref_differences = [] |
270 # skip the first element | 254 for ind, list_item in enumerate(column[1:].to_list()): |
271 for element in column[1:]: | 255 if list_item not in [column[0], '-']: |
272 if element == column[0]: | 256 index_list_of_ref_differences.append(ind) |
273 count = count + 1 | 257 c = itertools.count() |
274 else: | 258 val = max((list(g) for _, g in itertools.groupby(index_list_of_ref_differences, lambda x: x - next(c))), key=len) |
275 break | 259 # Starting row number with longest continous SNPs in column |
276 snp_from_top.append(count) | 260 snp_from_top.append(val[0]) |
277 row2 = pandas.Series(snp_from_top, tree_order.columns, name="snp_from_top") | 261 row2 = pandas.Series(snp_from_top, tree_order_df.columns, name="snp_from_top") |
278 tree_order = tree_order.append([row1]) | 262 tree_order_df = tree_order_df.append([row1]) |
279 tree_order = tree_order.append([row2]) | 263 tree_order_df = tree_order_df.append([row2]) |
280 # In pandas=0.18.1 even this does not work: | 264 tree_order_df = tree_order_df.T |
281 # abc = row1.to_frame() | 265 tree_order_df = tree_order_df.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False]) |
282 # abc = abc.T --> tree_order.shape (5, 18), abc.shape (1, 18) | 266 tree_order_df = tree_order_df.T |
283 # tree_order.append(abc) | |
284 # Continue to get error: "*** ValueError: all the input arrays must have same number of dimensions" | |
285 tree_order = tree_order.T | |
286 tree_order = tree_order.sort_values(['snp_from_top', 'snp_per_column'], ascending=[True, False]) | |
287 tree_order = tree_order.T | |
288 # Remove snp_per_column and snp_from_top rows. | 267 # Remove snp_per_column and snp_from_top rows. |
289 cascade_order = tree_order[:-2] | 268 cascade_order_df = tree_order_df[:-2] |
290 # Output the cascade table. | 269 # Output the cascade table. |
291 output_cascade_table(cascade_order, mqdf, group, annotation_dict) | 270 output_table(cascade_order_df, "cascade", group, annotation_dict) |
292 # Output the sorted table. | |
293 output_sort_table(cascade_order, mqdf, group, annotation_dict) | |
294 task_queue.task_done() | 271 task_queue.task_done() |
295 | |
296 | |
297 def set_num_cpus(num_files, processes): | |
298 num_cpus = len(os.sched_getaffinity(0)) | |
299 if num_files < num_cpus and num_files < processes: | |
300 return num_files | |
301 if num_cpus < processes: | |
302 half_cpus = int(num_cpus / 2) | |
303 if num_files < half_cpus: | |
304 return num_files | |
305 return half_cpus | |
306 return processes | |
307 | 272 |
308 | 273 |
309 if __name__ == '__main__': | 274 if __name__ == '__main__': |
310 parser = argparse.ArgumentParser() | 275 parser = argparse.ArgumentParser() |
311 | 276 |
355 | 320 |
356 multiprocessing.set_start_method('spawn') | 321 multiprocessing.set_start_method('spawn') |
357 queue1 = multiprocessing.JoinableQueue() | 322 queue1 = multiprocessing.JoinableQueue() |
358 queue2 = multiprocessing.JoinableQueue() | 323 queue2 = multiprocessing.JoinableQueue() |
359 num_files = len(newick_files) | 324 num_files = len(newick_files) |
360 cpus = set_num_cpus(num_files, args.processes) | |
361 # Set a timeout for get()s in the queue. | 325 # Set a timeout for get()s in the queue. |
362 timeout = 0.05 | 326 timeout = 0.05 |
363 | 327 |
364 for i, newick_file in enumerate(newick_files): | 328 for i, newick_file in enumerate(newick_files): |
365 json_file = json_files[i] | 329 json_file = json_files[i] |
366 json_avg_mq_file = json_avg_mq_files[i] | 330 json_avg_mq_file = json_avg_mq_files[i] |
367 queue1.put((newick_file, json_file, json_avg_mq_file)) | 331 queue1.put((newick_file, json_file, json_avg_mq_file)) |
368 | 332 |
369 # Complete the preprocess_tables task. | 333 # Complete the preprocess_tables task. |
370 processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(cpus)] | 334 processes = [multiprocessing.Process(target=preprocess_tables, args=(queue1, annotation_dict, timeout, )) for _ in range(args.processes)] |
371 for p in processes: | 335 for p in processes: |
372 p.start() | 336 p.start() |
373 for p in processes: | 337 for p in processes: |
374 p.join() | 338 p.join() |
375 queue1.join() | 339 queue1.join() |