Mercurial > repos > rhohensinner > galaxy_irods_interface
view irods_upload.py @ 2:0641ea2f75b1 draft
"planemo upload commit b2a00d9c24285fef0fb131d1832ecf4c337e5038-dirty"
author | rhohensinner |
---|---|
date | Fri, 02 Jul 2021 09:40:25 +0000 |
parents | |
children | d2be2eb8350f |
line wrap: on
line source
#!/usr/bin/env python # Processes uploads from the user. # WARNING: Changes in this tool (particularly as related to parsing) may need # to be reflected in galaxy.web.controllers.tool_runner and galaxy.tools from __future__ import print_function import errno import os import shutil import sys sys.path.append("/home/richard/galaxy/lib") from json import dump, load, loads from galaxy.datatypes import sniff from galaxy.datatypes.registry import Registry from galaxy.datatypes.upload_util import handle_upload, UploadProblemException from galaxy.util import ( bunch, safe_makedirs, unicodify ) from galaxy.util.compression_utils import CompressedFile assert sys.version_info[:2] >= (2, 7) _file_sources = None def get_file_sources(): global _file_sources if _file_sources is None: from galaxy.files import ConfiguredFileSources file_sources = None if os.path.exists("file_sources.json"): file_sources_as_dict = None with open("file_sources.json") as f: file_sources_as_dict = load(f) if file_sources_as_dict is not None: file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict) if file_sources is None: ConfiguredFileSources.from_dict([]) _file_sources = file_sources return _file_sources def file_err(msg, dataset): # never remove a server-side upload if dataset.type not in ('server_dir', 'path_paste'): try: os.remove(dataset.path) except Exception: pass return dict(type='dataset', ext='data', dataset_id=dataset.dataset_id, stderr=msg, failed=True) def safe_dict(d): """Recursively clone JSON structure with unicode dictionary keys.""" if isinstance(d, dict): return {unicodify(k): safe_dict(v) for k, v in d.items()} elif isinstance(d, list): return [safe_dict(x) for x in d] else: return d def parse_outputs(args): rval = {} for arg in args: id, files_path, path = arg.split(':', 2) rval[int(id)] = (path, files_path) return rval def add_file(dataset, registry, output_path): ext = None compression_type = None line_count = None link_data_only_str = dataset.get('link_data_only', 'copy_files') if link_data_only_str not in ['link_to_files', 'copy_files']: raise UploadProblemException("Invalid setting '%s' for option link_data_only - upload request misconfigured" % link_data_only_str) link_data_only = link_data_only_str == 'link_to_files' # run_as_real_user is estimated from galaxy config (external chmod indicated of inputs executed) # If this is True we always purge supplied upload inputs so they are cleaned up and we reuse their # paths during data conversions since this user already owns that path. # Older in_place check for upload jobs created before 18.01, TODO remove in 19.XX. xref #5206 run_as_real_user = dataset.get('run_as_real_user', False) or dataset.get("in_place", False) # purge_source defaults to True unless this is an FTP import and # ftp_upload_purge has been overridden to False in Galaxy's config. # We set purge_source to False if: # - the job does not have write access to the file, e.g. when running as the # real user # - the files are uploaded from external paths. purge_source = dataset.get('purge_source', True) and not run_as_real_user and dataset.type not in ('server_dir', 'path_paste') # in_place is True unless we are running as a real user or importing external paths (i.e. # this is a real upload and not a path paste or ftp import). # in_place should always be False if running as real user because the uploaded file will # be owned by Galaxy and not the user and it should be False for external paths so Galaxy doesn't # modify files not controlled by Galaxy. in_place = not run_as_real_user and dataset.type not in ('server_dir', 'path_paste', 'ftp_import') # Base on the check_upload_content Galaxy config option and on by default, this enables some # security related checks on the uploaded content, but can prevent uploads from working in some cases. check_content = dataset.get('check_content' , True) # auto_decompress is a request flag that can be swapped off to prevent Galaxy from automatically # decompressing archive files before sniffing. auto_decompress = dataset.get('auto_decompress', True) try: dataset.file_type except AttributeError: raise UploadProblemException('Unable to process uploaded file, missing file_type parameter.') if dataset.type == 'url': try: dataset.path = sniff.stream_url_to_file(dataset.path, file_sources=get_file_sources()) except Exception as e: raise UploadProblemException('Unable to fetch %s\n%s' % (dataset.path, unicodify(e))) # See if we have an empty file if not os.path.exists(dataset.path): raise UploadProblemException('Uploaded temporary file (%s) does not exist.' % dataset.path) stdout, ext, datatype, is_binary, converted_path = handle_upload( registry=registry, path=dataset.path, requested_ext=dataset.file_type, name=dataset.name, tmp_prefix='data_id_%s_upload_' % dataset.dataset_id, tmp_dir=output_adjacent_tmpdir(output_path), check_content=check_content, link_data_only=link_data_only, in_place=in_place, auto_decompress=auto_decompress, convert_to_posix_lines=dataset.to_posix_lines, convert_spaces_to_tabs=dataset.space_to_tab, ) # Strip compression extension from name if compression_type and not getattr(datatype, 'compressed', False) and dataset.name.endswith('.' + compression_type): dataset.name = dataset.name[:-len('.' + compression_type)] # Move dataset if link_data_only: # Never alter a file that will not be copied to Galaxy's local file store. if datatype.dataset_content_needs_grooming(dataset.path): err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \ '<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.' raise UploadProblemException(err_msg) if not link_data_only: # Move the dataset to its "real" path. converted_path is a tempfile so we move it even if purge_source is False. if purge_source or converted_path: try: # If user has indicated that the original file to be purged and have converted_path tempfile if purge_source and converted_path: shutil.move(converted_path, output_path) os.remove(dataset.path) else: shutil.move(converted_path or dataset.path, output_path) except OSError as e: # We may not have permission to remove the input if e.errno != errno.EACCES: raise else: shutil.copy(dataset.path, output_path) # Write the job info stdout = stdout or 'uploaded %s file' % ext info = dict(type='dataset', dataset_id=dataset.dataset_id, ext=ext, stdout=stdout, name=dataset.name, line_count=line_count) if dataset.get('uuid', None) is not None: info['uuid'] = dataset.get('uuid') # FIXME: does this belong here? also not output-adjacent-tmpdir aware =/ if not link_data_only and datatype and datatype.dataset_content_needs_grooming(output_path): # Groom the dataset content if necessary datatype.groom_dataset_content(output_path) return info def add_composite_file(dataset, registry, output_path, files_path): datatype = None # Find data type if dataset.file_type is not None: datatype = registry.get_datatype_by_extension(dataset.file_type) def to_path(path_or_url): is_url = path_or_url.find('://') != -1 # todo fixme if is_url: try: temp_name = sniff.stream_url_to_file(path_or_url, file_sources=get_file_sources()) except Exception as e: raise UploadProblemException('Unable to fetch %s\n%s' % (path_or_url, unicodify(e))) return temp_name, is_url return path_or_url, is_url def make_files_path(): safe_makedirs(files_path) def stage_file(name, composite_file_path, is_binary=False): dp = composite_file_path['path'] path, is_url = to_path(dp) if is_url: dataset.path = path dp = path auto_decompress = composite_file_path.get('auto_decompress', True) if auto_decompress and not datatype.composite_type and CompressedFile.can_decompress(dp): # It isn't an explicitly composite datatype, so these are just extra files to attach # as composite data. It'd be better if Galaxy was communicating this to the tool # a little more explicitly so we didn't need to dispatch on the datatype and so we # could attach arbitrary extra composite data to an existing composite datatype if # if need be? Perhaps that would be a mistake though. CompressedFile(dp).extract(files_path) else: tmpdir = output_adjacent_tmpdir(output_path) tmp_prefix = 'data_id_%s_convert_' % dataset.dataset_id sniff.handle_composite_file( datatype, dp, files_path, name, is_binary, tmpdir, tmp_prefix, composite_file_path, ) # Do we have pre-defined composite files from the datatype definition. if dataset.composite_files: make_files_path() for name, value in dataset.composite_files.items(): value = bunch.Bunch(**value) if value.name not in dataset.composite_file_paths: raise UploadProblemException("Failed to find file_path %s in %s" % (value.name, dataset.composite_file_paths)) if dataset.composite_file_paths[value.name] is None and not value.optional: raise UploadProblemException('A required composite data file was not provided (%s)' % name) elif dataset.composite_file_paths[value.name] is not None: composite_file_path = dataset.composite_file_paths[value.name] stage_file(name, composite_file_path, value.is_binary) # Do we have ad-hoc user supplied composite files. elif dataset.composite_file_paths: make_files_path() for key, composite_file in dataset.composite_file_paths.items(): stage_file(key, composite_file) # TODO: replace these defaults # Move the dataset to its "real" path primary_file_path, _ = to_path(dataset.primary_file) shutil.move(primary_file_path, output_path) # Write the job info return dict(type='dataset', dataset_id=dataset.dataset_id, stdout='uploaded %s file' % dataset.file_type) def __read_paramfile(path): with open(path) as fh: obj = load(fh) # If there's a single dataset in an old-style paramfile it'll still parse, but it'll be a dict assert type(obj) == list return obj def __read_old_paramfile(path): datasets = [] with open(path) as fh: for line in fh: datasets.append(loads(line)) return datasets def __write_job_metadata(metadata): # TODO: make upload/set_metadata compatible with https://github.com/galaxyproject/galaxy/pull/4437 with open('galaxy.json', 'w') as fh: for meta in metadata: dump(meta, fh) fh.write('\n') def output_adjacent_tmpdir(output_path): """ For temp files that will ultimately be moved to output_path anyway just create the file directly in output_path's directory so shutil.move will work optimally. """ return os.path.dirname(output_path) def __main__(): if len(sys.argv) < 4: print('usage: upload.py <root> <datatypes_conf> <json paramfile> <output spec> ...', file=sys.stderr) sys.exit(1) output_paths = parse_outputs(sys.argv[4:]) registry = Registry() registry.load_datatypes(root_dir=sys.argv[1], config=sys.argv[2]) try: datasets = __read_paramfile(sys.argv[3]) except (ValueError, AssertionError): datasets = __read_old_paramfile(sys.argv[3]) metadata = [] for dataset in datasets: dataset = bunch.Bunch(**safe_dict(dataset)) try: output_path = output_paths[int(dataset.dataset_id)][0] except Exception: print('Output path for dataset %s not found on command line' % dataset.dataset_id, file=sys.stderr) sys.exit(1) try: if dataset.type == 'composite': files_path = output_paths[int(dataset.dataset_id)][1] metadata.append(add_composite_file(dataset, registry, output_path, files_path)) else: metadata.append(add_file(dataset, registry, output_path)) except UploadProblemException as e: metadata.append(file_err(unicodify(e), dataset)) __write_job_metadata(metadata) if __name__ == '__main__': __main__()