changeset 147:3fca9b568faf draft

Uploaded
author bimib
date Wed, 06 Nov 2024 13:57:24 +0000
parents 88cf4543e210
children 5683406a8cfd
files COBRAxy/custom_data_generator.py COBRAxy/flux_simulation.py COBRAxy/flux_to_map.py COBRAxy/marea.py COBRAxy/marea_cluster.py COBRAxy/ras_generator.py COBRAxy/ras_to_bounds.py COBRAxy/rps_generator.py COBRAxy/utils/__pycache__/general_utils.cpython-312.pyc
diffstat 9 files changed, 91 insertions(+), 73 deletions(-) [+]
line wrap: on
line diff
--- a/COBRAxy/custom_data_generator.py	Wed Nov 06 10:12:52 2024 +0000
+++ b/COBRAxy/custom_data_generator.py	Wed Nov 06 13:57:24 2024 +0000
@@ -6,11 +6,11 @@
 import pandas as pd
 import utils.general_utils as utils
 import utils.rule_parsing  as rulesUtils
-from typing import Optional, Tuple, Union, Dict
+from typing import Optional, Tuple, Union, List, Dict
 import utils.reaction_parsing as reactionUtils
 
 ARGS : argparse.Namespace
-def process_args() -> argparse.Namespace:
+def process_args(args:List[str] = None) -> argparse.Namespace:
     """
     Interfaces the script of a module with its frontend, making the user's choices for
     various parameters available as values in code.
@@ -35,9 +35,8 @@
     parser.add_argument("-id", "--input",   type = str, required = True, help = "Input model")
     parser.add_argument("-mn", "--name",    type = str, required = True, help = "Input model name")
     # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
-    
-    argsNamespace = parser.parse_args()
-    argsNamespace.out_dir = "result"
+    parser.add_argument('-idop', '--output_path', type = str, default='result', help = 'output path for maps')
+    argsNamespace = parser.parse_args(args)
     # ^ can't get this one to work from xml, there doesn't seem to be a way to get the directory attribute from the collection
 
     return argsNamespace
@@ -184,7 +183,7 @@
             writer.writerow({ fieldNames[0] : key, fieldNames[1] : value })
 
 ###############################- ENTRY POINT -################################
-def main() -> None:
+def main(args:List[str] = None) -> None:
     """
     Initializes everything and sets the program in motion based on the fronted input arguments.
     
@@ -193,10 +192,10 @@
     """
     # get args from frontend (related xml)
     global ARGS
-    ARGS = process_args()
+    ARGS = process_args(args)
 
     # this is the worst thing I've seen so far, congrats to the former MaREA devs for suggesting this!
-    if os.path.isdir(ARGS.out_dir) == False: os.makedirs(ARGS.out_dir)
+    if os.path.isdir(ARGS.output_path) == False: os.makedirs(ARGS.output_path)
 
     # load custom model
     model = load_custom_model(
--- a/COBRAxy/flux_simulation.py	Wed Nov 06 10:12:52 2024 +0000
+++ b/COBRAxy/flux_simulation.py	Wed Nov 06 13:57:24 2024 +0000
@@ -11,7 +11,7 @@
 import sys
 
 ################################# process args ###############################
-def process_args(args :List[str]) -> argparse.Namespace:
+def process_args(args :List[str] = None) -> argparse.Namespace:
     """
     Processes command-line arguments.
 
@@ -88,7 +88,13 @@
                         required = False,
                         help = 'output type analysis')
     
-    ARGS = parser.parse_args()
+    parser.add_argument(
+        '-idop', '--output_path', 
+        type = str,
+        default='result',
+        help = 'output path for maps')
+    
+    ARGS = parser.parse_args(args)
     return ARGS
 
 ########################### warning ###########################################
@@ -109,7 +115,7 @@
 
 def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None:
     dataset.index.name = 'Reactions'
-    dataset.to_csv(ARGS.output_folder + name + ".csv", sep = '\t', index = keep_index)
+    dataset.to_csv(ARGS.output_path + name + ".csv", sep = '\t', index = keep_index)
 
 ############################ dataset input ####################################
 def read_dataset(data :str, name :str) -> pd.DataFrame:
@@ -156,17 +162,17 @@
     for i in range(0, n_batches):
         optgp = OptGPSampler(model, thinning, seed)
         samples = optgp.sample(n_samples)
-        samples.to_csv(ARGS.output_folder +  model_name + '_'+ str(i)+'_OPTGP.csv', index=False)
+        samples.to_csv(ARGS.output_path +  model_name + '_'+ str(i)+'_OPTGP.csv', index=False)
         seed+=1
     samplesTotal = pd.DataFrame()
     for i in range(0, n_batches):
-        samples_batch = pd.read_csv(ARGS.output_folder  +  model_name + '_'+ str(i)+'_OPTGP.csv')
+        samples_batch = pd.read_csv(ARGS.output_path  +  model_name + '_'+ str(i)+'_OPTGP.csv')
         samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
 
     write_to_file(samplesTotal.T, model_name, True)
 
     for i in range(0, n_batches):
-        os.remove(ARGS.output_folder +   model_name + '_'+ str(i)+'_OPTGP.csv')
+        os.remove(ARGS.output_path +   model_name + '_'+ str(i)+'_OPTGP.csv')
     pass
 
 
@@ -199,18 +205,18 @@
             ARGS.out_log)
             CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], 
                                                     samples)
-        utils.logWarning(ARGS.output_folder +  model_name + '_'+ str(i)+'_CBS.csv', ARGS.out_log)
-        samples.to_csv(ARGS.output_folder +  model_name + '_'+ str(i)+'_CBS.csv', index=False)
+        utils.logWarning(ARGS.output_path +  model_name + '_'+ str(i)+'_CBS.csv', ARGS.out_log)
+        samples.to_csv(ARGS.output_path +  model_name + '_'+ str(i)+'_CBS.csv', index=False)
 
     samplesTotal = pd.DataFrame()
     for i in range(0, n_batches):
-        samples_batch = pd.read_csv(ARGS.output_folder  +  model_name + '_'+ str(i)+'_CBS.csv')
+        samples_batch = pd.read_csv(ARGS.output_path  +  model_name + '_'+ str(i)+'_CBS.csv')
         samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
 
     write_to_file(samplesTotal.T, model_name, True)
 
     for i in range(0, n_batches):
-        os.remove(ARGS.output_folder +   model_name + '_'+ str(i)+'_CBS.csv')
+        os.remove(ARGS.output_path +   model_name + '_'+ str(i)+'_CBS.csv')
     pass
 
 
@@ -244,7 +250,7 @@
     df_mean, df_median, df_quantiles = fluxes_statistics(name, ARGS.output_types)
 
     if("fluxes" not in ARGS.output_types):
-        os.remove(ARGS.output_folder  +  name + '.csv')
+        os.remove(ARGS.output_path  +  name + '.csv')
 
     returnList = []
     returnList.append(df_mean)
@@ -278,7 +284,7 @@
     df_median= pd.DataFrame()
     df_quantiles= pd.DataFrame()
 
-    df_samples = pd.read_csv(ARGS.output_folder  +  model_name + '.csv', sep = '\t', index_col = 0).T
+    df_samples = pd.read_csv(ARGS.output_path  +  model_name + '.csv', sep = '\t', index_col = 0).T
     df_samples = df_samples.round(8)
 
     for output_type in output_types:
@@ -363,7 +369,7 @@
     return df_pFBA, df_FVA, df_sensitivity
 
 ############################# main ###########################################
-def main() -> None:
+def main(args :List[str] = None) -> None:
     """
     Initializes everything and sets the program in motion based on the fronted input arguments.
 
@@ -376,10 +382,8 @@
     num_processors = cpu_count()
 
     global ARGS
-    ARGS = process_args(sys.argv)
-
-    ARGS.output_folder = 'flux_simulation/'
-    
+    ARGS = process_args(args)
+  
     
     model_type :utils.Model = ARGS.model_selector
     if model_type is utils.Model.Custom:
--- a/COBRAxy/flux_to_map.py	Wed Nov 06 10:12:52 2024 +0000
+++ b/COBRAxy/flux_to_map.py	Wed Nov 06 13:57:24 2024 +0000
@@ -22,7 +22,7 @@
 ERRORS = []
 ########################## argparse ##########################################
 ARGS :argparse.Namespace
-def process_args() -> argparse.Namespace:
+def process_args(args:List[str] = None) -> argparse.Namespace:
     """
     Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code.
 
@@ -119,8 +119,14 @@
         '-colorm',  '--color_map',
         type = str,
         choices = ["jet", "viridis"])
+    
+    parser.add_argument(
+        '-idop', '--output_path', 
+        type = str,
+        default='result',
+        help = 'output path for maps')
 
-    args :argparse.Namespace = parser.parse_args()
+    args :argparse.Namespace = parser.parse_args(args)
     args.net = True
 
     return args
@@ -643,7 +649,7 @@
         # all output files: I don't care, this was never the performance bottleneck of the tool and
         # there is no other net gain in saving and re-using the built string.
         ext,
-        prefix = "result")
+        prefix = ARGS.output_path)
 
 FIELD_NOT_AVAILABLE = '/'
 def writeToCsv(rows: List[list], fieldNames :List[str], outPath :utils.FilePath) -> None:
@@ -922,8 +928,8 @@
     medians = {key: median/max_flux_medians for key, median in medians.items()}
     means = {key: mean/max_flux_means for key, mean in means.items()}
 
-    save_colormap_image(min_flux_medians, max_flux_medians, utils.FilePath("Color map median", ext=utils.FileFormat.PNG, prefix="result"), colormap)
-    save_colormap_image(min_flux_means, max_flux_means, utils.FilePath("Color map mean", ext=utils.FileFormat.PNG, prefix="result"), colormap)
+    save_colormap_image(min_flux_medians, max_flux_medians, utils.FilePath("Color map median", ext=utils.FileFormat.PNG, prefix=ARGS.output_path), colormap)
+    save_colormap_image(min_flux_means, max_flux_means, utils.FilePath("Color map mean", ext=utils.FileFormat.PNG, prefix=ARGS.output_path), colormap)
 
     cmap = plt.get_cmap(colormap)
 
@@ -981,11 +987,11 @@
     Returns:
         None
     """
-    svgFilePath = utils.FilePath(f"SVG Map {map_type} - {key}", ext=utils.FileFormat.SVG, prefix="result")
+    svgFilePath = utils.FilePath(f"SVG Map {map_type} - {key}", ext=utils.FileFormat.SVG, prefix=ARGS.output_path)
     utils.writeSvg(svgFilePath, metabMap)
     if ARGS.generate_pdf:
-        pngPath = utils.FilePath(f"PNG Map {map_type} - {key}", ext=utils.FileFormat.PNG, prefix="result")
-        pdfPath = utils.FilePath(f"PDF Map {map_type} - {key}", ext=utils.FileFormat.PDF, prefix="result")
+        pngPath = utils.FilePath(f"PNG Map {map_type} - {key}", ext=utils.FileFormat.PNG, prefix=ARGS.output_path)
+        pdfPath = utils.FilePath(f"PDF Map {map_type} - {key}", ext=utils.FileFormat.PDF, prefix=ARGS.output_path)
         convert_to_pdf(svgFilePath, pngPath, pdfPath)
     if not ARGS.generate_svg:
         os.remove(svgFilePath.show())
@@ -994,7 +1000,7 @@
 
     
 ############################ MAIN #############################################
-def main() -> None:
+def main(args:List[str] = None) -> None:
     """
     Initializes everything and sets the program in motion based on the fronted input arguments.
 
@@ -1006,9 +1012,9 @@
     """
 
     global ARGS
-    ARGS = process_args()
+    ARGS = process_args(args)
 
-    if os.path.isdir('result') == False: os.makedirs('result')
+    if os.path.isdir(ARGS.output_path) == False: os.makedirs(ARGS.output_path)
     
     core_map :ET.ElementTree = ARGS.choice_map.getMap(
         ARGS.tool_dir,
--- a/COBRAxy/marea.py	Wed Nov 06 10:12:52 2024 +0000
+++ b/COBRAxy/marea.py	Wed Nov 06 13:57:24 2024 +0000
@@ -20,7 +20,7 @@
 ERRORS = []
 ########################## argparse ##########################################
 ARGS :argparse.Namespace
-def process_args(args=None) -> argparse.Namespace:
+def process_args(args:List[str] = None) -> argparse.Namespace:
     """
     Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code.
 
@@ -871,7 +871,7 @@
     return { id : list(map(utils.Float("Dataset values, not an argument"), values)) for id, values in dataset.items() }, IDs
 
 ############################ MAIN #############################################
-def main(args=None) -> None:
+def main(args:List[str] = None) -> None:
     """
     Initializes everything and sets the program in motion based on the fronted input arguments.
 
--- a/COBRAxy/marea_cluster.py	Wed Nov 06 10:12:52 2024 +0000
+++ b/COBRAxy/marea_cluster.py	Wed Nov 06 13:57:24 2024 +0000
@@ -20,7 +20,7 @@
 from typing import Optional, Dict, List
 
 ################################# process args ###############################
-def process_args(args :List[str]) -> argparse.Namespace:
+def process_args(args :List[str] = None) -> argparse.Namespace:
     """
     Processes command-line arguments.
 
@@ -86,9 +86,13 @@
                         type = str,
                         help = 'output of best cluster tsv')
     				
-    
+    parser.add_argument(
+        '-idop', '--output_path', 
+        type = str,
+        default='result',
+        help = 'output path for maps')
     
-    args = parser.parse_args()
+    args = parser.parse_args(args)
     return args
 
 ########################### warning ###########################################
@@ -217,8 +221,8 @@
     Returns:
         None
     """
-    if not os.path.exists('clustering'):
-        os.makedirs('clustering')
+    if not os.path.exists(args.output_path):
+        os.makedirs(args.output_path)
     
         
     if elbow == 'true':
@@ -259,7 +263,7 @@
         if (i + k_min == best):
             prefix = '_BEST'
             
-        write_to_csv(dataset, all_labels[i], 'clustering/kmeans_with_' + str(i + k_min) + prefix + '_clusters.tsv')
+        write_to_csv(dataset, all_labels[i], f'{args.output_path}/kmeans_with_' + str(i + k_min) + prefix + '_clusters.tsv')
         
         
         if (prefix == '_BEST'):
@@ -272,7 +276,7 @@
         
        
         if silhouette:
-            silhouette_draw(dataset, all_labels[i], i + k_min, 'clustering/silhouette_with_' + str(i + k_min) + prefix + '_clusters.png')
+            silhouette_draw(dataset, all_labels[i], i + k_min, f'{args.output_path}/silhouette_with_' + str(i + k_min) + prefix + '_clusters.png')
         
         
     if elbow:
@@ -303,7 +307,7 @@
     plt.plot(x, distortions, marker = 'o')
     plt.xlabel('Number of clusters (k)')
     plt.ylabel('Distortion')
-    s = 'clustering/elbow_plot.png'
+    s = f'{args.output_path}/elbow_plot.png'
     fig = plt.gcf()
     fig.set_size_inches(18.5, 10.5, forward = True)
     fig.savefig(s, dpi=100)
@@ -406,8 +410,8 @@
     Returns:
         None
     """
-    if not os.path.exists('clustering'):
-        os.makedirs('clustering')
+    if not os.path.exists(args.output_path):
+        os.makedirs(args.output_path)
         
     if eps is not None:
         clusterer = DBSCAN(eps = eps, min_samples = min_samples)
@@ -445,14 +449,14 @@
     Returns:
         None
     """
-    if not os.path.exists('clustering'):
-        os.makedirs('clustering')
+    if not os.path.exists(args.output_path):
+        os.makedirs(args.output_path)
     
     plt.figure(figsize=(10, 7))  
     plt.title("Customer Dendograms")  
     shc.dendrogram(shc.linkage(dataset, method='ward'), labels=dataset.index.values.tolist())  
     fig = plt.gcf()
-    fig.savefig('clustering/dendogram.png', dpi=200)
+    fig.savefig(f'{args.output_path}/dendogram.png', dpi=200)
     
     range_n_clusters = [i for i in range(k_min, k_max+1)]
 
@@ -466,7 +470,7 @@
         cluster.fit_predict(dataset)  
         cluster_labels = cluster.labels_
         labels.append(cluster_labels)
-        write_to_csv(dataset, cluster_labels, 'clustering/hierarchical_with_' + str(n_clusters) + '_clusters.tsv')
+        write_to_csv(dataset, cluster_labels, f'{args.output_path}/hierarchical_with_' + str(n_clusters) + '_clusters.tsv')
         
     best = max_index(scores) + k_min
     
@@ -475,7 +479,7 @@
         if (i + k_min == best):
             prefix = '_BEST'
         if silhouette == 'true':
-            silhouette_draw(dataset, labels[i], i + k_min, 'clustering/silhouette_with_' + str(i + k_min) + prefix + '_clusters.png')
+            silhouette_draw(dataset, labels[i], i + k_min, f'{args.output_path}/silhouette_with_' + str(i + k_min) + prefix + '_clusters.png')
      
     for i in range(len(labels)):
         if (i + k_min == best):
@@ -486,17 +490,18 @@
             
     
 ############################# main ###########################################
-def main() -> None:
+def main(args_in:List[str] = None) -> None:
     """
     Initializes everything and sets the program in motion based on the fronted input arguments.
 
     Returns:
         None
     """
-    if not os.path.exists('clustering'):
-        os.makedirs('clustering')
+    global args
+    args = process_args(args_in)
 
-    args = process_args(sys.argv)
+    if not os.path.exists(args.output_path):
+        os.makedirs(args.output_path)
     
     #Data read
     
--- a/COBRAxy/ras_generator.py	Wed Nov 06 10:12:52 2024 +0000
+++ b/COBRAxy/ras_generator.py	Wed Nov 06 13:57:24 2024 +0000
@@ -12,7 +12,7 @@
 ERRORS = []
 ########################## argparse ##########################################
 ARGS :argparse.Namespace
-def process_args() -> argparse.Namespace:
+def process_args(args:List[str] = None) -> argparse.Namespace:
     """
     Processes command-line arguments.
 
@@ -61,8 +61,9 @@
         '-ra', '--ras_output',
         type = str,
         required = True, help = 'ras output')
+
     
-    return parser.parse_args()
+    return parser.parse_args(args)
 
 ############################ dataset input ####################################
 def read_dataset(data :str, name :str) -> pd.DataFrame:
@@ -647,7 +648,7 @@
     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
     return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) }
 
-def main() -> None:
+def main(args:List[str] = None) -> None:
     """
     Initializes everything and sets the program in motion based on the fronted input arguments.
     
@@ -656,7 +657,7 @@
     """
     # get args from frontend (related xml)
     global ARGS
-    ARGS = process_args()
+    ARGS = process_args(args)
     print(ARGS.rules_selector)
     # read dataset
     dataset = read_dataset(ARGS.input, "dataset")
--- a/COBRAxy/ras_to_bounds.py	Wed Nov 06 10:12:52 2024 +0000
+++ b/COBRAxy/ras_to_bounds.py	Wed Nov 06 13:57:24 2024 +0000
@@ -10,7 +10,7 @@
 from joblib import Parallel, delayed, cpu_count
 
 ################################# process args ###############################
-def process_args(args :List[str]) -> argparse.Namespace:
+def process_args(args :List[str] = None) -> argparse.Namespace:
     """
     Processes command-line arguments.
 
@@ -66,9 +66,14 @@
     parser.add_argument('-cc', '--cell_class',
                     type = str,
                     help = 'output of cell class')
+    parser.add_argument(
+        '-idop', '--output_path', 
+        type = str,
+        default='ras_to_bounds/',
+        help = 'output path for maps')
     
     
-    ARGS = parser.parse_args()
+    ARGS = parser.parse_args(args)
     return ARGS
 
 ########################### warning ###########################################
@@ -201,7 +206,7 @@
 
 
 ############################# main ###########################################
-def main() -> None:
+def main(args:List[str] = None) -> None:
     """
     Initializes everything and sets the program in motion based on the fronted input arguments.
 
@@ -213,9 +218,7 @@
 
 
     global ARGS
-    ARGS = process_args(sys.argv)
-
-    ARGS.output_folder = 'ras_to_bounds/'
+    ARGS = process_args(args)
 
     if(ARGS.ras_selector == True):
         ras_file_list = ARGS.input_ras.split(",")
@@ -269,10 +272,10 @@
         medium = medium[ARGS.medium_selector].to_dict()
 
     if(ARGS.ras_selector == True):
-        generate_bounds(model, medium, ras = ras_combined, output_folder=ARGS.output_folder)
+        generate_bounds(model, medium, ras = ras_combined, output_folder=ARGS.output_path)
         class_assignments.to_csv(ARGS.cell_class, sep = '\t', index = False)
     else:
-        generate_bounds(model, medium, output_folder=ARGS.output_folder)
+        generate_bounds(model, medium, output_folder=ARGS.output_path)
 
     pass
         
--- a/COBRAxy/rps_generator.py	Wed Nov 06 10:12:52 2024 +0000
+++ b/COBRAxy/rps_generator.py	Wed Nov 06 13:57:24 2024 +0000
@@ -16,7 +16,7 @@
 
 ########################## argparse ##########################################
 ARGS :argparse.Namespace
-def process_args() -> argparse.Namespace:
+def process_args(args:List[str] = None) -> argparse.Namespace:
     """
     Processes command-line arguments.
 
@@ -51,7 +51,7 @@
                         required = True,
                         help = 'rps output')
     
-    args = parser.parse_args()
+    args = parser.parse_args(args)
     return args
 
 ############################ dataset name #####################################
@@ -222,7 +222,7 @@
     df.to_csv(ARGS.rps_output, sep = '\t', na_rep = "None", index = False)
 
 ############################ main ####################################
-def main() -> None:
+def main(args:List[str] = None) -> None:
     """
     Initializes everything and sets the program in motion based on the fronted input arguments.
 
@@ -230,7 +230,7 @@
         None
     """
     global ARGS
-    ARGS = process_args()
+    ARGS = process_args(args)
 
     # TODO:use utils functions vvv
     with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl:
Binary file COBRAxy/utils/__pycache__/general_utils.cpython-312.pyc has changed