view utils.py @ 0:b722dba91064 draft default tip

planemo upload for repository https://github.com/RECETOX/galaxytools/tree/master/tools/tables commit d0ff40eb2b536fec6c973c3a9ea8e7f31cd9a0d6
author recetox
date Wed, 29 Jan 2025 15:35:51 +0000
parents
children
line wrap: on
line source

import argparse
from typing import Tuple


import pandas as pd


class KeyValuePairsAction(argparse.Action):
    def __call__(self, parser, namespace, values, option_string=None):
        """
        Parse key=value pairs from the command line arguments.

        Parameters:
        parser (argparse.ArgumentParser): The argument parser instance.
        namespace (argparse.Namespace): The namespace to hold the parsed values.
        values (list): The list of key=value pairs.
        option_string (str): The option string.

        Sets:
        namespace.dest (dict): A dictionary with 1-based column index as key and new column name as value.
        """
        key_value_pairs = {}
        for item in values:
            try:
                key, value = item.split("=")
                key_value_pairs[int(key)] = value  # Convert key to integer
            except ValueError:
                parser.error(
                    f"Invalid format for --rename: {item}. Expected format: key=value"
                )
        setattr(namespace, self.dest, key_value_pairs)


class LoadDataAction(argparse.Action):
    def __call__(self, parser, namespace, values, option_string=None):
        file_path, file_extension = values
        file_extension = file_extension.lower()
        if file_extension == "csv":
            df = pd.read_csv(file_path)
        elif file_extension in ["tsv", "tabular"]:
            df = pd.read_csv(file_path, sep="\t")
        elif file_extension == "parquet":
            df = pd.read_parquet(file_path)
        else:
            raise ValueError(f"Unsupported file format: {file_extension}")
        setattr(namespace, self.dest, df)


def write_csv(df: pd.DataFrame, file_path: str) -> None:
    """
    Write the dataframe to a CSV file.

    Parameters:
    df (pd.DataFrame): The dataframe to write.
    file_path (str): The path to the output CSV file.
    """
    df.to_csv(file_path, index=False)


def write_tsv(df: pd.DataFrame, file_path: str) -> None:
    """
    Write the dataframe to a TSV file.

    Parameters:
    df (pd.DataFrame): The dataframe to write.
    file_path (str): The path to the output TSV file.
    """
    df.to_csv(file_path, sep="\t", index=False)


def write_parquet(df: pd.DataFrame, file_path: str) -> None:
    """
    Write the dataframe to a Parquet file.

    Parameters:
    df (pd.DataFrame): The dataframe to write.
    file_path (str): The path to the output Parquet file.
    """
    df.to_parquet(file_path, index=False)


class StoreOutputAction(argparse.Action):
    def __call__(
        self,
        parser: argparse.ArgumentParser,
        namespace: argparse.Namespace,
        values: Tuple[str, str],
        option_string: str = None,
    ) -> None:
        """
        Custom argparse action to store the output function and file path based on file extension.

        Parameters:
        parser (argparse.ArgumentParser): The argument parser instance.
        namespace (argparse.Namespace): The namespace to hold the parsed values.
        values (Tuple[str, str]): The file path and file extension.
        option_string (str): The option string.
        """
        file_path, file_extension = values
        file_extension = file_extension.lower()
        if file_extension == "csv":
            write_func = write_csv
        elif file_extension in ["tsv", "tabular"]:
            write_func = write_tsv
        elif file_extension == "parquet":
            write_func = write_parquet
        else:
            raise ValueError(f"Unsupported file format: {file_extension}")
        setattr(namespace, self.dest, (write_func, file_path))


class SplitColumnIndicesAction(argparse.Action):
    def __call__(
        self,
        parser: argparse.ArgumentParser,
        namespace: argparse.Namespace,
        values: str,
        option_string: str = None,
    ) -> None:
        """
        Custom argparse action to split a comma-separated list of column indices and convert to 0-based indices.

        Parameters:
        parser (argparse.ArgumentParser): The argument parser instance.
        namespace (argparse.Namespace): The namespace to hold the parsed values.
        values (str): The comma-separated list of 1-based column indices.
        option_string (str): The option string.
        """
        indices = [int(x) - 1 for x in values.split(",")]  # Convert to 0-based indices
        setattr(namespace, self.dest, indices)