comparison ras_generator.py @ 262:d2d6a332d269 draft

Uploaded
author francesco_lapi
date Tue, 04 Mar 2025 15:23:25 +0000
parents
children
comparison
equal deleted inserted replaced
261:1df2d8de156f 262:d2d6a332d269
1 from __future__ import division
2 # galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason.
3 import sys
4 import argparse
5 import collections
6 import pandas as pd
7 import pickle as pk
8 import utils.general_utils as utils
9 import utils.rule_parsing as ruleUtils
10 from typing import Union, Optional, List, Dict, Tuple, TypeVar
11
12 ERRORS = []
13 ########################## argparse ##########################################
14 ARGS :argparse.Namespace
15 def process_args(args:List[str] = None) -> argparse.Namespace:
16 """
17 Processes command-line arguments.
18
19 Args:
20 args (list): List of command-line arguments.
21
22 Returns:
23 Namespace: An object containing parsed arguments.
24 """
25 parser = argparse.ArgumentParser(
26 usage = '%(prog)s [options]',
27 description = "process some value's genes to create a comparison's map.")
28
29 parser.add_argument(
30 '-rs', '--rules_selector',
31 type = utils.Model, default = utils.Model.HMRcore, choices = list(utils.Model),
32 help = 'chose which type of dataset you want use')
33
34 parser.add_argument("-rl", "--rule_list", type = str,
35 help = "path to input file with custom rules, if provided")
36
37 parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name")
38 # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
39
40 parser.add_argument(
41 '-n', '--none',
42 type = utils.Bool("none"), default = True,
43 help = 'compute Nan values')
44
45 parser.add_argument(
46 '-td', '--tool_dir',
47 type = str,
48 required = True, help = 'your tool directory')
49
50 parser.add_argument(
51 '-ol', '--out_log',
52 type = str,
53 help = "Output log")
54
55 parser.add_argument(
56 '-in', '--input', #id รจ diventato in
57 type = str,
58 help = 'input dataset')
59
60 parser.add_argument(
61 '-ra', '--ras_output',
62 type = str,
63 required = True, help = 'ras output')
64
65
66 return parser.parse_args(args)
67
68 ############################ dataset input ####################################
69 def read_dataset(data :str, name :str) -> pd.DataFrame:
70 """
71 Read a dataset from a CSV file and return it as a pandas DataFrame.
72
73 Args:
74 data (str): Path to the CSV file containing the dataset.
75 name (str): Name of the dataset, used in error messages.
76
77 Returns:
78 pandas.DataFrame: DataFrame containing the dataset.
79
80 Raises:
81 pd.errors.EmptyDataError: If the CSV file is empty.
82 sys.exit: If the CSV file has the wrong format, the execution is aborted.
83 """
84 try:
85 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
86 except pd.errors.EmptyDataError:
87 sys.exit('Execution aborted: wrong format of ' + name + '\n')
88 if len(dataset.columns) < 2:
89 sys.exit('Execution aborted: wrong format of ' + name + '\n')
90 return dataset
91
92 ############################ load id e rules ##################################
93 def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]:
94 """
95 Load IDs and rules from a dictionary of reactions.
96
97 Args:
98 reactions (dict): A dictionary where keys are IDs and values are rules.
99
100 Returns:
101 tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules.
102 """
103 ids, rules = [], []
104 for key, value in reactions.items():
105 ids.append(key)
106 rules.append(value)
107 return (ids, rules)
108
109 ############################ check_methods ####################################
110 def gene_type(l :str, name :str) -> str:
111 """
112 Determine the type of gene ID.
113
114 Args:
115 l (str): The gene identifier to check.
116 name (str): The name of the dataset, used in error messages.
117
118 Returns:
119 str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id').
120
121 Raises:
122 sys.exit: If the gene ID type is not supported, the execution is aborted.
123 """
124 if check_hgnc(l):
125 return 'hugo_id'
126 elif check_ensembl(l):
127 return 'ensembl_gene_id'
128 elif check_symbol(l):
129 return 'symbol'
130 elif check_entrez(l):
131 return 'entrez_id'
132 else:
133 sys.exit('Execution aborted:\n' +
134 'gene ID type in ' + name + ' not supported. Supported ID'+
135 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
136
137 def check_hgnc(l :str) -> bool:
138 """
139 Check if a gene identifier follows the HGNC format.
140
141 Args:
142 l (str): The gene identifier to check.
143
144 Returns:
145 bool: True if the gene identifier follows the HGNC format, False otherwise.
146 """
147 if len(l) > 5:
148 if (l.upper()).startswith('HGNC:'):
149 return l[5:].isdigit()
150 else:
151 return False
152 else:
153 return False
154
155 def check_ensembl(l :str) -> bool:
156 """
157 Check if a gene identifier follows the Ensembl format.
158
159 Args:
160 l (str): The gene identifier to check.
161
162 Returns:
163 bool: True if the gene identifier follows the Ensembl format, False otherwise.
164 """
165 return l.upper().startswith('ENS')
166
167
168 def check_symbol(l :str) -> bool:
169 """
170 Check if a gene identifier follows the symbol format.
171
172 Args:
173 l (str): The gene identifier to check.
174
175 Returns:
176 bool: True if the gene identifier follows the symbol format, False otherwise.
177 """
178 if len(l) > 0:
179 if l[0].isalpha() and l[1:].isalnum():
180 return True
181 else:
182 return False
183 else:
184 return False
185
186 def check_entrez(l :str) -> bool:
187 """
188 Check if a gene identifier follows the Entrez ID format.
189
190 Args:
191 l (str): The gene identifier to check.
192
193 Returns:
194 bool: True if the gene identifier follows the Entrez ID format, False otherwise.
195 """
196 if len(l) > 0:
197 return l.isdigit()
198 else:
199 return False
200
201 ############################ gene #############################################
202 def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]:
203 """
204 Process gene data to ensure correct formatting and handle duplicates.
205
206 Args:
207 gene (DataFrame): DataFrame containing gene data.
208 type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id').
209 name (str): Name of the dataset.
210 gene_custom (dict or None): Custom gene data dictionary if provided.
211
212 Returns:
213 dict: A dictionary containing gene data with gene IDs as keys and corresponding values.
214 """
215 args = process_args()
216 for i in range(len(gene)):
217 tmp = gene.iloc[i, 0]
218 gene.iloc[i, 0] = tmp.strip().split('.')[0]
219
220 gene_dup = [item for item, count in
221 collections.Counter(gene[gene.columns[0]]).items() if count > 1]
222 pat_dup = [item for item, count in
223 collections.Counter(list(gene.columns)).items() if count > 1]
224
225 gene_in_rule = None
226
227 if gene_dup:
228 if gene_custom == None:
229 print(args.rules_selector)
230 print(args.rules_selector == 'ENGRO2')
231
232 if args.rules_selector == 'HMRcore':
233 print(1)
234 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb'))
235
236 elif args.rules_selector == 'Recon':
237 print(2)
238 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/Recon_genes.p', 'rb'))
239
240 elif args.rules_selector == 'ENGRO2':
241 print(3)
242 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb'))
243
244 utils.logWarning(f"{args.tool_dir}'/local/pickle files/ENGRO2_genes.p'", ARGS.out_log)
245
246 gene_in_rule = gene_in_rule.get(type_gene)
247
248 else:
249 gene_in_rule = gene_custom
250
251 tmp = []
252 for i in gene_dup:
253 if gene_in_rule.get(i) == 'ok':
254 tmp.append(i)
255 if tmp:
256 sys.exit('Execution aborted because gene ID '
257 +str(tmp)+' in '+name+' is duplicated\n')
258
259 if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log)
260 return (gene.set_index(gene.columns[0])).to_dict()
261
262 ############################ resolve ##########################################
263 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]:
264 """
265 Replace gene identifiers with corresponding values from a dictionary.
266
267 Args:
268 l (str): String of gene identifier.
269 d (str): String corresponding to its value.
270
271 Returns:
272 tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement.
273 """
274 tmp = []
275 err = []
276 while l:
277 if isinstance(l[0], list):
278 tmp_rules, tmp_err = replace_gene_value(l[0], d)
279 tmp.append(tmp_rules)
280 err.extend(tmp_err)
281 else:
282 value = replace_gene(l[0], d)
283 tmp.append(value)
284 if value == None:
285 err.append(l[0])
286 l = l[1:]
287 return (tmp, err)
288
289 def replace_gene(l :str, d :str) -> Union[int, float]:
290 """
291 Replace a single gene identifier with its corresponding value from a dictionary.
292
293 Args:
294 l (str): Gene identifier to replace.
295 d (str): String corresponding to its value.
296
297 Returns:
298 float/int: Corresponding value from the dictionary if found, None otherwise.
299
300 Raises:
301 sys.exit: If the value associated with the gene identifier is not valid.
302 """
303 if l =='and' or l == 'or':
304 return l
305 else:
306 value = d.get(l, None)
307 if not(value == None or isinstance(value, (int, float))):
308 sys.exit('Execution aborted: ' + value + ' value not valid\n')
309 return value
310
311 T = TypeVar("T", bound = Optional[Union[int, float]])
312 def computes(val1 :T, op :str, val2 :T, cn :bool) -> T:
313 """
314 Compute the RAS value between two value and an operator ('and' or 'or').
315
316 Args:
317 val1(Optional(Union[float, int])): First value.
318 op (str): Operator ('and' or 'or').
319 val2(Optional(Union[float, int])): Second value.
320 cn (bool): Control boolean value.
321
322 Returns:
323 Optional(Union[float, int]): Result of the computation.
324 """
325 if val1 != None and val2 != None:
326 if op == 'and':
327 return min(val1, val2)
328 else:
329 return val1 + val2
330 elif op == 'and':
331 if cn is True:
332 if val1 != None:
333 return val1
334 elif val2 != None:
335 return val2
336 else:
337 return None
338 else:
339 return None
340 else:
341 if val1 != None:
342 return val1
343 elif val2 != None:
344 return val2
345 else:
346 return None
347
348 # ris should be Literal[None] but Literal is not supported in Python 3.7
349 def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]:
350 """
351 Control the format of the expression.
352
353 Args:
354 ris: Intermediate result.
355 l (list): Expression to control.
356 cn (bool): Control boolean value.
357
358 Returns:
359 Union[Literal[False], int, float]: Result of the control.
360 """
361 if len(l) == 1:
362 if isinstance(l[0], (float, int)) or l[0] == None:
363 return l[0]
364 elif isinstance(l[0], list):
365 return control(None, l[0], cn)
366 else:
367 return False
368 elif len(l) > 2:
369 return control_list(ris, l, cn)
370 else:
371 return False
372
373 def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]:
374 """
375 Control the format of a list of expressions.
376
377 Args:
378 ris: Intermediate result.
379 l (list): List of expressions to control.
380 cn (bool): Control boolean value.
381
382 Returns:
383 Optional[Literal[False]]: Result of the control.
384 """
385 while l:
386 if len(l) == 1:
387 return False
388 elif (isinstance(l[0], (float, int)) or
389 l[0] == None) and l[1] in ['and', 'or']:
390 if isinstance(l[2], (float, int)) or l[2] == None:
391 ris = computes(l[0], l[1], l[2], cn)
392 elif isinstance(l[2], list):
393 tmp = control(None, l[2], cn)
394 if tmp is False:
395 return False
396 else:
397 ris = computes(l[0], l[1], tmp, cn)
398 else:
399 return False
400 l = l[3:]
401 elif l[0] in ['and', 'or']:
402 if isinstance(l[1], (float, int)) or l[1] == None:
403 ris = computes(ris, l[0], l[1], cn)
404 elif isinstance(l[1], list):
405 tmp = control(None,l[1], cn)
406 if tmp is False:
407 return False
408 else:
409 ris = computes(ris, l[0], tmp, cn)
410 else:
411 return False
412 l = l[2:]
413 elif isinstance(l[0], list) and l[1] in ['and', 'or']:
414 if isinstance(l[2], (float, int)) or l[2] == None:
415 tmp = control(None, l[0], cn)
416 if tmp is False:
417 return False
418 else:
419 ris = computes(tmp, l[1], l[2], cn)
420 elif isinstance(l[2], list):
421 tmp = control(None, l[0], cn)
422 tmp2 = control(None, l[2], cn)
423 if tmp is False or tmp2 is False:
424 return False
425 else:
426 ris = computes(tmp, l[1], tmp2, cn)
427 else:
428 return False
429 l = l[3:]
430 else:
431 return False
432 return ris
433
434 ResolvedRules = Dict[str, List[Optional[Union[float, int]]]]
435 def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]:
436 """
437 Resolve rules using gene data to compute scores for each rule.
438
439 Args:
440 genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values.
441 rules (list): List of rules to resolve.
442 ids (list): List of IDs corresponding to the rules.
443 resolve_none (bool): Flag indicating whether to resolve None values in the rules.
444 name (str): Name of the dataset.
445
446 Returns:
447 tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data.
448 """
449 resolve_rules = {}
450 not_found = []
451 flag = False
452 for key, value in genes.items():
453 tmp_resolve = []
454 for i in range(len(rules)):
455 tmp = rules[i]
456 if tmp:
457 tmp, err = replace_gene_value(tmp, value)
458 if err:
459 not_found.extend(err)
460 ris = control(None, tmp, resolve_none)
461 if ris is False or ris == None:
462 tmp_resolve.append(None)
463 else:
464 tmp_resolve.append(ris)
465 flag = True
466 else:
467 tmp_resolve.append(None)
468 resolve_rules[key] = tmp_resolve
469
470 if flag is False:
471 utils.logWarning(
472 f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded",
473 ARGS.out_log)
474
475 return (None, None)
476
477 return (resolve_rules, list(set(not_found)))
478 ############################ create_ras #######################################
479 def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None:
480 """
481 Create a RAS (Reaction Activity Score) file from resolved rules.
482
483 Args:
484 resolve_rules (dict): Dictionary containing resolved rules.
485 dataset_name (str): Name of the dataset.
486 rules (list): List of rules.
487 file (str): Path to the output RAS file.
488
489 Returns:
490 None
491 """
492 if resolve_rules is None:
493 utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log)
494
495 for geni in resolve_rules.values():
496 for i, valori in enumerate(geni):
497 if valori == None:
498 geni[i] = 'None'
499
500 output_ras = pd.DataFrame.from_dict(resolve_rules)
501
502 output_ras.insert(0, 'Reactions', ids)
503 output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False)
504
505 text_file = open(file, "w")
506
507 text_file.write(output_to_csv)
508 text_file.close()
509
510 ################################- NEW RAS COMPUTATION -################################
511 Expr = Optional[Union[int, float]]
512 Ras = Expr
513 def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]:
514 """
515 Generates the RAS scores for each cell line found in the dataset.
516
517 Args:
518 dataset (pd.DataFrame): Dataset containing gene values.
519 rules (dict): The dict containing reaction ids as keys and rules as values.
520
521 Side effects:
522 dataset : mut
523
524 Returns:
525 dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary
526 where each key corresponds to a reaction ID and each value is its computed RAS score.
527 """
528 ras_values_by_cell_line = {}
529 dataset.set_index(dataset.columns[0], inplace=True)
530 # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata
531 for cell_line_name in dataset.columns[1:]:
532 cell_line = dataset[cell_line_name].to_dict()
533 ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line)
534 return ras_values_by_cell_line
535
536 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]:
537 """
538 Computes the RAS (Reaction Activity Score) values for each rule in the given dict.
539
540 Args:
541 value_rules (dict): A dictionary where keys are reaction ids and values are OpLists.
542 dataset : gene expression data of one cell line.
543
544 Returns:
545 dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule.
546 """
547 return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()}
548
549 def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr:
550 """
551 Extracts the gene expression of the given gene from a cell line dataset.
552
553 Args:
554 dataset : gene expression data of one cell line.
555 name : gene name.
556
557 Returns:
558 Expr : the gene's expression value.
559 """
560 expr = dataset.get(name, None)
561 if expr is None: ERRORS.append(name)
562
563 return expr
564
565 def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras:
566 """
567 Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior.
568
569 Args:
570 op_list (OpList): The OpList representing a rule with gene values.
571 dataset : gene expression data of one cell line.
572
573 Returns:
574 Ras: The computed RAS value for the given OpList.
575 """
576 op = op_list.op
577 ras_value :Ras = None
578 if not op: return get_gene_expr(dataset, op_list[0])
579 if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None
580
581 for i in range(len(op_list)):
582 item = op_list[i]
583 if isinstance(item, ruleUtils.OpList):
584 item = ras_op_list(item, dataset)
585
586 else:
587 item = get_gene_expr(dataset, item)
588
589 if item is None:
590 if op is ruleUtils.RuleOp.AND and not ARGS.none: return None
591 continue
592
593 if ras_value is None:
594 ras_value = item
595 else:
596 ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item)
597
598 return ras_value
599
600 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None:
601 """
602 Save computed ras scores to the given path, as a tsv file.
603
604 Args:
605 rasScores : the computed ras scores.
606 path : the output tsv file's path.
607
608 Returns:
609 None
610 """
611 for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly
612 for reactId, score in scores.items():
613 if score is None: scores[reactId] = "None"
614
615 output_ras = pd.DataFrame.from_dict(rasScores)
616 output_ras.insert(0, 'Reactions', reactions)
617 output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False)
618
619 ############################ MAIN #############################################
620 #TODO: not used but keep, it will be when the new translator dicts will be used.
621 def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str:
622 """
623 Translate gene from any supported encoding to HugoID.
624
625 Args:
626 geneName (str): the name of the gene in its current encoding.
627 encoding (str): the encoding.
628 geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names
629 and encodings in the current model, mapping each to the corresponding HugoID encoding.
630
631 Raises:
632 ValueError: When the gene isn't supported in the model.
633
634 Returns:
635 str: the gene in HugoID encoding.
636 """
637 supportedGenesInEncoding = geneTranslator[encoding]
638 if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName]
639 raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!")
640
641 def load_custom_rules() -> Dict[str, ruleUtils.OpList]:
642 """
643 Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
644 performed, significantly impacting the runtime.
645
646 Returns:
647 Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
648 """
649 datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat
650
651 try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext
652 except utils.PathErr as err:
653 raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}")
654
655 if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath)
656
657 # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
658 return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) }
659
660 def main(args:List[str] = None) -> None:
661 """
662 Initializes everything and sets the program in motion based on the fronted input arguments.
663
664 Returns:
665 None
666 """
667 # get args from frontend (related xml)
668 global ARGS
669 ARGS = process_args(args)
670 print(ARGS.rules_selector)
671 # read dataset
672 dataset = read_dataset(ARGS.input, "dataset")
673 dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str)
674
675 # remove versioning from gene names
676 dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0]
677
678 # handle custom models
679 model :utils.Model = ARGS.rules_selector
680 if model is utils.Model.Custom:
681 rules = load_custom_rules()
682 reactions = list(rules.keys())
683
684 save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
685 if ERRORS: utils.logWarning(
686 f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
687 ARGS.out_log)
688
689 return
690
691 # This is the standard flow of the ras_generator program, for non-custom models.
692 name = "RAS Dataset"
693 type_gene = gene_type(dataset.iloc[0, 0], name)
694
695 rules = model.getRules(ARGS.tool_dir)
696 genes = data_gene(dataset, type_gene, name, None)
697 ids, rules = load_id_rules(rules.get(type_gene))
698
699 resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name)
700 create_ras(resolve_rules, name, rules, ids, ARGS.ras_output)
701
702 if err: utils.logWarning(
703 f"Warning: gene(s) {err} not found in class \"{name}\", " +
704 "the expression level for this gene will be considered NaN",
705 ARGS.out_log)
706
707 print("Execution succeded")
708
709 ###############################################################################
710 if __name__ == "__main__":
711 main()