Mercurial > repos > recetox > table_pandas_transform
diff utils.py @ 0:b722dba91064 draft default tip
planemo upload for repository https://github.com/RECETOX/galaxytools/tree/master/tools/tables commit d0ff40eb2b536fec6c973c3a9ea8e7f31cd9a0d6
author | recetox |
---|---|
date | Wed, 29 Jan 2025 15:35:51 +0000 (2 months ago) |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/utils.py Wed Jan 29 15:35:51 2025 +0000 @@ -0,0 +1,130 @@ +import argparse +from typing import Tuple + + +import pandas as pd + + +class KeyValuePairsAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + """ + Parse key=value pairs from the command line arguments. + + Parameters: + parser (argparse.ArgumentParser): The argument parser instance. + namespace (argparse.Namespace): The namespace to hold the parsed values. + values (list): The list of key=value pairs. + option_string (str): The option string. + + Sets: + namespace.dest (dict): A dictionary with 1-based column index as key and new column name as value. + """ + key_value_pairs = {} + for item in values: + try: + key, value = item.split("=") + key_value_pairs[int(key)] = value # Convert key to integer + except ValueError: + parser.error( + f"Invalid format for --rename: {item}. Expected format: key=value" + ) + setattr(namespace, self.dest, key_value_pairs) + + +class LoadDataAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + file_path, file_extension = values + file_extension = file_extension.lower() + if file_extension == "csv": + df = pd.read_csv(file_path) + elif file_extension in ["tsv", "tabular"]: + df = pd.read_csv(file_path, sep="\t") + elif file_extension == "parquet": + df = pd.read_parquet(file_path) + else: + raise ValueError(f"Unsupported file format: {file_extension}") + setattr(namespace, self.dest, df) + + +def write_csv(df: pd.DataFrame, file_path: str) -> None: + """ + Write the dataframe to a CSV file. + + Parameters: + df (pd.DataFrame): The dataframe to write. + file_path (str): The path to the output CSV file. + """ + df.to_csv(file_path, index=False) + + +def write_tsv(df: pd.DataFrame, file_path: str) -> None: + """ + Write the dataframe to a TSV file. + + Parameters: + df (pd.DataFrame): The dataframe to write. + file_path (str): The path to the output TSV file. + """ + df.to_csv(file_path, sep="\t", index=False) + + +def write_parquet(df: pd.DataFrame, file_path: str) -> None: + """ + Write the dataframe to a Parquet file. + + Parameters: + df (pd.DataFrame): The dataframe to write. + file_path (str): The path to the output Parquet file. + """ + df.to_parquet(file_path, index=False) + + +class StoreOutputAction(argparse.Action): + def __call__( + self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: Tuple[str, str], + option_string: str = None, + ) -> None: + """ + Custom argparse action to store the output function and file path based on file extension. + + Parameters: + parser (argparse.ArgumentParser): The argument parser instance. + namespace (argparse.Namespace): The namespace to hold the parsed values. + values (Tuple[str, str]): The file path and file extension. + option_string (str): The option string. + """ + file_path, file_extension = values + file_extension = file_extension.lower() + if file_extension == "csv": + write_func = write_csv + elif file_extension in ["tsv", "tabular"]: + write_func = write_tsv + elif file_extension == "parquet": + write_func = write_parquet + else: + raise ValueError(f"Unsupported file format: {file_extension}") + setattr(namespace, self.dest, (write_func, file_path)) + + +class SplitColumnIndicesAction(argparse.Action): + def __call__( + self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: str, + option_string: str = None, + ) -> None: + """ + Custom argparse action to split a comma-separated list of column indices and convert to 0-based indices. + + Parameters: + parser (argparse.ArgumentParser): The argument parser instance. + namespace (argparse.Namespace): The namespace to hold the parsed values. + values (str): The comma-separated list of 1-based column indices. + option_string (str): The option string. + """ + indices = [int(x) - 1 for x in values.split(",")] # Convert to 0-based indices + setattr(namespace, self.dest, indices)