Mercurial > repos > computational-metabolomics > cfmid
view cfmid.py @ 0:41c4de0ed4ec draft default tip
planemo upload for repository https://github.com/computational-metabolomics/cfm-galaxy/tree/master/tools/cfm commit f0157bb3b01871411f27c1d5bd4ccee2039335d0
author | computational-metabolomics |
---|---|
date | Wed, 15 Nov 2023 16:28:04 +0000 (17 months ago) |
parents | |
children |
line wrap: on
line source
import argparse import csv import os import shutil parser = argparse.ArgumentParser() parser.add_argument("--input") parser.add_argument("--db_local") parser.add_argument("--num_highest") parser.add_argument("--ppm_db") parser.add_argument("--ppm_mass_tol") parser.add_argument("--abs_mass_tol") parser.add_argument("--polarity") parser.add_argument("--score_type") parser.add_argument("--results") parser.add_argument("--tool_directory") args = parser.parse_args() print(args) # Example # python cfmid.py --abs_mass_tol='0.01' --db_local='test-data/demo_db.tsv' --input='test-data/input.msp' --num_highest='10' --polarity='pos' --ppm_db='10.0' --ppm_mass_tol='10.0' --results='results.txt' --score_type='Jaccard' --tool_directory='/home/rw/Documents/cfm-galaxy/tools/cfm' id2info = {} mz2id = [] # store DB in dicts with open(args.db_local) as csvfile: csvreader = csv.DictReader(csvfile, delimiter="\t") for row in csvreader: id2info[row["Identifier"]] = row mz2id.append((float(row["MonoisotopicMass"]), row["Identifier"])) name_tmp = "tmp" if os.path.isdir(name_tmp): shutil.rmtree(name_tmp) os.makedirs(name_tmp) with open(args.input, "r") as infile: numlines = 0 for line in infile: line = line.strip() if numlines == 0: print(line) if "NAME" in line: featid = line.split("NAME: ")[1] if "PRECURSORMZ" in line: mz = float(line.split("PRECURSORMZ: ")[1]) if args.polarity == "pos": mz2 = mz - 1.007276 else: mz2 = mz + 1.007276 if "Num Peaks" in line: numlines = int(line.split("Num Peaks: ")[1]) linesread = 0 peaklist = [] else: if linesread == numlines: numlines = 0 cand_id_list = [] mz_ranges = (float(args.ppm_db) * mz2) / 1e6 mz_ranges = (mz2 - mz_ranges, mz2 + mz_ranges) # check hits for t in mz2id: if (t[0] > mz_ranges[0]) and (t[0] < mz_ranges[1]): cand_id_list.append(t[1]) # run only if we got candidates if len(cand_id_list) > 0: # write spec file with open("./tmpspec.txt", "w") as outfile: for e in ["low", "mid", "high"]: outfile.write(e + "\n") for p in peaklist: outfile.write(p[0] + "\t" + p[1] + "\n") # write candidates file with open("./tmpcand.txt", "w") as outfile: for c in cand_id_list: outfile.write( "{0} {1}\n".format(c, id2info[c]["InChI"]) ) # TODO: Use InChI or SMILES # create command line call outi = os.path.join(name_tmp, "cfm_" + featid + ".txt") cmd_command = "cfm-id tmpspec.txt {0} tmpcand.txt ".format(featid) cmd_command += "{0} {1} {2} {3} ".format( args.num_highest, args.ppm_db, args.ppm_mass_tol, args.abs_mass_tol, ) if args.polarity == "pos": cmd_command += ( os.path.join( args.tool_directory, "data", "positive_metab_se_cfm", "param_output0.log", ) + " " ) cmd_command += ( os.path.join( args.tool_directory, "data", "positive_metab_se_cfm", "param_config.txt", ) + " " ) else: cmd_command += ( os.path.join( args.tool_directory, "data", "negative_metab_se_cfm/param_output0.log", ) + " " ) cmd_command += ( os.path.join( args.tool_directory, "data", "negative_metab_se_cfm/param_config.txt", ) + " " ) cmd_command += "{0} 1 {1}".format(args.score_type, outi) # run print(cmd_command) os.system(cmd_command) else: line = tuple(line.split("\t")) linesread += 1 peaklist.append(line) # merge outputs outfiles = os.listdir(name_tmp) outfiles.sort(key=lambda x: os.path.getmtime(os.path.join(name_tmp, x))) with open(args.results, "w") as outfile: outfile.write("UID\tRank\tScore\tIdentifier\tInChI\n") for fname in outfiles: fileid = os.path.basename(fname) fileid = fileid.split("_")[1] fileid = fileid.split(".txt")[0] with open(os.path.join(name_tmp, fname)) as infile: for line in infile: line = line.replace(" ", "\t") outfile.write(fileid + "\t" + line)