Mercurial > repos > iuc > data_manager_cat
view data_manager/data_manager_cat.py @ 2:472dabcb03bf draft default tip
planemo upload commit 09b56ef3e09ad6c5923c88616fea5cbd77d87616
author | iuc |
---|---|
date | Mon, 18 Dec 2023 09:36:10 +0000 |
parents | 74af283d8ebd |
children |
line wrap: on
line source
#!/usr/bin/env python from __future__ import print_function import argparse import json import os.path import subprocess import sys import tarfile import tempfile import zipfile try: # For Python 3.0 and later from urllib.request import urlopen except ImportError: # Fall back to Python 2 imports from urllib2 import urlopen def url_download(url, workdir): file_path = os.path.join(workdir, 'download.dat') src = None dst = None try: src = urlopen(url) with open(file_path, 'wb') as dst: while True: chunk = src.read(2**10) if chunk: dst.write(chunk) else: break finally: if src: src.close() if tarfile.is_tarfile(file_path): fh = tarfile.open(file_path, 'r:*') elif zipfile.is_zipfile(file_path): fh = zipfile.ZipFile(file_path, 'r') else: return fh.extractall(workdir) os.remove(file_path) def cat_prepare(install_dir, db_dir=None, tax_dir=None): if db_dir and tax_dir: cmd = ['CAT', 'prepare', '--existing', '-d', db_dir, '-t', tax_dir] else: cmd = ['CAT', 'prepare', '--fresh', '-q'] cmd_stdout = tempfile.NamedTemporaryFile() cmd_stderr = tempfile.NamedTemporaryFile() return_code = subprocess.call(cmd, shell=False, cwd=install_dir, stdout=cmd_stdout, stderr=cmd_stderr) if return_code: msg = "stdout:\n%s\nstderr:\n%s" % (cmd_stdout.read(), cmd_stderr.read()) cmd_stdout.close() cmd_stderr.close() raise Exception('Error: (%s), returncode=%s %s' % (' '.join(cmd), return_code, msg)) def main(): parser = argparse.ArgumentParser() parser.add_argument('--config_file', required=True) parser.add_argument('--install_path', default=None) parser.add_argument('--db_url', default=None) parser.add_argument('--database_folder', default=None) parser.add_argument('--taxonomy_folder', default=None) args = parser.parse_args() cat_path = None cat_db = None tax_db = None if args.database_folder and args.taxonomy_folder: cat_path = os.path.dirname(args.database_folder) cat_db = os.path.basename(args.database_folder) tax_db = os.path.basename(args.taxonomy_folder) cat_prepare(os.getcwd(), db_dir=args.database_folder, tax_dir=args.taxonomy_folder) elif not args.install_path: sys.exit(1) else: if not os.path.exists(args.install_path): os.makedirs(args.install_path) if args.db_url: url_download(args.db_url, args.install_path) else: cat_prepare(args.install_path) for root, dirs, _ in os.walk(args.install_path): for dname in dirs: if dname.endswith('CAT_database'): cat_db = dname elif dname.endswith('taxonomy'): tax_db = dname if cat_db and tax_db: cat_path = root break cat_dir = os.path.basename(cat_path) dm_dict = {} dm_dict['data_tables'] = dm_dict.get('data_tables', {}) data_table = 'cat_database' dm_dict['data_tables'][data_table]\ = dm_dict['data_tables'].get(data_table, []) data_table_entry = dict(value=cat_dir, name=cat_dir, database_folder=os.path.join(cat_dir, cat_db), taxonomy_folder=os.path.join(cat_dir, tax_db)) dm_dict['data_tables'][data_table].append(data_table_entry) # save info to json file with open(args.config_file, 'w') as fh: json.dump(dm_dict, fh, sort_keys=True) if __name__ == "__main__": main()