Mercurial > repos > devteam > data_manager_rsync_g2
view data_manager/data_manager_rsync.py @ 2:e0329ab30f6d draft default tip
"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/data_managers/data_manager_rsync_g2 commit 57f71aa633a43ab02bbf05acd0c6d7f406e01f1e"
author | iuc |
---|---|
date | Thu, 28 Nov 2019 15:47:47 -0500 |
parents | 8ff92bd7e2a3 |
children |
line wrap: on
line source
#!/usr/bin/env python # Dan Blankenberg from __future__ import print_function import base64 import datetime import logging import optparse import os import shutil import subprocess import tempfile from json import ( dumps, loads ) from os.path import basename from xml.etree.ElementTree import tostring try: # For Python 3.0 and later from urllib.request import urlopen except ImportError: # Fall back to Python 2 imports from urllib2 import urlopen _log_name = __name__ if _log_name == '__builtin__': _log_name = 'toolshed.installed.g2.rsync.data.manager' log = logging.getLogger(_log_name) # Get the Data from the Galaxy Project rsync server RSYNC_CMD = 'rsync' RSYNC_SERVER = "rsync://datacache.g2.bx.psu.edu/" LOCATION_DIR = "location" INDEX_DIR = "indexes" # Pull the Tool Data Table files from github # FIXME: These files should be accessible from the rsync server directly. TOOL_DATA_TABLE_CONF_XML_URLS = {'main': "https://raw.githubusercontent.com/galaxyproject/usegalaxy-playbook/master/env/main/files/galaxy/config/tool_data_table_conf.xml", 'test': "https://raw.githubusercontent.com/galaxyproject/usegalaxy-playbook/master/env/test/files/galaxy/config/tool_data_table_conf.xml"} # Replace data table source entries with local temporary location GALAXY_DATA_CANONICAL_PATH = "/galaxy/data/" TOOL_DATA_TABLE_CONF_XML_REPLACE_SOURCE = '<file path="%slocation/' % (GALAXY_DATA_CANONICAL_PATH) TOOL_DATA_TABLE_CONF_XML_REPLACE_TARGET = '<file path="%s/' # Some basic Caching, so we don't have to reload and download everything every time CACHE_TIME = datetime.timedelta(minutes=10) TOOL_DATA_TABLES_LOADED_BY_URL = {} # Entries will not be selected by default DEFAULT_SELECTED = False # Exclude data managers without 'path' column or that are in the manual exclude list PATH_COLUMN_NAMES = ['path'] EXCLUDE_DATA_TABLES = [] # TODO: Make additional handler actions available for tables that can't fit into the the basic # "take the value of path" as a dir and copy contents. # e.g. mafs. Although this maf table is goofy and doesn't have path defined in <table> def, # but it does exit in the .loc. # --- These methods are called by/within the Galaxy Application def exec_before_job(app, inp_data, out_data, param_dict, tool=None, **kwd): # Look for any data tables that haven't been defined for this data manager before and dynamically add them to Galaxy param_dict = dict(**param_dict) param_dict['data_table_entries'] = param_dict.get('data_table_entries', []) if not isinstance(param_dict['data_table_entries'], list): param_dict['data_table_entries'] = [param_dict['data_table_entries']] param_dict['data_table_entries'] = ",".join(param_dict['data_table_entries']) if tool: tool_shed_repository = tool.tool_shed_repository else: tool_shed_repository = None tdtm = None data_manager = app.data_managers.get_manager(tool.data_manager_id, None) data_table_entries = get_data_table_entries(param_dict) data_tables = load_data_tables_from_url(data_table_class=app.tool_data_tables.__class__).get('data_tables') for data_table_name, entries in data_table_entries.items(): # get data table managed by this data Manager has_data_table = app.tool_data_tables.get_tables().get(str(data_table_name)) if has_data_table: has_data_table = bool(has_data_table.get_filename_for_source(data_manager, None)) if not has_data_table: if tdtm is None: from tool_shed.tools import data_table_manager tdtm = data_table_manager.ToolDataTableManager(app) target_dir, tool_path, relative_target_dir = tdtm.get_target_install_dir(tool_shed_repository) # Dynamically add this data table log.debug("Attempting to dynamically create a missing Tool Data Table named %s." % data_table_name) data_table = data_tables[data_table_name] repo_info = tdtm.generate_repository_info_elem_from_repository(tool_shed_repository, parent_elem=None) if repo_info is not None: repo_info = tostring(repo_info) tmp_file = tempfile.NamedTemporaryFile(mode="w") tmp_file.write(get_new_xml_definition(app, data_table, data_manager, repo_info, target_dir)) tmp_file.flush() app.tool_data_tables.add_new_entries_from_config_file(tmp_file.name, None, app.config.shed_tool_data_table_config, persist=True) tmp_file.close() def galaxy_code_get_available_data_tables(trans): # list of data tables found_tables = get_available_tables(trans) rval = [(x, x, DEFAULT_SELECTED) for x in found_tables] return rval def galaxy_code_get_available_data_tables_entries(trans, dbkey, data_table_names): # available entries, optionally filtered by dbkey and table names if dbkey in [None, '', '?']: dbkey = None if data_table_names in [None, '', '?']: data_table_names = None found_tables = get_available_tables_for_dbkey(trans, dbkey, data_table_names) dbkey_text = '(%s) ' % (dbkey) if dbkey else '' rval = [("%s%s" % (dbkey_text, x[0]), base64.b64encode(dumps(dict(name=x[0].split(': ')[0], entry=x[1]), sort_keys=True).rstrip().encode('utf-8')).decode('utf-8'), DEFAULT_SELECTED) for x in found_tables.items()] return rval # --- End Galaxy called Methods --- def rsync_urljoin(base, url): # urlparse.urljoin doesn't work correctly for our use-case # probably because it doesn't recognize rhe rsync scheme base = base.rstrip('/') url = url.lstrip('/') return "%s/%s" % (base, url) def rsync_list_dir(server, dir=None, skip_names=[]): # drwxr-xr-x 50 2014/05/16 20:58:11 . if dir: dir = rsync_urljoin(server, dir) else: dir = server rsync_response = tempfile.NamedTemporaryFile() rsync_stderr = tempfile.NamedTemporaryFile() rsync_cmd = [RSYNC_CMD, '--list-only', dir] return_code = subprocess.call(rsync_cmd, stdout=rsync_response, stderr=rsync_stderr) rsync_response.flush() rsync_response.seek(0) rsync_stderr.flush() rsync_stderr.seek(0) if return_code: msg = "stdout:\n%s\nstderr:\n%s" % (rsync_response.read(), rsync_stderr.read()) rsync_response.close() rsync_stderr.close() raise Exception('Failed to execute rsync command (%s), returncode=%s. Rsync_output:\n%s' % (rsync_cmd, return_code, msg)) rsync_stderr.close() rval = {} for line in rsync_response: perms, line = line.split(None, 1) line = line.strip() size, line = line.split(None, 1) line = line.strip() date, line = line.split(None, 1) line = line.strip() time, line = line.split(None, 1) name = line.strip() if name in skip_names: continue size = line.strip() rval[name] = dict(name=name, permissions=perms, bytes=size, date=date, time=time) rsync_response.close() return rval def rsync_sync_to_dir(source, target): rsync_response = tempfile.NamedTemporaryFile() rsync_stderr = tempfile.NamedTemporaryFile() rsync_cmd = [RSYNC_CMD, '-avzP', source, target] return_code = subprocess.call(rsync_cmd, stdout=rsync_response, stderr=rsync_stderr) rsync_response.flush() rsync_response.seek(0) rsync_stderr.flush() rsync_stderr.seek(0) if return_code: msg = "stdout:\n%s\nstderr:\n%s" % (rsync_response.read(), rsync_stderr.read()) rsync_response.close() rsync_stderr.close() raise Exception('Failed to execute rsync command (%s), returncode=%s. Rsync_output:\n%s' % (rsync_cmd, return_code, msg)) rsync_response.close() rsync_stderr.close() return return_code def data_table_needs_refresh(cached_data_table, url): if cached_data_table is None: return True, {} if datetime.datetime.now() - cached_data_table.get('time_loaded') > CACHE_TIME: data_table_text = urlopen(url).read().decode('utf-8') if cached_data_table.get('data_table_text', None) != data_table_text: return True, {'data_table_text': data_table_text} loc_file_attrs = rsync_list_dir(RSYNC_SERVER, LOCATION_DIR) if cached_data_table.get('loc_file_attrs', None) != loc_file_attrs: return True, {'loc_file_attrs': loc_file_attrs} return False, {} def load_data_tables_from_url(url=None, site='main', data_table_class=None): if not url: url = TOOL_DATA_TABLE_CONF_XML_URLS.get(site, None) assert url, ValueError('You must provide either a URL or a site=name.') cached_data_table = TOOL_DATA_TABLES_LOADED_BY_URL.get(url, None) refresh, attribs = data_table_needs_refresh(cached_data_table, url) if refresh: data_table_text = attribs.get('data_table_text') or urlopen(url).read().decode('utf-8') loc_file_attrs = attribs.get('loc_file_attrs') or rsync_list_dir(RSYNC_SERVER, LOCATION_DIR) tmp_dir = tempfile.mkdtemp(prefix='rsync_g2_') tmp_loc_dir = os.path.join(tmp_dir, 'location') os.mkdir(tmp_loc_dir) rsync_sync_to_dir(rsync_urljoin(RSYNC_SERVER, LOCATION_DIR), os.path.abspath(tmp_loc_dir)) new_data_table_text = data_table_text.replace(TOOL_DATA_TABLE_CONF_XML_REPLACE_SOURCE, TOOL_DATA_TABLE_CONF_XML_REPLACE_TARGET % (tmp_loc_dir)) data_table_fh = tempfile.NamedTemporaryFile(dir=tmp_dir, prefix='rysnc_data_manager_data_table_conf_', mode="w") data_table_fh.write(new_data_table_text) data_table_fh.flush() tmp_data_dir = os.path.join(tmp_dir, 'tool-data') os.mkdir(tmp_data_dir) data_tables = data_table_class(tmp_data_dir, config_filename=data_table_fh.name) for name, data_table in list(data_tables.data_tables.items()): if name in EXCLUDE_DATA_TABLES or not data_table_has_path_column(data_table): log.debug('Removing data table "%s" because it is excluded by name or does not have a defined "path" column.', name) del data_tables.data_tables[name] cached_data_table = {'data_tables': data_tables, 'tmp_dir': tmp_dir, 'data_table_text': data_table_text, 'tmp_loc_dir': tmp_loc_dir, 'loc_file_attrs': loc_file_attrs, 'time_loaded': datetime.datetime.now()} TOOL_DATA_TABLES_LOADED_BY_URL[url] = cached_data_table # delete the files data_table_fh.close() cleanup_before_exit(tmp_dir) return cached_data_table def data_table_has_path_column(data_table): col_names = data_table.get_column_name_list() for name in PATH_COLUMN_NAMES: if name in col_names: return True return False def get_available_tables(trans): # list of data tables data_tables = load_data_tables_from_url(data_table_class=trans.app.tool_data_tables.__class__) return data_tables.get('data_tables').get_tables().keys() def get_new_xml_definition(app, data_table, data_manager, repo_info=None, location_file_dir=None): sub_dict = {'table_name': data_table.name, 'comment_char': '', 'columns': '', 'file_path': ''} sub_dict.update(data_manager.get_tool_shed_repository_info_dict()) if data_table.comment_char: sub_dict['comment_char'] = 'comment_char="%s"' % (data_table.comment_char) for i, name in enumerate(data_table.get_column_name_list()): if name is not None: sub_dict['columns'] = "%s\n%s" % (sub_dict['columns'], '<column name="%s" index="%s" />' % (name, i)) location_file_dir = location_file_dir or app.config.galaxy_data_manager_data_path for filename in data_table.filenames.keys(): sub_dict['file_path'] = basename(filename) sub_dict['file_path'] = os.path.join(location_file_dir, sub_dict['file_path']) # os.path.abspath? if not os.path.exists(sub_dict['file_path']): # Create empty file open(sub_dict['file_path'], 'wb+').close() break sub_dict['repo_info'] = repo_info or '' return """ <tables><table name="%(table_name)s" %(comment_char)s> %(columns)s <file path="%(file_path)s" /> %(repo_info)s </table></tables> """ % sub_dict def get_available_tables_for_dbkey(trans, dbkey, data_table_names): data_tables = load_data_tables_from_url(data_table_class=trans.app.tool_data_tables.__class__) rval = {} for name, data_table in data_tables.get('data_tables').get_tables().items(): if (not data_table_names or name in data_table_names): # TODO: check that columns are similiar if not dbkey: entry_getter = data_table.get_named_fields_list() else: entry_getter = data_table.get_entries('dbkey', dbkey, None, default=[]) for entry in entry_getter: name = "%s: %s" % (data_table.name, dumps(entry)) rval[name] = entry return rval def split_path_all(path): rval = [] path = path.rstrip('/') while True: head, tail = os.path.split(path) if tail: rval.append(tail) path = head elif head: rval.append(head) break else: break rval.reverse() return rval def get_data_for_path(path, data_root_dir): # We list dir with a /, but copy data without # listing with / gives a . entry when its a dir # cloning without the / will copy that whole directory into the target, # instead of just that target's contents if path.startswith(GALAXY_DATA_CANONICAL_PATH): path = path[len(GALAXY_DATA_CANONICAL_PATH):] make_path = path rsync_source = rsync_urljoin(rsync_urljoin(RSYNC_SERVER, INDEX_DIR), path) if rsync_source.endswith('/'): rsync_source = rsync_source[:-1] try: dir_list = rsync_list_dir(rsync_source + "/") except Exception: dir_list = None while not dir_list or '.' not in dir_list: head, tail = os.path.split(make_path) if not head: head = tail make_path = head rsync_source = rsync_urljoin(rsync_urljoin(RSYNC_SERVER, INDEX_DIR), head) # if we error here, likely due to a connection issue if rsync_source.endswith('/'): rsync_source = rsync_source[:-1] dir_list = rsync_list_dir(rsync_source + "/") split_path = split_path_all(make_path) target_path = data_root_dir for p in split_path[:-1]: target_path = os.path.join(target_path, p) if not os.path.exists(target_path): os.mkdir(target_path) rsync_sync_to_dir(rsync_source, target_path) return path def get_data_and_munge_path(data_table_name, data_table_entry, data_root_dir): path_cols = [] for key, value in data_table_entry.items(): if key in PATH_COLUMN_NAMES: path_cols.append((key, value)) if path_cols: for col_name, value in path_cols: if value.startswith(GALAXY_DATA_CANONICAL_PATH): data_table_entry[col_name] = get_data_for_path(value, data_root_dir) else: print('unable to determine location of rsync data for', data_table_name, data_table_entry) return data_table_entry def fulfill_data_table_entries(data_table_entries, data_manager_dict, data_root_dir): for data_table_name, entries in data_table_entries.items(): for entry in entries: entry = get_data_and_munge_path(data_table_name, entry, data_root_dir) _add_data_table_entry(data_manager_dict, data_table_name, entry) return data_manager_dict def _add_data_table_entry(data_manager_dict, data_table_name, data_table_entry): data_manager_dict['data_tables'] = data_manager_dict.get('data_tables', {}) data_manager_dict['data_tables'][data_table_name] = data_manager_dict['data_tables'].get(data_table_name, []) data_manager_dict['data_tables'][data_table_name].append(data_table_entry) return data_manager_dict def cleanup_before_exit(tmp_dir): if tmp_dir and os.path.exists(tmp_dir): shutil.rmtree(tmp_dir) def get_data_table_entries(params): rval = {} data_table_entries = params.get('data_table_entries', None) if data_table_entries: for entry_text in data_table_entries.split(','): entry_text = base64.b64decode(entry_text.strip().encode('utf-8')) entry_dict = loads(entry_text) data_table_name = entry_dict['name'] data_table_entry = entry_dict['entry'] rval[data_table_name] = rval.get(data_table_name, []) rval[data_table_name].append(data_table_entry) return rval def main(): parser = optparse.OptionParser() (options, args) = parser.parse_args() filename = args[0] params = loads(open(filename).read()) target_directory = params['output_data'][0]['extra_files_path'] os.mkdir(target_directory) data_manager_dict = {} data_table_entries = get_data_table_entries(params['param_dict']) # Populate the data Tables data_manager_dict = fulfill_data_table_entries(data_table_entries, data_manager_dict, target_directory) # save info to json file open(filename, 'w').write(dumps(data_manager_dict, sort_keys=True)) if __name__ == "__main__": main()