comparison irods_upload.py @ 2:0641ea2f75b1 draft

"planemo upload commit b2a00d9c24285fef0fb131d1832ecf4c337e5038-dirty"
author rhohensinner
date Fri, 02 Jul 2021 09:40:25 +0000
parents
children d2be2eb8350f
comparison
equal deleted inserted replaced
1:19c1cecdfdfd 2:0641ea2f75b1
1 #!/usr/bin/env python
2 # Processes uploads from the user.
3
4 # WARNING: Changes in this tool (particularly as related to parsing) may need
5 # to be reflected in galaxy.web.controllers.tool_runner and galaxy.tools
6 from __future__ import print_function
7
8 import errno
9 import os
10 import shutil
11 import sys
12 sys.path.append("/home/richard/galaxy/lib")
13 from json import dump, load, loads
14
15 from galaxy.datatypes import sniff
16 from galaxy.datatypes.registry import Registry
17 from galaxy.datatypes.upload_util import handle_upload, UploadProblemException
18 from galaxy.util import (
19 bunch,
20 safe_makedirs,
21 unicodify
22 )
23 from galaxy.util.compression_utils import CompressedFile
24
25 assert sys.version_info[:2] >= (2, 7)
26
27
28 _file_sources = None
29
30
31 def get_file_sources():
32 global _file_sources
33 if _file_sources is None:
34 from galaxy.files import ConfiguredFileSources
35 file_sources = None
36 if os.path.exists("file_sources.json"):
37 file_sources_as_dict = None
38 with open("file_sources.json") as f:
39 file_sources_as_dict = load(f)
40 if file_sources_as_dict is not None:
41 file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict)
42 if file_sources is None:
43 ConfiguredFileSources.from_dict([])
44 _file_sources = file_sources
45 return _file_sources
46
47
48 def file_err(msg, dataset):
49 # never remove a server-side upload
50 if dataset.type not in ('server_dir', 'path_paste'):
51 try:
52 os.remove(dataset.path)
53 except Exception:
54 pass
55 return dict(type='dataset',
56 ext='data',
57 dataset_id=dataset.dataset_id,
58 stderr=msg,
59 failed=True)
60
61
62 def safe_dict(d):
63 """Recursively clone JSON structure with unicode dictionary keys."""
64 if isinstance(d, dict):
65 return {unicodify(k): safe_dict(v) for k, v in d.items()}
66 elif isinstance(d, list):
67 return [safe_dict(x) for x in d]
68 else:
69 return d
70
71
72 def parse_outputs(args):
73 rval = {}
74 for arg in args:
75 id, files_path, path = arg.split(':', 2)
76 rval[int(id)] = (path, files_path)
77 return rval
78
79
80 def add_file(dataset, registry, output_path):
81 ext = None
82 compression_type = None
83 line_count = None
84 link_data_only_str = dataset.get('link_data_only', 'copy_files')
85 if link_data_only_str not in ['link_to_files', 'copy_files']:
86 raise UploadProblemException("Invalid setting '%s' for option link_data_only - upload request misconfigured" % link_data_only_str)
87 link_data_only = link_data_only_str == 'link_to_files'
88
89 # run_as_real_user is estimated from galaxy config (external chmod indicated of inputs executed)
90 # If this is True we always purge supplied upload inputs so they are cleaned up and we reuse their
91 # paths during data conversions since this user already owns that path.
92 # Older in_place check for upload jobs created before 18.01, TODO remove in 19.XX. xref #5206
93 run_as_real_user = dataset.get('run_as_real_user', False) or dataset.get("in_place", False)
94
95 # purge_source defaults to True unless this is an FTP import and
96 # ftp_upload_purge has been overridden to False in Galaxy's config.
97 # We set purge_source to False if:
98 # - the job does not have write access to the file, e.g. when running as the
99 # real user
100 # - the files are uploaded from external paths.
101 purge_source = dataset.get('purge_source', True) and not run_as_real_user and dataset.type not in ('server_dir', 'path_paste')
102
103 # in_place is True unless we are running as a real user or importing external paths (i.e.
104 # this is a real upload and not a path paste or ftp import).
105 # in_place should always be False if running as real user because the uploaded file will
106 # be owned by Galaxy and not the user and it should be False for external paths so Galaxy doesn't
107 # modify files not controlled by Galaxy.
108 in_place = not run_as_real_user and dataset.type not in ('server_dir', 'path_paste', 'ftp_import')
109
110 # Base on the check_upload_content Galaxy config option and on by default, this enables some
111 # security related checks on the uploaded content, but can prevent uploads from working in some cases.
112 check_content = dataset.get('check_content' , True)
113
114 # auto_decompress is a request flag that can be swapped off to prevent Galaxy from automatically
115 # decompressing archive files before sniffing.
116 auto_decompress = dataset.get('auto_decompress', True)
117 try:
118 dataset.file_type
119 except AttributeError:
120 raise UploadProblemException('Unable to process uploaded file, missing file_type parameter.')
121
122 if dataset.type == 'url':
123 try:
124 dataset.path = sniff.stream_url_to_file(dataset.path, file_sources=get_file_sources())
125 except Exception as e:
126 raise UploadProblemException('Unable to fetch %s\n%s' % (dataset.path, unicodify(e)))
127
128 # See if we have an empty file
129 if not os.path.exists(dataset.path):
130 raise UploadProblemException('Uploaded temporary file (%s) does not exist.' % dataset.path)
131
132 stdout, ext, datatype, is_binary, converted_path = handle_upload(
133 registry=registry,
134 path=dataset.path,
135 requested_ext=dataset.file_type,
136 name=dataset.name,
137 tmp_prefix='data_id_%s_upload_' % dataset.dataset_id,
138 tmp_dir=output_adjacent_tmpdir(output_path),
139 check_content=check_content,
140 link_data_only=link_data_only,
141 in_place=in_place,
142 auto_decompress=auto_decompress,
143 convert_to_posix_lines=dataset.to_posix_lines,
144 convert_spaces_to_tabs=dataset.space_to_tab,
145 )
146
147 # Strip compression extension from name
148 if compression_type and not getattr(datatype, 'compressed', False) and dataset.name.endswith('.' + compression_type):
149 dataset.name = dataset.name[:-len('.' + compression_type)]
150
151 # Move dataset
152 if link_data_only:
153 # Never alter a file that will not be copied to Galaxy's local file store.
154 if datatype.dataset_content_needs_grooming(dataset.path):
155 err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \
156 '<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.'
157 raise UploadProblemException(err_msg)
158 if not link_data_only:
159 # Move the dataset to its "real" path. converted_path is a tempfile so we move it even if purge_source is False.
160 if purge_source or converted_path:
161 try:
162 # If user has indicated that the original file to be purged and have converted_path tempfile
163 if purge_source and converted_path:
164 shutil.move(converted_path, output_path)
165 os.remove(dataset.path)
166 else:
167 shutil.move(converted_path or dataset.path, output_path)
168 except OSError as e:
169 # We may not have permission to remove the input
170 if e.errno != errno.EACCES:
171 raise
172 else:
173 shutil.copy(dataset.path, output_path)
174
175 # Write the job info
176 stdout = stdout or 'uploaded %s file' % ext
177 info = dict(type='dataset',
178 dataset_id=dataset.dataset_id,
179 ext=ext,
180 stdout=stdout,
181 name=dataset.name,
182 line_count=line_count)
183 if dataset.get('uuid', None) is not None:
184 info['uuid'] = dataset.get('uuid')
185 # FIXME: does this belong here? also not output-adjacent-tmpdir aware =/
186 if not link_data_only and datatype and datatype.dataset_content_needs_grooming(output_path):
187 # Groom the dataset content if necessary
188 datatype.groom_dataset_content(output_path)
189 return info
190
191
192 def add_composite_file(dataset, registry, output_path, files_path):
193 datatype = None
194
195 # Find data type
196 if dataset.file_type is not None:
197 datatype = registry.get_datatype_by_extension(dataset.file_type)
198
199 def to_path(path_or_url):
200 is_url = path_or_url.find('://') != -1 # todo fixme
201 if is_url:
202 try:
203 temp_name = sniff.stream_url_to_file(path_or_url, file_sources=get_file_sources())
204 except Exception as e:
205 raise UploadProblemException('Unable to fetch %s\n%s' % (path_or_url, unicodify(e)))
206
207 return temp_name, is_url
208
209 return path_or_url, is_url
210
211 def make_files_path():
212 safe_makedirs(files_path)
213
214 def stage_file(name, composite_file_path, is_binary=False):
215 dp = composite_file_path['path']
216 path, is_url = to_path(dp)
217 if is_url:
218 dataset.path = path
219 dp = path
220
221 auto_decompress = composite_file_path.get('auto_decompress', True)
222 if auto_decompress and not datatype.composite_type and CompressedFile.can_decompress(dp):
223 # It isn't an explicitly composite datatype, so these are just extra files to attach
224 # as composite data. It'd be better if Galaxy was communicating this to the tool
225 # a little more explicitly so we didn't need to dispatch on the datatype and so we
226 # could attach arbitrary extra composite data to an existing composite datatype if
227 # if need be? Perhaps that would be a mistake though.
228 CompressedFile(dp).extract(files_path)
229 else:
230 tmpdir = output_adjacent_tmpdir(output_path)
231 tmp_prefix = 'data_id_%s_convert_' % dataset.dataset_id
232 sniff.handle_composite_file(
233 datatype,
234 dp,
235 files_path,
236 name,
237 is_binary,
238 tmpdir,
239 tmp_prefix,
240 composite_file_path,
241 )
242
243 # Do we have pre-defined composite files from the datatype definition.
244 if dataset.composite_files:
245 make_files_path()
246 for name, value in dataset.composite_files.items():
247 value = bunch.Bunch(**value)
248 if value.name not in dataset.composite_file_paths:
249 raise UploadProblemException("Failed to find file_path %s in %s" % (value.name, dataset.composite_file_paths))
250 if dataset.composite_file_paths[value.name] is None and not value.optional:
251 raise UploadProblemException('A required composite data file was not provided (%s)' % name)
252 elif dataset.composite_file_paths[value.name] is not None:
253 composite_file_path = dataset.composite_file_paths[value.name]
254 stage_file(name, composite_file_path, value.is_binary)
255
256 # Do we have ad-hoc user supplied composite files.
257 elif dataset.composite_file_paths:
258 make_files_path()
259 for key, composite_file in dataset.composite_file_paths.items():
260 stage_file(key, composite_file) # TODO: replace these defaults
261
262 # Move the dataset to its "real" path
263 primary_file_path, _ = to_path(dataset.primary_file)
264 shutil.move(primary_file_path, output_path)
265
266 # Write the job info
267 return dict(type='dataset',
268 dataset_id=dataset.dataset_id,
269 stdout='uploaded %s file' % dataset.file_type)
270
271
272 def __read_paramfile(path):
273 with open(path) as fh:
274 obj = load(fh)
275 # If there's a single dataset in an old-style paramfile it'll still parse, but it'll be a dict
276 assert type(obj) == list
277 return obj
278
279
280 def __read_old_paramfile(path):
281 datasets = []
282 with open(path) as fh:
283 for line in fh:
284 datasets.append(loads(line))
285 return datasets
286
287
288 def __write_job_metadata(metadata):
289 # TODO: make upload/set_metadata compatible with https://github.com/galaxyproject/galaxy/pull/4437
290 with open('galaxy.json', 'w') as fh:
291 for meta in metadata:
292 dump(meta, fh)
293 fh.write('\n')
294
295
296 def output_adjacent_tmpdir(output_path):
297 """ For temp files that will ultimately be moved to output_path anyway
298 just create the file directly in output_path's directory so shutil.move
299 will work optimally.
300 """
301 return os.path.dirname(output_path)
302
303
304 def __main__():
305
306 if len(sys.argv) < 4:
307 print('usage: upload.py <root> <datatypes_conf> <json paramfile> <output spec> ...', file=sys.stderr)
308 sys.exit(1)
309
310 output_paths = parse_outputs(sys.argv[4:])
311
312 registry = Registry()
313 registry.load_datatypes(root_dir=sys.argv[1], config=sys.argv[2])
314
315 try:
316 datasets = __read_paramfile(sys.argv[3])
317 except (ValueError, AssertionError):
318 datasets = __read_old_paramfile(sys.argv[3])
319
320 metadata = []
321 for dataset in datasets:
322 dataset = bunch.Bunch(**safe_dict(dataset))
323 try:
324 output_path = output_paths[int(dataset.dataset_id)][0]
325 except Exception:
326 print('Output path for dataset %s not found on command line' % dataset.dataset_id, file=sys.stderr)
327 sys.exit(1)
328 try:
329 if dataset.type == 'composite':
330 files_path = output_paths[int(dataset.dataset_id)][1]
331 metadata.append(add_composite_file(dataset, registry, output_path, files_path))
332 else:
333 metadata.append(add_file(dataset, registry, output_path))
334 except UploadProblemException as e:
335 metadata.append(file_err(unicodify(e), dataset))
336 __write_job_metadata(metadata)
337
338
339 if __name__ == '__main__':
340 __main__()