Mercurial > repos > iuc > table_compute
view scripts/table_compute.py @ 4:93a3ce78ce55 draft
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/table_compute commit d741508e5ed912cdeee4f67ec8451b6e6865363c"
author | iuc |
---|---|
date | Tue, 20 Apr 2021 15:46:10 +0000 |
parents | 02c3e335a695 |
children | 3bf5661c0280 |
line wrap: on
line source
#!/usr/bin/env python3 """ Table Compute tool - a wrapper around pandas with parameter input validation. """ __version__ = "0.9.2" import csv import math from sys import argv import numpy as np import pandas as pd from safety import Safety if len(argv) == 2 and argv[1] == "--version": print(__version__) exit(-1) # The import below should be generated in the same directory as # the table_compute.py script. # It is placed here so that the --version switch does not fail import userconfig as uc # noqa: I100,I202 class Utils: @staticmethod def getOneValueMathOp(op_name): "Returns a simple one value math operator such as log, sqrt, etc" return getattr(math, op_name) @staticmethod def getVectorPandaOp(op_name): "Returns a valid DataFrame vector operator" return getattr(pd.DataFrame, op_name) @staticmethod def getTwoValuePandaOp(op_name, pd_obj): "Returns a valid two value DataFrame or Series operator" return getattr(type(pd_obj), "__" + op_name + "__") @staticmethod def readcsv(filedict, narm): data = pd.read_csv( filedict["file"], header=filedict["header"], index_col=filedict["row_names"], keep_default_na=narm, nrows=filedict["nrows"], skipfooter=filedict["skipfooter"], skip_blank_lines=filedict["skip_blank_lines"], sep='\t' ) # Fix whitespace issues in index or column names data.columns = [col.strip() if type(col) is str else col for col in data.columns] data.index = [row.strip() if type(row) is str else row for row in data.index] return(data) @staticmethod def rangemaker(tab): # e.g. "1:3,2:-2" specifies "1,2,3,2,1,0,-1,-2" to give [0,1,2,1,0,-1,-2] # Positive indices are decremented by 1 to reference 0-base numbering # Negative indices are unaltered, so that -1 refers to the last column out = [] err_mess = None for ranges in tab.split(","): nums = ranges.split(":") if len(nums) == 1: numb = int(nums[0]) # Positive numbers get decremented. # i.e. column "3" refers to index 2 # column "-1" still refers to index -1 if numb != 0: out.append(numb if (numb < 0) else (numb - 1)) else: err_mess = "Please do not use 0 as an index" elif len(nums) == 2: left, right = map(int, nums) if 0 in (left, right): err_mess = "Please do not use 0 as an index" elif left < right: if left > 0: # and right > 0 too # 1:3 to 0,1,2 out.extend(range(left - 1, right)) elif right < 0: # and left < 0 too # -3:-1 to -3,-2,-1 out.extend(range(left, right + 1)) elif left < 0 and right > 0: # -2:2 to -2,-1,0,1 out.extend(range(left, 0)) out.extend(range(0, right)) elif right < left: if right > 0: # and left > 0 # 3:1 to 2,1,0 out.extend(range(left - 1, right - 2, -1)) elif left < 0: # and right < 0 # -1:-3 to -1,-2,-3 out.extend(range(left, right - 1, -1)) elif right < 0 and left > 0: # 2:-2 to 1,0,-1,-2 out.extend(range(left - 1, right - 1, -1)) else: err_mess = "%s should not be equal or contain a zero" % nums if err_mess: print(err_mess) return(None) return(out) # Set decimal precision pd.options.display.precision = uc.Default["precision"] user_mode = uc.Default["user_mode"] user_mode_single = None out_table = None params = uc.Data["params"] if user_mode == "single": # Read in TSV file data = Utils.readcsv(uc.Data["tables"][0], uc.Default["narm"]) user_mode_single = params["user_mode_single"] if user_mode_single == "precision": # Useful for changing decimal precision on write out out_table = data elif user_mode_single == "select": cols_specified = params["select_cols_wanted"] rows_specified = params["select_rows_wanted"] # Select all indexes if empty array of values if cols_specified: cols_specified = Utils.rangemaker(cols_specified) else: cols_specified = range(len(data.columns)) if rows_specified: rows_specified = Utils.rangemaker(rows_specified) else: rows_specified = range(len(data)) # do not use duplicate indexes # e.g. [2,3,2,5,5,4,2] to [2,3,5,4] nodupes_col = not params["select_cols_unique"] nodupes_row = not params["select_rows_unique"] if nodupes_col: cols_specified = [x for i, x in enumerate(cols_specified) if x not in cols_specified[:i]] if nodupes_row: rows_specified = [x for i, x in enumerate(rows_specified) if x not in rows_specified[:i]] out_table = data.iloc[rows_specified, cols_specified] elif user_mode_single == "filtersumval": mode = params["filtersumval_mode"] axis = params["filtersumval_axis"] operation = params["filtersumval_op"] compare_operation = params["filtersumval_compare"] value = params["filtersumval_against"] minmatch = params["filtersumval_minmatch"] if mode == "operation": # Perform axis operation summary_op = Utils.getVectorPandaOp(operation) axis_summary = summary_op(data, axis=axis) # Perform vector comparison compare_op = Utils.getTwoValuePandaOp( compare_operation, axis_summary ) axis_bool = compare_op(axis_summary, value) elif mode == "element": if operation.startswith("str_"): data = data.astype("str") value = str(value) # Convert str_eq to eq operation = operation[4:] else: value = float(value) op = Utils.getTwoValuePandaOp(operation, data) bool_mat = op(data, value) axis_bool = np.sum(bool_mat, axis=axis) >= minmatch out_table = data.loc[:, axis_bool] if axis == 0 else data.loc[axis_bool, :] elif user_mode_single == "matrixapply": # 0 - column, 1 - row axis = params["matrixapply_dimension"] # sd, mean, max, min, sum, median, summary operation = params["matrixapply_op"] if operation is None: use_custom = params["matrixapply_custom"] if use_custom: custom_func = params["matrixapply_custom_func"] def fun(vec): """Dummy Function""" return vec ss = Safety(custom_func, ['vec'], 'pd.Series') fun_string = ss.generateFunction() exec(fun_string) # SUPER DUPER SAFE... out_table = data.apply(fun, axis) else: print("No operation given") exit(-1) else: op = getattr(pd.DataFrame, operation) out_table = op(data, axis) elif user_mode_single == "element": # lt, gt, ge, etc. operation = params["element_op"] bool_mat = None if operation is not None: if operation == "rowcol": # Select all indexes if empty array of values if "element_cols" in params: cols_specified = Utils.rangemaker(params["element_cols"]) else: cols_specified = range(len(data.columns)) if "element_rows" in params: rows_specified = Utils.rangemaker(params["element_rows"]) else: rows_specified = range(len(data)) # Inclusive selection: # - True: Giving a row or column will match all elements in that row or column # - False: Give a row or column will match only elements in both those rows or columns inclusive = params["element_inclusive"] # Create a bool matrix (intialised to False) with selected # rows and columns set to True bool_mat = data.copy() bool_mat[:] = False if inclusive: bool_mat.iloc[rows_specified, :] = True bool_mat.iloc[:, cols_specified] = True else: bool_mat.iloc[rows_specified, cols_specified] = True else: op = Utils.getTwoValuePandaOp(operation, data) value = params["element_value"] try: # Could be numeric value = float(value) except ValueError: pass # generate filter matrix of True/False values bool_mat = op(data, value) else: # implement no filtering through a filter matrix filled with # True values. bool_mat = np.full(data.shape, True) # Get the main processing mode mode = params["element_mode"] if mode == "replace": replacement_val = params["element_replace"] out_table = data.mask( bool_mat, data.where(bool_mat).applymap( lambda x: replacement_val.format(elem=x) ) ) elif mode == "modify": mod_op = Utils.getOneValueMathOp(params["element_modify_op"]) out_table = data.mask( bool_mat, data.where(bool_mat).applymap(mod_op) ) elif mode == "scale": scale_op = Utils.getTwoValuePandaOp( params["element_scale_op"], data ) scale_value = params["element_scale_value"] out_table = data.mask( bool_mat, scale_op(data.where(bool_mat), scale_value) ) elif mode == "custom": element_customop = params["element_customop"] def fun(elem): """Dummy Function""" return elem ss = Safety(element_customop, ['elem']) fun_string = ss.generateFunction() exec(fun_string) # SUPER DUPER SAFE... out_table = data.mask( bool_mat, data.where(bool_mat).applymap(fun) ) else: print("No such element mode!", mode) exit(-1) elif user_mode_single == "fulltable": general_mode = params["mode"] if general_mode == "transpose": out_table = data.T elif general_mode == "melt": melt_ids = params["MELT"]["melt_ids"] melt_values = params["MELT"]["melt_values"] out_table = pd.melt(data, id_vars=melt_ids, value_vars=melt_values) elif general_mode == "pivot": pivot_index = params["PIVOT"]["pivot_index"] pivot_column = params["PIVOT"]["pivot_columns"] pivot_values = params["PIVOT"]["pivot_values"] pivot_aggfunc = params["PIVOT"]["pivot_aggfunc"] if not(pivot_aggfunc): out_table = data.pivot( index=pivot_index, columns=pivot_column, values=pivot_values ) else: out_table = data.pivot_table( index=pivot_index, columns=pivot_column, values=pivot_values, aggfunc=pivot_aggfunc ) elif general_mode == "custom": custom_func = params["fulltable_customop"] def fun(tableau): """Dummy Function""" return tableau ss = Safety(custom_func, ['table'], 'pd.DataFrame') fun_string = ss.generateFunction() exec(fun_string) # SUPER DUPER SAFE... out_table = fun(data) else: print("No such mode!", user_mode_single) exit(-1) elif user_mode == "multiple": table_sections = uc.Data["tables"] if not table_sections: print("Multiple table sets not given!") exit(-1) reader_skip = uc.Default["reader_skip"] # Data table = [] # 1-based handlers for users "table1", "table2", etc. table_names = [] # Actual 0-based references "table[0]", "table[1]", etc. table_names_real = [] # Read and populate tables for x, t_sect in enumerate(table_sections): tmp = Utils.readcsv(t_sect, uc.Default["narm"]) table.append(tmp) table_names.append("table" + str(x + 1)) table_names_real.append("table[" + str(x) + "]") custom_op = params["fulltable_customop"] ss = Safety(custom_op, table_names, 'pd.DataFrame') fun_string = ss.generateFunction() # Change the argument to table fun_string = fun_string.replace("fun(table1):", "fun():") # table1 to table[1] for name, name_real in zip(table_names, table_names_real): fun_string = fun_string.replace(name, name_real) fun_string = fun_string.replace("fun():", "fun(table):") exec(fun_string) # SUPER DUPER SAFE... out_table = fun(table) else: print("No such mode!", user_mode) exit(-1) if not isinstance(out_table, (pd.DataFrame, pd.Series)): print('The specified operation did not result in a table to return.') raise RuntimeError( 'The operation did not generate a pd.DataFrame or pd.Series to return.' ) out_parameters = { "sep": "\t", "float_format": "%%.%df" % pd.options.display.precision, "header": uc.Default["out_headers_col"], "index": uc.Default["out_headers_row"] } if user_mode_single not in ('matrixapply', None): out_parameters["quoting"] = csv.QUOTE_NONE out_table.to_csv(uc.Default["outtable"], **out_parameters)