Mercurial > repos > bgruening > openbabel_remduplicates
view cheminfolib.py @ 12:50ca8845e7f5 draft
"planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/chemicaltoolbox/openbabel commit 944ea4bb8a9cd4244152a4a4fecd0485fabc2ad0"
author | bgruening |
---|---|
date | Tue, 28 Jul 2020 08:38:56 -0400 |
parents | 75d6c2b7907a |
children | 12aca74f07d7 |
line wrap: on
line source
#!/usr/bin/env python """ Small library with cheminformatic functions based on openbabel and pgchem. Copyright 2012, Bjoern Gruening and Xavier Lucas """ import os, sys try: from galaxy import eggs eggs.require('psycopg2') except: print('psycopg2 is not available. It is currently used in the pgchem wrappers, that are not shipped with default CTB') try: from openbabel import openbabel, pybel openbabel.obErrorLog.StopLogging() except: print('OpenBabel could not be found. A few functions are not available without OpenBabel.') from multiprocessing import Pool import glob, tempfile, re import subprocess def CountLines( path ): out = subprocess.Popen(['wc', '-l', path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT ).communicate()[0] return int(out.partition(b' ')[0]) def grep(pattern, file_obj): grepper = re.compile(pattern) for line in file_obj: if grepper.search(line): return True return False def check_filetype(filepath): mol = False possible_inchi = True for line_counter, line in enumerate(open(filepath)): if line_counter > 10000: break if line.find('$$$$') != -1: return 'sdf' elif line.find('@<TRIPOS>MOLECULE') != -1: return 'mol2' elif line.find('ligand id') != -1: return 'drf' elif possible_inchi and re.findall('^InChI=', line): return 'inchi' elif re.findall('^M\s+END', line): mol = True # first line is not an InChI, so it can't be an InChI file possible_inchi = False if mol: # END can occures before $$$$, so and SDF file will # be recognised as mol, if you not using this hack' return 'mol' return 'smi' def db_connect(args): try: db_conn = psycopg2.connect("dbname=%s user=%s host=%s password=%s" % (args.dbname, args.dbuser, args.dbhost, args.dbpasswd)); return db_conn except: sys.exit('Unable to connect to the db') ColumnNames = { 'can_smiles' : 'Canonical SMILES', 'can' : 'Canonical SMILES', 'inchi' : 'InChI', 'inchi_key' : 'InChI key', 'inchi_key_first' : 'InChI key first', 'inchi_key_last' : 'InChI key last', 'molwt' : 'Molecular weight', 'hbd' : 'Hydrogen-bond donors', 'donors' : 'Hydrogen-bond donors', 'hba' : 'Hydrogen-bond acceptors', 'acceptors' : 'Hydrogen-bond acceptors', 'rotbonds' : 'Rotatable bonds', 'logp' : 'logP', 'psa' : 'Polar surface area', 'mr' : 'Molecular refractivity', 'atoms' : 'Number of heavy atoms', 'rings' : 'Number of rings', 'set_bits' : 'FP2 bits', 'id' : 'Internal identifier', 'tani' : 'Tanimoto coefficient', 'spectrophore' : 'Spectrophores(TM)', 'dist_spectrophore' : 'Spectrophores(TM) distance to target', 'synonym' : 'Entry id', } OBDescriptor = { 'atoms': ["atoms","Number of atoms"], 'hatoms': ["hatoms","Number of heavy atoms"], # self defined tag hatoms in plugindefines.txt 'can_smiles' : ["cansmi","Canonical SMILES"], 'can_smilesNS' : ["cansmiNS","Canonical SMILES without isotopes or stereo"], #["abonds","Number of aromatic bonds"], #["bonds","Number of bonds"], #["dbonds","Number of double bonds"], #["formula","Chemical formula"], 'hba': ["HBA1","Number of Hydrogen Bond Acceptors 1 (JoelLib)"], 'hba2': ["HBA2","Number of Hydrogen Bond Acceptors 2 (JoelLib)"], 'hbd': ["HBD","Number of Hydrogen Bond Donors (JoelLib)"], 'inchi': ["InChI","IUPAC InChI identifier"], 'inchi_key': ["InChIKey","InChIKey"], #["L5","Lipinski Rule of Five"], 'logp': ["logP","octanol/water partition coefficient"], 'mr': ["MR","molar refractivity"], 'molwt': ["MW","Molecular Weight filter"], #["nF","Number of Fluorine Atoms"], #["s","SMARTS filter"], #["sbonds","Number of single bonds"], #["smarts","SMARTS filter"], #["tbonds","Number of triple bonds"], #["title","For comparing a molecule's title"], 'psa': ["TPSA","topological polar surface area"], 'rotbonds' : ['ROTATABLE_BOND', 'rotatable bonds'], } def print_output(args, rows): if args.oformat == 'table': outfile = open(args.output, 'w') requested_fields = (filter(lambda x: x not in ["[", "]", "'"], args.fetch)).split(', ') if args.header: outfile.write( 'Identifier\t' + '\t'.join( [ColumnNames[key] for key in requested_fields] ) + '\n' ) for row in rows: outfile.write( row['synonym'] + '\t' + '\t'.join( [str(row[key]) for key in requested_fields] ) + '\n' ) elif args.oformat in ['sdf', 'mol2']: outfile = pybel.Outputfile(args.oformat, args.output, overwrite=True) for row in rows: try: mol = pybel.readstring('sdf', row['mol']) if args.oformat == 'sdf': keys = filter(lambda x: x not in ["[", "]", "'"], args.fetch).split(', ') mol.data.update( { ColumnNames['synonym'] : row['synonym'] } ) if 'inchi_key' in keys: keys = (', '.join(keys).replace( "inchi_key", "inchi_key_first, inchi_key_last" )).split(', ') [ mol.data.update( { ColumnNames[key] : row[key] } ) for key in keys if key] outfile.write(mol) except: pass else: outfile = open(args.output, 'w') outfile.write( '\n'.join( [ '%s\t%s' % (row[args.oformat], row['synonym'] ) for row in rows ] ) ) outfile.close() def pybel_stop_logging(): openbabel.obErrorLog.StopLogging() def get_properties_ext(mol): HBD = pybel.Smarts("[!#6;!H0]") HBA = pybel.Smarts("[$([$([#8,#16]);!$(*=N~O);" + "!$(*~N=O);X1,X2]),$([#7;v3;" + "!$([nH]);!$(*(-a)-a)])]" ) calc_desc_dict = mol.calcdesc() try: logp = calc_desc_dict['logP'] except: logp = calc_desc_dict['LogP'] return {"molwt": mol.molwt, "logp": logp, "donors": len(HBD.findall(mol)), "acceptors": len(HBA.findall(mol)), "psa": calc_desc_dict['TPSA'], "mr": calc_desc_dict['MR'], "rotbonds": mol.OBMol.NumRotors(), "can": mol.write("can").split()[0].strip(), ### tthis one works fine for both zinc and chembl (no ZINC code added after can descriptor string) "inchi": mol.write("inchi").strip(), "inchi_key": get_inchikey(mol).strip(), "rings": len(mol.sssr), "atoms": mol.OBMol.NumHvyAtoms(), "spectrophore" : OBspectrophore(mol), } def get_inchikey(mol): conv = openbabel.OBConversion() conv.SetInAndOutFormats("mol", "inchi") conv.SetOptions("K", conv.OUTOPTIONS) inchikey = conv.WriteString( mol.OBMol ) return inchikey def OBspectrophore(mol): spectrophore = pybel.ob.OBSpectrophore() # Parameters: rotation angle = 20, normalization for mean and sd, accuracy = 3.0 A and non-stereospecific cages. spectrophore.SetNormalization( spectrophore.NormalizationTowardsZeroMeanAndUnitStd ) return ', '.join( [ "%.3f" % value for value in spectrophore.GetSpectrophore( mol.OBMol ) ] ) def squared_euclidean_distance(a, b): try: return ((np.asarray( a ) - np.asarray( b ))**2).sum() except ValueError: return 0 def split_library( lib_path, lib_format = 'sdf', package_size = None ): """ Split a library of compounds. Usage: split_library( lib_path, lib_format, package_size ) IT currently ONLY WORKS FOR SD-Files """ pack = 1 mol_counter = 0 outfile = open('/%s/%s_pack_%i.%s' % ( '/'.join(lib_path.split('/')[:-1]), lib_path.split('/')[-1].split('.')[0], pack, 'sdf'), 'w' ) for line in open(lib_path, 'r'): outfile.write( line ) if line.strip() == '$$$$': mol_counter += 1 if mol_counter % package_size == 0: outfile.close() pack += 1 outfile = open('/%s/%s_pack_%i.%s' % ( '/'.join(lib_path.split('/')[:-1]), lib_path.split('/')[-1].split('.')[0], pack, 'sdf'), 'w' ) if mol_counter*10 % package_size == 0: print('%i molecules parsed, starting pack nr. %i' % ( mol_counter, pack - 1 )) outfile.close() return True def split_smi_library( smiles_file, structures_in_one_file ): """ Split a file with SMILES to several files for multiprocessing usage. Usage: split_smi_library( smiles_file, 10 ) """ output_files = [] tfile = tempfile.NamedTemporaryFile(delete=False) smiles_handle = open(smiles_file, 'r') for count, line in enumerate( smiles_handle ): if count % structures_in_one_file == 0 and count != 0: tfile.close() output_files.append(tfile.name) tfile = tempfile.NamedTemporaryFile(delete=False) tfile.write(line) tfile.close() output_files.append(tfile.name) smiles_handle.close() return output_files def mp_run(input_path, regex, PROCESSES, function_to_call ): paths = [] [ paths.append(compound_file) for compound_file in glob.glob(str(input_path) + str(regex)) ] paths.sort() pool = Pool(processes=PROCESSES) print('Process initialized with', PROCESSES, 'processors') result = pool.map_async(function_to_call, paths) result.get() return paths if __name__ == '__main__': print(check_filetype(sys.argv[1]))