Mercurial > repos > recetox > table_pandas_rename_column
comparison table_scipy_interpolate.py @ 0:3f54cd56a65e draft
planemo upload for repository https://github.com/RECETOX/galaxytools/tree/master/tools/tables commit d0ff40eb2b536fec6c973c3a9ea8e7f31cd9a0d6
| author | recetox |
|---|---|
| date | Wed, 29 Jan 2025 15:35:31 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:3f54cd56a65e |
|---|---|
| 1 import argparse | |
| 2 import logging | |
| 3 from typing import Callable, Tuple | |
| 4 | |
| 5 | |
| 6 import numpy as np | |
| 7 import pandas as pd | |
| 8 from scipy.interpolate import Akima1DInterpolator, CubicSpline, PchipInterpolator | |
| 9 from utils import LoadDataAction, StoreOutputAction | |
| 10 | |
| 11 | |
| 12 class InterpolationModelAction(argparse.Action): | |
| 13 def __call__( | |
| 14 self, | |
| 15 parser: argparse.ArgumentParser, | |
| 16 namespace: argparse.Namespace, | |
| 17 values: str, | |
| 18 option_string: str = None, | |
| 19 ) -> None: | |
| 20 """ | |
| 21 Custom argparse action to map interpolation method names to their corresponding functions. | |
| 22 | |
| 23 Parameters: | |
| 24 parser (argparse.ArgumentParser): The argument parser instance. | |
| 25 namespace (argparse.Namespace): The namespace to hold the parsed values. | |
| 26 values (str): The interpolation method name. | |
| 27 option_string (str): The option string. | |
| 28 """ | |
| 29 interpolators = { | |
| 30 "linear": np.interp, | |
| 31 "cubic": CubicSpline, | |
| 32 "pchip": PchipInterpolator, | |
| 33 "akima": Akima1DInterpolator, | |
| 34 } | |
| 35 if values not in interpolators: | |
| 36 raise ValueError(f"Unknown interpolation method: {values}") | |
| 37 setattr(namespace, self.dest, interpolators[values]) | |
| 38 | |
| 39 | |
| 40 def interpolate_data( | |
| 41 reference: pd.DataFrame, | |
| 42 query: pd.DataFrame, | |
| 43 x_col: int, | |
| 44 y_col: int, | |
| 45 xnew_col: int, | |
| 46 model: Callable, | |
| 47 output_dataset: Tuple[Callable[[pd.DataFrame, str], None], str], | |
| 48 ) -> None: | |
| 49 """ | |
| 50 Interpolate data using the specified model. | |
| 51 | |
| 52 Parameters: | |
| 53 reference (pd.DataFrame): The reference dataset. | |
| 54 query (pd.DataFrame): The query dataset. | |
| 55 x_col (int): The 1-based index of the x column in the reference dataset. | |
| 56 y_col (int): The 1-based index of the y column in the reference dataset. | |
| 57 xnew_col (int): The 1-based index of the x column in the query dataset. | |
| 58 model (Callable): The interpolation model to use. | |
| 59 output_dataset (Tuple[Callable[[pd.DataFrame, str], None], str]): The output dataset and its file extension. | |
| 60 """ | |
| 61 try: | |
| 62 # Convert 1-based indices to 0-based indices | |
| 63 x_col_name = reference.columns[x_col - 1] | |
| 64 y_col_name = reference.columns[y_col - 1] | |
| 65 xnew_col_name = query.columns[xnew_col - 1] | |
| 66 | |
| 67 # Check if y_col already exists in the query dataset | |
| 68 if y_col_name in query.columns: | |
| 69 raise ValueError( | |
| 70 f"Column '{y_col_name}' already exists in the query dataset." | |
| 71 ) | |
| 72 | |
| 73 if model == np.interp: | |
| 74 query[y_col_name] = model( | |
| 75 query[xnew_col_name], reference[x_col_name], reference[y_col_name] | |
| 76 ) | |
| 77 else: | |
| 78 model_instance = model(reference[x_col_name], reference[y_col_name]) | |
| 79 query[y_col_name] = model_instance(query[xnew_col_name]).astype(float) | |
| 80 | |
| 81 write_func, file_path = output_dataset | |
| 82 write_func(query, file_path) | |
| 83 except Exception as e: | |
| 84 logging.error(f"Error in interpolate_data function: {e}") | |
| 85 raise | |
| 86 | |
| 87 | |
| 88 def main( | |
| 89 reference_dataset: pd.DataFrame, | |
| 90 query_dataset: pd.DataFrame, | |
| 91 x_col: int, | |
| 92 y_col: int, | |
| 93 xnew_col: int, | |
| 94 model: Callable, | |
| 95 output_dataset: Tuple[Callable[[pd.DataFrame, str], None], str], | |
| 96 ) -> None: | |
| 97 """ | |
| 98 Main function to load the datasets, perform interpolation, and save the result. | |
| 99 | |
| 100 Parameters: | |
| 101 reference_dataset (Tuple[pd.DataFrame, str]): The reference dataset and its file extension. | |
| 102 query_dataset (Tuple[pd.DataFrame, str]): The query dataset and its file extension. | |
| 103 x_col (int): The 1-based index of the x column in the reference dataset. | |
| 104 y_col (int): The 1-based index of the y column in the reference dataset. | |
| 105 xnew_col (int): The 1-based index of the x column in the query dataset. | |
| 106 model (Callable): The interpolation model to use. | |
| 107 output_dataset (Tuple[Callable[[pd.DataFrame, str], None], str]): The output dataset and its file extension. | |
| 108 """ | |
| 109 try: | |
| 110 interpolate_data(reference_dataset, query_dataset, x_col, y_col, xnew_col, model, output_dataset) | |
| 111 except Exception as e: | |
| 112 logging.error(f"Error in main function: {e}") | |
| 113 raise | |
| 114 | |
| 115 | |
| 116 if __name__ == "__main__": | |
| 117 logging.basicConfig(level=logging.INFO) | |
| 118 parser = argparse.ArgumentParser( | |
| 119 description="Interpolate data using various methods." | |
| 120 ) | |
| 121 parser.add_argument( | |
| 122 "--reference_dataset", | |
| 123 nargs=2, | |
| 124 action=LoadDataAction, | |
| 125 required=True, | |
| 126 help="Path to the reference dataset and its file extension (csv, tsv, parquet)", | |
| 127 ) | |
| 128 parser.add_argument( | |
| 129 "--query_dataset", | |
| 130 nargs=2, | |
| 131 action=LoadDataAction, | |
| 132 required=True, | |
| 133 help="Path to the query dataset and its file extension (csv, tsv, parquet)", | |
| 134 ) | |
| 135 parser.add_argument( | |
| 136 "--x_col", | |
| 137 type=int, | |
| 138 required=True, | |
| 139 help="1-based index of the x column in the reference dataset", | |
| 140 ) | |
| 141 parser.add_argument( | |
| 142 "--y_col", | |
| 143 type=int, | |
| 144 required=True, | |
| 145 help="1-based index of the y column in the reference dataset", | |
| 146 ) | |
| 147 parser.add_argument( | |
| 148 "--xnew_col", | |
| 149 type=int, | |
| 150 required=True, | |
| 151 help="1-based index of the x column in the query dataset", | |
| 152 ) | |
| 153 parser.add_argument( | |
| 154 "--model", | |
| 155 type=str, | |
| 156 action=InterpolationModelAction, | |
| 157 required=True, | |
| 158 help="Interpolation model to use (linear, cubic, pchip, akima)", | |
| 159 ) | |
| 160 parser.add_argument( | |
| 161 "--output_dataset", | |
| 162 nargs=2, | |
| 163 action=StoreOutputAction, | |
| 164 required=True, | |
| 165 help="Path to the output dataset and its file extension (csv, tsv, parquet)", | |
| 166 ) | |
| 167 | |
| 168 args = parser.parse_args() | |
| 169 main( | |
| 170 args.reference_dataset, | |
| 171 args.query_dataset, | |
| 172 args.x_col, | |
| 173 args.y_col, | |
| 174 args.xnew_col, | |
| 175 args.model, | |
| 176 args.output_dataset, | |
| 177 ) |
