4
|
1 from __future__ import division
|
|
2 # galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason.
|
|
3 import sys
|
|
4 import argparse
|
|
5 import collections
|
|
6 import pandas as pd
|
|
7 import pickle as pk
|
|
8 import utils.general_utils as utils
|
|
9 import utils.rule_parsing as ruleUtils
|
|
10 from typing import Union, Optional, List, Dict, Tuple, TypeVar
|
|
11
|
|
12 ERRORS = []
|
|
13 ########################## argparse ##########################################
|
|
14 ARGS :argparse.Namespace
|
|
15 def process_args() -> argparse.Namespace:
|
|
16 """
|
|
17 Processes command-line arguments.
|
|
18
|
|
19 Args:
|
|
20 args (list): List of command-line arguments.
|
|
21
|
|
22 Returns:
|
|
23 Namespace: An object containing parsed arguments.
|
|
24 """
|
|
25 parser = argparse.ArgumentParser(
|
|
26 usage = '%(prog)s [options]',
|
|
27 description = "process some value's genes to create a comparison's map.")
|
|
28
|
|
29 parser.add_argument(
|
|
30 '-rs', '--rules_selector',
|
|
31 type = utils.Model, default = utils.Model.HMRcore, choices = list(utils.Model),
|
|
32 help = 'chose which type of dataset you want use')
|
|
33
|
|
34 parser.add_argument("-rl", "--rule_list", type = str,
|
|
35 help = "path to input file with custom rules, if provided")
|
|
36
|
|
37 parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name")
|
|
38 # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
|
|
39
|
|
40 parser.add_argument(
|
|
41 '-n', '--none',
|
|
42 type = utils.Bool("none"), default = True,
|
|
43 help = 'compute Nan values')
|
|
44
|
|
45 parser.add_argument(
|
|
46 '-td', '--tool_dir',
|
|
47 type = str,
|
|
48 required = True, help = 'your tool directory')
|
|
49
|
|
50 parser.add_argument(
|
|
51 '-ol', '--out_log',
|
|
52 type = str,
|
|
53 help = "Output log")
|
|
54
|
|
55 parser.add_argument(
|
|
56 '-in', '--input', #id รจ diventato in
|
|
57 type = str,
|
|
58 help = 'input dataset')
|
|
59
|
|
60 parser.add_argument(
|
|
61 '-ra', '--ras_output',
|
|
62 type = str,
|
|
63 required = True, help = 'ras output')
|
|
64
|
|
65 return parser.parse_args()
|
|
66
|
|
67 ############################ dataset input ####################################
|
|
68 def read_dataset(data :str, name :str) -> pd.DataFrame:
|
|
69 """
|
|
70 Read a dataset from a CSV file and return it as a pandas DataFrame.
|
|
71
|
|
72 Args:
|
|
73 data (str): Path to the CSV file containing the dataset.
|
|
74 name (str): Name of the dataset, used in error messages.
|
|
75
|
|
76 Returns:
|
|
77 pandas.DataFrame: DataFrame containing the dataset.
|
|
78
|
|
79 Raises:
|
|
80 pd.errors.EmptyDataError: If the CSV file is empty.
|
|
81 sys.exit: If the CSV file has the wrong format, the execution is aborted.
|
|
82 """
|
|
83 try:
|
|
84 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
|
|
85 except pd.errors.EmptyDataError:
|
|
86 sys.exit('Execution aborted: wrong format of ' + name + '\n')
|
|
87 if len(dataset.columns) < 2:
|
|
88 sys.exit('Execution aborted: wrong format of ' + name + '\n')
|
|
89 return dataset
|
|
90
|
|
91 ############################ load id e rules ##################################
|
|
92 def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]:
|
|
93 """
|
|
94 Load IDs and rules from a dictionary of reactions.
|
|
95
|
|
96 Args:
|
|
97 reactions (dict): A dictionary where keys are IDs and values are rules.
|
|
98
|
|
99 Returns:
|
|
100 tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules.
|
|
101 """
|
|
102 ids, rules = [], []
|
|
103 for key, value in reactions.items():
|
|
104 ids.append(key)
|
|
105 rules.append(value)
|
|
106 return (ids, rules)
|
|
107
|
|
108 ############################ check_methods ####################################
|
|
109 def gene_type(l :str, name :str) -> str:
|
|
110 """
|
|
111 Determine the type of gene ID.
|
|
112
|
|
113 Args:
|
|
114 l (str): The gene identifier to check.
|
|
115 name (str): The name of the dataset, used in error messages.
|
|
116
|
|
117 Returns:
|
|
118 str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id').
|
|
119
|
|
120 Raises:
|
|
121 sys.exit: If the gene ID type is not supported, the execution is aborted.
|
|
122 """
|
|
123 if check_hgnc(l):
|
|
124 return 'hugo_id'
|
|
125 elif check_ensembl(l):
|
|
126 return 'ensembl_gene_id'
|
|
127 elif check_symbol(l):
|
|
128 return 'symbol'
|
|
129 elif check_entrez(l):
|
|
130 return 'entrez_id'
|
|
131 else:
|
|
132 sys.exit('Execution aborted:\n' +
|
|
133 'gene ID type in ' + name + ' not supported. Supported ID'+
|
|
134 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
|
|
135
|
|
136 def check_hgnc(l :str) -> bool:
|
|
137 """
|
|
138 Check if a gene identifier follows the HGNC format.
|
|
139
|
|
140 Args:
|
|
141 l (str): The gene identifier to check.
|
|
142
|
|
143 Returns:
|
|
144 bool: True if the gene identifier follows the HGNC format, False otherwise.
|
|
145 """
|
|
146 if len(l) > 5:
|
|
147 if (l.upper()).startswith('HGNC:'):
|
|
148 return l[5:].isdigit()
|
|
149 else:
|
|
150 return False
|
|
151 else:
|
|
152 return False
|
|
153
|
|
154 def check_ensembl(l :str) -> bool:
|
|
155 """
|
|
156 Check if a gene identifier follows the Ensembl format.
|
|
157
|
|
158 Args:
|
|
159 l (str): The gene identifier to check.
|
|
160
|
|
161 Returns:
|
|
162 bool: True if the gene identifier follows the Ensembl format, False otherwise.
|
|
163 """
|
|
164 return l.upper().startswith('ENS')
|
|
165
|
|
166
|
|
167 def check_symbol(l :str) -> bool:
|
|
168 """
|
|
169 Check if a gene identifier follows the symbol format.
|
|
170
|
|
171 Args:
|
|
172 l (str): The gene identifier to check.
|
|
173
|
|
174 Returns:
|
|
175 bool: True if the gene identifier follows the symbol format, False otherwise.
|
|
176 """
|
|
177 if len(l) > 0:
|
|
178 if l[0].isalpha() and l[1:].isalnum():
|
|
179 return True
|
|
180 else:
|
|
181 return False
|
|
182 else:
|
|
183 return False
|
|
184
|
|
185 def check_entrez(l :str) -> bool:
|
|
186 """
|
|
187 Check if a gene identifier follows the Entrez ID format.
|
|
188
|
|
189 Args:
|
|
190 l (str): The gene identifier to check.
|
|
191
|
|
192 Returns:
|
|
193 bool: True if the gene identifier follows the Entrez ID format, False otherwise.
|
|
194 """
|
|
195 if len(l) > 0:
|
|
196 return l.isdigit()
|
|
197 else:
|
|
198 return False
|
|
199
|
|
200 ############################ gene #############################################
|
|
201 def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]:
|
|
202 """
|
|
203 Process gene data to ensure correct formatting and handle duplicates.
|
|
204
|
|
205 Args:
|
|
206 gene (DataFrame): DataFrame containing gene data.
|
|
207 type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id').
|
|
208 name (str): Name of the dataset.
|
|
209 gene_custom (dict or None): Custom gene data dictionary if provided.
|
|
210
|
|
211 Returns:
|
|
212 dict: A dictionary containing gene data with gene IDs as keys and corresponding values.
|
|
213 """
|
|
214 args = process_args()
|
|
215 for i in range(len(gene)):
|
|
216 tmp = gene.iloc[i, 0]
|
|
217 gene.iloc[i, 0] = tmp.strip().split('.')[0]
|
|
218
|
|
219 gene_dup = [item for item, count in
|
|
220 collections.Counter(gene[gene.columns[0]]).items() if count > 1]
|
|
221 pat_dup = [item for item, count in
|
|
222 collections.Counter(list(gene.columns)).items() if count > 1]
|
|
223
|
|
224 if gene_dup:
|
|
225 if gene_custom == None:
|
|
226 if args.rules_selector == 'HMRcore':
|
|
227 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb'))
|
|
228
|
|
229 elif args.rules_selector == 'Recon':
|
|
230 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/Recon_genes.p', 'rb'))
|
|
231
|
|
232 elif args.rules_selector == 'ENGRO2':
|
|
233 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb'))
|
41
|
234 print(gene_in_rule)
|
|
235 print(type_gene)
|
4
|
236 gene_in_rule = gene_in_rule.get(type_gene)
|
|
237
|
|
238 else:
|
|
239 gene_in_rule = gene_custom
|
|
240 tmp = []
|
|
241 for i in gene_dup:
|
|
242 if gene_in_rule.get(i) == 'ok':
|
|
243 tmp.append(i)
|
|
244 if tmp:
|
|
245 sys.exit('Execution aborted because gene ID '
|
|
246 +str(tmp)+' in '+name+' is duplicated\n')
|
|
247
|
|
248 if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log)
|
|
249 return (gene.set_index(gene.columns[0])).to_dict()
|
|
250
|
|
251 ############################ resolve ##########################################
|
|
252 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]:
|
|
253 """
|
|
254 Replace gene identifiers with corresponding values from a dictionary.
|
|
255
|
|
256 Args:
|
|
257 l (str): String of gene identifier.
|
|
258 d (str): String corresponding to its value.
|
|
259
|
|
260 Returns:
|
|
261 tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement.
|
|
262 """
|
|
263 tmp = []
|
|
264 err = []
|
|
265 while l:
|
|
266 if isinstance(l[0], list):
|
|
267 tmp_rules, tmp_err = replace_gene_value(l[0], d)
|
|
268 tmp.append(tmp_rules)
|
|
269 err.extend(tmp_err)
|
|
270 else:
|
|
271 value = replace_gene(l[0], d)
|
|
272 tmp.append(value)
|
|
273 if value == None:
|
|
274 err.append(l[0])
|
|
275 l = l[1:]
|
|
276 return (tmp, err)
|
|
277
|
|
278 def replace_gene(l :str, d :str) -> Union[int, float]:
|
|
279 """
|
|
280 Replace a single gene identifier with its corresponding value from a dictionary.
|
|
281
|
|
282 Args:
|
|
283 l (str): Gene identifier to replace.
|
|
284 d (str): String corresponding to its value.
|
|
285
|
|
286 Returns:
|
|
287 float/int: Corresponding value from the dictionary if found, None otherwise.
|
|
288
|
|
289 Raises:
|
|
290 sys.exit: If the value associated with the gene identifier is not valid.
|
|
291 """
|
|
292 if l =='and' or l == 'or':
|
|
293 return l
|
|
294 else:
|
|
295 value = d.get(l, None)
|
|
296 if not(value == None or isinstance(value, (int, float))):
|
|
297 sys.exit('Execution aborted: ' + value + ' value not valid\n')
|
|
298 return value
|
|
299
|
|
300 T = TypeVar("T", bound = Optional[Union[int, float]])
|
|
301 def computes(val1 :T, op :str, val2 :T, cn :bool) -> T:
|
|
302 """
|
|
303 Compute the RAS value between two value and an operator ('and' or 'or').
|
|
304
|
|
305 Args:
|
|
306 val1(Optional(Union[float, int])): First value.
|
|
307 op (str): Operator ('and' or 'or').
|
|
308 val2(Optional(Union[float, int])): Second value.
|
|
309 cn (bool): Control boolean value.
|
|
310
|
|
311 Returns:
|
|
312 Optional(Union[float, int]): Result of the computation.
|
|
313 """
|
|
314 if val1 != None and val2 != None:
|
|
315 if op == 'and':
|
|
316 return min(val1, val2)
|
|
317 else:
|
|
318 return val1 + val2
|
|
319 elif op == 'and':
|
|
320 if cn is True:
|
|
321 if val1 != None:
|
|
322 return val1
|
|
323 elif val2 != None:
|
|
324 return val2
|
|
325 else:
|
|
326 return None
|
|
327 else:
|
|
328 return None
|
|
329 else:
|
|
330 if val1 != None:
|
|
331 return val1
|
|
332 elif val2 != None:
|
|
333 return val2
|
|
334 else:
|
|
335 return None
|
|
336
|
|
337 # ris should be Literal[None] but Literal is not supported in Python 3.7
|
|
338 def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]:
|
|
339 """
|
|
340 Control the format of the expression.
|
|
341
|
|
342 Args:
|
|
343 ris: Intermediate result.
|
|
344 l (list): Expression to control.
|
|
345 cn (bool): Control boolean value.
|
|
346
|
|
347 Returns:
|
|
348 Union[Literal[False], int, float]: Result of the control.
|
|
349 """
|
|
350 if len(l) == 1:
|
|
351 if isinstance(l[0], (float, int)) or l[0] == None:
|
|
352 return l[0]
|
|
353 elif isinstance(l[0], list):
|
|
354 return control(None, l[0], cn)
|
|
355 else:
|
|
356 return False
|
|
357 elif len(l) > 2:
|
|
358 return control_list(ris, l, cn)
|
|
359 else:
|
|
360 return False
|
|
361
|
|
362 def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]:
|
|
363 """
|
|
364 Control the format of a list of expressions.
|
|
365
|
|
366 Args:
|
|
367 ris: Intermediate result.
|
|
368 l (list): List of expressions to control.
|
|
369 cn (bool): Control boolean value.
|
|
370
|
|
371 Returns:
|
|
372 Optional[Literal[False]]: Result of the control.
|
|
373 """
|
|
374 while l:
|
|
375 if len(l) == 1:
|
|
376 return False
|
|
377 elif (isinstance(l[0], (float, int)) or
|
|
378 l[0] == None) and l[1] in ['and', 'or']:
|
|
379 if isinstance(l[2], (float, int)) or l[2] == None:
|
|
380 ris = computes(l[0], l[1], l[2], cn)
|
|
381 elif isinstance(l[2], list):
|
|
382 tmp = control(None, l[2], cn)
|
|
383 if tmp is False:
|
|
384 return False
|
|
385 else:
|
|
386 ris = computes(l[0], l[1], tmp, cn)
|
|
387 else:
|
|
388 return False
|
|
389 l = l[3:]
|
|
390 elif l[0] in ['and', 'or']:
|
|
391 if isinstance(l[1], (float, int)) or l[1] == None:
|
|
392 ris = computes(ris, l[0], l[1], cn)
|
|
393 elif isinstance(l[1], list):
|
|
394 tmp = control(None,l[1], cn)
|
|
395 if tmp is False:
|
|
396 return False
|
|
397 else:
|
|
398 ris = computes(ris, l[0], tmp, cn)
|
|
399 else:
|
|
400 return False
|
|
401 l = l[2:]
|
|
402 elif isinstance(l[0], list) and l[1] in ['and', 'or']:
|
|
403 if isinstance(l[2], (float, int)) or l[2] == None:
|
|
404 tmp = control(None, l[0], cn)
|
|
405 if tmp is False:
|
|
406 return False
|
|
407 else:
|
|
408 ris = computes(tmp, l[1], l[2], cn)
|
|
409 elif isinstance(l[2], list):
|
|
410 tmp = control(None, l[0], cn)
|
|
411 tmp2 = control(None, l[2], cn)
|
|
412 if tmp is False or tmp2 is False:
|
|
413 return False
|
|
414 else:
|
|
415 ris = computes(tmp, l[1], tmp2, cn)
|
|
416 else:
|
|
417 return False
|
|
418 l = l[3:]
|
|
419 else:
|
|
420 return False
|
|
421 return ris
|
|
422
|
|
423 ResolvedRules = Dict[str, List[Optional[Union[float, int]]]]
|
|
424 def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]:
|
|
425 """
|
|
426 Resolve rules using gene data to compute scores for each rule.
|
|
427
|
|
428 Args:
|
|
429 genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values.
|
|
430 rules (list): List of rules to resolve.
|
|
431 ids (list): List of IDs corresponding to the rules.
|
|
432 resolve_none (bool): Flag indicating whether to resolve None values in the rules.
|
|
433 name (str): Name of the dataset.
|
|
434
|
|
435 Returns:
|
|
436 tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data.
|
|
437 """
|
|
438 resolve_rules = {}
|
|
439 not_found = []
|
|
440 flag = False
|
|
441 for key, value in genes.items():
|
|
442 tmp_resolve = []
|
|
443 for i in range(len(rules)):
|
|
444 tmp = rules[i]
|
|
445 if tmp:
|
|
446 tmp, err = replace_gene_value(tmp, value)
|
|
447 if err:
|
|
448 not_found.extend(err)
|
|
449 ris = control(None, tmp, resolve_none)
|
|
450 if ris is False or ris == None:
|
|
451 tmp_resolve.append(None)
|
|
452 else:
|
|
453 tmp_resolve.append(ris)
|
|
454 flag = True
|
|
455 else:
|
|
456 tmp_resolve.append(None)
|
|
457 resolve_rules[key] = tmp_resolve
|
|
458
|
|
459 if flag is False:
|
|
460 utils.logWarning(
|
|
461 f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded",
|
|
462 ARGS.out_log)
|
|
463
|
|
464 return (None, None)
|
|
465
|
|
466 return (resolve_rules, list(set(not_found)))
|
|
467 ############################ create_ras #######################################
|
|
468 def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None:
|
|
469 """
|
|
470 Create a RAS (Reaction Activity Score) file from resolved rules.
|
|
471
|
|
472 Args:
|
|
473 resolve_rules (dict): Dictionary containing resolved rules.
|
|
474 dataset_name (str): Name of the dataset.
|
|
475 rules (list): List of rules.
|
|
476 file (str): Path to the output RAS file.
|
|
477
|
|
478 Returns:
|
|
479 None
|
|
480 """
|
|
481 if resolve_rules is None:
|
|
482 utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log)
|
|
483
|
|
484 for geni in resolve_rules.values():
|
|
485 for i, valori in enumerate(geni):
|
|
486 if valori == None:
|
|
487 geni[i] = 'None'
|
|
488
|
|
489 output_ras = pd.DataFrame.from_dict(resolve_rules)
|
|
490
|
|
491 output_ras.insert(0, 'Reactions', ids)
|
|
492 output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False)
|
|
493
|
|
494 text_file = open(file, "w")
|
|
495
|
|
496 text_file.write(output_to_csv)
|
|
497 text_file.close()
|
|
498
|
|
499 ################################- NEW RAS COMPUTATION -################################
|
|
500 Expr = Optional[Union[int, float]]
|
|
501 Ras = Expr
|
|
502 def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]:
|
|
503 """
|
|
504 Generates the RAS scores for each cell line found in the dataset.
|
|
505
|
|
506 Args:
|
|
507 dataset (pd.DataFrame): Dataset containing gene values.
|
|
508 rules (dict): The dict containing reaction ids as keys and rules as values.
|
|
509
|
|
510 Side effects:
|
|
511 dataset : mut
|
|
512
|
|
513 Returns:
|
|
514 dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary
|
|
515 where each key corresponds to a reaction ID and each value is its computed RAS score.
|
|
516 """
|
|
517 ras_values_by_cell_line = {}
|
|
518 dataset.set_index(dataset.columns[0], inplace=True)
|
|
519 # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata
|
|
520 for cell_line_name in dataset.columns[1:]:
|
|
521 cell_line = dataset[cell_line_name].to_dict()
|
|
522 ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line)
|
|
523 return ras_values_by_cell_line
|
|
524
|
|
525 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]:
|
|
526 """
|
|
527 Computes the RAS (Reaction Activity Score) values for each rule in the given dict.
|
|
528
|
|
529 Args:
|
|
530 value_rules (dict): A dictionary where keys are reaction ids and values are OpLists.
|
|
531 dataset : gene expression data of one cell line.
|
|
532
|
|
533 Returns:
|
|
534 dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule.
|
|
535 """
|
|
536 return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()}
|
|
537
|
|
538 def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr:
|
|
539 """
|
|
540 Extracts the gene expression of the given gene from a cell line dataset.
|
|
541
|
|
542 Args:
|
|
543 dataset : gene expression data of one cell line.
|
|
544 name : gene name.
|
|
545
|
|
546 Returns:
|
|
547 Expr : the gene's expression value.
|
|
548 """
|
|
549 expr = dataset.get(name, None)
|
|
550 if expr is None: ERRORS.append(name)
|
|
551
|
|
552 return expr
|
|
553
|
|
554 def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras:
|
|
555 """
|
|
556 Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior.
|
|
557
|
|
558 Args:
|
|
559 op_list (OpList): The OpList representing a rule with gene values.
|
|
560 dataset : gene expression data of one cell line.
|
|
561
|
|
562 Returns:
|
|
563 Ras: The computed RAS value for the given OpList.
|
|
564 """
|
|
565 op = op_list.op
|
|
566 ras_value :Ras = None
|
|
567 if not op: return get_gene_expr(dataset, op_list[0])
|
|
568 if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None
|
|
569
|
|
570 for i in range(len(op_list)):
|
|
571 item = op_list[i]
|
|
572 if isinstance(item, ruleUtils.OpList):
|
|
573 item = ras_op_list(item, dataset)
|
|
574
|
|
575 else:
|
|
576 item = get_gene_expr(dataset, item)
|
|
577
|
|
578 if item is None:
|
|
579 if op is ruleUtils.RuleOp.AND and not ARGS.none: return None
|
|
580 continue
|
|
581
|
|
582 if ras_value is None:
|
|
583 ras_value = item
|
|
584 else:
|
|
585 ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item)
|
|
586
|
|
587 return ras_value
|
|
588
|
|
589 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None:
|
|
590 """
|
|
591 Save computed ras scores to the given path, as a tsv file.
|
|
592
|
|
593 Args:
|
|
594 rasScores : the computed ras scores.
|
|
595 path : the output tsv file's path.
|
|
596
|
|
597 Returns:
|
|
598 None
|
|
599 """
|
|
600 for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly
|
|
601 for reactId, score in scores.items():
|
|
602 if score is None: scores[reactId] = "None"
|
|
603
|
|
604 output_ras = pd.DataFrame.from_dict(rasScores)
|
|
605 output_ras.insert(0, 'Reactions', reactions)
|
|
606 output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False)
|
|
607
|
|
608 ############################ MAIN #############################################
|
|
609 #TODO: not used but keep, it will be when the new translator dicts will be used.
|
|
610 def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str:
|
|
611 """
|
|
612 Translate gene from any supported encoding to HugoID.
|
|
613
|
|
614 Args:
|
|
615 geneName (str): the name of the gene in its current encoding.
|
|
616 encoding (str): the encoding.
|
|
617 geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names
|
|
618 and encodings in the current model, mapping each to the corresponding HugoID encoding.
|
|
619
|
|
620 Raises:
|
|
621 ValueError: When the gene isn't supported in the model.
|
|
622
|
|
623 Returns:
|
|
624 str: the gene in HugoID encoding.
|
|
625 """
|
|
626 supportedGenesInEncoding = geneTranslator[encoding]
|
|
627 if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName]
|
|
628 raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!")
|
|
629
|
|
630 def load_custom_rules() -> Dict[str, ruleUtils.OpList]:
|
|
631 """
|
|
632 Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
|
|
633 performed, significantly impacting the runtime.
|
|
634
|
|
635 Returns:
|
|
636 Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
|
|
637 """
|
|
638 datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat
|
|
639
|
|
640 try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext
|
|
641 except utils.PathErr as err:
|
|
642 raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}")
|
|
643
|
|
644 if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath)
|
|
645
|
|
646 # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
|
|
647 return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) }
|
|
648
|
|
649 def main() -> None:
|
|
650 """
|
|
651 Initializes everything and sets the program in motion based on the fronted input arguments.
|
|
652
|
|
653 Returns:
|
|
654 None
|
|
655 """
|
|
656 # get args from frontend (related xml)
|
|
657 global ARGS
|
|
658 ARGS = process_args()
|
|
659
|
|
660 # read dataset
|
|
661 dataset = read_dataset(ARGS.input, "dataset")
|
|
662 dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str)
|
|
663
|
|
664 # remove versioning from gene names
|
|
665 dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0]
|
|
666
|
|
667 # handle custom models
|
|
668 model :utils.Model = ARGS.rules_selector
|
|
669 if model is utils.Model.Custom:
|
|
670 rules = load_custom_rules()
|
|
671 reactions = list(rules.keys())
|
|
672
|
|
673 save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
|
|
674 if ERRORS: utils.logWarning(
|
|
675 f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
|
|
676 ARGS.out_log)
|
|
677
|
|
678 return
|
|
679
|
|
680 # This is the standard flow of the ras_generator program, for non-custom models.
|
|
681 name = "RAS Dataset"
|
|
682 type_gene = gene_type(dataset.iloc[0, 0], name)
|
|
683
|
|
684 rules = model.getRules(ARGS.tool_dir)
|
|
685 genes = data_gene(dataset, type_gene, name, None)
|
|
686 ids, rules = load_id_rules(rules.get(type_gene))
|
|
687
|
|
688 resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name)
|
|
689 create_ras(resolve_rules, name, rules, ids, ARGS.ras_output)
|
|
690
|
|
691 if err: utils.logWarning(
|
|
692 f"Warning: gene(s) {err} not found in class \"{name}\", " +
|
|
693 "the expression level for this gene will be considered NaN",
|
|
694 ARGS.out_log)
|
|
695
|
|
696 print("Execution succeded")
|
|
697
|
|
698 ###############################################################################
|
|
699 if __name__ == "__main__":
|
|
700 main() |