Mercurial > repos > bgruening > get_online_data
view get_online_data.py @ 0:2538366eb8fb draft default tip
planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/chemicaltoolbox/data_source/get_online_data commit aed18d7d09e332efe57d00b33c2b8249abefaedb
author | bgruening |
---|---|
date | Wed, 22 May 2019 07:43:41 -0400 |
parents | |
children |
line wrap: on
line source
import os import urllib.request import gzip, tempfile import zipfile import subprocess import shutil import argparse from io import BytesIO def unescape(cond_text): # Unescape if input has been escaped mapped_chars = { '>' :'__gt__', '<' :'__lt__', "'" :'__sq__', '"' :'__dq__', '[' :'__ob__', ']' :'__cb__', '{' :'__oc__', '}' :'__cc__', '@' : '__at__', '\n' : '__cn__', '\r' : '__cr__', '\t' : '__tc__' } for key, value in mapped_chars.items(): cond_text = cond_text.replace( value, key ) return cond_text def get_files(options): urls = unescape(options.url) with open(options.out, 'wb+') as out: if options.whitelist: allowed_extensions = [ext.strip() for ext in unescape(options.whitelist).split('\n')] else: allowed_extensions = ['.sdf', '.smi', '.inchi', '.mol'] for url in urls.split('\n'): request = urllib.request.Request(url) response = urllib.request.urlopen(request) resp_read = response.read() if resp_read[:2] == b'\x1f\x8b': # test magic number for gzipped files response = urllib.request.urlopen(request) out.write(gzip.decompress(resp_read)) elif resp_read[:2] == b'PK': # test magic number for zipped files temp = tempfile.NamedTemporaryFile(delete=False) temp.close() zf = zipfile.ZipFile(BytesIO(resp_read), allowZip64=True) tmpdir = tempfile.mkdtemp() for filename in zf.namelist(): zf.extractall(tmpdir) os.remove(temp.name) molfiles = [] for root, dirs, files in os.walk(tmpdir): for filename in files: if os.path.splitext(filename)[-1].lower() in allowed_extensions or allowed_extensions == []: mfile = os.path.join(root, filename) shutil.copyfileobj(open(mfile, 'rb'), out) shutil.rmtree( tmpdir ) zf.close() else: out.write(resp_read) if __name__ == "__main__": parser = argparse.ArgumentParser(description="""Download compressed files and extract files of with chosen extensions """) parser.add_argument('--url', dest='url', help='URL') parser.add_argument('--whitelist', dest='whitelist', default=None, help='whitelist') parser.add_argument('--out', dest='out', help='output') options = parser.parse_args() get_files(options)