Mercurial > repos > recetox > table_pandas_rename_columns_regex
comparison utils.py @ 0:505a8e975968 draft
planemo upload for repository https://github.com/RECETOX/galaxytools/tree/master/tools/tables commit d0ff40eb2b536fec6c973c3a9ea8e7f31cd9a0d6
| author | recetox |
|---|---|
| date | Wed, 29 Jan 2025 15:35:08 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:505a8e975968 |
|---|---|
| 1 import argparse | |
| 2 from typing import Tuple | |
| 3 | |
| 4 | |
| 5 import pandas as pd | |
| 6 | |
| 7 | |
| 8 class KeyValuePairsAction(argparse.Action): | |
| 9 def __call__(self, parser, namespace, values, option_string=None): | |
| 10 """ | |
| 11 Parse key=value pairs from the command line arguments. | |
| 12 | |
| 13 Parameters: | |
| 14 parser (argparse.ArgumentParser): The argument parser instance. | |
| 15 namespace (argparse.Namespace): The namespace to hold the parsed values. | |
| 16 values (list): The list of key=value pairs. | |
| 17 option_string (str): The option string. | |
| 18 | |
| 19 Sets: | |
| 20 namespace.dest (dict): A dictionary with 1-based column index as key and new column name as value. | |
| 21 """ | |
| 22 key_value_pairs = {} | |
| 23 for item in values: | |
| 24 try: | |
| 25 key, value = item.split("=") | |
| 26 key_value_pairs[int(key)] = value # Convert key to integer | |
| 27 except ValueError: | |
| 28 parser.error( | |
| 29 f"Invalid format for --rename: {item}. Expected format: key=value" | |
| 30 ) | |
| 31 setattr(namespace, self.dest, key_value_pairs) | |
| 32 | |
| 33 | |
| 34 class LoadDataAction(argparse.Action): | |
| 35 def __call__(self, parser, namespace, values, option_string=None): | |
| 36 file_path, file_extension = values | |
| 37 file_extension = file_extension.lower() | |
| 38 if file_extension == "csv": | |
| 39 df = pd.read_csv(file_path) | |
| 40 elif file_extension in ["tsv", "tabular"]: | |
| 41 df = pd.read_csv(file_path, sep="\t") | |
| 42 elif file_extension == "parquet": | |
| 43 df = pd.read_parquet(file_path) | |
| 44 else: | |
| 45 raise ValueError(f"Unsupported file format: {file_extension}") | |
| 46 setattr(namespace, self.dest, df) | |
| 47 | |
| 48 | |
| 49 def write_csv(df: pd.DataFrame, file_path: str) -> None: | |
| 50 """ | |
| 51 Write the dataframe to a CSV file. | |
| 52 | |
| 53 Parameters: | |
| 54 df (pd.DataFrame): The dataframe to write. | |
| 55 file_path (str): The path to the output CSV file. | |
| 56 """ | |
| 57 df.to_csv(file_path, index=False) | |
| 58 | |
| 59 | |
| 60 def write_tsv(df: pd.DataFrame, file_path: str) -> None: | |
| 61 """ | |
| 62 Write the dataframe to a TSV file. | |
| 63 | |
| 64 Parameters: | |
| 65 df (pd.DataFrame): The dataframe to write. | |
| 66 file_path (str): The path to the output TSV file. | |
| 67 """ | |
| 68 df.to_csv(file_path, sep="\t", index=False) | |
| 69 | |
| 70 | |
| 71 def write_parquet(df: pd.DataFrame, file_path: str) -> None: | |
| 72 """ | |
| 73 Write the dataframe to a Parquet file. | |
| 74 | |
| 75 Parameters: | |
| 76 df (pd.DataFrame): The dataframe to write. | |
| 77 file_path (str): The path to the output Parquet file. | |
| 78 """ | |
| 79 df.to_parquet(file_path, index=False) | |
| 80 | |
| 81 | |
| 82 class StoreOutputAction(argparse.Action): | |
| 83 def __call__( | |
| 84 self, | |
| 85 parser: argparse.ArgumentParser, | |
| 86 namespace: argparse.Namespace, | |
| 87 values: Tuple[str, str], | |
| 88 option_string: str = None, | |
| 89 ) -> None: | |
| 90 """ | |
| 91 Custom argparse action to store the output function and file path based on file extension. | |
| 92 | |
| 93 Parameters: | |
| 94 parser (argparse.ArgumentParser): The argument parser instance. | |
| 95 namespace (argparse.Namespace): The namespace to hold the parsed values. | |
| 96 values (Tuple[str, str]): The file path and file extension. | |
| 97 option_string (str): The option string. | |
| 98 """ | |
| 99 file_path, file_extension = values | |
| 100 file_extension = file_extension.lower() | |
| 101 if file_extension == "csv": | |
| 102 write_func = write_csv | |
| 103 elif file_extension in ["tsv", "tabular"]: | |
| 104 write_func = write_tsv | |
| 105 elif file_extension == "parquet": | |
| 106 write_func = write_parquet | |
| 107 else: | |
| 108 raise ValueError(f"Unsupported file format: {file_extension}") | |
| 109 setattr(namespace, self.dest, (write_func, file_path)) | |
| 110 | |
| 111 | |
| 112 class SplitColumnIndicesAction(argparse.Action): | |
| 113 def __call__( | |
| 114 self, | |
| 115 parser: argparse.ArgumentParser, | |
| 116 namespace: argparse.Namespace, | |
| 117 values: str, | |
| 118 option_string: str = None, | |
| 119 ) -> None: | |
| 120 """ | |
| 121 Custom argparse action to split a comma-separated list of column indices and convert to 0-based indices. | |
| 122 | |
| 123 Parameters: | |
| 124 parser (argparse.ArgumentParser): The argument parser instance. | |
| 125 namespace (argparse.Namespace): The namespace to hold the parsed values. | |
| 126 values (str): The comma-separated list of 1-based column indices. | |
| 127 option_string (str): The option string. | |
| 128 """ | |
| 129 indices = [int(x) - 1 for x in values.split(",")] # Convert to 0-based indices | |
| 130 setattr(namespace, self.dest, indices) |
