Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/galaxy/tool_util/cwl/util.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 """Client-centric CWL-related utilities. | |
2 | |
3 Used to share code between the Galaxy test framework | |
4 and other Galaxy CWL clients (e.g. Planemo).""" | |
5 import hashlib | |
6 import json | |
7 import os | |
8 import tarfile | |
9 import tempfile | |
10 from collections import namedtuple | |
11 | |
12 import yaml | |
13 from six import ( | |
14 BytesIO, | |
15 iteritems, | |
16 python_2_unicode_compatible | |
17 ) | |
18 | |
19 from galaxy.util import unicodify | |
20 | |
21 STORE_SECONDARY_FILES_WITH_BASENAME = True | |
22 SECONDARY_FILES_EXTRA_PREFIX = "__secondary_files__" | |
23 SECONDARY_FILES_INDEX_PATH = "__secondary_files_index.json" | |
24 | |
25 | |
26 def set_basename_and_derived_properties(properties, basename): | |
27 properties["basename"] = basename | |
28 properties["nameroot"], properties["nameext"] = os.path.splitext(basename) | |
29 return properties | |
30 | |
31 | |
32 def output_properties(path=None, content=None, basename=None, pseduo_location=False): | |
33 checksum = hashlib.sha1() | |
34 properties = { | |
35 "class": "File", | |
36 } | |
37 if path is not None: | |
38 properties["path"] = path | |
39 f = open(path, "rb") | |
40 else: | |
41 f = BytesIO(content) | |
42 | |
43 try: | |
44 contents = f.read(1024 * 1024) | |
45 filesize = 0 | |
46 while contents: | |
47 checksum.update(contents) | |
48 filesize += len(contents) | |
49 contents = f.read(1024 * 1024) | |
50 finally: | |
51 f.close() | |
52 properties["checksum"] = "sha1$%s" % checksum.hexdigest() | |
53 properties["size"] = filesize | |
54 set_basename_and_derived_properties(properties, basename) | |
55 _handle_pseudo_location(properties, pseduo_location) | |
56 return properties | |
57 | |
58 | |
59 def _handle_pseudo_location(properties, pseduo_location): | |
60 if pseduo_location: | |
61 properties["location"] = properties["basename"] | |
62 | |
63 | |
64 def abs_path_or_uri(path_or_uri, relative_to): | |
65 """Return an absolute path if this isn't a URI, otherwise keep the URI the same. | |
66 """ | |
67 is_uri = "://" in path_or_uri | |
68 if not is_uri and not os.path.isabs(path_or_uri): | |
69 path_or_uri = os.path.join(relative_to, path_or_uri) | |
70 if not is_uri: | |
71 _ensure_file_exists(path_or_uri) | |
72 return path_or_uri | |
73 | |
74 | |
75 def abs_path(path_or_uri, relative_to): | |
76 path_or_uri = abs_path_or_uri(path_or_uri, relative_to) | |
77 if path_or_uri.startswith("file://"): | |
78 path_or_uri = path_or_uri[len("file://"):] | |
79 | |
80 return path_or_uri | |
81 | |
82 | |
83 def path_or_uri_to_uri(path_or_uri): | |
84 if "://" not in path_or_uri: | |
85 return "file://%s" % path_or_uri | |
86 else: | |
87 return path_or_uri | |
88 | |
89 | |
90 def galactic_job_json( | |
91 job, test_data_directory, upload_func, collection_create_func, tool_or_workflow="workflow" | |
92 ): | |
93 """Adapt a CWL job object to the Galaxy API. | |
94 | |
95 CWL derived tools in Galaxy can consume a job description sort of like | |
96 CWL job objects via the API but paths need to be replaced with datasets | |
97 and records and arrays with collection references. This function will | |
98 stage files and modify the job description to adapt to these changes | |
99 for Galaxy. | |
100 """ | |
101 | |
102 datasets = [] | |
103 dataset_collections = [] | |
104 | |
105 def response_to_hda(target, upload_response): | |
106 assert isinstance(upload_response, dict), upload_response | |
107 assert "outputs" in upload_response, upload_response | |
108 assert len(upload_response["outputs"]) > 0, upload_response | |
109 dataset = upload_response["outputs"][0] | |
110 datasets.append((dataset, target)) | |
111 dataset_id = dataset["id"] | |
112 return {"src": "hda", "id": dataset_id} | |
113 | |
114 def upload_file(file_path, secondary_files, **kwargs): | |
115 file_path = abs_path_or_uri(file_path, test_data_directory) | |
116 target = FileUploadTarget(file_path, secondary_files, **kwargs) | |
117 upload_response = upload_func(target) | |
118 return response_to_hda(target, upload_response) | |
119 | |
120 def upload_file_literal(contents, **kwd): | |
121 target = FileLiteralTarget(contents, **kwd) | |
122 upload_response = upload_func(target) | |
123 return response_to_hda(target, upload_response) | |
124 | |
125 def upload_tar(file_path): | |
126 file_path = abs_path_or_uri(file_path, test_data_directory) | |
127 target = DirectoryUploadTarget(file_path) | |
128 upload_response = upload_func(target) | |
129 return response_to_hda(target, upload_response) | |
130 | |
131 def upload_file_with_composite_data(file_path, composite_data, **kwargs): | |
132 if file_path is not None: | |
133 file_path = abs_path_or_uri(file_path, test_data_directory) | |
134 composite_data_resolved = [] | |
135 for cd in composite_data: | |
136 composite_data_resolved.append(abs_path_or_uri(cd, test_data_directory)) | |
137 target = FileUploadTarget(file_path, composite_data=composite_data_resolved, **kwargs) | |
138 upload_response = upload_func(target) | |
139 return response_to_hda(target, upload_response) | |
140 | |
141 def upload_object(the_object): | |
142 target = ObjectUploadTarget(the_object) | |
143 upload_response = upload_func(target) | |
144 return response_to_hda(target, upload_response) | |
145 | |
146 def replacement_item(value, force_to_file=False): | |
147 is_dict = isinstance(value, dict) | |
148 item_class = None if not is_dict else value.get("class", None) | |
149 is_file = item_class == "File" | |
150 is_directory = item_class == "Directory" | |
151 is_collection = item_class == "Collection" # Galaxy extension. | |
152 | |
153 if force_to_file: | |
154 if is_file: | |
155 return replacement_file(value) | |
156 else: | |
157 return upload_object(value) | |
158 | |
159 if isinstance(value, list): | |
160 return replacement_list(value) | |
161 elif not isinstance(value, dict): | |
162 if tool_or_workflow == "workflow": | |
163 # All inputs represented as dataset or collection parameters | |
164 return upload_object(value) | |
165 else: | |
166 return value | |
167 | |
168 if is_file: | |
169 return replacement_file(value) | |
170 elif is_directory: | |
171 return replacement_directory(value) | |
172 elif is_collection: | |
173 return replacement_collection(value) | |
174 else: | |
175 return replacement_record(value) | |
176 | |
177 def replacement_file(value): | |
178 file_path = value.get("location", None) or value.get("path", None) | |
179 # format to match output definitions in tool, where did filetype come from? | |
180 filetype = value.get("filetype", None) or value.get("format", None) | |
181 composite_data_raw = value.get("composite_data", None) | |
182 kwd = {} | |
183 if "tags" in value: | |
184 kwd["tags"] = value.get("tags") | |
185 if composite_data_raw: | |
186 composite_data = [] | |
187 for entry in composite_data_raw: | |
188 path = None | |
189 if isinstance(entry, dict): | |
190 path = entry.get("location", None) or entry.get("path", None) | |
191 else: | |
192 path = entry | |
193 composite_data.append(path) | |
194 rval_c = upload_file_with_composite_data(None, composite_data, filetype=filetype, **kwd) | |
195 return rval_c | |
196 | |
197 if file_path is None: | |
198 contents = value.get("contents", None) | |
199 if contents is not None: | |
200 return upload_file_literal(contents, **kwd) | |
201 | |
202 return value | |
203 | |
204 secondary_files = value.get("secondaryFiles", []) | |
205 secondary_files_tar_path = None | |
206 if secondary_files: | |
207 tmp = tempfile.NamedTemporaryFile(delete=False) | |
208 tf = tarfile.open(fileobj=tmp, mode='w:') | |
209 order = [] | |
210 index_contents = { | |
211 "order": order | |
212 } | |
213 for secondary_file in secondary_files: | |
214 secondary_file_path = secondary_file.get("location", None) or secondary_file.get("path", None) | |
215 assert secondary_file_path, "Invalid secondaryFile entry found [%s]" % secondary_file | |
216 full_secondary_file_path = os.path.join(test_data_directory, secondary_file_path) | |
217 basename = secondary_file.get("basename") or os.path.basename(secondary_file_path) | |
218 order.append(unicodify(basename)) | |
219 tf.add(full_secondary_file_path, os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename)) | |
220 tmp_index = tempfile.NamedTemporaryFile(delete=False, mode="w") | |
221 json.dump(index_contents, tmp_index) | |
222 tmp_index.close() | |
223 tf.add(tmp_index.name, SECONDARY_FILES_INDEX_PATH) | |
224 tf.close() | |
225 secondary_files_tar_path = tmp.name | |
226 | |
227 return upload_file(file_path, secondary_files_tar_path, filetype=filetype, **kwd) | |
228 | |
229 def replacement_directory(value): | |
230 file_path = value.get("location", None) or value.get("path", None) | |
231 if file_path is None: | |
232 return value | |
233 | |
234 if not os.path.isabs(file_path): | |
235 file_path = os.path.join(test_data_directory, file_path) | |
236 | |
237 tmp = tempfile.NamedTemporaryFile(delete=False) | |
238 tf = tarfile.open(fileobj=tmp, mode='w:') | |
239 tf.add(file_path, '.') | |
240 tf.close() | |
241 | |
242 return upload_tar(tmp.name) | |
243 | |
244 def replacement_list(value): | |
245 collection_element_identifiers = [] | |
246 for i, item in enumerate(value): | |
247 dataset = replacement_item(item, force_to_file=True) | |
248 collection_element = dataset.copy() | |
249 collection_element["name"] = str(i) | |
250 collection_element_identifiers.append(collection_element) | |
251 | |
252 # TODO: handle nested lists/arrays | |
253 collection = collection_create_func(collection_element_identifiers, "list") | |
254 dataset_collections.append(collection) | |
255 hdca_id = collection["id"] | |
256 return {"src": "hdca", "id": hdca_id} | |
257 | |
258 def to_elements(value, rank_collection_type): | |
259 collection_element_identifiers = [] | |
260 assert "elements" in value | |
261 elements = value["elements"] | |
262 | |
263 is_nested_collection = ":" in rank_collection_type | |
264 for element in elements: | |
265 if not is_nested_collection: | |
266 # flat collection | |
267 dataset = replacement_item(element, force_to_file=True) | |
268 collection_element = dataset.copy() | |
269 collection_element["name"] = element["identifier"] | |
270 collection_element_identifiers.append(collection_element) | |
271 else: | |
272 # nested collection | |
273 sub_collection_type = rank_collection_type[rank_collection_type.find(":") + 1:] | |
274 collection_element = { | |
275 "name": element["identifier"], | |
276 "src": "new_collection", | |
277 "collection_type": sub_collection_type, | |
278 "element_identifiers": to_elements(element, sub_collection_type) | |
279 } | |
280 collection_element_identifiers.append(collection_element) | |
281 | |
282 return collection_element_identifiers | |
283 | |
284 def replacement_collection(value): | |
285 assert "collection_type" in value | |
286 collection_type = value["collection_type"] | |
287 elements = to_elements(value, collection_type) | |
288 | |
289 collection = collection_create_func(elements, collection_type) | |
290 dataset_collections.append(collection) | |
291 hdca_id = collection["id"] | |
292 return {"src": "hdca", "id": hdca_id} | |
293 | |
294 def replacement_record(value): | |
295 collection_element_identifiers = [] | |
296 for record_key, record_value in value.items(): | |
297 if not isinstance(record_value, dict) or record_value.get("class") != "File": | |
298 dataset = replacement_item(record_value, force_to_file=True) | |
299 collection_element = dataset.copy() | |
300 else: | |
301 dataset = upload_file(record_value["location"], []) | |
302 collection_element = dataset.copy() | |
303 | |
304 collection_element["name"] = record_key | |
305 collection_element_identifiers.append(collection_element) | |
306 | |
307 collection = collection_create_func(collection_element_identifiers, "record") | |
308 dataset_collections.append(collection) | |
309 hdca_id = collection["id"] | |
310 return {"src": "hdca", "id": hdca_id} | |
311 | |
312 replace_keys = {} | |
313 for key, value in iteritems(job): | |
314 replace_keys[key] = replacement_item(value) | |
315 | |
316 job.update(replace_keys) | |
317 return job, datasets | |
318 | |
319 | |
320 def _ensure_file_exists(file_path): | |
321 if not os.path.exists(file_path): | |
322 template = "File [%s] does not exist - parent directory [%s] does %sexist, cwd is [%s]" | |
323 parent_directory = os.path.dirname(file_path) | |
324 message = template % ( | |
325 file_path, | |
326 parent_directory, | |
327 "" if os.path.exists(parent_directory) else "not ", | |
328 os.getcwd(), | |
329 ) | |
330 raise Exception(message) | |
331 | |
332 | |
333 @python_2_unicode_compatible | |
334 class FileLiteralTarget(object): | |
335 | |
336 def __init__(self, contents, **kwargs): | |
337 self.contents = contents | |
338 self.properties = kwargs | |
339 | |
340 def __str__(self): | |
341 return "FileLiteralTarget[path=%s] with %s" % (self.path, self.properties) | |
342 | |
343 | |
344 @python_2_unicode_compatible | |
345 class FileUploadTarget(object): | |
346 | |
347 def __init__(self, path, secondary_files=None, **kwargs): | |
348 self.path = path | |
349 self.secondary_files = secondary_files | |
350 self.composite_data = kwargs.get("composite_data", []) | |
351 self.properties = kwargs | |
352 | |
353 def __str__(self): | |
354 return "FileUploadTarget[path=%s] with %s" % (self.path, self.properties) | |
355 | |
356 | |
357 @python_2_unicode_compatible | |
358 class ObjectUploadTarget(object): | |
359 | |
360 def __init__(self, the_object): | |
361 self.object = the_object | |
362 | |
363 def __str__(self): | |
364 return "ObjectUploadTarget[object=%s]" % self.object | |
365 | |
366 | |
367 @python_2_unicode_compatible | |
368 class DirectoryUploadTarget(object): | |
369 | |
370 def __init__(self, tar_path): | |
371 self.tar_path = tar_path | |
372 | |
373 def __str__(self): | |
374 return "DirectoryUploadTarget[tar_path=%s]" % self.tar_path | |
375 | |
376 | |
377 GalaxyOutput = namedtuple("GalaxyOutput", ["history_id", "history_content_type", "history_content_id", "metadata"]) | |
378 | |
379 | |
380 def tool_response_to_output(tool_response, history_id, output_id): | |
381 for output in tool_response["outputs"]: | |
382 if output["output_name"] == output_id: | |
383 return GalaxyOutput(history_id, "dataset", output["id"], None) | |
384 | |
385 for output_collection in tool_response["output_collections"]: | |
386 if output_collection["output_name"] == output_id: | |
387 return GalaxyOutput(history_id, "dataset_collection", output_collection["id"], None) | |
388 | |
389 raise Exception("Failed to find output with label [%s]" % output_id) | |
390 | |
391 | |
392 def invocation_to_output(invocation, history_id, output_id): | |
393 if output_id in invocation["outputs"]: | |
394 dataset = invocation["outputs"][output_id] | |
395 galaxy_output = GalaxyOutput(history_id, "dataset", dataset["id"], None) | |
396 elif output_id in invocation["output_collections"]: | |
397 collection = invocation["output_collections"][output_id] | |
398 galaxy_output = GalaxyOutput(history_id, "dataset_collection", collection["id"], None) | |
399 else: | |
400 raise Exception("Failed to find output with label [%s] in [%s]" % (output_id, invocation)) | |
401 | |
402 return galaxy_output | |
403 | |
404 | |
405 def output_to_cwl_json( | |
406 galaxy_output, get_metadata, get_dataset, get_extra_files, pseduo_location=False, | |
407 ): | |
408 """Convert objects in a Galaxy history into a CWL object. | |
409 | |
410 Useful in running conformance tests and implementing the cwl-runner | |
411 interface via Galaxy. | |
412 """ | |
413 def element_to_cwl_json(element): | |
414 object = element["object"] | |
415 content_type = object.get("history_content_type") | |
416 metadata = None | |
417 if content_type is None: | |
418 content_type = "dataset_collection" | |
419 metadata = element["object"] | |
420 metadata["history_content_type"] = content_type | |
421 element_output = GalaxyOutput( | |
422 galaxy_output.history_id, | |
423 content_type, | |
424 object["id"], | |
425 metadata, | |
426 ) | |
427 return output_to_cwl_json(element_output, get_metadata, get_dataset, get_extra_files, pseduo_location=pseduo_location) | |
428 | |
429 output_metadata = galaxy_output.metadata | |
430 if output_metadata is None: | |
431 output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id) | |
432 | |
433 def dataset_dict_to_json_content(dataset_dict): | |
434 if "content" in dataset_dict: | |
435 return json.loads(dataset_dict["content"]) | |
436 else: | |
437 with open(dataset_dict["path"]) as f: | |
438 return json.safe_load(f) | |
439 | |
440 if output_metadata["history_content_type"] == "dataset": | |
441 ext = output_metadata["file_ext"] | |
442 assert output_metadata["state"] == "ok" | |
443 if ext == "expression.json": | |
444 dataset_dict = get_dataset(output_metadata) | |
445 return dataset_dict_to_json_content(dataset_dict) | |
446 else: | |
447 file_or_directory = "Directory" if ext == "directory" else "File" | |
448 secondary_files = [] | |
449 | |
450 if file_or_directory == "File": | |
451 dataset_dict = get_dataset(output_metadata) | |
452 properties = output_properties(pseduo_location=pseduo_location, **dataset_dict) | |
453 basename = properties["basename"] | |
454 extra_files = get_extra_files(output_metadata) | |
455 found_index = False | |
456 for extra_file in extra_files: | |
457 if extra_file["class"] == "File": | |
458 path = extra_file["path"] | |
459 if path == SECONDARY_FILES_INDEX_PATH: | |
460 found_index = True | |
461 | |
462 if found_index: | |
463 ec = get_dataset(output_metadata, filename=SECONDARY_FILES_INDEX_PATH) | |
464 index = dataset_dict_to_json_content(ec) | |
465 | |
466 def dir_listing(dir_path): | |
467 listing = [] | |
468 for extra_file in extra_files: | |
469 path = extra_file["path"] | |
470 extra_file_class = extra_file["class"] | |
471 extra_file_basename = os.path.basename(path) | |
472 if os.path.join(dir_path, extra_file_basename) != path: | |
473 continue | |
474 | |
475 if extra_file_class == "File": | |
476 ec = get_dataset(output_metadata, filename=path) | |
477 ec["basename"] = extra_file_basename | |
478 ec_properties = output_properties(pseduo_location=pseduo_location, **ec) | |
479 elif extra_file_class == "Directory": | |
480 ec_properties = {} | |
481 ec_properties["class"] = "Directory" | |
482 ec_properties["location"] = ec_basename | |
483 ec_properties["listing"] = dir_listing(path) | |
484 else: | |
485 raise Exception("Unknown output type encountered....") | |
486 listing.append(ec_properties) | |
487 return listing | |
488 | |
489 for basename in index["order"]: | |
490 for extra_file in extra_files: | |
491 path = extra_file["path"] | |
492 if path != os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename): | |
493 continue | |
494 | |
495 extra_file_class = extra_file["class"] | |
496 | |
497 # This is wrong... | |
498 if not STORE_SECONDARY_FILES_WITH_BASENAME: | |
499 ec_basename = basename + os.path.basename(path) | |
500 else: | |
501 ec_basename = os.path.basename(path) | |
502 | |
503 if extra_file_class == "File": | |
504 ec = get_dataset(output_metadata, filename=path) | |
505 ec["basename"] = ec_basename | |
506 ec_properties = output_properties(pseduo_location=pseduo_location, **ec) | |
507 elif extra_file_class == "Directory": | |
508 ec_properties = {} | |
509 ec_properties["class"] = "Directory" | |
510 ec_properties["location"] = ec_basename | |
511 ec_properties["listing"] = dir_listing(path) | |
512 else: | |
513 raise Exception("Unknown output type encountered....") | |
514 secondary_files.append(ec_properties) | |
515 | |
516 else: | |
517 basename = output_metadata.get("created_from_basename") | |
518 if not basename: | |
519 basename = output_metadata.get("name") | |
520 | |
521 listing = [] | |
522 properties = { | |
523 "class": "Directory", | |
524 "basename": basename, | |
525 "listing": listing, | |
526 } | |
527 | |
528 extra_files = get_extra_files(output_metadata) | |
529 for extra_file in extra_files: | |
530 if extra_file["class"] == "File": | |
531 path = extra_file["path"] | |
532 ec = get_dataset(output_metadata, filename=path) | |
533 ec["basename"] = os.path.basename(path) | |
534 ec_properties = output_properties(pseduo_location=pseduo_location, **ec) | |
535 listing.append(ec_properties) | |
536 | |
537 if secondary_files: | |
538 properties["secondaryFiles"] = secondary_files | |
539 return properties | |
540 | |
541 elif output_metadata["history_content_type"] == "dataset_collection": | |
542 rval = None | |
543 collection_type = output_metadata["collection_type"].split(":", 1)[0] | |
544 if collection_type in ["list", "paired"]: | |
545 rval = [] | |
546 for element in output_metadata["elements"]: | |
547 rval.append(element_to_cwl_json(element)) | |
548 elif collection_type == "record": | |
549 rval = {} | |
550 for element in output_metadata["elements"]: | |
551 rval[element["element_identifier"]] = element_to_cwl_json(element) | |
552 return rval | |
553 else: | |
554 raise NotImplementedError("Unknown history content type encountered") | |
555 | |
556 | |
557 def download_output(galaxy_output, get_metadata, get_dataset, get_extra_files, output_path): | |
558 output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id) | |
559 dataset_dict = get_dataset(output_metadata) | |
560 with open(output_path, 'wb') as fh: | |
561 fh.write(dataset_dict['content']) | |
562 | |
563 | |
564 def guess_artifact_type(path): | |
565 # TODO: Handle IDs within files. | |
566 tool_or_workflow = "workflow" | |
567 try: | |
568 with open(path, "r") as f: | |
569 artifact = yaml.safe_load(f) | |
570 | |
571 tool_or_workflow = "tool" if artifact["class"] != "Workflow" else "workflow" | |
572 | |
573 except Exception as e: | |
574 print(e) | |
575 | |
576 return tool_or_workflow |