comparison cobraxy-9688ad27287b/COBRAxy/ras_generator.py @ 90:a48b2e06ebe7 draft

Uploaded
author luca_milaz
date Sun, 13 Oct 2024 11:35:56 +0000
parents
children
comparison
equal deleted inserted replaced
89:6ddfc81e97d1 90:a48b2e06ebe7
1 from __future__ import division
2 # galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason.
3 import sys
4 import argparse
5 import collections
6 import pandas as pd
7 import pickle as pk
8 import utils.general_utils as utils
9 import utils.rule_parsing as ruleUtils
10 from typing import Union, Optional, List, Dict, Tuple, TypeVar
11
12 ERRORS = []
13 ########################## argparse ##########################################
14 ARGS :argparse.Namespace
15 def process_args() -> argparse.Namespace:
16 """
17 Processes command-line arguments.
18
19 Args:
20 args (list): List of command-line arguments.
21
22 Returns:
23 Namespace: An object containing parsed arguments.
24 """
25 parser = argparse.ArgumentParser(
26 usage = '%(prog)s [options]',
27 description = "process some value's genes to create a comparison's map.")
28
29 parser.add_argument(
30 '-rs', '--rules_selector',
31 type = utils.Model, default = utils.Model.HMRcore, choices = list(utils.Model),
32 help = 'chose which type of dataset you want use')
33
34 parser.add_argument("-rl", "--rule_list", type = str,
35 help = "path to input file with custom rules, if provided")
36
37 parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name")
38 # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
39
40 parser.add_argument(
41 '-n', '--none',
42 type = utils.Bool("none"), default = True,
43 help = 'compute Nan values')
44
45 parser.add_argument(
46 '-td', '--tool_dir',
47 type = str,
48 required = True, help = 'your tool directory')
49
50 parser.add_argument(
51 '-ol', '--out_log',
52 type = str,
53 help = "Output log")
54
55 parser.add_argument(
56 '-in', '--input', #id รจ diventato in
57 type = str,
58 help = 'input dataset')
59
60 parser.add_argument(
61 '-ra', '--ras_output',
62 type = str,
63 required = True, help = 'ras output')
64
65 return parser.parse_args()
66
67 ############################ dataset input ####################################
68 def read_dataset(data :str, name :str) -> pd.DataFrame:
69 """
70 Read a dataset from a CSV file and return it as a pandas DataFrame.
71
72 Args:
73 data (str): Path to the CSV file containing the dataset.
74 name (str): Name of the dataset, used in error messages.
75
76 Returns:
77 pandas.DataFrame: DataFrame containing the dataset.
78
79 Raises:
80 pd.errors.EmptyDataError: If the CSV file is empty.
81 sys.exit: If the CSV file has the wrong format, the execution is aborted.
82 """
83 try:
84 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
85 except pd.errors.EmptyDataError:
86 sys.exit('Execution aborted: wrong format of ' + name + '\n')
87 if len(dataset.columns) < 2:
88 sys.exit('Execution aborted: wrong format of ' + name + '\n')
89 return dataset
90
91 ############################ load id e rules ##################################
92 def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]:
93 """
94 Load IDs and rules from a dictionary of reactions.
95
96 Args:
97 reactions (dict): A dictionary where keys are IDs and values are rules.
98
99 Returns:
100 tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules.
101 """
102 ids, rules = [], []
103 for key, value in reactions.items():
104 ids.append(key)
105 rules.append(value)
106 return (ids, rules)
107
108 ############################ check_methods ####################################
109 def gene_type(l :str, name :str) -> str:
110 """
111 Determine the type of gene ID.
112
113 Args:
114 l (str): The gene identifier to check.
115 name (str): The name of the dataset, used in error messages.
116
117 Returns:
118 str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id').
119
120 Raises:
121 sys.exit: If the gene ID type is not supported, the execution is aborted.
122 """
123 if check_hgnc(l):
124 return 'hugo_id'
125 elif check_ensembl(l):
126 return 'ensembl_gene_id'
127 elif check_symbol(l):
128 return 'symbol'
129 elif check_entrez(l):
130 return 'entrez_id'
131 else:
132 sys.exit('Execution aborted:\n' +
133 'gene ID type in ' + name + ' not supported. Supported ID'+
134 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
135
136 def check_hgnc(l :str) -> bool:
137 """
138 Check if a gene identifier follows the HGNC format.
139
140 Args:
141 l (str): The gene identifier to check.
142
143 Returns:
144 bool: True if the gene identifier follows the HGNC format, False otherwise.
145 """
146 if len(l) > 5:
147 if (l.upper()).startswith('HGNC:'):
148 return l[5:].isdigit()
149 else:
150 return False
151 else:
152 return False
153
154 def check_ensembl(l :str) -> bool:
155 """
156 Check if a gene identifier follows the Ensembl format.
157
158 Args:
159 l (str): The gene identifier to check.
160
161 Returns:
162 bool: True if the gene identifier follows the Ensembl format, False otherwise.
163 """
164 return l.upper().startswith('ENS')
165
166
167 def check_symbol(l :str) -> bool:
168 """
169 Check if a gene identifier follows the symbol format.
170
171 Args:
172 l (str): The gene identifier to check.
173
174 Returns:
175 bool: True if the gene identifier follows the symbol format, False otherwise.
176 """
177 if len(l) > 0:
178 if l[0].isalpha() and l[1:].isalnum():
179 return True
180 else:
181 return False
182 else:
183 return False
184
185 def check_entrez(l :str) -> bool:
186 """
187 Check if a gene identifier follows the Entrez ID format.
188
189 Args:
190 l (str): The gene identifier to check.
191
192 Returns:
193 bool: True if the gene identifier follows the Entrez ID format, False otherwise.
194 """
195 if len(l) > 0:
196 return l.isdigit()
197 else:
198 return False
199
200 ############################ gene #############################################
201 def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]:
202 """
203 Process gene data to ensure correct formatting and handle duplicates.
204
205 Args:
206 gene (DataFrame): DataFrame containing gene data.
207 type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id').
208 name (str): Name of the dataset.
209 gene_custom (dict or None): Custom gene data dictionary if provided.
210
211 Returns:
212 dict: A dictionary containing gene data with gene IDs as keys and corresponding values.
213 """
214 args = process_args()
215 for i in range(len(gene)):
216 tmp = gene.iloc[i, 0]
217 gene.iloc[i, 0] = tmp.strip().split('.')[0]
218
219 gene_dup = [item for item, count in
220 collections.Counter(gene[gene.columns[0]]).items() if count > 1]
221 pat_dup = [item for item, count in
222 collections.Counter(list(gene.columns)).items() if count > 1]
223
224 if gene_dup:
225 if gene_custom == None:
226 if args.rules_selector == 'HMRcore':
227 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb'))
228
229 elif args.rules_selector == 'Recon':
230 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/Recon_genes.p', 'rb'))
231
232 elif args.rules_selector == 'ENGRO2':
233 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb'))
234 print(f"{args.tool_dir}'/local/pickle files/ENGRO2_genes.p'")
235 utils.logWarning(f"{args.tool_dir}'/local/pickle files/ENGRO2_genes.p'", ARGS.out_log)
236 print(args.rules_selector)
237 gene_in_rule = gene_in_rule.get(type_gene)
238
239 else:
240 gene_in_rule = gene_custom
241 tmp = []
242 for i in gene_dup:
243 if gene_in_rule.get(i) == 'ok':
244 tmp.append(i)
245 if tmp:
246 sys.exit('Execution aborted because gene ID '
247 +str(tmp)+' in '+name+' is duplicated\n')
248
249 if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log)
250 return (gene.set_index(gene.columns[0])).to_dict()
251
252 ############################ resolve ##########################################
253 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]:
254 """
255 Replace gene identifiers with corresponding values from a dictionary.
256
257 Args:
258 l (str): String of gene identifier.
259 d (str): String corresponding to its value.
260
261 Returns:
262 tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement.
263 """
264 tmp = []
265 err = []
266 while l:
267 if isinstance(l[0], list):
268 tmp_rules, tmp_err = replace_gene_value(l[0], d)
269 tmp.append(tmp_rules)
270 err.extend(tmp_err)
271 else:
272 value = replace_gene(l[0], d)
273 tmp.append(value)
274 if value == None:
275 err.append(l[0])
276 l = l[1:]
277 return (tmp, err)
278
279 def replace_gene(l :str, d :str) -> Union[int, float]:
280 """
281 Replace a single gene identifier with its corresponding value from a dictionary.
282
283 Args:
284 l (str): Gene identifier to replace.
285 d (str): String corresponding to its value.
286
287 Returns:
288 float/int: Corresponding value from the dictionary if found, None otherwise.
289
290 Raises:
291 sys.exit: If the value associated with the gene identifier is not valid.
292 """
293 if l =='and' or l == 'or':
294 return l
295 else:
296 value = d.get(l, None)
297 if not(value == None or isinstance(value, (int, float))):
298 sys.exit('Execution aborted: ' + value + ' value not valid\n')
299 return value
300
301 T = TypeVar("T", bound = Optional[Union[int, float]])
302 def computes(val1 :T, op :str, val2 :T, cn :bool) -> T:
303 """
304 Compute the RAS value between two value and an operator ('and' or 'or').
305
306 Args:
307 val1(Optional(Union[float, int])): First value.
308 op (str): Operator ('and' or 'or').
309 val2(Optional(Union[float, int])): Second value.
310 cn (bool): Control boolean value.
311
312 Returns:
313 Optional(Union[float, int]): Result of the computation.
314 """
315 if val1 != None and val2 != None:
316 if op == 'and':
317 return min(val1, val2)
318 else:
319 return val1 + val2
320 elif op == 'and':
321 if cn is True:
322 if val1 != None:
323 return val1
324 elif val2 != None:
325 return val2
326 else:
327 return None
328 else:
329 return None
330 else:
331 if val1 != None:
332 return val1
333 elif val2 != None:
334 return val2
335 else:
336 return None
337
338 # ris should be Literal[None] but Literal is not supported in Python 3.7
339 def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]:
340 """
341 Control the format of the expression.
342
343 Args:
344 ris: Intermediate result.
345 l (list): Expression to control.
346 cn (bool): Control boolean value.
347
348 Returns:
349 Union[Literal[False], int, float]: Result of the control.
350 """
351 if len(l) == 1:
352 if isinstance(l[0], (float, int)) or l[0] == None:
353 return l[0]
354 elif isinstance(l[0], list):
355 return control(None, l[0], cn)
356 else:
357 return False
358 elif len(l) > 2:
359 return control_list(ris, l, cn)
360 else:
361 return False
362
363 def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]:
364 """
365 Control the format of a list of expressions.
366
367 Args:
368 ris: Intermediate result.
369 l (list): List of expressions to control.
370 cn (bool): Control boolean value.
371
372 Returns:
373 Optional[Literal[False]]: Result of the control.
374 """
375 while l:
376 if len(l) == 1:
377 return False
378 elif (isinstance(l[0], (float, int)) or
379 l[0] == None) and l[1] in ['and', 'or']:
380 if isinstance(l[2], (float, int)) or l[2] == None:
381 ris = computes(l[0], l[1], l[2], cn)
382 elif isinstance(l[2], list):
383 tmp = control(None, l[2], cn)
384 if tmp is False:
385 return False
386 else:
387 ris = computes(l[0], l[1], tmp, cn)
388 else:
389 return False
390 l = l[3:]
391 elif l[0] in ['and', 'or']:
392 if isinstance(l[1], (float, int)) or l[1] == None:
393 ris = computes(ris, l[0], l[1], cn)
394 elif isinstance(l[1], list):
395 tmp = control(None,l[1], cn)
396 if tmp is False:
397 return False
398 else:
399 ris = computes(ris, l[0], tmp, cn)
400 else:
401 return False
402 l = l[2:]
403 elif isinstance(l[0], list) and l[1] in ['and', 'or']:
404 if isinstance(l[2], (float, int)) or l[2] == None:
405 tmp = control(None, l[0], cn)
406 if tmp is False:
407 return False
408 else:
409 ris = computes(tmp, l[1], l[2], cn)
410 elif isinstance(l[2], list):
411 tmp = control(None, l[0], cn)
412 tmp2 = control(None, l[2], cn)
413 if tmp is False or tmp2 is False:
414 return False
415 else:
416 ris = computes(tmp, l[1], tmp2, cn)
417 else:
418 return False
419 l = l[3:]
420 else:
421 return False
422 return ris
423
424 ResolvedRules = Dict[str, List[Optional[Union[float, int]]]]
425 def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]:
426 """
427 Resolve rules using gene data to compute scores for each rule.
428
429 Args:
430 genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values.
431 rules (list): List of rules to resolve.
432 ids (list): List of IDs corresponding to the rules.
433 resolve_none (bool): Flag indicating whether to resolve None values in the rules.
434 name (str): Name of the dataset.
435
436 Returns:
437 tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data.
438 """
439 resolve_rules = {}
440 not_found = []
441 flag = False
442 for key, value in genes.items():
443 tmp_resolve = []
444 for i in range(len(rules)):
445 tmp = rules[i]
446 if tmp:
447 tmp, err = replace_gene_value(tmp, value)
448 if err:
449 not_found.extend(err)
450 ris = control(None, tmp, resolve_none)
451 if ris is False or ris == None:
452 tmp_resolve.append(None)
453 else:
454 tmp_resolve.append(ris)
455 flag = True
456 else:
457 tmp_resolve.append(None)
458 resolve_rules[key] = tmp_resolve
459
460 if flag is False:
461 utils.logWarning(
462 f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded",
463 ARGS.out_log)
464
465 return (None, None)
466
467 return (resolve_rules, list(set(not_found)))
468 ############################ create_ras #######################################
469 def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None:
470 """
471 Create a RAS (Reaction Activity Score) file from resolved rules.
472
473 Args:
474 resolve_rules (dict): Dictionary containing resolved rules.
475 dataset_name (str): Name of the dataset.
476 rules (list): List of rules.
477 file (str): Path to the output RAS file.
478
479 Returns:
480 None
481 """
482 if resolve_rules is None:
483 utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log)
484
485 for geni in resolve_rules.values():
486 for i, valori in enumerate(geni):
487 if valori == None:
488 geni[i] = 'None'
489
490 output_ras = pd.DataFrame.from_dict(resolve_rules)
491
492 output_ras.insert(0, 'Reactions', ids)
493 output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False)
494
495 text_file = open(file, "w")
496
497 text_file.write(output_to_csv)
498 text_file.close()
499
500 ################################- NEW RAS COMPUTATION -################################
501 Expr = Optional[Union[int, float]]
502 Ras = Expr
503 def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]:
504 """
505 Generates the RAS scores for each cell line found in the dataset.
506
507 Args:
508 dataset (pd.DataFrame): Dataset containing gene values.
509 rules (dict): The dict containing reaction ids as keys and rules as values.
510
511 Side effects:
512 dataset : mut
513
514 Returns:
515 dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary
516 where each key corresponds to a reaction ID and each value is its computed RAS score.
517 """
518 ras_values_by_cell_line = {}
519 dataset.set_index(dataset.columns[0], inplace=True)
520 # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata
521 for cell_line_name in dataset.columns[1:]:
522 cell_line = dataset[cell_line_name].to_dict()
523 ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line)
524 return ras_values_by_cell_line
525
526 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]:
527 """
528 Computes the RAS (Reaction Activity Score) values for each rule in the given dict.
529
530 Args:
531 value_rules (dict): A dictionary where keys are reaction ids and values are OpLists.
532 dataset : gene expression data of one cell line.
533
534 Returns:
535 dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule.
536 """
537 return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()}
538
539 def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr:
540 """
541 Extracts the gene expression of the given gene from a cell line dataset.
542
543 Args:
544 dataset : gene expression data of one cell line.
545 name : gene name.
546
547 Returns:
548 Expr : the gene's expression value.
549 """
550 expr = dataset.get(name, None)
551 if expr is None: ERRORS.append(name)
552
553 return expr
554
555 def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras:
556 """
557 Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior.
558
559 Args:
560 op_list (OpList): The OpList representing a rule with gene values.
561 dataset : gene expression data of one cell line.
562
563 Returns:
564 Ras: The computed RAS value for the given OpList.
565 """
566 op = op_list.op
567 ras_value :Ras = None
568 if not op: return get_gene_expr(dataset, op_list[0])
569 if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None
570
571 for i in range(len(op_list)):
572 item = op_list[i]
573 if isinstance(item, ruleUtils.OpList):
574 item = ras_op_list(item, dataset)
575
576 else:
577 item = get_gene_expr(dataset, item)
578
579 if item is None:
580 if op is ruleUtils.RuleOp.AND and not ARGS.none: return None
581 continue
582
583 if ras_value is None:
584 ras_value = item
585 else:
586 ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item)
587
588 return ras_value
589
590 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None:
591 """
592 Save computed ras scores to the given path, as a tsv file.
593
594 Args:
595 rasScores : the computed ras scores.
596 path : the output tsv file's path.
597
598 Returns:
599 None
600 """
601 for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly
602 for reactId, score in scores.items():
603 if score is None: scores[reactId] = "None"
604
605 output_ras = pd.DataFrame.from_dict(rasScores)
606 output_ras.insert(0, 'Reactions', reactions)
607 output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False)
608
609 ############################ MAIN #############################################
610 #TODO: not used but keep, it will be when the new translator dicts will be used.
611 def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str:
612 """
613 Translate gene from any supported encoding to HugoID.
614
615 Args:
616 geneName (str): the name of the gene in its current encoding.
617 encoding (str): the encoding.
618 geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names
619 and encodings in the current model, mapping each to the corresponding HugoID encoding.
620
621 Raises:
622 ValueError: When the gene isn't supported in the model.
623
624 Returns:
625 str: the gene in HugoID encoding.
626 """
627 supportedGenesInEncoding = geneTranslator[encoding]
628 if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName]
629 raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!")
630
631 def load_custom_rules() -> Dict[str, ruleUtils.OpList]:
632 """
633 Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
634 performed, significantly impacting the runtime.
635
636 Returns:
637 Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
638 """
639 datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat
640
641 try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext
642 except utils.PathErr as err:
643 raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}")
644
645 if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath)
646
647 # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
648 return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) }
649
650 def main() -> None:
651 """
652 Initializes everything and sets the program in motion based on the fronted input arguments.
653
654 Returns:
655 None
656 """
657 # get args from frontend (related xml)
658 global ARGS
659 ARGS = process_args()
660 print(ARGS.rules_selector)
661 # read dataset
662 dataset = read_dataset(ARGS.input, "dataset")
663 dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str)
664
665 # remove versioning from gene names
666 dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0]
667
668 # handle custom models
669 model :utils.Model = ARGS.rules_selector
670 if model is utils.Model.Custom:
671 rules = load_custom_rules()
672 reactions = list(rules.keys())
673
674 save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
675 if ERRORS: utils.logWarning(
676 f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
677 ARGS.out_log)
678
679 return
680
681 # This is the standard flow of the ras_generator program, for non-custom models.
682 name = "RAS Dataset"
683 type_gene = gene_type(dataset.iloc[0, 0], name)
684
685 rules = model.getRules(ARGS.tool_dir)
686 genes = data_gene(dataset, type_gene, name, None)
687 ids, rules = load_id_rules(rules.get(type_gene))
688
689 resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name)
690 create_ras(resolve_rules, name, rules, ids, ARGS.ras_output)
691
692 if err: utils.logWarning(
693 f"Warning: gene(s) {err} not found in class \"{name}\", " +
694 "the expression level for this gene will be considered NaN",
695 ARGS.out_log)
696
697 print("Execution succeded")
698
699 ###############################################################################
700 if __name__ == "__main__":
701 main()