comparison COBRAxy/ras_generator.py @ 4:41f35c2f0c7b draft

Uploaded
author luca_milaz
date Wed, 18 Sep 2024 10:59:10 +0000
parents
children a1ab05a70185
comparison
equal deleted inserted replaced
3:1f3ac6fd9867 4:41f35c2f0c7b
1 from __future__ import division
2 # galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason.
3 import sys
4 import argparse
5 import collections
6 import pandas as pd
7 import pickle as pk
8 import utils.general_utils as utils
9 import utils.rule_parsing as ruleUtils
10 from typing import Union, Optional, List, Dict, Tuple, TypeVar
11
12 ERRORS = []
13 ########################## argparse ##########################################
14 ARGS :argparse.Namespace
15 def process_args() -> argparse.Namespace:
16 """
17 Processes command-line arguments.
18
19 Args:
20 args (list): List of command-line arguments.
21
22 Returns:
23 Namespace: An object containing parsed arguments.
24 """
25 parser = argparse.ArgumentParser(
26 usage = '%(prog)s [options]',
27 description = "process some value's genes to create a comparison's map.")
28
29 parser.add_argument(
30 '-rs', '--rules_selector',
31 type = utils.Model, default = utils.Model.HMRcore, choices = list(utils.Model),
32 help = 'chose which type of dataset you want use')
33
34 parser.add_argument("-rl", "--rule_list", type = str,
35 help = "path to input file with custom rules, if provided")
36
37 parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name")
38 # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
39
40 parser.add_argument(
41 '-n', '--none',
42 type = utils.Bool("none"), default = True,
43 help = 'compute Nan values')
44
45 parser.add_argument(
46 '-td', '--tool_dir',
47 type = str,
48 required = True, help = 'your tool directory')
49
50 parser.add_argument(
51 '-ol', '--out_log',
52 type = str,
53 help = "Output log")
54
55 parser.add_argument(
56 '-in', '--input', #id รจ diventato in
57 type = str,
58 help = 'input dataset')
59
60 parser.add_argument(
61 '-ra', '--ras_output',
62 type = str,
63 required = True, help = 'ras output')
64
65 return parser.parse_args()
66
67 ############################ dataset input ####################################
68 def read_dataset(data :str, name :str) -> pd.DataFrame:
69 """
70 Read a dataset from a CSV file and return it as a pandas DataFrame.
71
72 Args:
73 data (str): Path to the CSV file containing the dataset.
74 name (str): Name of the dataset, used in error messages.
75
76 Returns:
77 pandas.DataFrame: DataFrame containing the dataset.
78
79 Raises:
80 pd.errors.EmptyDataError: If the CSV file is empty.
81 sys.exit: If the CSV file has the wrong format, the execution is aborted.
82 """
83 try:
84 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
85 except pd.errors.EmptyDataError:
86 sys.exit('Execution aborted: wrong format of ' + name + '\n')
87 if len(dataset.columns) < 2:
88 sys.exit('Execution aborted: wrong format of ' + name + '\n')
89 return dataset
90
91 ############################ load id e rules ##################################
92 def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]:
93 """
94 Load IDs and rules from a dictionary of reactions.
95
96 Args:
97 reactions (dict): A dictionary where keys are IDs and values are rules.
98
99 Returns:
100 tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules.
101 """
102 ids, rules = [], []
103 for key, value in reactions.items():
104 ids.append(key)
105 rules.append(value)
106 return (ids, rules)
107
108 ############################ check_methods ####################################
109 def gene_type(l :str, name :str) -> str:
110 """
111 Determine the type of gene ID.
112
113 Args:
114 l (str): The gene identifier to check.
115 name (str): The name of the dataset, used in error messages.
116
117 Returns:
118 str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id').
119
120 Raises:
121 sys.exit: If the gene ID type is not supported, the execution is aborted.
122 """
123 if check_hgnc(l):
124 return 'hugo_id'
125 elif check_ensembl(l):
126 return 'ensembl_gene_id'
127 elif check_symbol(l):
128 return 'symbol'
129 elif check_entrez(l):
130 return 'entrez_id'
131 else:
132 sys.exit('Execution aborted:\n' +
133 'gene ID type in ' + name + ' not supported. Supported ID'+
134 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
135
136 def check_hgnc(l :str) -> bool:
137 """
138 Check if a gene identifier follows the HGNC format.
139
140 Args:
141 l (str): The gene identifier to check.
142
143 Returns:
144 bool: True if the gene identifier follows the HGNC format, False otherwise.
145 """
146 if len(l) > 5:
147 if (l.upper()).startswith('HGNC:'):
148 return l[5:].isdigit()
149 else:
150 return False
151 else:
152 return False
153
154 def check_ensembl(l :str) -> bool:
155 """
156 Check if a gene identifier follows the Ensembl format.
157
158 Args:
159 l (str): The gene identifier to check.
160
161 Returns:
162 bool: True if the gene identifier follows the Ensembl format, False otherwise.
163 """
164 return l.upper().startswith('ENS')
165
166
167 def check_symbol(l :str) -> bool:
168 """
169 Check if a gene identifier follows the symbol format.
170
171 Args:
172 l (str): The gene identifier to check.
173
174 Returns:
175 bool: True if the gene identifier follows the symbol format, False otherwise.
176 """
177 if len(l) > 0:
178 if l[0].isalpha() and l[1:].isalnum():
179 return True
180 else:
181 return False
182 else:
183 return False
184
185 def check_entrez(l :str) -> bool:
186 """
187 Check if a gene identifier follows the Entrez ID format.
188
189 Args:
190 l (str): The gene identifier to check.
191
192 Returns:
193 bool: True if the gene identifier follows the Entrez ID format, False otherwise.
194 """
195 if len(l) > 0:
196 return l.isdigit()
197 else:
198 return False
199
200 ############################ gene #############################################
201 def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]:
202 """
203 Process gene data to ensure correct formatting and handle duplicates.
204
205 Args:
206 gene (DataFrame): DataFrame containing gene data.
207 type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id').
208 name (str): Name of the dataset.
209 gene_custom (dict or None): Custom gene data dictionary if provided.
210
211 Returns:
212 dict: A dictionary containing gene data with gene IDs as keys and corresponding values.
213 """
214 args = process_args()
215 for i in range(len(gene)):
216 tmp = gene.iloc[i, 0]
217 gene.iloc[i, 0] = tmp.strip().split('.')[0]
218
219 gene_dup = [item for item, count in
220 collections.Counter(gene[gene.columns[0]]).items() if count > 1]
221 pat_dup = [item for item, count in
222 collections.Counter(list(gene.columns)).items() if count > 1]
223
224 if gene_dup:
225 if gene_custom == None:
226 if args.rules_selector == 'HMRcore':
227 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb'))
228
229 elif args.rules_selector == 'Recon':
230 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/Recon_genes.p', 'rb'))
231
232 elif args.rules_selector == 'ENGRO2':
233 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb'))
234
235 gene_in_rule = gene_in_rule.get(type_gene)
236
237 else:
238 gene_in_rule = gene_custom
239 tmp = []
240 for i in gene_dup:
241 if gene_in_rule.get(i) == 'ok':
242 tmp.append(i)
243 if tmp:
244 sys.exit('Execution aborted because gene ID '
245 +str(tmp)+' in '+name+' is duplicated\n')
246
247 if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log)
248 return (gene.set_index(gene.columns[0])).to_dict()
249
250 ############################ resolve ##########################################
251 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]:
252 """
253 Replace gene identifiers with corresponding values from a dictionary.
254
255 Args:
256 l (str): String of gene identifier.
257 d (str): String corresponding to its value.
258
259 Returns:
260 tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement.
261 """
262 tmp = []
263 err = []
264 while l:
265 if isinstance(l[0], list):
266 tmp_rules, tmp_err = replace_gene_value(l[0], d)
267 tmp.append(tmp_rules)
268 err.extend(tmp_err)
269 else:
270 value = replace_gene(l[0], d)
271 tmp.append(value)
272 if value == None:
273 err.append(l[0])
274 l = l[1:]
275 return (tmp, err)
276
277 def replace_gene(l :str, d :str) -> Union[int, float]:
278 """
279 Replace a single gene identifier with its corresponding value from a dictionary.
280
281 Args:
282 l (str): Gene identifier to replace.
283 d (str): String corresponding to its value.
284
285 Returns:
286 float/int: Corresponding value from the dictionary if found, None otherwise.
287
288 Raises:
289 sys.exit: If the value associated with the gene identifier is not valid.
290 """
291 if l =='and' or l == 'or':
292 return l
293 else:
294 value = d.get(l, None)
295 if not(value == None or isinstance(value, (int, float))):
296 sys.exit('Execution aborted: ' + value + ' value not valid\n')
297 return value
298
299 T = TypeVar("T", bound = Optional[Union[int, float]])
300 def computes(val1 :T, op :str, val2 :T, cn :bool) -> T:
301 """
302 Compute the RAS value between two value and an operator ('and' or 'or').
303
304 Args:
305 val1(Optional(Union[float, int])): First value.
306 op (str): Operator ('and' or 'or').
307 val2(Optional(Union[float, int])): Second value.
308 cn (bool): Control boolean value.
309
310 Returns:
311 Optional(Union[float, int]): Result of the computation.
312 """
313 if val1 != None and val2 != None:
314 if op == 'and':
315 return min(val1, val2)
316 else:
317 return val1 + val2
318 elif op == 'and':
319 if cn is True:
320 if val1 != None:
321 return val1
322 elif val2 != None:
323 return val2
324 else:
325 return None
326 else:
327 return None
328 else:
329 if val1 != None:
330 return val1
331 elif val2 != None:
332 return val2
333 else:
334 return None
335
336 # ris should be Literal[None] but Literal is not supported in Python 3.7
337 def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]:
338 """
339 Control the format of the expression.
340
341 Args:
342 ris: Intermediate result.
343 l (list): Expression to control.
344 cn (bool): Control boolean value.
345
346 Returns:
347 Union[Literal[False], int, float]: Result of the control.
348 """
349 if len(l) == 1:
350 if isinstance(l[0], (float, int)) or l[0] == None:
351 return l[0]
352 elif isinstance(l[0], list):
353 return control(None, l[0], cn)
354 else:
355 return False
356 elif len(l) > 2:
357 return control_list(ris, l, cn)
358 else:
359 return False
360
361 def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]:
362 """
363 Control the format of a list of expressions.
364
365 Args:
366 ris: Intermediate result.
367 l (list): List of expressions to control.
368 cn (bool): Control boolean value.
369
370 Returns:
371 Optional[Literal[False]]: Result of the control.
372 """
373 while l:
374 if len(l) == 1:
375 return False
376 elif (isinstance(l[0], (float, int)) or
377 l[0] == None) and l[1] in ['and', 'or']:
378 if isinstance(l[2], (float, int)) or l[2] == None:
379 ris = computes(l[0], l[1], l[2], cn)
380 elif isinstance(l[2], list):
381 tmp = control(None, l[2], cn)
382 if tmp is False:
383 return False
384 else:
385 ris = computes(l[0], l[1], tmp, cn)
386 else:
387 return False
388 l = l[3:]
389 elif l[0] in ['and', 'or']:
390 if isinstance(l[1], (float, int)) or l[1] == None:
391 ris = computes(ris, l[0], l[1], cn)
392 elif isinstance(l[1], list):
393 tmp = control(None,l[1], cn)
394 if tmp is False:
395 return False
396 else:
397 ris = computes(ris, l[0], tmp, cn)
398 else:
399 return False
400 l = l[2:]
401 elif isinstance(l[0], list) and l[1] in ['and', 'or']:
402 if isinstance(l[2], (float, int)) or l[2] == None:
403 tmp = control(None, l[0], cn)
404 if tmp is False:
405 return False
406 else:
407 ris = computes(tmp, l[1], l[2], cn)
408 elif isinstance(l[2], list):
409 tmp = control(None, l[0], cn)
410 tmp2 = control(None, l[2], cn)
411 if tmp is False or tmp2 is False:
412 return False
413 else:
414 ris = computes(tmp, l[1], tmp2, cn)
415 else:
416 return False
417 l = l[3:]
418 else:
419 return False
420 return ris
421
422 ResolvedRules = Dict[str, List[Optional[Union[float, int]]]]
423 def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]:
424 """
425 Resolve rules using gene data to compute scores for each rule.
426
427 Args:
428 genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values.
429 rules (list): List of rules to resolve.
430 ids (list): List of IDs corresponding to the rules.
431 resolve_none (bool): Flag indicating whether to resolve None values in the rules.
432 name (str): Name of the dataset.
433
434 Returns:
435 tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data.
436 """
437 resolve_rules = {}
438 not_found = []
439 flag = False
440 for key, value in genes.items():
441 tmp_resolve = []
442 for i in range(len(rules)):
443 tmp = rules[i]
444 if tmp:
445 tmp, err = replace_gene_value(tmp, value)
446 if err:
447 not_found.extend(err)
448 ris = control(None, tmp, resolve_none)
449 if ris is False or ris == None:
450 tmp_resolve.append(None)
451 else:
452 tmp_resolve.append(ris)
453 flag = True
454 else:
455 tmp_resolve.append(None)
456 resolve_rules[key] = tmp_resolve
457
458 if flag is False:
459 utils.logWarning(
460 f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded",
461 ARGS.out_log)
462
463 return (None, None)
464
465 return (resolve_rules, list(set(not_found)))
466 ############################ create_ras #######################################
467 def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None:
468 """
469 Create a RAS (Reaction Activity Score) file from resolved rules.
470
471 Args:
472 resolve_rules (dict): Dictionary containing resolved rules.
473 dataset_name (str): Name of the dataset.
474 rules (list): List of rules.
475 file (str): Path to the output RAS file.
476
477 Returns:
478 None
479 """
480 if resolve_rules is None:
481 utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log)
482
483 for geni in resolve_rules.values():
484 for i, valori in enumerate(geni):
485 if valori == None:
486 geni[i] = 'None'
487
488 output_ras = pd.DataFrame.from_dict(resolve_rules)
489
490 output_ras.insert(0, 'Reactions', ids)
491 output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False)
492
493 text_file = open(file, "w")
494
495 text_file.write(output_to_csv)
496 text_file.close()
497
498 ################################- NEW RAS COMPUTATION -################################
499 Expr = Optional[Union[int, float]]
500 Ras = Expr
501 def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]:
502 """
503 Generates the RAS scores for each cell line found in the dataset.
504
505 Args:
506 dataset (pd.DataFrame): Dataset containing gene values.
507 rules (dict): The dict containing reaction ids as keys and rules as values.
508
509 Side effects:
510 dataset : mut
511
512 Returns:
513 dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary
514 where each key corresponds to a reaction ID and each value is its computed RAS score.
515 """
516 ras_values_by_cell_line = {}
517 dataset.set_index(dataset.columns[0], inplace=True)
518 # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata
519 for cell_line_name in dataset.columns[1:]:
520 cell_line = dataset[cell_line_name].to_dict()
521 ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line)
522 return ras_values_by_cell_line
523
524 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]:
525 """
526 Computes the RAS (Reaction Activity Score) values for each rule in the given dict.
527
528 Args:
529 value_rules (dict): A dictionary where keys are reaction ids and values are OpLists.
530 dataset : gene expression data of one cell line.
531
532 Returns:
533 dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule.
534 """
535 return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()}
536
537 def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr:
538 """
539 Extracts the gene expression of the given gene from a cell line dataset.
540
541 Args:
542 dataset : gene expression data of one cell line.
543 name : gene name.
544
545 Returns:
546 Expr : the gene's expression value.
547 """
548 expr = dataset.get(name, None)
549 if expr is None: ERRORS.append(name)
550
551 return expr
552
553 def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras:
554 """
555 Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior.
556
557 Args:
558 op_list (OpList): The OpList representing a rule with gene values.
559 dataset : gene expression data of one cell line.
560
561 Returns:
562 Ras: The computed RAS value for the given OpList.
563 """
564 op = op_list.op
565 ras_value :Ras = None
566 if not op: return get_gene_expr(dataset, op_list[0])
567 if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None
568
569 for i in range(len(op_list)):
570 item = op_list[i]
571 if isinstance(item, ruleUtils.OpList):
572 item = ras_op_list(item, dataset)
573
574 else:
575 item = get_gene_expr(dataset, item)
576
577 if item is None:
578 if op is ruleUtils.RuleOp.AND and not ARGS.none: return None
579 continue
580
581 if ras_value is None:
582 ras_value = item
583 else:
584 ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item)
585
586 return ras_value
587
588 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None:
589 """
590 Save computed ras scores to the given path, as a tsv file.
591
592 Args:
593 rasScores : the computed ras scores.
594 path : the output tsv file's path.
595
596 Returns:
597 None
598 """
599 for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly
600 for reactId, score in scores.items():
601 if score is None: scores[reactId] = "None"
602
603 output_ras = pd.DataFrame.from_dict(rasScores)
604 output_ras.insert(0, 'Reactions', reactions)
605 output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False)
606
607 ############################ MAIN #############################################
608 #TODO: not used but keep, it will be when the new translator dicts will be used.
609 def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str:
610 """
611 Translate gene from any supported encoding to HugoID.
612
613 Args:
614 geneName (str): the name of the gene in its current encoding.
615 encoding (str): the encoding.
616 geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names
617 and encodings in the current model, mapping each to the corresponding HugoID encoding.
618
619 Raises:
620 ValueError: When the gene isn't supported in the model.
621
622 Returns:
623 str: the gene in HugoID encoding.
624 """
625 supportedGenesInEncoding = geneTranslator[encoding]
626 if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName]
627 raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!")
628
629 def load_custom_rules() -> Dict[str, ruleUtils.OpList]:
630 """
631 Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
632 performed, significantly impacting the runtime.
633
634 Returns:
635 Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
636 """
637 datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat
638
639 try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext
640 except utils.PathErr as err:
641 raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}")
642
643 if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath)
644
645 # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
646 return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) }
647
648 def main() -> None:
649 """
650 Initializes everything and sets the program in motion based on the fronted input arguments.
651
652 Returns:
653 None
654 """
655 # get args from frontend (related xml)
656 global ARGS
657 ARGS = process_args()
658
659 # read dataset
660 dataset = read_dataset(ARGS.input, "dataset")
661 dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str)
662
663 # remove versioning from gene names
664 dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0]
665
666 # handle custom models
667 model :utils.Model = ARGS.rules_selector
668 if model is utils.Model.Custom:
669 rules = load_custom_rules()
670 reactions = list(rules.keys())
671
672 save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
673 if ERRORS: utils.logWarning(
674 f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
675 ARGS.out_log)
676
677 return
678
679 # This is the standard flow of the ras_generator program, for non-custom models.
680 name = "RAS Dataset"
681 type_gene = gene_type(dataset.iloc[0, 0], name)
682
683 rules = model.getRules(ARGS.tool_dir)
684 genes = data_gene(dataset, type_gene, name, None)
685 ids, rules = load_id_rules(rules.get(type_gene))
686
687 resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name)
688 create_ras(resolve_rules, name, rules, ids, ARGS.ras_output)
689
690 if err: utils.logWarning(
691 f"Warning: gene(s) {err} not found in class \"{name}\", " +
692 "the expression level for this gene will be considered NaN",
693 ARGS.out_log)
694
695 print("Execution succeded")
696
697 ###############################################################################
698 if __name__ == "__main__":
699 main()