Mercurial > repos > bimib > cobraxy
comparison COBRAxy/ras_generator.py @ 510:c17c6c9d112c draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Tue, 07 Oct 2025 09:48:18 +0000 |
| parents | 5956dcf94277 |
| children | f32d3c9089fc |
comparison
equal
deleted
inserted
replaced
| 509:5956dcf94277 | 510:c17c6c9d112c |
|---|---|
| 174 # build useful structures for RAS computation | 174 # build useful structures for RAS computation |
| 175 genes =dataset.index.intersection(rules_total_string) | 175 genes =dataset.index.intersection(rules_total_string) |
| 176 | 176 |
| 177 #check if there is one gene at least | 177 #check if there is one gene at least |
| 178 if len(genes)==0: | 178 if len(genes)==0: |
| 179 print("ERROR: no gene of the count matrix is in the metabolic model!") | 179 raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.") |
| 180 print(" are you sure that the gene annotation is the same for the model and the count matrix?") | |
| 181 return -1 | |
| 182 | 180 |
| 183 cell_ids = list(dataset.columns) | 181 cell_ids = list(dataset.columns) |
| 184 count_df_filtered = dataset.loc[genes] | 182 count_df_filtered = dataset.loc[genes] |
| 185 count_df_filtered = count_df_filtered.rename(index=lambda x: x.replace("-", "_")) | 183 count_df_filtered = count_df_filtered.rename(index=lambda x: x.replace("-", "_").replace(":", "_")) |
| 186 | 184 |
| 187 ras_df=np.full((len(dict_rule_reactions), len(cell_ids)), np.nan) | 185 ras_df=np.full((len(dict_rule_reactions), len(cell_ids)), np.nan) |
| 188 | 186 |
| 189 # for loop on rules | 187 # for loop on rules |
| 188 genes_not_mapped=[] | |
| 190 ind = 0 | 189 ind = 0 |
| 191 for rule, reaction_ids in dict_rule_reactions.items(): | 190 for rule, reaction_ids in dict_rule_reactions.items(): |
| 192 if len(rule) != 0: | 191 if len(rule) != 0: |
| 193 # there is one gene at least in the formula | 192 # there is one gene at least in the formula |
| 194 rule = rule.replace("-","_") | 193 warning_rule="_" |
| 194 if "-" in rule: | |
| 195 warning_rule="-" | |
| 196 if ":" in rule: | |
| 197 warning_rule=":" | |
| 198 rule_orig=rule.split().copy() #original rule in list | |
| 199 rule = rule.replace(warning_rule,"_") | |
| 200 #modified rule | |
| 195 rule_split = rule.split() | 201 rule_split = rule.split() |
| 196 rule_split_elements = list(set(rule_split)) # genes in formula | 202 rule_split_elements = list(filter(lambda x: x not in logic_operators, rule_split)) # remove of all logical operators |
| 203 rule_split_elements = list(set(rule_split_elements)) # genes in formula | |
| 197 | 204 |
| 198 # which genes are in the count matrix? | 205 # which genes are in the count matrix? |
| 199 genes_in_count_matrix = [el for el in rule_split_elements if el in genes] | 206 genes_in_count_matrix = [el for el in rule_split_elements if el in genes] |
| 200 genes_notin_count_matrix = [el for el in rule_split_elements if el not in genes] | 207 genes_notin_count_matrix = [] |
| 201 | 208 for el in rule_split_elements: |
| 202 if len(genes_in_count_matrix) > 0: #there is at least one gene in the count matrix | 209 if el not in genes: #not present in original dataset |
| 210 if el.replace("_",warning_rule) in rule_orig: | |
| 211 genes_notin_count_matrix.append(el.replace("_",warning_rule)) | |
| 212 else: | |
| 213 genes_notin_count_matrix.append(el) | |
| 214 genes_not_mapped.extend(genes_notin_count_matrix) | |
| 215 | |
| 216 # add genes not present in the data | |
| 217 if len(genes_in_count_matrix) > 0: #there is at least one gene in the count matrix | |
| 203 if len(rule_split) == 1: | 218 if len(rule_split) == 1: |
| 204 #one gene --> one reaction | 219 #one gene --> one reaction |
| 205 ras_df[ind] = count_df_filtered.loc[genes_in_count_matrix] | 220 ras_df[ind] = count_df_filtered.loc[genes_in_count_matrix] |
| 206 else: | 221 else: |
| 207 if len(genes_notin_count_matrix) > 0 and ignore_nan == False: | 222 if len(genes_notin_count_matrix) > 0 and ignore_nan == False: |
| 233 ras_df['Reactions'] = [reaction_ids for rule,reaction_ids in dict_rule_reactions.items()] | 248 ras_df['Reactions'] = [reaction_ids for rule,reaction_ids in dict_rule_reactions.items()] |
| 234 | 249 |
| 235 #create the reaction dataframe for ras (reactions x samples) | 250 #create the reaction dataframe for ras (reactions x samples) |
| 236 ras_df = ras_df.explode("Reactions").set_index("Reactions") | 251 ras_df = ras_df.explode("Reactions").set_index("Reactions") |
| 237 | 252 |
| 238 return ras_df | 253 #total genes not mapped from the gpr |
| 239 | 254 genes_not_mapped = sorted(set(genes_not_mapped)) |
| 255 | |
| 256 return ras_df,genes_not_mapped | |
| 257 | |
| 258 # function to evalute a complex boolean expression e.g. A or (B and C) | |
| 240 # function to evalute a complex boolean expression e.g. A or (B and C) | 259 # function to evalute a complex boolean expression e.g. A or (B and C) |
| 241 def _evaluate_ast( node, values,or_function,and_function): | 260 def _evaluate_ast( node, values,or_function,and_function): |
| 242 if isinstance(node, ast.BoolOp): | 261 if isinstance(node, ast.BoolOp): |
| 243 # Ricorsione sugli argomenti | 262 |
| 244 vals = [_evaluate_ast(v, values,or_function,and_function) for v in node.values] | 263 vals = [_evaluate_ast(v, values,or_function,and_function) for v in node.values] |
| 245 | 264 |
| 246 vals = [v for v in vals if v is not None] | 265 vals = [v for v in vals if v is not None] |
| 247 if not vals: | 266 if not vals: |
| 248 return None | 267 return np.nan |
| 249 | 268 |
| 250 vals = [np.array(v) if isinstance(v, (list, np.ndarray)) else v for v in vals] | 269 vals = [np.array(v) if isinstance(v, (list, np.ndarray)) else v for v in vals] |
| 251 | 270 |
| 252 if isinstance(node.op, ast.Or): | 271 if isinstance(node.op, ast.Or): |
| 253 return or_function(vals) | 272 return or_function(vals) |
| 255 return and_function(vals) | 274 return and_function(vals) |
| 256 | 275 |
| 257 elif isinstance(node, ast.Name): | 276 elif isinstance(node, ast.Name): |
| 258 return values.get(node.id, None) | 277 return values.get(node.id, None) |
| 259 elif isinstance(node, ast.Constant): | 278 elif isinstance(node, ast.Constant): |
| 260 return node.value | 279 key = str(node.value) #convert in str |
| 280 return values.get(key, None) | |
| 261 else: | 281 else: |
| 262 raise ValueError(f"Error in boolean expression: {ast.dump(node)}") | 282 raise ValueError(f"Error in boolean expression: {ast.dump(node)}") |
| 263 | 283 |
| 264 def main(args:List[str] = None) -> None: | 284 def main(args:List[str] = None) -> None: |
| 265 """ | 285 """ |
| 272 global ARGS | 292 global ARGS |
| 273 ARGS = process_args(args) | 293 ARGS = process_args(args) |
| 274 | 294 |
| 275 # read dataset and remove versioning from gene names | 295 # read dataset and remove versioning from gene names |
| 276 dataset = read_dataset(ARGS.input, "dataset") | 296 dataset = read_dataset(ARGS.input, "dataset") |
| 277 dataset.index = [str(el.split(".")[0]) for el in dataset.index] | 297 orig_gene_list=dataset.index.copy() |
| 298 dataset.index = [str(el.split(".")[0]) for el in dataset.index] | |
| 299 | |
| 300 if any(dataset.index.duplicated(keep=False)): | |
| 301 list_str=", ".join(orig_gene_list[dataset.index.duplicated(keep=False)]) | |
| 302 raise ValueError(f"ERROR: Duplicate entries in the gene dataset. The following genes are duplicated: "+list_str) | |
| 278 | 303 |
| 279 #load GPR rules | 304 #load GPR rules |
| 280 rules = load_custom_rules() | 305 rules = load_custom_rules() |
| 281 | 306 |
| 282 #create a list of all the gpr | 307 #create a list of all the gpr |
| 284 for id,rule in rules.items(): | 309 for id,rule in rules.items(): |
| 285 rules_total_string+=rule.replace("(","").replace(")","") + " " | 310 rules_total_string+=rule.replace("(","").replace(")","") + " " |
| 286 rules_total_string=list(set(rules_total_string.split(" "))) | 311 rules_total_string=list(set(rules_total_string.split(" "))) |
| 287 | 312 |
| 288 #check if nan value must be ignored in the GPR | 313 #check if nan value must be ignored in the GPR |
| 289 print(ARGS.none,"\n\n\n*************",ARGS.none==True) | |
| 290 if ARGS.none: | 314 if ARGS.none: |
| 291 # #e.g. (A or nan --> A) | 315 # #e.g. (A or nan --> A) |
| 292 ignore_nan = True | 316 ignore_nan = True |
| 293 else: | 317 else: |
| 294 #e.g. (A or nan --> nan) | 318 #e.g. (A or nan --> nan) |
| 295 ignore_nan = False | 319 ignore_nan = False |
| 296 | 320 |
| 297 #compure ras | 321 #compure ras |
| 298 ras_df=computeRAS(dataset,rules, | 322 ras_df,genes_not_mapped=computeRAS(dataset,rules, |
| 299 rules_total_string, | 323 rules_total_string, |
| 300 or_function=np.sum, # type of operation to do in case of an or expression (max, sum, mean) | 324 or_function=np.sum, # type of operation to do in case of an or expression (max, sum, mean) |
| 301 and_function=np.min, | 325 and_function=np.min, |
| 302 ignore_nan=ignore_nan) | 326 ignore_nan=ignore_nan) |
| 303 | 327 |
| 304 #save to csv and replace nan with None | 328 #save to csv and replace nan with None |
| 305 ras_df.replace(np.nan,"None").to_csv(ARGS.ras_output, sep = '\t') | 329 ras_df.replace([np.nan,None],"None").to_csv(ARGS.ras_output, sep = '\t') |
| 306 | 330 |
| 307 #print | 331 #report genes not present in the data |
| 332 if len(genes_not_mapped)>0: | |
| 333 genes_not_mapped_str=", ".join(genes_not_mapped) | |
| 334 utils.logWarning( | |
| 335 f"The following genes are mentioned in the GPR rules but don't appear in the dataset: "+genes_not_mapped_str, | |
| 336 ARGS.out_log) | |
| 337 | |
| 308 print("Execution succeeded") | 338 print("Execution succeeded") |
| 309 | 339 |
| 310 ############################################################################### | 340 ############################################################################### |
| 311 if __name__ == "__main__": | 341 if __name__ == "__main__": |
| 312 main() | 342 main() |
