comparison planemo/lib/python3.7/site-packages/galaxy/tool_util/cwl/util.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d30785e31577
1 """Client-centric CWL-related utilities.
2
3 Used to share code between the Galaxy test framework
4 and other Galaxy CWL clients (e.g. Planemo)."""
5 import hashlib
6 import json
7 import os
8 import tarfile
9 import tempfile
10 from collections import namedtuple
11
12 import yaml
13 from six import (
14 BytesIO,
15 iteritems,
16 python_2_unicode_compatible
17 )
18
19 from galaxy.util import unicodify
20
21 STORE_SECONDARY_FILES_WITH_BASENAME = True
22 SECONDARY_FILES_EXTRA_PREFIX = "__secondary_files__"
23 SECONDARY_FILES_INDEX_PATH = "__secondary_files_index.json"
24
25
26 def set_basename_and_derived_properties(properties, basename):
27 properties["basename"] = basename
28 properties["nameroot"], properties["nameext"] = os.path.splitext(basename)
29 return properties
30
31
32 def output_properties(path=None, content=None, basename=None, pseduo_location=False):
33 checksum = hashlib.sha1()
34 properties = {
35 "class": "File",
36 }
37 if path is not None:
38 properties["path"] = path
39 f = open(path, "rb")
40 else:
41 f = BytesIO(content)
42
43 try:
44 contents = f.read(1024 * 1024)
45 filesize = 0
46 while contents:
47 checksum.update(contents)
48 filesize += len(contents)
49 contents = f.read(1024 * 1024)
50 finally:
51 f.close()
52 properties["checksum"] = "sha1$%s" % checksum.hexdigest()
53 properties["size"] = filesize
54 set_basename_and_derived_properties(properties, basename)
55 _handle_pseudo_location(properties, pseduo_location)
56 return properties
57
58
59 def _handle_pseudo_location(properties, pseduo_location):
60 if pseduo_location:
61 properties["location"] = properties["basename"]
62
63
64 def abs_path_or_uri(path_or_uri, relative_to):
65 """Return an absolute path if this isn't a URI, otherwise keep the URI the same.
66 """
67 is_uri = "://" in path_or_uri
68 if not is_uri and not os.path.isabs(path_or_uri):
69 path_or_uri = os.path.join(relative_to, path_or_uri)
70 if not is_uri:
71 _ensure_file_exists(path_or_uri)
72 return path_or_uri
73
74
75 def abs_path(path_or_uri, relative_to):
76 path_or_uri = abs_path_or_uri(path_or_uri, relative_to)
77 if path_or_uri.startswith("file://"):
78 path_or_uri = path_or_uri[len("file://"):]
79
80 return path_or_uri
81
82
83 def path_or_uri_to_uri(path_or_uri):
84 if "://" not in path_or_uri:
85 return "file://%s" % path_or_uri
86 else:
87 return path_or_uri
88
89
90 def galactic_job_json(
91 job, test_data_directory, upload_func, collection_create_func, tool_or_workflow="workflow"
92 ):
93 """Adapt a CWL job object to the Galaxy API.
94
95 CWL derived tools in Galaxy can consume a job description sort of like
96 CWL job objects via the API but paths need to be replaced with datasets
97 and records and arrays with collection references. This function will
98 stage files and modify the job description to adapt to these changes
99 for Galaxy.
100 """
101
102 datasets = []
103 dataset_collections = []
104
105 def response_to_hda(target, upload_response):
106 assert isinstance(upload_response, dict), upload_response
107 assert "outputs" in upload_response, upload_response
108 assert len(upload_response["outputs"]) > 0, upload_response
109 dataset = upload_response["outputs"][0]
110 datasets.append((dataset, target))
111 dataset_id = dataset["id"]
112 return {"src": "hda", "id": dataset_id}
113
114 def upload_file(file_path, secondary_files, **kwargs):
115 file_path = abs_path_or_uri(file_path, test_data_directory)
116 target = FileUploadTarget(file_path, secondary_files, **kwargs)
117 upload_response = upload_func(target)
118 return response_to_hda(target, upload_response)
119
120 def upload_file_literal(contents, **kwd):
121 target = FileLiteralTarget(contents, **kwd)
122 upload_response = upload_func(target)
123 return response_to_hda(target, upload_response)
124
125 def upload_tar(file_path):
126 file_path = abs_path_or_uri(file_path, test_data_directory)
127 target = DirectoryUploadTarget(file_path)
128 upload_response = upload_func(target)
129 return response_to_hda(target, upload_response)
130
131 def upload_file_with_composite_data(file_path, composite_data, **kwargs):
132 if file_path is not None:
133 file_path = abs_path_or_uri(file_path, test_data_directory)
134 composite_data_resolved = []
135 for cd in composite_data:
136 composite_data_resolved.append(abs_path_or_uri(cd, test_data_directory))
137 target = FileUploadTarget(file_path, composite_data=composite_data_resolved, **kwargs)
138 upload_response = upload_func(target)
139 return response_to_hda(target, upload_response)
140
141 def upload_object(the_object):
142 target = ObjectUploadTarget(the_object)
143 upload_response = upload_func(target)
144 return response_to_hda(target, upload_response)
145
146 def replacement_item(value, force_to_file=False):
147 is_dict = isinstance(value, dict)
148 item_class = None if not is_dict else value.get("class", None)
149 is_file = item_class == "File"
150 is_directory = item_class == "Directory"
151 is_collection = item_class == "Collection" # Galaxy extension.
152
153 if force_to_file:
154 if is_file:
155 return replacement_file(value)
156 else:
157 return upload_object(value)
158
159 if isinstance(value, list):
160 return replacement_list(value)
161 elif not isinstance(value, dict):
162 if tool_or_workflow == "workflow":
163 # All inputs represented as dataset or collection parameters
164 return upload_object(value)
165 else:
166 return value
167
168 if is_file:
169 return replacement_file(value)
170 elif is_directory:
171 return replacement_directory(value)
172 elif is_collection:
173 return replacement_collection(value)
174 else:
175 return replacement_record(value)
176
177 def replacement_file(value):
178 file_path = value.get("location", None) or value.get("path", None)
179 # format to match output definitions in tool, where did filetype come from?
180 filetype = value.get("filetype", None) or value.get("format", None)
181 composite_data_raw = value.get("composite_data", None)
182 kwd = {}
183 if "tags" in value:
184 kwd["tags"] = value.get("tags")
185 if composite_data_raw:
186 composite_data = []
187 for entry in composite_data_raw:
188 path = None
189 if isinstance(entry, dict):
190 path = entry.get("location", None) or entry.get("path", None)
191 else:
192 path = entry
193 composite_data.append(path)
194 rval_c = upload_file_with_composite_data(None, composite_data, filetype=filetype, **kwd)
195 return rval_c
196
197 if file_path is None:
198 contents = value.get("contents", None)
199 if contents is not None:
200 return upload_file_literal(contents, **kwd)
201
202 return value
203
204 secondary_files = value.get("secondaryFiles", [])
205 secondary_files_tar_path = None
206 if secondary_files:
207 tmp = tempfile.NamedTemporaryFile(delete=False)
208 tf = tarfile.open(fileobj=tmp, mode='w:')
209 order = []
210 index_contents = {
211 "order": order
212 }
213 for secondary_file in secondary_files:
214 secondary_file_path = secondary_file.get("location", None) or secondary_file.get("path", None)
215 assert secondary_file_path, "Invalid secondaryFile entry found [%s]" % secondary_file
216 full_secondary_file_path = os.path.join(test_data_directory, secondary_file_path)
217 basename = secondary_file.get("basename") or os.path.basename(secondary_file_path)
218 order.append(unicodify(basename))
219 tf.add(full_secondary_file_path, os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename))
220 tmp_index = tempfile.NamedTemporaryFile(delete=False, mode="w")
221 json.dump(index_contents, tmp_index)
222 tmp_index.close()
223 tf.add(tmp_index.name, SECONDARY_FILES_INDEX_PATH)
224 tf.close()
225 secondary_files_tar_path = tmp.name
226
227 return upload_file(file_path, secondary_files_tar_path, filetype=filetype, **kwd)
228
229 def replacement_directory(value):
230 file_path = value.get("location", None) or value.get("path", None)
231 if file_path is None:
232 return value
233
234 if not os.path.isabs(file_path):
235 file_path = os.path.join(test_data_directory, file_path)
236
237 tmp = tempfile.NamedTemporaryFile(delete=False)
238 tf = tarfile.open(fileobj=tmp, mode='w:')
239 tf.add(file_path, '.')
240 tf.close()
241
242 return upload_tar(tmp.name)
243
244 def replacement_list(value):
245 collection_element_identifiers = []
246 for i, item in enumerate(value):
247 dataset = replacement_item(item, force_to_file=True)
248 collection_element = dataset.copy()
249 collection_element["name"] = str(i)
250 collection_element_identifiers.append(collection_element)
251
252 # TODO: handle nested lists/arrays
253 collection = collection_create_func(collection_element_identifiers, "list")
254 dataset_collections.append(collection)
255 hdca_id = collection["id"]
256 return {"src": "hdca", "id": hdca_id}
257
258 def to_elements(value, rank_collection_type):
259 collection_element_identifiers = []
260 assert "elements" in value
261 elements = value["elements"]
262
263 is_nested_collection = ":" in rank_collection_type
264 for element in elements:
265 if not is_nested_collection:
266 # flat collection
267 dataset = replacement_item(element, force_to_file=True)
268 collection_element = dataset.copy()
269 collection_element["name"] = element["identifier"]
270 collection_element_identifiers.append(collection_element)
271 else:
272 # nested collection
273 sub_collection_type = rank_collection_type[rank_collection_type.find(":") + 1:]
274 collection_element = {
275 "name": element["identifier"],
276 "src": "new_collection",
277 "collection_type": sub_collection_type,
278 "element_identifiers": to_elements(element, sub_collection_type)
279 }
280 collection_element_identifiers.append(collection_element)
281
282 return collection_element_identifiers
283
284 def replacement_collection(value):
285 assert "collection_type" in value
286 collection_type = value["collection_type"]
287 elements = to_elements(value, collection_type)
288
289 collection = collection_create_func(elements, collection_type)
290 dataset_collections.append(collection)
291 hdca_id = collection["id"]
292 return {"src": "hdca", "id": hdca_id}
293
294 def replacement_record(value):
295 collection_element_identifiers = []
296 for record_key, record_value in value.items():
297 if not isinstance(record_value, dict) or record_value.get("class") != "File":
298 dataset = replacement_item(record_value, force_to_file=True)
299 collection_element = dataset.copy()
300 else:
301 dataset = upload_file(record_value["location"], [])
302 collection_element = dataset.copy()
303
304 collection_element["name"] = record_key
305 collection_element_identifiers.append(collection_element)
306
307 collection = collection_create_func(collection_element_identifiers, "record")
308 dataset_collections.append(collection)
309 hdca_id = collection["id"]
310 return {"src": "hdca", "id": hdca_id}
311
312 replace_keys = {}
313 for key, value in iteritems(job):
314 replace_keys[key] = replacement_item(value)
315
316 job.update(replace_keys)
317 return job, datasets
318
319
320 def _ensure_file_exists(file_path):
321 if not os.path.exists(file_path):
322 template = "File [%s] does not exist - parent directory [%s] does %sexist, cwd is [%s]"
323 parent_directory = os.path.dirname(file_path)
324 message = template % (
325 file_path,
326 parent_directory,
327 "" if os.path.exists(parent_directory) else "not ",
328 os.getcwd(),
329 )
330 raise Exception(message)
331
332
333 @python_2_unicode_compatible
334 class FileLiteralTarget(object):
335
336 def __init__(self, contents, **kwargs):
337 self.contents = contents
338 self.properties = kwargs
339
340 def __str__(self):
341 return "FileLiteralTarget[path=%s] with %s" % (self.path, self.properties)
342
343
344 @python_2_unicode_compatible
345 class FileUploadTarget(object):
346
347 def __init__(self, path, secondary_files=None, **kwargs):
348 self.path = path
349 self.secondary_files = secondary_files
350 self.composite_data = kwargs.get("composite_data", [])
351 self.properties = kwargs
352
353 def __str__(self):
354 return "FileUploadTarget[path=%s] with %s" % (self.path, self.properties)
355
356
357 @python_2_unicode_compatible
358 class ObjectUploadTarget(object):
359
360 def __init__(self, the_object):
361 self.object = the_object
362
363 def __str__(self):
364 return "ObjectUploadTarget[object=%s]" % self.object
365
366
367 @python_2_unicode_compatible
368 class DirectoryUploadTarget(object):
369
370 def __init__(self, tar_path):
371 self.tar_path = tar_path
372
373 def __str__(self):
374 return "DirectoryUploadTarget[tar_path=%s]" % self.tar_path
375
376
377 GalaxyOutput = namedtuple("GalaxyOutput", ["history_id", "history_content_type", "history_content_id", "metadata"])
378
379
380 def tool_response_to_output(tool_response, history_id, output_id):
381 for output in tool_response["outputs"]:
382 if output["output_name"] == output_id:
383 return GalaxyOutput(history_id, "dataset", output["id"], None)
384
385 for output_collection in tool_response["output_collections"]:
386 if output_collection["output_name"] == output_id:
387 return GalaxyOutput(history_id, "dataset_collection", output_collection["id"], None)
388
389 raise Exception("Failed to find output with label [%s]" % output_id)
390
391
392 def invocation_to_output(invocation, history_id, output_id):
393 if output_id in invocation["outputs"]:
394 dataset = invocation["outputs"][output_id]
395 galaxy_output = GalaxyOutput(history_id, "dataset", dataset["id"], None)
396 elif output_id in invocation["output_collections"]:
397 collection = invocation["output_collections"][output_id]
398 galaxy_output = GalaxyOutput(history_id, "dataset_collection", collection["id"], None)
399 else:
400 raise Exception("Failed to find output with label [%s] in [%s]" % (output_id, invocation))
401
402 return galaxy_output
403
404
405 def output_to_cwl_json(
406 galaxy_output, get_metadata, get_dataset, get_extra_files, pseduo_location=False,
407 ):
408 """Convert objects in a Galaxy history into a CWL object.
409
410 Useful in running conformance tests and implementing the cwl-runner
411 interface via Galaxy.
412 """
413 def element_to_cwl_json(element):
414 object = element["object"]
415 content_type = object.get("history_content_type")
416 metadata = None
417 if content_type is None:
418 content_type = "dataset_collection"
419 metadata = element["object"]
420 metadata["history_content_type"] = content_type
421 element_output = GalaxyOutput(
422 galaxy_output.history_id,
423 content_type,
424 object["id"],
425 metadata,
426 )
427 return output_to_cwl_json(element_output, get_metadata, get_dataset, get_extra_files, pseduo_location=pseduo_location)
428
429 output_metadata = galaxy_output.metadata
430 if output_metadata is None:
431 output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id)
432
433 def dataset_dict_to_json_content(dataset_dict):
434 if "content" in dataset_dict:
435 return json.loads(dataset_dict["content"])
436 else:
437 with open(dataset_dict["path"]) as f:
438 return json.safe_load(f)
439
440 if output_metadata["history_content_type"] == "dataset":
441 ext = output_metadata["file_ext"]
442 assert output_metadata["state"] == "ok"
443 if ext == "expression.json":
444 dataset_dict = get_dataset(output_metadata)
445 return dataset_dict_to_json_content(dataset_dict)
446 else:
447 file_or_directory = "Directory" if ext == "directory" else "File"
448 secondary_files = []
449
450 if file_or_directory == "File":
451 dataset_dict = get_dataset(output_metadata)
452 properties = output_properties(pseduo_location=pseduo_location, **dataset_dict)
453 basename = properties["basename"]
454 extra_files = get_extra_files(output_metadata)
455 found_index = False
456 for extra_file in extra_files:
457 if extra_file["class"] == "File":
458 path = extra_file["path"]
459 if path == SECONDARY_FILES_INDEX_PATH:
460 found_index = True
461
462 if found_index:
463 ec = get_dataset(output_metadata, filename=SECONDARY_FILES_INDEX_PATH)
464 index = dataset_dict_to_json_content(ec)
465
466 def dir_listing(dir_path):
467 listing = []
468 for extra_file in extra_files:
469 path = extra_file["path"]
470 extra_file_class = extra_file["class"]
471 extra_file_basename = os.path.basename(path)
472 if os.path.join(dir_path, extra_file_basename) != path:
473 continue
474
475 if extra_file_class == "File":
476 ec = get_dataset(output_metadata, filename=path)
477 ec["basename"] = extra_file_basename
478 ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
479 elif extra_file_class == "Directory":
480 ec_properties = {}
481 ec_properties["class"] = "Directory"
482 ec_properties["location"] = ec_basename
483 ec_properties["listing"] = dir_listing(path)
484 else:
485 raise Exception("Unknown output type encountered....")
486 listing.append(ec_properties)
487 return listing
488
489 for basename in index["order"]:
490 for extra_file in extra_files:
491 path = extra_file["path"]
492 if path != os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename):
493 continue
494
495 extra_file_class = extra_file["class"]
496
497 # This is wrong...
498 if not STORE_SECONDARY_FILES_WITH_BASENAME:
499 ec_basename = basename + os.path.basename(path)
500 else:
501 ec_basename = os.path.basename(path)
502
503 if extra_file_class == "File":
504 ec = get_dataset(output_metadata, filename=path)
505 ec["basename"] = ec_basename
506 ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
507 elif extra_file_class == "Directory":
508 ec_properties = {}
509 ec_properties["class"] = "Directory"
510 ec_properties["location"] = ec_basename
511 ec_properties["listing"] = dir_listing(path)
512 else:
513 raise Exception("Unknown output type encountered....")
514 secondary_files.append(ec_properties)
515
516 else:
517 basename = output_metadata.get("created_from_basename")
518 if not basename:
519 basename = output_metadata.get("name")
520
521 listing = []
522 properties = {
523 "class": "Directory",
524 "basename": basename,
525 "listing": listing,
526 }
527
528 extra_files = get_extra_files(output_metadata)
529 for extra_file in extra_files:
530 if extra_file["class"] == "File":
531 path = extra_file["path"]
532 ec = get_dataset(output_metadata, filename=path)
533 ec["basename"] = os.path.basename(path)
534 ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
535 listing.append(ec_properties)
536
537 if secondary_files:
538 properties["secondaryFiles"] = secondary_files
539 return properties
540
541 elif output_metadata["history_content_type"] == "dataset_collection":
542 rval = None
543 collection_type = output_metadata["collection_type"].split(":", 1)[0]
544 if collection_type in ["list", "paired"]:
545 rval = []
546 for element in output_metadata["elements"]:
547 rval.append(element_to_cwl_json(element))
548 elif collection_type == "record":
549 rval = {}
550 for element in output_metadata["elements"]:
551 rval[element["element_identifier"]] = element_to_cwl_json(element)
552 return rval
553 else:
554 raise NotImplementedError("Unknown history content type encountered")
555
556
557 def download_output(galaxy_output, get_metadata, get_dataset, get_extra_files, output_path):
558 output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id)
559 dataset_dict = get_dataset(output_metadata)
560 with open(output_path, 'wb') as fh:
561 fh.write(dataset_dict['content'])
562
563
564 def guess_artifact_type(path):
565 # TODO: Handle IDs within files.
566 tool_or_workflow = "workflow"
567 try:
568 with open(path, "r") as f:
569 artifact = yaml.safe_load(f)
570
571 tool_or_workflow = "tool" if artifact["class"] != "Workflow" else "workflow"
572
573 except Exception as e:
574 print(e)
575
576 return tool_or_workflow