Mercurial > repos > rhohensinner > galaxy_irods_interface
comparison irods_upload.py @ 2:0641ea2f75b1 draft
"planemo upload commit b2a00d9c24285fef0fb131d1832ecf4c337e5038-dirty"
| author | rhohensinner |
|---|---|
| date | Fri, 02 Jul 2021 09:40:25 +0000 |
| parents | |
| children | d2be2eb8350f |
comparison
equal
deleted
inserted
replaced
| 1:19c1cecdfdfd | 2:0641ea2f75b1 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 # Processes uploads from the user. | |
| 3 | |
| 4 # WARNING: Changes in this tool (particularly as related to parsing) may need | |
| 5 # to be reflected in galaxy.web.controllers.tool_runner and galaxy.tools | |
| 6 from __future__ import print_function | |
| 7 | |
| 8 import errno | |
| 9 import os | |
| 10 import shutil | |
| 11 import sys | |
| 12 sys.path.append("/home/richard/galaxy/lib") | |
| 13 from json import dump, load, loads | |
| 14 | |
| 15 from galaxy.datatypes import sniff | |
| 16 from galaxy.datatypes.registry import Registry | |
| 17 from galaxy.datatypes.upload_util import handle_upload, UploadProblemException | |
| 18 from galaxy.util import ( | |
| 19 bunch, | |
| 20 safe_makedirs, | |
| 21 unicodify | |
| 22 ) | |
| 23 from galaxy.util.compression_utils import CompressedFile | |
| 24 | |
| 25 assert sys.version_info[:2] >= (2, 7) | |
| 26 | |
| 27 | |
| 28 _file_sources = None | |
| 29 | |
| 30 | |
| 31 def get_file_sources(): | |
| 32 global _file_sources | |
| 33 if _file_sources is None: | |
| 34 from galaxy.files import ConfiguredFileSources | |
| 35 file_sources = None | |
| 36 if os.path.exists("file_sources.json"): | |
| 37 file_sources_as_dict = None | |
| 38 with open("file_sources.json") as f: | |
| 39 file_sources_as_dict = load(f) | |
| 40 if file_sources_as_dict is not None: | |
| 41 file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict) | |
| 42 if file_sources is None: | |
| 43 ConfiguredFileSources.from_dict([]) | |
| 44 _file_sources = file_sources | |
| 45 return _file_sources | |
| 46 | |
| 47 | |
| 48 def file_err(msg, dataset): | |
| 49 # never remove a server-side upload | |
| 50 if dataset.type not in ('server_dir', 'path_paste'): | |
| 51 try: | |
| 52 os.remove(dataset.path) | |
| 53 except Exception: | |
| 54 pass | |
| 55 return dict(type='dataset', | |
| 56 ext='data', | |
| 57 dataset_id=dataset.dataset_id, | |
| 58 stderr=msg, | |
| 59 failed=True) | |
| 60 | |
| 61 | |
| 62 def safe_dict(d): | |
| 63 """Recursively clone JSON structure with unicode dictionary keys.""" | |
| 64 if isinstance(d, dict): | |
| 65 return {unicodify(k): safe_dict(v) for k, v in d.items()} | |
| 66 elif isinstance(d, list): | |
| 67 return [safe_dict(x) for x in d] | |
| 68 else: | |
| 69 return d | |
| 70 | |
| 71 | |
| 72 def parse_outputs(args): | |
| 73 rval = {} | |
| 74 for arg in args: | |
| 75 id, files_path, path = arg.split(':', 2) | |
| 76 rval[int(id)] = (path, files_path) | |
| 77 return rval | |
| 78 | |
| 79 | |
| 80 def add_file(dataset, registry, output_path): | |
| 81 ext = None | |
| 82 compression_type = None | |
| 83 line_count = None | |
| 84 link_data_only_str = dataset.get('link_data_only', 'copy_files') | |
| 85 if link_data_only_str not in ['link_to_files', 'copy_files']: | |
| 86 raise UploadProblemException("Invalid setting '%s' for option link_data_only - upload request misconfigured" % link_data_only_str) | |
| 87 link_data_only = link_data_only_str == 'link_to_files' | |
| 88 | |
| 89 # run_as_real_user is estimated from galaxy config (external chmod indicated of inputs executed) | |
| 90 # If this is True we always purge supplied upload inputs so they are cleaned up and we reuse their | |
| 91 # paths during data conversions since this user already owns that path. | |
| 92 # Older in_place check for upload jobs created before 18.01, TODO remove in 19.XX. xref #5206 | |
| 93 run_as_real_user = dataset.get('run_as_real_user', False) or dataset.get("in_place", False) | |
| 94 | |
| 95 # purge_source defaults to True unless this is an FTP import and | |
| 96 # ftp_upload_purge has been overridden to False in Galaxy's config. | |
| 97 # We set purge_source to False if: | |
| 98 # - the job does not have write access to the file, e.g. when running as the | |
| 99 # real user | |
| 100 # - the files are uploaded from external paths. | |
| 101 purge_source = dataset.get('purge_source', True) and not run_as_real_user and dataset.type not in ('server_dir', 'path_paste') | |
| 102 | |
| 103 # in_place is True unless we are running as a real user or importing external paths (i.e. | |
| 104 # this is a real upload and not a path paste or ftp import). | |
| 105 # in_place should always be False if running as real user because the uploaded file will | |
| 106 # be owned by Galaxy and not the user and it should be False for external paths so Galaxy doesn't | |
| 107 # modify files not controlled by Galaxy. | |
| 108 in_place = not run_as_real_user and dataset.type not in ('server_dir', 'path_paste', 'ftp_import') | |
| 109 | |
| 110 # Base on the check_upload_content Galaxy config option and on by default, this enables some | |
| 111 # security related checks on the uploaded content, but can prevent uploads from working in some cases. | |
| 112 check_content = dataset.get('check_content' , True) | |
| 113 | |
| 114 # auto_decompress is a request flag that can be swapped off to prevent Galaxy from automatically | |
| 115 # decompressing archive files before sniffing. | |
| 116 auto_decompress = dataset.get('auto_decompress', True) | |
| 117 try: | |
| 118 dataset.file_type | |
| 119 except AttributeError: | |
| 120 raise UploadProblemException('Unable to process uploaded file, missing file_type parameter.') | |
| 121 | |
| 122 if dataset.type == 'url': | |
| 123 try: | |
| 124 dataset.path = sniff.stream_url_to_file(dataset.path, file_sources=get_file_sources()) | |
| 125 except Exception as e: | |
| 126 raise UploadProblemException('Unable to fetch %s\n%s' % (dataset.path, unicodify(e))) | |
| 127 | |
| 128 # See if we have an empty file | |
| 129 if not os.path.exists(dataset.path): | |
| 130 raise UploadProblemException('Uploaded temporary file (%s) does not exist.' % dataset.path) | |
| 131 | |
| 132 stdout, ext, datatype, is_binary, converted_path = handle_upload( | |
| 133 registry=registry, | |
| 134 path=dataset.path, | |
| 135 requested_ext=dataset.file_type, | |
| 136 name=dataset.name, | |
| 137 tmp_prefix='data_id_%s_upload_' % dataset.dataset_id, | |
| 138 tmp_dir=output_adjacent_tmpdir(output_path), | |
| 139 check_content=check_content, | |
| 140 link_data_only=link_data_only, | |
| 141 in_place=in_place, | |
| 142 auto_decompress=auto_decompress, | |
| 143 convert_to_posix_lines=dataset.to_posix_lines, | |
| 144 convert_spaces_to_tabs=dataset.space_to_tab, | |
| 145 ) | |
| 146 | |
| 147 # Strip compression extension from name | |
| 148 if compression_type and not getattr(datatype, 'compressed', False) and dataset.name.endswith('.' + compression_type): | |
| 149 dataset.name = dataset.name[:-len('.' + compression_type)] | |
| 150 | |
| 151 # Move dataset | |
| 152 if link_data_only: | |
| 153 # Never alter a file that will not be copied to Galaxy's local file store. | |
| 154 if datatype.dataset_content_needs_grooming(dataset.path): | |
| 155 err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \ | |
| 156 '<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.' | |
| 157 raise UploadProblemException(err_msg) | |
| 158 if not link_data_only: | |
| 159 # Move the dataset to its "real" path. converted_path is a tempfile so we move it even if purge_source is False. | |
| 160 if purge_source or converted_path: | |
| 161 try: | |
| 162 # If user has indicated that the original file to be purged and have converted_path tempfile | |
| 163 if purge_source and converted_path: | |
| 164 shutil.move(converted_path, output_path) | |
| 165 os.remove(dataset.path) | |
| 166 else: | |
| 167 shutil.move(converted_path or dataset.path, output_path) | |
| 168 except OSError as e: | |
| 169 # We may not have permission to remove the input | |
| 170 if e.errno != errno.EACCES: | |
| 171 raise | |
| 172 else: | |
| 173 shutil.copy(dataset.path, output_path) | |
| 174 | |
| 175 # Write the job info | |
| 176 stdout = stdout or 'uploaded %s file' % ext | |
| 177 info = dict(type='dataset', | |
| 178 dataset_id=dataset.dataset_id, | |
| 179 ext=ext, | |
| 180 stdout=stdout, | |
| 181 name=dataset.name, | |
| 182 line_count=line_count) | |
| 183 if dataset.get('uuid', None) is not None: | |
| 184 info['uuid'] = dataset.get('uuid') | |
| 185 # FIXME: does this belong here? also not output-adjacent-tmpdir aware =/ | |
| 186 if not link_data_only and datatype and datatype.dataset_content_needs_grooming(output_path): | |
| 187 # Groom the dataset content if necessary | |
| 188 datatype.groom_dataset_content(output_path) | |
| 189 return info | |
| 190 | |
| 191 | |
| 192 def add_composite_file(dataset, registry, output_path, files_path): | |
| 193 datatype = None | |
| 194 | |
| 195 # Find data type | |
| 196 if dataset.file_type is not None: | |
| 197 datatype = registry.get_datatype_by_extension(dataset.file_type) | |
| 198 | |
| 199 def to_path(path_or_url): | |
| 200 is_url = path_or_url.find('://') != -1 # todo fixme | |
| 201 if is_url: | |
| 202 try: | |
| 203 temp_name = sniff.stream_url_to_file(path_or_url, file_sources=get_file_sources()) | |
| 204 except Exception as e: | |
| 205 raise UploadProblemException('Unable to fetch %s\n%s' % (path_or_url, unicodify(e))) | |
| 206 | |
| 207 return temp_name, is_url | |
| 208 | |
| 209 return path_or_url, is_url | |
| 210 | |
| 211 def make_files_path(): | |
| 212 safe_makedirs(files_path) | |
| 213 | |
| 214 def stage_file(name, composite_file_path, is_binary=False): | |
| 215 dp = composite_file_path['path'] | |
| 216 path, is_url = to_path(dp) | |
| 217 if is_url: | |
| 218 dataset.path = path | |
| 219 dp = path | |
| 220 | |
| 221 auto_decompress = composite_file_path.get('auto_decompress', True) | |
| 222 if auto_decompress and not datatype.composite_type and CompressedFile.can_decompress(dp): | |
| 223 # It isn't an explicitly composite datatype, so these are just extra files to attach | |
| 224 # as composite data. It'd be better if Galaxy was communicating this to the tool | |
| 225 # a little more explicitly so we didn't need to dispatch on the datatype and so we | |
| 226 # could attach arbitrary extra composite data to an existing composite datatype if | |
| 227 # if need be? Perhaps that would be a mistake though. | |
| 228 CompressedFile(dp).extract(files_path) | |
| 229 else: | |
| 230 tmpdir = output_adjacent_tmpdir(output_path) | |
| 231 tmp_prefix = 'data_id_%s_convert_' % dataset.dataset_id | |
| 232 sniff.handle_composite_file( | |
| 233 datatype, | |
| 234 dp, | |
| 235 files_path, | |
| 236 name, | |
| 237 is_binary, | |
| 238 tmpdir, | |
| 239 tmp_prefix, | |
| 240 composite_file_path, | |
| 241 ) | |
| 242 | |
| 243 # Do we have pre-defined composite files from the datatype definition. | |
| 244 if dataset.composite_files: | |
| 245 make_files_path() | |
| 246 for name, value in dataset.composite_files.items(): | |
| 247 value = bunch.Bunch(**value) | |
| 248 if value.name not in dataset.composite_file_paths: | |
| 249 raise UploadProblemException("Failed to find file_path %s in %s" % (value.name, dataset.composite_file_paths)) | |
| 250 if dataset.composite_file_paths[value.name] is None and not value.optional: | |
| 251 raise UploadProblemException('A required composite data file was not provided (%s)' % name) | |
| 252 elif dataset.composite_file_paths[value.name] is not None: | |
| 253 composite_file_path = dataset.composite_file_paths[value.name] | |
| 254 stage_file(name, composite_file_path, value.is_binary) | |
| 255 | |
| 256 # Do we have ad-hoc user supplied composite files. | |
| 257 elif dataset.composite_file_paths: | |
| 258 make_files_path() | |
| 259 for key, composite_file in dataset.composite_file_paths.items(): | |
| 260 stage_file(key, composite_file) # TODO: replace these defaults | |
| 261 | |
| 262 # Move the dataset to its "real" path | |
| 263 primary_file_path, _ = to_path(dataset.primary_file) | |
| 264 shutil.move(primary_file_path, output_path) | |
| 265 | |
| 266 # Write the job info | |
| 267 return dict(type='dataset', | |
| 268 dataset_id=dataset.dataset_id, | |
| 269 stdout='uploaded %s file' % dataset.file_type) | |
| 270 | |
| 271 | |
| 272 def __read_paramfile(path): | |
| 273 with open(path) as fh: | |
| 274 obj = load(fh) | |
| 275 # If there's a single dataset in an old-style paramfile it'll still parse, but it'll be a dict | |
| 276 assert type(obj) == list | |
| 277 return obj | |
| 278 | |
| 279 | |
| 280 def __read_old_paramfile(path): | |
| 281 datasets = [] | |
| 282 with open(path) as fh: | |
| 283 for line in fh: | |
| 284 datasets.append(loads(line)) | |
| 285 return datasets | |
| 286 | |
| 287 | |
| 288 def __write_job_metadata(metadata): | |
| 289 # TODO: make upload/set_metadata compatible with https://github.com/galaxyproject/galaxy/pull/4437 | |
| 290 with open('galaxy.json', 'w') as fh: | |
| 291 for meta in metadata: | |
| 292 dump(meta, fh) | |
| 293 fh.write('\n') | |
| 294 | |
| 295 | |
| 296 def output_adjacent_tmpdir(output_path): | |
| 297 """ For temp files that will ultimately be moved to output_path anyway | |
| 298 just create the file directly in output_path's directory so shutil.move | |
| 299 will work optimally. | |
| 300 """ | |
| 301 return os.path.dirname(output_path) | |
| 302 | |
| 303 | |
| 304 def __main__(): | |
| 305 | |
| 306 if len(sys.argv) < 4: | |
| 307 print('usage: upload.py <root> <datatypes_conf> <json paramfile> <output spec> ...', file=sys.stderr) | |
| 308 sys.exit(1) | |
| 309 | |
| 310 output_paths = parse_outputs(sys.argv[4:]) | |
| 311 | |
| 312 registry = Registry() | |
| 313 registry.load_datatypes(root_dir=sys.argv[1], config=sys.argv[2]) | |
| 314 | |
| 315 try: | |
| 316 datasets = __read_paramfile(sys.argv[3]) | |
| 317 except (ValueError, AssertionError): | |
| 318 datasets = __read_old_paramfile(sys.argv[3]) | |
| 319 | |
| 320 metadata = [] | |
| 321 for dataset in datasets: | |
| 322 dataset = bunch.Bunch(**safe_dict(dataset)) | |
| 323 try: | |
| 324 output_path = output_paths[int(dataset.dataset_id)][0] | |
| 325 except Exception: | |
| 326 print('Output path for dataset %s not found on command line' % dataset.dataset_id, file=sys.stderr) | |
| 327 sys.exit(1) | |
| 328 try: | |
| 329 if dataset.type == 'composite': | |
| 330 files_path = output_paths[int(dataset.dataset_id)][1] | |
| 331 metadata.append(add_composite_file(dataset, registry, output_path, files_path)) | |
| 332 else: | |
| 333 metadata.append(add_file(dataset, registry, output_path)) | |
| 334 except UploadProblemException as e: | |
| 335 metadata.append(file_err(unicodify(e), dataset)) | |
| 336 __write_job_metadata(metadata) | |
| 337 | |
| 338 | |
| 339 if __name__ == '__main__': | |
| 340 __main__() |
