1 from __future__ import division
2 # galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason.
3 import sys
4 import argparse
5 import collections
6 import pandas as pd
7 import pickle as pk
8 import utils.general_utils as utils
9 import utils.rule_parsing as ruleUtils
10 from typing import Union, Optional, List, Dict, Tuple, TypeVar
12 ERRORS = []
13 ########################## argparse ##########################################
14 ARGS :argparse.Namespace
15 def process_args() -> argparse.Namespace:
16 """
17 Processes command-line arguments.
19 Args:
20 args (list): List of command-line arguments.
22 Returns:
23 Namespace: An object containing parsed arguments.
24 """
25 parser = argparse.ArgumentParser(
26 usage = '%(prog)s [options]',
27 description = "process some value's genes to create a comparison's map.")
29 parser.add_argument(
30 '-rs', '--rules_selector',
31 type = utils.Model, default = utils.Model.HMRcore, choices = list(utils.Model),
32 help = 'chose which type of dataset you want use')
34 parser.add_argument("-rl", "--rule_list", type = str,
35 help = "path to input file with custom rules, if provided")
37 parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name")
38 # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
40 parser.add_argument(
41 '-n', '--none',
42 type = utils.Bool("none"), default = True,
43 help = 'compute Nan values')
45 parser.add_argument(
46 '-td', '--tool_dir',
47 type = str,
48 required = True, help = 'your tool directory')
50 parser.add_argument(
51 '-ol', '--out_log',
52 type = str,
53 help = "Output log")
55 parser.add_argument(
56 '-in', '--input', #id รจ diventato in
57 type = str,
58 help = 'input dataset')
60 parser.add_argument(
61 '-ra', '--ras_output',
62 type = str,
63 required = True, help = 'ras output')
65 return parser.parse_args()
67 ############################ dataset input ####################################
68 def read_dataset(data :str, name :str) -> pd.DataFrame:
69 """
70 Read a dataset from a CSV file and return it as a pandas DataFrame.
72 Args:
73 data (str): Path to the CSV file containing the dataset.
74 name (str): Name of the dataset, used in error messages.
76 Returns:
77 pandas.DataFrame: DataFrame containing the dataset.
79 Raises:
80 pd.errors.EmptyDataError: If the CSV file is empty.
81 sys.exit: If the CSV file has the wrong format, the execution is aborted.
82 """
83 try:
84 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
85 except pd.errors.EmptyDataError:
86 sys.exit('Execution aborted: wrong format of ' + name + '\n')
87 if len(dataset.columns) < 2:
88 sys.exit('Execution aborted: wrong format of ' + name + '\n')
89 return dataset
91 ############################ load id e rules ##################################
92 def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]:
93 """
94 Load IDs and rules from a dictionary of reactions.
96 Args:
97 reactions (dict): A dictionary where keys are IDs and values are rules.
99 Returns:
100 tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules.
101 """
102 ids, rules = [], []
103 for key, value in reactions.items():
104 ids.append(key)
105 rules.append(value)
106 return (ids, rules)
108 ############################ check_methods ####################################
109 def gene_type(l :str, name :str) -> str:
110 """
111 Determine the type of gene ID.
113 Args:
114 l (str): The gene identifier to check.
115 name (str): The name of the dataset, used in error messages.
117 Returns:
118 str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id').
120 Raises:
121 sys.exit: If the gene ID type is not supported, the execution is aborted.
122 """
123 if check_hgnc(l):
124 return 'hugo_id'
125 elif check_ensembl(l):
126 return 'ensembl_gene_id'
127 elif check_symbol(l):
128 return 'symbol'
129 elif check_entrez(l):
130 return 'entrez_id'
131 else:
132 sys.exit('Execution aborted:\n' +
133 'gene ID type in ' + name + ' not supported. Supported ID'+
134 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
136 def check_hgnc(l :str) -> bool:
137 """
138 Check if a gene identifier follows the HGNC format.
140 Args:
141 l (str): The gene identifier to check.
143 Returns:
144 bool: True if the gene identifier follows the HGNC format, False otherwise.
145 """
146 if len(l) > 5:
147 if (l.upper()).startswith('HGNC:'):
148 return l[5:].isdigit()
149 else:
150 return False
151 else:
152 return False
154 def check_ensembl(l :str) -> bool:
155 """
156 Check if a gene identifier follows the Ensembl format.
158 Args:
159 l (str): The gene identifier to check.
161 Returns:
162 bool: True if the gene identifier follows the Ensembl format, False otherwise.
163 """
164 return l.upper().startswith('ENS')
167 def check_symbol(l :str) -> bool:
168 """
169 Check if a gene identifier follows the symbol format.
171 Args:
172 l (str): The gene identifier to check.
174 Returns:
175 bool: True if the gene identifier follows the symbol format, False otherwise.
176 """
177 if len(l) > 0:
178 if l[0].isalpha() and l[1:].isalnum():
179 return True
180 else:
181 return False
182 else:
183 return False
185 def check_entrez(l :str) -> bool:
186 """
187 Check if a gene identifier follows the Entrez ID format.
189 Args:
190 l (str): The gene identifier to check.
192 Returns:
193 bool: True if the gene identifier follows the Entrez ID format, False otherwise.
194 """
195 if len(l) > 0:
196 return l.isdigit()
197 else:
198 return False
200 ############################ gene #############################################
201 def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]:
202 """
203 Process gene data to ensure correct formatting and handle duplicates.
205 Args:
206 gene (DataFrame): DataFrame containing gene data.
207 type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id').
208 name (str): Name of the dataset.
209 gene_custom (dict or None): Custom gene data dictionary if provided.
211 Returns:
212 dict: A dictionary containing gene data with gene IDs as keys and corresponding values.
213 """
214 args = process_args()
215 for i in range(len(gene)):
216 tmp = gene.iloc[i, 0]
217 gene.iloc[i, 0] = tmp.strip().split('.')[0]
219 gene_dup = [item for item, count in
220 collections.Counter(gene[gene.columns[0]]).items() if count > 1]
221 pat_dup = [item for item, count in
222 collections.Counter(list(gene.columns)).items() if count > 1]
224 if gene_dup:
225 if gene_custom == None:
226 if args.rules_selector == 'HMRcore':
227 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb'))
229 elif args.rules_selector == 'Recon':
230 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/Recon_genes.p', 'rb'))
232 elif args.rules_selector == 'ENGRO2':
233 gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb'))
234 print(gene_in_rule)
235 print(type_gene)
236 gene_in_rule = gene_in_rule.get(type_gene)
238 else:
239 gene_in_rule = gene_custom
240 tmp = []
241 for i in gene_dup:
242 if gene_in_rule.get(i) == 'ok':
243 tmp.append(i)
244 if tmp:
245 sys.exit('Execution aborted because gene ID '
246 +str(tmp)+' in '+name+' is duplicated\n')
248 if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log)
249 return (gene.set_index(gene.columns[0])).to_dict()
251 ############################ resolve ##########################################
252 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]:
253 """
254 Replace gene identifiers with corresponding values from a dictionary.
256 Args:
257 l (str): String of gene identifier.
258 d (str): String corresponding to its value.
260 Returns:
261 tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement.
262 """
263 tmp = []
264 err = []
265 while l:
266 if isinstance(l[0], list):
267 tmp_rules, tmp_err = replace_gene_value(l[0], d)
268 tmp.append(tmp_rules)
269 err.extend(tmp_err)
270 else:
271 value = replace_gene(l[0], d)
272 tmp.append(value)
273 if value == None:
274 err.append(l[0])
275 l = l[1:]
276 return (tmp, err)
278 def replace_gene(l :str, d :str) -> Union[int, float]:
279 """
280 Replace a single gene identifier with its corresponding value from a dictionary.
282 Args:
283 l (str): Gene identifier to replace.
284 d (str): String corresponding to its value.
286 Returns:
287 float/int: Corresponding value from the dictionary if found, None otherwise.
289 Raises:
290 sys.exit: If the value associated with the gene identifier is not valid.
291 """
292 if l =='and' or l == 'or':
293 return l
294 else:
295 value = d.get(l, None)
296 if not(value == None or isinstance(value, (int, float))):
297 sys.exit('Execution aborted: ' + value + ' value not valid\n')
298 return value
300 T = TypeVar("T", bound = Optional[Union[int, float]])
301 def computes(val1 :T, op :str, val2 :T, cn :bool) -> T:
302 """
303 Compute the RAS value between two value and an operator ('and' or 'or').
305 Args:
306 val1(Optional(Union[float, int])): First value.
307 op (str): Operator ('and' or 'or').
308 val2(Optional(Union[float, int])): Second value.
309 cn (bool): Control boolean value.
311 Returns:
312 Optional(Union[float, int]): Result of the computation.
313 """
314 if val1 != None and val2 != None:
315 if op == 'and':
316 return min(val1, val2)
317 else:
318 return val1 + val2
319 elif op == 'and':
320 if cn is True:
321 if val1 != None:
322 return val1
323 elif val2 != None:
324 return val2
325 else:
326 return None
327 else:
328 return None
329 else:
330 if val1 != None:
331 return val1
332 elif val2 != None:
333 return val2
334 else:
335 return None
337 # ris should be Literal[None] but Literal is not supported in Python 3.7
338 def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]:
339 """
340 Control the format of the expression.
342 Args:
343 ris: Intermediate result.
344 l (list): Expression to control.
345 cn (bool): Control boolean value.
347 Returns:
348 Union[Literal[False], int, float]: Result of the control.
349 """
350 if len(l) == 1:
351 if isinstance(l[0], (float, int)) or l[0] == None:
352 return l[0]
353 elif isinstance(l[0], list):
354 return control(None, l[0], cn)
355 else:
356 return False
357 elif len(l) > 2:
358 return control_list(ris, l, cn)
359 else:
360 return False
362 def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]:
363 """
364 Control the format of a list of expressions.
366 Args:
367 ris: Intermediate result.
368 l (list): List of expressions to control.
369 cn (bool): Control boolean value.
371 Returns:
372 Optional[Literal[False]]: Result of the control.
373 """
374 while l:
375 if len(l) == 1:
376 return False
377 elif (isinstance(l[0], (float, int)) or
378 l[0] == None) and l[1] in ['and', 'or']:
379 if isinstance(l[2], (float, int)) or l[2] == None:
380 ris = computes(l[0], l[1], l[2], cn)
381 elif isinstance(l[2], list):
382 tmp = control(None, l[2], cn)
383 if tmp is False:
384 return False
385 else:
386 ris = computes(l[0], l[1], tmp, cn)
387 else:
388 return False
389 l = l[3:]
390 elif l[0] in ['and', 'or']:
391 if isinstance(l[1], (float, int)) or l[1] == None:
392 ris = computes(ris, l[0], l[1], cn)
393 elif isinstance(l[1], list):
394 tmp = control(None,l[1], cn)
395 if tmp is False:
396 return False
397 else:
398 ris = computes(ris, l[0], tmp, cn)
399 else:
400 return False
401 l = l[2:]
402 elif isinstance(l[0], list) and l[1] in ['and', 'or']:
403 if isinstance(l[2], (float, int)) or l[2] == None:
404 tmp = control(None, l[0], cn)
405 if tmp is False:
406 return False
407 else:
408 ris = computes(tmp, l[1], l[2], cn)
409 elif isinstance(l[2], list):
410 tmp = control(None, l[0], cn)
411 tmp2 = control(None, l[2], cn)
412 if tmp is False or tmp2 is False:
413 return False
414 else:
415 ris = computes(tmp, l[1], tmp2, cn)
416 else:
417 return False
418 l = l[3:]
419 else:
420 return False
421 return ris
423 ResolvedRules = Dict[str, List[Optional[Union[float, int]]]]
424 def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]:
425 """
426 Resolve rules using gene data to compute scores for each rule.
428 Args:
429 genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values.
430 rules (list): List of rules to resolve.
431 ids (list): List of IDs corresponding to the rules.
432 resolve_none (bool): Flag indicating whether to resolve None values in the rules.
433 name (str): Name of the dataset.
435 Returns:
436 tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data.
437 """
438 resolve_rules = {}
439 not_found = []
440 flag = False
441 for key, value in genes.items():
442 tmp_resolve = []
443 for i in range(len(rules)):
444 tmp = rules[i]
445 if tmp:
446 tmp, err = replace_gene_value(tmp, value)
447 if err:
448 not_found.extend(err)
449 ris = control(None, tmp, resolve_none)
450 if ris is False or ris == None:
451 tmp_resolve.append(None)
452 else:
453 tmp_resolve.append(ris)
454 flag = True
455 else:
456 tmp_resolve.append(None)
457 resolve_rules[key] = tmp_resolve
459 if flag is False:
460 utils.logWarning(
461 f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded",
462 ARGS.out_log)
464 return (None, None)
466 return (resolve_rules, list(set(not_found)))
467 ############################ create_ras #######################################
468 def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None:
469 """
470 Create a RAS (Reaction Activity Score) file from resolved rules.
472 Args:
473 resolve_rules (dict): Dictionary containing resolved rules.
474 dataset_name (str): Name of the dataset.
475 rules (list): List of rules.
476 file (str): Path to the output RAS file.
478 Returns:
479 None
480 """
481 if resolve_rules is None:
482 utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log)
484 for geni in resolve_rules.values():
485 for i, valori in enumerate(geni):
486 if valori == None:
487 geni[i] = 'None'
489 output_ras = pd.DataFrame.from_dict(resolve_rules)
491 output_ras.insert(0, 'Reactions', ids)
492 output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False)
494 text_file = open(file, "w")
496 text_file.write(output_to_csv)
497 text_file.close()
499 ################################- NEW RAS COMPUTATION -################################
500 Expr = Optional[Union[int, float]]
501 Ras = Expr
502 def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]:
503 """
504 Generates the RAS scores for each cell line found in the dataset.
506 Args:
507 dataset (pd.DataFrame): Dataset containing gene values.
508 rules (dict): The dict containing reaction ids as keys and rules as values.
510 Side effects:
511 dataset : mut
513 Returns:
514 dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary
515 where each key corresponds to a reaction ID and each value is its computed RAS score.
516 """
517 ras_values_by_cell_line = {}
518 dataset.set_index(dataset.columns[0], inplace=True)
519 # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata
520 for cell_line_name in dataset.columns[1:]:
521 cell_line = dataset[cell_line_name].to_dict()
522 ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line)
523 return ras_values_by_cell_line
525 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]:
526 """
527 Computes the RAS (Reaction Activity Score) values for each rule in the given dict.
529 Args:
530 value_rules (dict): A dictionary where keys are reaction ids and values are OpLists.
531 dataset : gene expression data of one cell line.
533 Returns:
534 dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule.
535 """
536 return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()}
538 def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr:
539 """
540 Extracts the gene expression of the given gene from a cell line dataset.
542 Args:
543 dataset : gene expression data of one cell line.
544 name : gene name.
546 Returns:
547 Expr : the gene's expression value.
548 """
549 expr = dataset.get(name, None)
550 if expr is None: ERRORS.append(name)
552 return expr
554 def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras:
555 """
556 Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior.
558 Args:
559 op_list (OpList): The OpList representing a rule with gene values.
560 dataset : gene expression data of one cell line.
562 Returns:
563 Ras: The computed RAS value for the given OpList.
564 """
565 op = op_list.op
566 ras_value :Ras = None
567 if not op: return get_gene_expr(dataset, op_list[0])
568 if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None
570 for i in range(len(op_list)):
571 item = op_list[i]
572 if isinstance(item, ruleUtils.OpList):
573 item = ras_op_list(item, dataset)
575 else:
576 item = get_gene_expr(dataset, item)
578 if item is None:
579 if op is ruleUtils.RuleOp.AND and not ARGS.none: return None
580 continue
582 if ras_value is None:
583 ras_value = item
584 else:
585 ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item)
587 return ras_value
589 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None:
590 """
591 Save computed ras scores to the given path, as a tsv file.
593 Args:
594 rasScores : the computed ras scores.
595 path : the output tsv file's path.
597 Returns:
598 None
599 """
600 for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly
601 for reactId, score in scores.items():
602 if score is None: scores[reactId] = "None"
604 output_ras = pd.DataFrame.from_dict(rasScores)
605 output_ras.insert(0, 'Reactions', reactions)
606 output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False)
608 ############################ MAIN #############################################
609 #TODO: not used but keep, it will be when the new translator dicts will be used.
610 def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str:
611 """
612 Translate gene from any supported encoding to HugoID.
614 Args:
615 geneName (str): the name of the gene in its current encoding.
616 encoding (str): the encoding.
617 geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names
618 and encodings in the current model, mapping each to the corresponding HugoID encoding.
620 Raises:
621 ValueError: When the gene isn't supported in the model.
623 Returns:
624 str: the gene in HugoID encoding.
625 """
626 supportedGenesInEncoding = geneTranslator[encoding]
627 if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName]
628 raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!")
630 def load_custom_rules() -> Dict[str, ruleUtils.OpList]:
631 """
632 Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
633 performed, significantly impacting the runtime.
635 Returns:
636 Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
637 """
638 datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat
640 try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext
641 except utils.PathErr as err:
642 raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}")
644 if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath)
646 # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
647 return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) }
649 def main() -> None:
650 """
651 Initializes everything and sets the program in motion based on the fronted input arguments.
653 Returns:
654 None
655 """
656 # get args from frontend (related xml)
657 global ARGS
658 ARGS = process_args()
660 # read dataset
661 dataset = read_dataset(ARGS.input, "dataset")
662 dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str)
664 # remove versioning from gene names
665 dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0]
667 # handle custom models
668 model :utils.Model = ARGS.rules_selector
669 if model is utils.Model.Custom:
670 rules = load_custom_rules()
671 reactions = list(rules.keys())
673 save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
674 if ERRORS: utils.logWarning(
675 f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
676 ARGS.out_log)
678 return
680 # This is the standard flow of the ras_generator program, for non-custom models.
681 name = "RAS Dataset"
682 type_gene = gene_type(dataset.iloc[0, 0], name)
684 rules = model.getRules(ARGS.tool_dir)
685 genes = data_gene(dataset, type_gene, name, None)
686 ids, rules = load_id_rules(rules.get(type_gene))
688 resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name)
689 create_ras(resolve_rules, name, rules, ids, ARGS.ras_output)
691 if err: utils.logWarning(
692 f"Warning: gene(s) {err} not found in class \"{name}\", " +
693 "the expression level for this gene will be considered NaN",
694 ARGS.out_log)
696 print("Execution succeded")
698 ###############################################################################
699 if __name__ == "__main__":
700 main() |