Mercurial > repos > bimib > marea
view Marea/ras_generator.py @ 85:035ba1736d38 draft
Uploaded
author | bimib |
---|---|
date | Mon, 07 Jun 2021 14:12:52 +0000 |
parents | 3b0e71e28c0b |
children |
line wrap: on
line source
from __future__ import division import sys import pandas as pd import collections import pickle as pk import math import argparse ########################## argparse ########################################## def process_args(args): parser = argparse.ArgumentParser(usage = '%(prog)s [options]', description = 'process some value\'s'+ ' genes to create a comparison\'s map.') parser.add_argument('-rs', '--rules_selector', type = str, default = 'HMRcore', choices = ['HMRcore', 'Recon', 'Custom'], help = 'chose which type of dataset you want use') parser.add_argument('-cr', '--custom', type = str, help='your dataset if you want custom rules') parser.add_argument('-n', '--none', type = str, default = 'true', choices = ['true', 'false'], help = 'compute Nan values') parser.add_argument('-td', '--tool_dir', type = str, required = True, help = 'your tool directory') parser.add_argument('-ol', '--out_log', help = "Output log") parser.add_argument('-id', '--input', type = str, help = 'input dataset') parser.add_argument('-ra', '--ras_output', type = str, required = True, help = 'ras output') args = parser.parse_args() return args ########################### warning ########################################### def warning(s): args = process_args(sys.argv) with open(args.out_log, 'a') as log: log.write(s) ############################ dataset input #################################### def read_dataset(data, name): try: dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python') except pd.errors.EmptyDataError: sys.exit('Execution aborted: wrong format of ' + name + '\n') if len(dataset.columns) < 2: sys.exit('Execution aborted: wrong format of ' + name + '\n') return dataset ############################ dataset name ##################################### def name_dataset(name_data, count): if str(name_data) == 'Dataset': return str(name_data) + '_' + str(count) else: return str(name_data) ############################ load id e rules ################################## def load_id_rules(reactions): ids, rules = [], [] for key, value in reactions.items(): ids.append(key) rules.append(value) return (ids, rules) ############################ check_methods #################################### def gene_type(l, name): if check_hgnc(l): return 'hugo_id' elif check_ensembl(l): return 'ensembl_gene_id' elif check_symbol(l): return 'symbol' elif check_entrez(l): return 'entrez_id' else: sys.exit('Execution aborted:\n' + 'gene ID type in ' + name + ' not supported. Supported ID'+ 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n') def check_hgnc(l): if len(l) > 5: if (l.upper()).startswith('HGNC:'): return l[5:].isdigit() else: return False else: return False def check_ensembl(l): if len(l) == 15: if (l.upper()).startswith('ENS'): return l[4:].isdigit() else: return False else: return False def check_symbol(l): if len(l) > 0: if l[0].isalpha() and l[1:].isalnum(): return True else: return False else: return False def check_entrez(l): if len(l) > 0: return l.isdigit() else: return False def check_bool(b): if b == 'true': return True elif b == 'false': return False ############################ resolve_methods ################################## def replace_gene_value(l, d): tmp = [] err = [] while l: if isinstance(l[0], list): tmp_rules, tmp_err = replace_gene_value(l[0], d) tmp.append(tmp_rules) err.extend(tmp_err) else: value = replace_gene(l[0], d) tmp.append(value) if value == None: err.append(l[0]) l = l[1:] return (tmp, err) def replace_gene(l, d): if l =='and' or l == 'or': return l else: value = d.get(l, None) if not(value == None or isinstance(value, (int, float))): sys.exit('Execution aborted: ' + value + ' value not valid\n') return value def computes(val1, op, val2, cn): if val1 != None and val2 != None: if op == 'and': return min(val1, val2) else: return val1 + val2 elif op == 'and': if cn is True: if val1 != None: return val1 elif val2 != None: return val2 else: return None else: return None else: if val1 != None: return val1 elif val2 != None: return val2 else: return None def control(ris, l, cn): if len(l) == 1: if isinstance(l[0], (float, int)) or l[0] == None: return l[0] elif isinstance(l[0], list): return control(None, l[0], cn) else: return False elif len(l) > 2: return control_list(ris, l, cn) else: return False def control_list(ris, l, cn): while l: if len(l) == 1: return False elif (isinstance(l[0], (float, int)) or l[0] == None) and l[1] in ['and', 'or']: if isinstance(l[2], (float, int)) or l[2] == None: ris = computes(l[0], l[1], l[2], cn) elif isinstance(l[2], list): tmp = control(None, l[2], cn) if tmp is False: return False else: ris = computes(l[0], l[1], tmp, cn) else: return False l = l[3:] elif l[0] in ['and', 'or']: if isinstance(l[1], (float, int)) or l[1] == None: ris = computes(ris, l[0], l[1], cn) elif isinstance(l[1], list): tmp = control(None,l[1], cn) if tmp is False: return False else: ris = computes(ris, l[0], tmp, cn) else: return False l = l[2:] elif isinstance(l[0], list) and l[1] in ['and', 'or']: if isinstance(l[2], (float, int)) or l[2] == None: tmp = control(None, l[0], cn) if tmp is False: return False else: ris = computes(tmp, l[1], l[2], cn) elif isinstance(l[2], list): tmp = control(None, l[0], cn) tmp2 = control(None, l[2], cn) if tmp is False or tmp2 is False: return False else: ris = computes(tmp, l[1], tmp2, cn) else: return False l = l[3:] else: return False return ris ############################ make recon ####################################### def check_and_doWord(l): tmp = [] tmp_genes = [] count = 0 while l: if count >= 0: if l[0] == '(': count += 1 tmp.append(l[0]) l.pop(0) elif l[0] == ')': count -= 1 tmp.append(l[0]) l.pop(0) elif l[0] == ' ': l.pop(0) else: word = [] while l: if l[0] in [' ', '(', ')']: break else: word.append(l[0]) l.pop(0) word = ''.join(word) tmp.append(word) if not(word in ['or', 'and']): tmp_genes.append(word) else: return False if count == 0: return (tmp, tmp_genes) else: return False def brackets_to_list(l): tmp = [] while l: if l[0] == '(': l.pop(0) tmp.append(resolve_brackets(l)) else: tmp.append(l[0]) l.pop(0) return tmp def resolve_brackets(l): tmp = [] while l[0] != ')': if l[0] == '(': l.pop(0) tmp.append(resolve_brackets(l)) else: tmp.append(l[0]) l.pop(0) l.pop(0) return tmp def priorityAND(l): tmp = [] flag = True while l: if len(l) == 1: if isinstance(l[0], list): tmp.append(priorityAND(l[0])) else: tmp.append(l[0]) l = l[1:] elif l[0] == 'or': tmp.append(l[0]) flag = False l = l[1:] elif l[1] == 'or': if isinstance(l[0], list): tmp.append(priorityAND(l[0])) else: tmp.append(l[0]) tmp.append(l[1]) flag = False l = l[2:] elif l[1] == 'and': tmpAnd = [] if isinstance(l[0], list): tmpAnd.append(priorityAND(l[0])) else: tmpAnd.append(l[0]) tmpAnd.append(l[1]) if isinstance(l[2], list): tmpAnd.append(priorityAND(l[2])) else: tmpAnd.append(l[2]) l = l[3:] while l: if l[0] == 'and': tmpAnd.append(l[0]) if isinstance(l[1], list): tmpAnd.append(priorityAND(l[1])) else: tmpAnd.append(l[1]) l = l[2:] elif l[0] == 'or': flag = False break if flag == True: #when there are only AND in list tmp.extend(tmpAnd) elif flag == False: tmp.append(tmpAnd) return tmp def checkRule(l): if len(l) == 1: if isinstance(l[0], list): if checkRule(l[0]) is False: return False elif len(l) > 2: if checkRule2(l) is False: return False else: return False return True def checkRule2(l): while l: if len(l) == 1: return False elif isinstance(l[0], list) and l[1] in ['and', 'or']: if checkRule(l[0]) is False: return False if isinstance(l[2], list): if checkRule(l[2]) is False: return False l = l[3:] elif l[1] in ['and', 'or']: if isinstance(l[2], list): if checkRule(l[2]) is False: return False l = l[3:] elif l[0] in ['and', 'or']: if isinstance(l[1], list): if checkRule(l[1]) is False: return False l = l[2:] else: return False return True def do_rules(rules): split_rules = [] err_rules = [] tmp_gene_in_rule = [] for i in range(len(rules)): tmp = list(rules[i]) if tmp: tmp, tmp_genes = check_and_doWord(tmp) tmp_gene_in_rule.extend(tmp_genes) if tmp is False: split_rules.append([]) err_rules.append(rules[i]) else: tmp = brackets_to_list(tmp) if checkRule(tmp): split_rules.append(priorityAND(tmp)) else: split_rules.append([]) err_rules.append(rules[i]) else: split_rules.append([]) if err_rules: warning('Warning: wrong format rule in ' + str(err_rules) + '\n') return (split_rules, list(set(tmp_gene_in_rule))) def make_recon(data): try: import cobra as cb import warnings with warnings.catch_warnings(): warnings.simplefilter('ignore') recon = cb.io.read_sbml_model(data) react = recon.reactions rules = [react[i].gene_reaction_rule for i in range(len(react))] ids = [react[i].id for i in range(len(react))] except: try: data = (pd.read_csv(data, sep = '\t', dtype = str, engine='python')).fillna('') if len(data.columns) < 2: sys.exit('Execution aborted: wrong format of '+ 'custom datarules\n') if not len(data.columns) == 2: warning('Warning: more than 2 columns in custom datarules.\n' + 'Extra columns have been disregarded\n') ids = list(data.iloc[:, 0]) rules = list(data.iloc[:, 1]) except pd.errors.EmptyDataError: sys.exit('Execution aborted: wrong format of custom datarules\n') except pd.errors.ParserError: sys.exit('Execution aborted: wrong format of custom datarules\n') split_rules, tmp_genes = do_rules(rules) gene_in_rule = {} for i in tmp_genes: gene_in_rule[i] = 'ok' return (ids, split_rules, gene_in_rule) ############################ gene ############################################# def data_gene(gene, type_gene, name, gene_custom): args = process_args(sys.argv) for i in range(len(gene)): tmp = gene.iloc[i, 0] if tmp.startswith(' ') or tmp.endswith(' '): gene.iloc[i, 0] = (tmp.lstrip()).rstrip() gene_dup = [item for item, count in collections.Counter(gene[gene.columns[0]]).items() if count > 1] pat_dup = [item for item, count in collections.Counter(list(gene.columns)).items() if count > 1] if gene_dup: if gene_custom == None: if args.rules_selector == 'HMRcore': gene_in_rule = pk.load(open(args.tool_dir + '/local/HMRcore_genes.p', 'rb')) elif args.rules_selector == 'Recon': gene_in_rule = pk.load(open(args.tool_dir + '/local/Recon_genes.p', 'rb')) gene_in_rule = gene_in_rule.get(type_gene) else: gene_in_rule = gene_custom tmp = [] for i in gene_dup: if gene_in_rule.get(i) == 'ok': tmp.append(i) if tmp: sys.exit('Execution aborted because gene ID ' +str(tmp)+' in '+name+' is duplicated\n') if pat_dup: warning('Warning: duplicated label\n' + str(pat_dup) + 'in ' + name + '\n') return (gene.set_index(gene.columns[0])).to_dict() ############################ resolve ########################################## def resolve(genes, rules, ids, resolve_none, name): resolve_rules = {} not_found = [] flag = False for key, value in genes.items(): tmp_resolve = [] for i in range(len(rules)): tmp = rules[i] if tmp: tmp, err = replace_gene_value(tmp, value) if err: not_found.extend(err) ris = control(None, tmp, resolve_none) if ris is False or ris == None: tmp_resolve.append(None) else: tmp_resolve.append(ris) flag = True else: tmp_resolve.append(None) resolve_rules[key] = tmp_resolve if flag is False: warning('Warning: no computable score (due to missing gene values)' + 'for class ' + name + ', the class has been disregarded\n') return (None, None) return (resolve_rules, list(set(not_found))) ############################ split class ###################################### def split_class(classes, resolve_rules): class_pat = {} for i in range(len(classes)): classe = classes.iloc[i, 1] if not pd.isnull(classe): l = [] for j in range(i, len(classes)): if classes.iloc[j, 1] == classe: pat_id = classes.iloc[j, 0] if tmp != None: l.append(tmp) classes.iloc[j, 1] = None if l: class_pat[classe] = list(map(list, zip(*l))) else: warning('Warning: no sample found in class ' + classe + ', the class has been disregarded\n') return class_pat ############################ create_ras ####################################### def create_ras (resolve_rules, dataset_name, rules, ids, file): if resolve_rules == None: warning("Couldn't generate RAS for current dataset: " + dataset_name) for geni in resolve_rules.values(): for i, valori in enumerate(geni): if valori == None: geni[i] = 'None' output_ras = pd.DataFrame.from_dict(resolve_rules) output_ras.insert(0, 'Reactions', ids) output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False) text_file = open(file, "w") text_file.write(output_to_csv) text_file.close() ############################ MAIN ############################################# def main(): args = process_args(sys.argv) if args.rules_selector == 'HMRcore': recon = pk.load(open(args.tool_dir + '/local/HMRcore_rules.p', 'rb')) elif args.rules_selector == 'Recon': recon = pk.load(open(args.tool_dir + '/local/Recon_rules.p', 'rb')) elif args.rules_selector == 'Custom': ids, rules, gene_in_rule = make_recon(args.custom) resolve_none = check_bool(args.none) name = "RAS Dataset" dataset = read_dataset(args.input, "dataset") dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str) type_gene = gene_type(dataset.iloc[0, 0], name) if args.rules_selector != 'Custom': genes = data_gene(dataset, type_gene, name, None) ids, rules = load_id_rules(recon.get(type_gene)) elif args.rules_selector == 'Custom': genes = data_gene(dataset, type_gene, name, gene_in_rule) resolve_rules, err = resolve(genes, rules, ids, resolve_none, name) create_ras(resolve_rules, name, rules, ids, args.ras_output) if err != None and err: warning('Warning: gene\n' + str(err) + '\nnot found in class ' + name + ', the expression level for this gene ' + 'will be considered NaN\n') print('Execution succeded') return None ############################################################################### if __name__ == "__main__": main()