Mercurial > repos > rhohensinner > galaxy_irods_interface
diff irods_upload.py @ 2:0641ea2f75b1 draft
"planemo upload commit b2a00d9c24285fef0fb131d1832ecf4c337e5038-dirty"
author | rhohensinner |
---|---|
date | Fri, 02 Jul 2021 09:40:25 +0000 |
parents | |
children | d2be2eb8350f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/irods_upload.py Fri Jul 02 09:40:25 2021 +0000 @@ -0,0 +1,340 @@ +#!/usr/bin/env python +# Processes uploads from the user. + +# WARNING: Changes in this tool (particularly as related to parsing) may need +# to be reflected in galaxy.web.controllers.tool_runner and galaxy.tools +from __future__ import print_function + +import errno +import os +import shutil +import sys +sys.path.append("/home/richard/galaxy/lib") +from json import dump, load, loads + +from galaxy.datatypes import sniff +from galaxy.datatypes.registry import Registry +from galaxy.datatypes.upload_util import handle_upload, UploadProblemException +from galaxy.util import ( + bunch, + safe_makedirs, + unicodify +) +from galaxy.util.compression_utils import CompressedFile + +assert sys.version_info[:2] >= (2, 7) + + +_file_sources = None + + +def get_file_sources(): + global _file_sources + if _file_sources is None: + from galaxy.files import ConfiguredFileSources + file_sources = None + if os.path.exists("file_sources.json"): + file_sources_as_dict = None + with open("file_sources.json") as f: + file_sources_as_dict = load(f) + if file_sources_as_dict is not None: + file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict) + if file_sources is None: + ConfiguredFileSources.from_dict([]) + _file_sources = file_sources + return _file_sources + + +def file_err(msg, dataset): + # never remove a server-side upload + if dataset.type not in ('server_dir', 'path_paste'): + try: + os.remove(dataset.path) + except Exception: + pass + return dict(type='dataset', + ext='data', + dataset_id=dataset.dataset_id, + stderr=msg, + failed=True) + + +def safe_dict(d): + """Recursively clone JSON structure with unicode dictionary keys.""" + if isinstance(d, dict): + return {unicodify(k): safe_dict(v) for k, v in d.items()} + elif isinstance(d, list): + return [safe_dict(x) for x in d] + else: + return d + + +def parse_outputs(args): + rval = {} + for arg in args: + id, files_path, path = arg.split(':', 2) + rval[int(id)] = (path, files_path) + return rval + + +def add_file(dataset, registry, output_path): + ext = None + compression_type = None + line_count = None + link_data_only_str = dataset.get('link_data_only', 'copy_files') + if link_data_only_str not in ['link_to_files', 'copy_files']: + raise UploadProblemException("Invalid setting '%s' for option link_data_only - upload request misconfigured" % link_data_only_str) + link_data_only = link_data_only_str == 'link_to_files' + + # run_as_real_user is estimated from galaxy config (external chmod indicated of inputs executed) + # If this is True we always purge supplied upload inputs so they are cleaned up and we reuse their + # paths during data conversions since this user already owns that path. + # Older in_place check for upload jobs created before 18.01, TODO remove in 19.XX. xref #5206 + run_as_real_user = dataset.get('run_as_real_user', False) or dataset.get("in_place", False) + + # purge_source defaults to True unless this is an FTP import and + # ftp_upload_purge has been overridden to False in Galaxy's config. + # We set purge_source to False if: + # - the job does not have write access to the file, e.g. when running as the + # real user + # - the files are uploaded from external paths. + purge_source = dataset.get('purge_source', True) and not run_as_real_user and dataset.type not in ('server_dir', 'path_paste') + + # in_place is True unless we are running as a real user or importing external paths (i.e. + # this is a real upload and not a path paste or ftp import). + # in_place should always be False if running as real user because the uploaded file will + # be owned by Galaxy and not the user and it should be False for external paths so Galaxy doesn't + # modify files not controlled by Galaxy. + in_place = not run_as_real_user and dataset.type not in ('server_dir', 'path_paste', 'ftp_import') + + # Base on the check_upload_content Galaxy config option and on by default, this enables some + # security related checks on the uploaded content, but can prevent uploads from working in some cases. + check_content = dataset.get('check_content' , True) + + # auto_decompress is a request flag that can be swapped off to prevent Galaxy from automatically + # decompressing archive files before sniffing. + auto_decompress = dataset.get('auto_decompress', True) + try: + dataset.file_type + except AttributeError: + raise UploadProblemException('Unable to process uploaded file, missing file_type parameter.') + + if dataset.type == 'url': + try: + dataset.path = sniff.stream_url_to_file(dataset.path, file_sources=get_file_sources()) + except Exception as e: + raise UploadProblemException('Unable to fetch %s\n%s' % (dataset.path, unicodify(e))) + + # See if we have an empty file + if not os.path.exists(dataset.path): + raise UploadProblemException('Uploaded temporary file (%s) does not exist.' % dataset.path) + + stdout, ext, datatype, is_binary, converted_path = handle_upload( + registry=registry, + path=dataset.path, + requested_ext=dataset.file_type, + name=dataset.name, + tmp_prefix='data_id_%s_upload_' % dataset.dataset_id, + tmp_dir=output_adjacent_tmpdir(output_path), + check_content=check_content, + link_data_only=link_data_only, + in_place=in_place, + auto_decompress=auto_decompress, + convert_to_posix_lines=dataset.to_posix_lines, + convert_spaces_to_tabs=dataset.space_to_tab, + ) + + # Strip compression extension from name + if compression_type and not getattr(datatype, 'compressed', False) and dataset.name.endswith('.' + compression_type): + dataset.name = dataset.name[:-len('.' + compression_type)] + + # Move dataset + if link_data_only: + # Never alter a file that will not be copied to Galaxy's local file store. + if datatype.dataset_content_needs_grooming(dataset.path): + err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \ + '<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.' + raise UploadProblemException(err_msg) + if not link_data_only: + # Move the dataset to its "real" path. converted_path is a tempfile so we move it even if purge_source is False. + if purge_source or converted_path: + try: + # If user has indicated that the original file to be purged and have converted_path tempfile + if purge_source and converted_path: + shutil.move(converted_path, output_path) + os.remove(dataset.path) + else: + shutil.move(converted_path or dataset.path, output_path) + except OSError as e: + # We may not have permission to remove the input + if e.errno != errno.EACCES: + raise + else: + shutil.copy(dataset.path, output_path) + + # Write the job info + stdout = stdout or 'uploaded %s file' % ext + info = dict(type='dataset', + dataset_id=dataset.dataset_id, + ext=ext, + stdout=stdout, + name=dataset.name, + line_count=line_count) + if dataset.get('uuid', None) is not None: + info['uuid'] = dataset.get('uuid') + # FIXME: does this belong here? also not output-adjacent-tmpdir aware =/ + if not link_data_only and datatype and datatype.dataset_content_needs_grooming(output_path): + # Groom the dataset content if necessary + datatype.groom_dataset_content(output_path) + return info + + +def add_composite_file(dataset, registry, output_path, files_path): + datatype = None + + # Find data type + if dataset.file_type is not None: + datatype = registry.get_datatype_by_extension(dataset.file_type) + + def to_path(path_or_url): + is_url = path_or_url.find('://') != -1 # todo fixme + if is_url: + try: + temp_name = sniff.stream_url_to_file(path_or_url, file_sources=get_file_sources()) + except Exception as e: + raise UploadProblemException('Unable to fetch %s\n%s' % (path_or_url, unicodify(e))) + + return temp_name, is_url + + return path_or_url, is_url + + def make_files_path(): + safe_makedirs(files_path) + + def stage_file(name, composite_file_path, is_binary=False): + dp = composite_file_path['path'] + path, is_url = to_path(dp) + if is_url: + dataset.path = path + dp = path + + auto_decompress = composite_file_path.get('auto_decompress', True) + if auto_decompress and not datatype.composite_type and CompressedFile.can_decompress(dp): + # It isn't an explicitly composite datatype, so these are just extra files to attach + # as composite data. It'd be better if Galaxy was communicating this to the tool + # a little more explicitly so we didn't need to dispatch on the datatype and so we + # could attach arbitrary extra composite data to an existing composite datatype if + # if need be? Perhaps that would be a mistake though. + CompressedFile(dp).extract(files_path) + else: + tmpdir = output_adjacent_tmpdir(output_path) + tmp_prefix = 'data_id_%s_convert_' % dataset.dataset_id + sniff.handle_composite_file( + datatype, + dp, + files_path, + name, + is_binary, + tmpdir, + tmp_prefix, + composite_file_path, + ) + + # Do we have pre-defined composite files from the datatype definition. + if dataset.composite_files: + make_files_path() + for name, value in dataset.composite_files.items(): + value = bunch.Bunch(**value) + if value.name not in dataset.composite_file_paths: + raise UploadProblemException("Failed to find file_path %s in %s" % (value.name, dataset.composite_file_paths)) + if dataset.composite_file_paths[value.name] is None and not value.optional: + raise UploadProblemException('A required composite data file was not provided (%s)' % name) + elif dataset.composite_file_paths[value.name] is not None: + composite_file_path = dataset.composite_file_paths[value.name] + stage_file(name, composite_file_path, value.is_binary) + + # Do we have ad-hoc user supplied composite files. + elif dataset.composite_file_paths: + make_files_path() + for key, composite_file in dataset.composite_file_paths.items(): + stage_file(key, composite_file) # TODO: replace these defaults + + # Move the dataset to its "real" path + primary_file_path, _ = to_path(dataset.primary_file) + shutil.move(primary_file_path, output_path) + + # Write the job info + return dict(type='dataset', + dataset_id=dataset.dataset_id, + stdout='uploaded %s file' % dataset.file_type) + + +def __read_paramfile(path): + with open(path) as fh: + obj = load(fh) + # If there's a single dataset in an old-style paramfile it'll still parse, but it'll be a dict + assert type(obj) == list + return obj + + +def __read_old_paramfile(path): + datasets = [] + with open(path) as fh: + for line in fh: + datasets.append(loads(line)) + return datasets + + +def __write_job_metadata(metadata): + # TODO: make upload/set_metadata compatible with https://github.com/galaxyproject/galaxy/pull/4437 + with open('galaxy.json', 'w') as fh: + for meta in metadata: + dump(meta, fh) + fh.write('\n') + + +def output_adjacent_tmpdir(output_path): + """ For temp files that will ultimately be moved to output_path anyway + just create the file directly in output_path's directory so shutil.move + will work optimally. + """ + return os.path.dirname(output_path) + + +def __main__(): + + if len(sys.argv) < 4: + print('usage: upload.py <root> <datatypes_conf> <json paramfile> <output spec> ...', file=sys.stderr) + sys.exit(1) + + output_paths = parse_outputs(sys.argv[4:]) + + registry = Registry() + registry.load_datatypes(root_dir=sys.argv[1], config=sys.argv[2]) + + try: + datasets = __read_paramfile(sys.argv[3]) + except (ValueError, AssertionError): + datasets = __read_old_paramfile(sys.argv[3]) + + metadata = [] + for dataset in datasets: + dataset = bunch.Bunch(**safe_dict(dataset)) + try: + output_path = output_paths[int(dataset.dataset_id)][0] + except Exception: + print('Output path for dataset %s not found on command line' % dataset.dataset_id, file=sys.stderr) + sys.exit(1) + try: + if dataset.type == 'composite': + files_path = output_paths[int(dataset.dataset_id)][1] + metadata.append(add_composite_file(dataset, registry, output_path, files_path)) + else: + metadata.append(add_file(dataset, registry, output_path)) + except UploadProblemException as e: + metadata.append(file_err(unicodify(e), dataset)) + __write_job_metadata(metadata) + + +if __name__ == '__main__': + __main__()