Mercurial > repos > rhohensinner > galaxy_irods_interface
comparison irods_upload.py @ 2:0641ea2f75b1 draft
"planemo upload commit b2a00d9c24285fef0fb131d1832ecf4c337e5038-dirty"
author | rhohensinner |
---|---|
date | Fri, 02 Jul 2021 09:40:25 +0000 |
parents | |
children | d2be2eb8350f |
comparison
equal
deleted
inserted
replaced
1:19c1cecdfdfd | 2:0641ea2f75b1 |
---|---|
1 #!/usr/bin/env python | |
2 # Processes uploads from the user. | |
3 | |
4 # WARNING: Changes in this tool (particularly as related to parsing) may need | |
5 # to be reflected in galaxy.web.controllers.tool_runner and galaxy.tools | |
6 from __future__ import print_function | |
7 | |
8 import errno | |
9 import os | |
10 import shutil | |
11 import sys | |
12 sys.path.append("/home/richard/galaxy/lib") | |
13 from json import dump, load, loads | |
14 | |
15 from galaxy.datatypes import sniff | |
16 from galaxy.datatypes.registry import Registry | |
17 from galaxy.datatypes.upload_util import handle_upload, UploadProblemException | |
18 from galaxy.util import ( | |
19 bunch, | |
20 safe_makedirs, | |
21 unicodify | |
22 ) | |
23 from galaxy.util.compression_utils import CompressedFile | |
24 | |
25 assert sys.version_info[:2] >= (2, 7) | |
26 | |
27 | |
28 _file_sources = None | |
29 | |
30 | |
31 def get_file_sources(): | |
32 global _file_sources | |
33 if _file_sources is None: | |
34 from galaxy.files import ConfiguredFileSources | |
35 file_sources = None | |
36 if os.path.exists("file_sources.json"): | |
37 file_sources_as_dict = None | |
38 with open("file_sources.json") as f: | |
39 file_sources_as_dict = load(f) | |
40 if file_sources_as_dict is not None: | |
41 file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict) | |
42 if file_sources is None: | |
43 ConfiguredFileSources.from_dict([]) | |
44 _file_sources = file_sources | |
45 return _file_sources | |
46 | |
47 | |
48 def file_err(msg, dataset): | |
49 # never remove a server-side upload | |
50 if dataset.type not in ('server_dir', 'path_paste'): | |
51 try: | |
52 os.remove(dataset.path) | |
53 except Exception: | |
54 pass | |
55 return dict(type='dataset', | |
56 ext='data', | |
57 dataset_id=dataset.dataset_id, | |
58 stderr=msg, | |
59 failed=True) | |
60 | |
61 | |
62 def safe_dict(d): | |
63 """Recursively clone JSON structure with unicode dictionary keys.""" | |
64 if isinstance(d, dict): | |
65 return {unicodify(k): safe_dict(v) for k, v in d.items()} | |
66 elif isinstance(d, list): | |
67 return [safe_dict(x) for x in d] | |
68 else: | |
69 return d | |
70 | |
71 | |
72 def parse_outputs(args): | |
73 rval = {} | |
74 for arg in args: | |
75 id, files_path, path = arg.split(':', 2) | |
76 rval[int(id)] = (path, files_path) | |
77 return rval | |
78 | |
79 | |
80 def add_file(dataset, registry, output_path): | |
81 ext = None | |
82 compression_type = None | |
83 line_count = None | |
84 link_data_only_str = dataset.get('link_data_only', 'copy_files') | |
85 if link_data_only_str not in ['link_to_files', 'copy_files']: | |
86 raise UploadProblemException("Invalid setting '%s' for option link_data_only - upload request misconfigured" % link_data_only_str) | |
87 link_data_only = link_data_only_str == 'link_to_files' | |
88 | |
89 # run_as_real_user is estimated from galaxy config (external chmod indicated of inputs executed) | |
90 # If this is True we always purge supplied upload inputs so they are cleaned up and we reuse their | |
91 # paths during data conversions since this user already owns that path. | |
92 # Older in_place check for upload jobs created before 18.01, TODO remove in 19.XX. xref #5206 | |
93 run_as_real_user = dataset.get('run_as_real_user', False) or dataset.get("in_place", False) | |
94 | |
95 # purge_source defaults to True unless this is an FTP import and | |
96 # ftp_upload_purge has been overridden to False in Galaxy's config. | |
97 # We set purge_source to False if: | |
98 # - the job does not have write access to the file, e.g. when running as the | |
99 # real user | |
100 # - the files are uploaded from external paths. | |
101 purge_source = dataset.get('purge_source', True) and not run_as_real_user and dataset.type not in ('server_dir', 'path_paste') | |
102 | |
103 # in_place is True unless we are running as a real user or importing external paths (i.e. | |
104 # this is a real upload and not a path paste or ftp import). | |
105 # in_place should always be False if running as real user because the uploaded file will | |
106 # be owned by Galaxy and not the user and it should be False for external paths so Galaxy doesn't | |
107 # modify files not controlled by Galaxy. | |
108 in_place = not run_as_real_user and dataset.type not in ('server_dir', 'path_paste', 'ftp_import') | |
109 | |
110 # Base on the check_upload_content Galaxy config option and on by default, this enables some | |
111 # security related checks on the uploaded content, but can prevent uploads from working in some cases. | |
112 check_content = dataset.get('check_content' , True) | |
113 | |
114 # auto_decompress is a request flag that can be swapped off to prevent Galaxy from automatically | |
115 # decompressing archive files before sniffing. | |
116 auto_decompress = dataset.get('auto_decompress', True) | |
117 try: | |
118 dataset.file_type | |
119 except AttributeError: | |
120 raise UploadProblemException('Unable to process uploaded file, missing file_type parameter.') | |
121 | |
122 if dataset.type == 'url': | |
123 try: | |
124 dataset.path = sniff.stream_url_to_file(dataset.path, file_sources=get_file_sources()) | |
125 except Exception as e: | |
126 raise UploadProblemException('Unable to fetch %s\n%s' % (dataset.path, unicodify(e))) | |
127 | |
128 # See if we have an empty file | |
129 if not os.path.exists(dataset.path): | |
130 raise UploadProblemException('Uploaded temporary file (%s) does not exist.' % dataset.path) | |
131 | |
132 stdout, ext, datatype, is_binary, converted_path = handle_upload( | |
133 registry=registry, | |
134 path=dataset.path, | |
135 requested_ext=dataset.file_type, | |
136 name=dataset.name, | |
137 tmp_prefix='data_id_%s_upload_' % dataset.dataset_id, | |
138 tmp_dir=output_adjacent_tmpdir(output_path), | |
139 check_content=check_content, | |
140 link_data_only=link_data_only, | |
141 in_place=in_place, | |
142 auto_decompress=auto_decompress, | |
143 convert_to_posix_lines=dataset.to_posix_lines, | |
144 convert_spaces_to_tabs=dataset.space_to_tab, | |
145 ) | |
146 | |
147 # Strip compression extension from name | |
148 if compression_type and not getattr(datatype, 'compressed', False) and dataset.name.endswith('.' + compression_type): | |
149 dataset.name = dataset.name[:-len('.' + compression_type)] | |
150 | |
151 # Move dataset | |
152 if link_data_only: | |
153 # Never alter a file that will not be copied to Galaxy's local file store. | |
154 if datatype.dataset_content_needs_grooming(dataset.path): | |
155 err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \ | |
156 '<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.' | |
157 raise UploadProblemException(err_msg) | |
158 if not link_data_only: | |
159 # Move the dataset to its "real" path. converted_path is a tempfile so we move it even if purge_source is False. | |
160 if purge_source or converted_path: | |
161 try: | |
162 # If user has indicated that the original file to be purged and have converted_path tempfile | |
163 if purge_source and converted_path: | |
164 shutil.move(converted_path, output_path) | |
165 os.remove(dataset.path) | |
166 else: | |
167 shutil.move(converted_path or dataset.path, output_path) | |
168 except OSError as e: | |
169 # We may not have permission to remove the input | |
170 if e.errno != errno.EACCES: | |
171 raise | |
172 else: | |
173 shutil.copy(dataset.path, output_path) | |
174 | |
175 # Write the job info | |
176 stdout = stdout or 'uploaded %s file' % ext | |
177 info = dict(type='dataset', | |
178 dataset_id=dataset.dataset_id, | |
179 ext=ext, | |
180 stdout=stdout, | |
181 name=dataset.name, | |
182 line_count=line_count) | |
183 if dataset.get('uuid', None) is not None: | |
184 info['uuid'] = dataset.get('uuid') | |
185 # FIXME: does this belong here? also not output-adjacent-tmpdir aware =/ | |
186 if not link_data_only and datatype and datatype.dataset_content_needs_grooming(output_path): | |
187 # Groom the dataset content if necessary | |
188 datatype.groom_dataset_content(output_path) | |
189 return info | |
190 | |
191 | |
192 def add_composite_file(dataset, registry, output_path, files_path): | |
193 datatype = None | |
194 | |
195 # Find data type | |
196 if dataset.file_type is not None: | |
197 datatype = registry.get_datatype_by_extension(dataset.file_type) | |
198 | |
199 def to_path(path_or_url): | |
200 is_url = path_or_url.find('://') != -1 # todo fixme | |
201 if is_url: | |
202 try: | |
203 temp_name = sniff.stream_url_to_file(path_or_url, file_sources=get_file_sources()) | |
204 except Exception as e: | |
205 raise UploadProblemException('Unable to fetch %s\n%s' % (path_or_url, unicodify(e))) | |
206 | |
207 return temp_name, is_url | |
208 | |
209 return path_or_url, is_url | |
210 | |
211 def make_files_path(): | |
212 safe_makedirs(files_path) | |
213 | |
214 def stage_file(name, composite_file_path, is_binary=False): | |
215 dp = composite_file_path['path'] | |
216 path, is_url = to_path(dp) | |
217 if is_url: | |
218 dataset.path = path | |
219 dp = path | |
220 | |
221 auto_decompress = composite_file_path.get('auto_decompress', True) | |
222 if auto_decompress and not datatype.composite_type and CompressedFile.can_decompress(dp): | |
223 # It isn't an explicitly composite datatype, so these are just extra files to attach | |
224 # as composite data. It'd be better if Galaxy was communicating this to the tool | |
225 # a little more explicitly so we didn't need to dispatch on the datatype and so we | |
226 # could attach arbitrary extra composite data to an existing composite datatype if | |
227 # if need be? Perhaps that would be a mistake though. | |
228 CompressedFile(dp).extract(files_path) | |
229 else: | |
230 tmpdir = output_adjacent_tmpdir(output_path) | |
231 tmp_prefix = 'data_id_%s_convert_' % dataset.dataset_id | |
232 sniff.handle_composite_file( | |
233 datatype, | |
234 dp, | |
235 files_path, | |
236 name, | |
237 is_binary, | |
238 tmpdir, | |
239 tmp_prefix, | |
240 composite_file_path, | |
241 ) | |
242 | |
243 # Do we have pre-defined composite files from the datatype definition. | |
244 if dataset.composite_files: | |
245 make_files_path() | |
246 for name, value in dataset.composite_files.items(): | |
247 value = bunch.Bunch(**value) | |
248 if value.name not in dataset.composite_file_paths: | |
249 raise UploadProblemException("Failed to find file_path %s in %s" % (value.name, dataset.composite_file_paths)) | |
250 if dataset.composite_file_paths[value.name] is None and not value.optional: | |
251 raise UploadProblemException('A required composite data file was not provided (%s)' % name) | |
252 elif dataset.composite_file_paths[value.name] is not None: | |
253 composite_file_path = dataset.composite_file_paths[value.name] | |
254 stage_file(name, composite_file_path, value.is_binary) | |
255 | |
256 # Do we have ad-hoc user supplied composite files. | |
257 elif dataset.composite_file_paths: | |
258 make_files_path() | |
259 for key, composite_file in dataset.composite_file_paths.items(): | |
260 stage_file(key, composite_file) # TODO: replace these defaults | |
261 | |
262 # Move the dataset to its "real" path | |
263 primary_file_path, _ = to_path(dataset.primary_file) | |
264 shutil.move(primary_file_path, output_path) | |
265 | |
266 # Write the job info | |
267 return dict(type='dataset', | |
268 dataset_id=dataset.dataset_id, | |
269 stdout='uploaded %s file' % dataset.file_type) | |
270 | |
271 | |
272 def __read_paramfile(path): | |
273 with open(path) as fh: | |
274 obj = load(fh) | |
275 # If there's a single dataset in an old-style paramfile it'll still parse, but it'll be a dict | |
276 assert type(obj) == list | |
277 return obj | |
278 | |
279 | |
280 def __read_old_paramfile(path): | |
281 datasets = [] | |
282 with open(path) as fh: | |
283 for line in fh: | |
284 datasets.append(loads(line)) | |
285 return datasets | |
286 | |
287 | |
288 def __write_job_metadata(metadata): | |
289 # TODO: make upload/set_metadata compatible with https://github.com/galaxyproject/galaxy/pull/4437 | |
290 with open('galaxy.json', 'w') as fh: | |
291 for meta in metadata: | |
292 dump(meta, fh) | |
293 fh.write('\n') | |
294 | |
295 | |
296 def output_adjacent_tmpdir(output_path): | |
297 """ For temp files that will ultimately be moved to output_path anyway | |
298 just create the file directly in output_path's directory so shutil.move | |
299 will work optimally. | |
300 """ | |
301 return os.path.dirname(output_path) | |
302 | |
303 | |
304 def __main__(): | |
305 | |
306 if len(sys.argv) < 4: | |
307 print('usage: upload.py <root> <datatypes_conf> <json paramfile> <output spec> ...', file=sys.stderr) | |
308 sys.exit(1) | |
309 | |
310 output_paths = parse_outputs(sys.argv[4:]) | |
311 | |
312 registry = Registry() | |
313 registry.load_datatypes(root_dir=sys.argv[1], config=sys.argv[2]) | |
314 | |
315 try: | |
316 datasets = __read_paramfile(sys.argv[3]) | |
317 except (ValueError, AssertionError): | |
318 datasets = __read_old_paramfile(sys.argv[3]) | |
319 | |
320 metadata = [] | |
321 for dataset in datasets: | |
322 dataset = bunch.Bunch(**safe_dict(dataset)) | |
323 try: | |
324 output_path = output_paths[int(dataset.dataset_id)][0] | |
325 except Exception: | |
326 print('Output path for dataset %s not found on command line' % dataset.dataset_id, file=sys.stderr) | |
327 sys.exit(1) | |
328 try: | |
329 if dataset.type == 'composite': | |
330 files_path = output_paths[int(dataset.dataset_id)][1] | |
331 metadata.append(add_composite_file(dataset, registry, output_path, files_path)) | |
332 else: | |
333 metadata.append(add_file(dataset, registry, output_path)) | |
334 except UploadProblemException as e: | |
335 metadata.append(file_err(unicodify(e), dataset)) | |
336 __write_job_metadata(metadata) | |
337 | |
338 | |
339 if __name__ == '__main__': | |
340 __main__() |