Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/galaxy/tool_util/cwl/parser.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 """ This module provides proxy objects around objects from the common | |
2 workflow language reference implementation library cwltool. These proxies | |
3 adapt cwltool to Galaxy features and abstract the library away from the rest | |
4 of the framework. | |
5 """ | |
6 from __future__ import absolute_import | |
7 | |
8 import base64 | |
9 import copy | |
10 import json | |
11 import logging | |
12 import os | |
13 import pickle | |
14 from abc import ABCMeta, abstractmethod | |
15 from collections import OrderedDict | |
16 from uuid import uuid4 | |
17 | |
18 import six | |
19 | |
20 from galaxy.exceptions import MessageException | |
21 from galaxy.util import ( | |
22 listify, | |
23 safe_makedirs, | |
24 unicodify, | |
25 ) | |
26 from galaxy.util.bunch import Bunch | |
27 from .cwltool_deps import ( | |
28 beta_relaxed_fmt_check, | |
29 ensure_cwltool_available, | |
30 getdefault, | |
31 pathmapper, | |
32 process, | |
33 ref_resolver, | |
34 relink_initialworkdir, | |
35 RuntimeContext, | |
36 sourceline, | |
37 StdFsAccess, | |
38 ) | |
39 from .representation import ( | |
40 field_to_field_type, | |
41 FIELD_TYPE_REPRESENTATION, | |
42 INPUT_TYPE, | |
43 type_descriptions_for_field_types, | |
44 USE_FIELD_TYPES, | |
45 USE_STEP_PARAMETERS, | |
46 ) | |
47 from .schema import ( | |
48 non_strict_non_validating_schema_loader, | |
49 schema_loader, | |
50 ) | |
51 from .util import SECONDARY_FILES_EXTRA_PREFIX | |
52 | |
53 log = logging.getLogger(__name__) | |
54 | |
55 JOB_JSON_FILE = ".cwl_job.json" | |
56 | |
57 DOCKER_REQUIREMENT = "DockerRequirement" | |
58 SUPPORTED_TOOL_REQUIREMENTS = [ | |
59 "CreateFileRequirement", | |
60 "DockerRequirement", | |
61 "EnvVarRequirement", | |
62 "InitialWorkDirRequirement", | |
63 "InlineJavascriptRequirement", | |
64 "ResourceRequirement", | |
65 "ShellCommandRequirement", | |
66 "ScatterFeatureRequirement", | |
67 "SchemaDefRequirement", | |
68 "SubworkflowFeatureRequirement", | |
69 "StepInputExpressionRequirement", | |
70 "MultipleInputFeatureRequirement", | |
71 ] | |
72 | |
73 | |
74 SUPPORTED_WORKFLOW_REQUIREMENTS = SUPPORTED_TOOL_REQUIREMENTS + [ | |
75 ] | |
76 | |
77 PERSISTED_REPRESENTATION = "cwl_tool_object" | |
78 | |
79 | |
80 def tool_proxy(tool_path=None, tool_object=None, strict_cwl_validation=True, tool_directory=None, uuid=None): | |
81 """ Provide a proxy object to cwltool data structures to just | |
82 grab relevant data. | |
83 """ | |
84 ensure_cwltool_available() | |
85 # if uuid is None: | |
86 # raise Exception("tool_proxy must be called with non-None uuid") | |
87 tool = _to_cwl_tool_object( | |
88 tool_path=tool_path, | |
89 tool_object=tool_object, | |
90 strict_cwl_validation=strict_cwl_validation, | |
91 tool_directory=tool_directory, | |
92 uuid=uuid | |
93 ) | |
94 return tool | |
95 | |
96 | |
97 def tool_proxy_from_persistent_representation(persisted_tool, strict_cwl_validation=True, tool_directory=None): | |
98 """Load a ToolProxy from a previously persisted representation.""" | |
99 ensure_cwltool_available() | |
100 if PERSISTED_REPRESENTATION == "cwl_tool_object": | |
101 kwds = {"cwl_tool_object": ToolProxy.from_persistent_representation(persisted_tool)} | |
102 else: | |
103 raw_process_reference = persisted_tool # ??? | |
104 kwds = {"raw_process_reference": ToolProxy.from_persistent_representation(raw_process_reference)} | |
105 uuid = persisted_tool["uuid"] | |
106 tool = _to_cwl_tool_object(uuid=uuid, strict_cwl_validation=strict_cwl_validation, tool_directory=tool_directory, **kwds) | |
107 return tool | |
108 | |
109 | |
110 def workflow_proxy(workflow_path, strict_cwl_validation=True): | |
111 ensure_cwltool_available() | |
112 workflow = _to_cwl_workflow_object(workflow_path, strict_cwl_validation=strict_cwl_validation) | |
113 return workflow | |
114 | |
115 | |
116 def load_job_proxy(job_directory, strict_cwl_validation=True): | |
117 ensure_cwltool_available() | |
118 job_objects_path = os.path.join(job_directory, JOB_JSON_FILE) | |
119 job_objects = json.load(open(job_objects_path, "r")) | |
120 job_inputs = job_objects["job_inputs"] | |
121 output_dict = job_objects["output_dict"] | |
122 # Any reason to retain older tool_path variant of this? Probably not? | |
123 if "tool_path" in job_objects: | |
124 tool_path = job_objects["tool_path"] | |
125 cwl_tool = tool_proxy(tool_path, strict_cwl_validation=strict_cwl_validation) | |
126 else: | |
127 persisted_tool = job_objects["tool_representation"] | |
128 cwl_tool = tool_proxy_from_persistent_representation(persisted_tool=persisted_tool, strict_cwl_validation=strict_cwl_validation) | |
129 cwl_job = cwl_tool.job_proxy(job_inputs, output_dict, job_directory=job_directory) | |
130 return cwl_job | |
131 | |
132 | |
133 def _to_cwl_tool_object(tool_path=None, tool_object=None, cwl_tool_object=None, raw_process_reference=None, strict_cwl_validation=False, tool_directory=None, uuid=None): | |
134 if uuid is None: | |
135 uuid = str(uuid4()) | |
136 schema_loader = _schema_loader(strict_cwl_validation) | |
137 if raw_process_reference is None and tool_path is not None: | |
138 assert cwl_tool_object is None | |
139 assert tool_object is None | |
140 | |
141 raw_process_reference = schema_loader.raw_process_reference(tool_path) | |
142 cwl_tool = schema_loader.tool( | |
143 raw_process_reference=raw_process_reference, | |
144 ) | |
145 elif tool_object is not None: | |
146 assert raw_process_reference is None | |
147 assert cwl_tool_object is None | |
148 | |
149 # Allow loading tools from YAML... | |
150 from ruamel import yaml as ryaml | |
151 as_str = json.dumps(tool_object) | |
152 tool_object = ryaml.round_trip_load(as_str) | |
153 path = tool_directory | |
154 if path is None: | |
155 path = os.getcwd() | |
156 uri = ref_resolver.file_uri(path) + "/" | |
157 sourceline.add_lc_filename(tool_object, uri) | |
158 raw_process_reference = schema_loader.raw_process_reference_for_object( | |
159 tool_object, | |
160 uri=uri | |
161 ) | |
162 cwl_tool = schema_loader.tool( | |
163 raw_process_reference=raw_process_reference, | |
164 ) | |
165 else: | |
166 cwl_tool = cwl_tool_object | |
167 | |
168 if isinstance(cwl_tool, int): | |
169 raise Exception("Failed to load tool.") | |
170 | |
171 raw_tool = cwl_tool.tool | |
172 # Apply Galaxy hacks to CWL tool representation to bridge semantic differences | |
173 # between Galaxy and cwltool. | |
174 _hack_cwl_requirements(cwl_tool) | |
175 check_requirements(raw_tool) | |
176 return _cwl_tool_object_to_proxy(cwl_tool, uuid, raw_process_reference=raw_process_reference, tool_path=tool_path) | |
177 | |
178 | |
179 def _cwl_tool_object_to_proxy(cwl_tool, uuid, raw_process_reference=None, tool_path=None): | |
180 raw_tool = cwl_tool.tool | |
181 if "class" not in raw_tool: | |
182 raise Exception("File does not declare a class, not a valid Draft 3+ CWL tool.") | |
183 | |
184 process_class = raw_tool["class"] | |
185 if process_class == "CommandLineTool": | |
186 proxy_class = CommandLineToolProxy | |
187 elif process_class == "ExpressionTool": | |
188 proxy_class = ExpressionToolProxy | |
189 else: | |
190 raise Exception("File not a CWL CommandLineTool.") | |
191 top_level_object = tool_path is not None | |
192 if top_level_object and ("cwlVersion" not in raw_tool): | |
193 raise Exception("File does not declare a CWL version, pre-draft 3 CWL tools are not supported.") | |
194 | |
195 proxy = proxy_class(cwl_tool, uuid, raw_process_reference, tool_path) | |
196 return proxy | |
197 | |
198 | |
199 def _to_cwl_workflow_object(workflow_path, strict_cwl_validation=None): | |
200 proxy_class = WorkflowProxy | |
201 cwl_workflow = _schema_loader(strict_cwl_validation).tool(path=workflow_path) | |
202 raw_workflow = cwl_workflow.tool | |
203 check_requirements(raw_workflow, tool=False) | |
204 | |
205 proxy = proxy_class(cwl_workflow, workflow_path) | |
206 return proxy | |
207 | |
208 | |
209 def _schema_loader(strict_cwl_validation): | |
210 target_schema_loader = schema_loader if strict_cwl_validation else non_strict_non_validating_schema_loader | |
211 return target_schema_loader | |
212 | |
213 | |
214 def _hack_cwl_requirements(cwl_tool): | |
215 move_to_hints = [] | |
216 for i, requirement in enumerate(cwl_tool.requirements): | |
217 if requirement["class"] == DOCKER_REQUIREMENT: | |
218 move_to_hints.insert(0, i) | |
219 | |
220 for i in move_to_hints: | |
221 del cwl_tool.requirements[i] | |
222 cwl_tool.hints.append(requirement) | |
223 | |
224 | |
225 def check_requirements(rec, tool=True): | |
226 if isinstance(rec, dict): | |
227 if "requirements" in rec: | |
228 for r in rec["requirements"]: | |
229 if tool: | |
230 possible = SUPPORTED_TOOL_REQUIREMENTS | |
231 else: | |
232 possible = SUPPORTED_WORKFLOW_REQUIREMENTS | |
233 if r["class"] not in possible: | |
234 raise Exception("Unsupported requirement %s" % r["class"]) | |
235 for d in rec: | |
236 check_requirements(rec[d], tool=tool) | |
237 if isinstance(rec, list): | |
238 for d in rec: | |
239 check_requirements(d, tool=tool) | |
240 | |
241 | |
242 @six.add_metaclass(ABCMeta) | |
243 class ToolProxy(object): | |
244 | |
245 def __init__(self, tool, uuid, raw_process_reference=None, tool_path=None): | |
246 self._tool = tool | |
247 self._uuid = uuid | |
248 self._tool_path = tool_path | |
249 self._raw_process_reference = raw_process_reference | |
250 | |
251 def job_proxy(self, input_dict, output_dict, job_directory="."): | |
252 """ Build a cwltool.job.Job describing computation using a input_json | |
253 Galaxy will generate mapping the Galaxy description of the inputs into | |
254 a cwltool compatible variant. | |
255 """ | |
256 return JobProxy(self, input_dict, output_dict, job_directory=job_directory) | |
257 | |
258 @property | |
259 def id(self): | |
260 raw_id = self._tool.tool.get("id", None) | |
261 return raw_id | |
262 | |
263 def galaxy_id(self): | |
264 raw_id = self.id | |
265 tool_id = None | |
266 # don't reduce "search.cwl#index" to search | |
267 if raw_id: | |
268 tool_id = os.path.basename(raw_id) | |
269 # tool_id = os.path.splitext(os.path.basename(raw_id))[0] | |
270 if not tool_id: | |
271 return self._uuid | |
272 assert tool_id | |
273 if tool_id.startswith("#"): | |
274 tool_id = tool_id[1:] | |
275 return tool_id | |
276 | |
277 @abstractmethod | |
278 def input_instances(self): | |
279 """ Return InputInstance objects describing mapping to Galaxy inputs. """ | |
280 | |
281 @abstractmethod | |
282 def output_instances(self): | |
283 """ Return OutputInstance objects describing mapping to Galaxy inputs. """ | |
284 | |
285 @abstractmethod | |
286 def docker_identifier(self): | |
287 """ Return docker identifier for embedding in tool description. """ | |
288 | |
289 @abstractmethod | |
290 def description(self): | |
291 """ Return description to tool. """ | |
292 | |
293 @abstractmethod | |
294 def label(self): | |
295 """ Return label for tool. """ | |
296 | |
297 def to_persistent_representation(self): | |
298 """Return a JSON representation of this tool. Not for serialization | |
299 over the wire, but serialization in a database.""" | |
300 # TODO: Replace this with some more readable serialization, | |
301 # I really don't like using pickle here. | |
302 if PERSISTED_REPRESENTATION == "cwl_tool_object": | |
303 persisted_obj = remove_pickle_problems(self._tool) | |
304 else: | |
305 persisted_obj = self._raw_process_reference | |
306 return { | |
307 "class": self._class, | |
308 "pickle": unicodify(base64.b64encode(pickle.dumps(persisted_obj, pickle.HIGHEST_PROTOCOL))), | |
309 "uuid": self._uuid, | |
310 } | |
311 | |
312 @staticmethod | |
313 def from_persistent_representation(as_object): | |
314 """Recover an object serialized with to_persistent_representation.""" | |
315 if "class" not in as_object: | |
316 raise Exception("Failed to deserialize tool proxy from JSON object - no class found.") | |
317 if "pickle" not in as_object: | |
318 raise Exception("Failed to deserialize tool proxy from JSON object - no pickle representation found.") | |
319 if "uuid" not in as_object: | |
320 raise Exception("Failed to deserialize tool proxy from JSON object - no uuid found.") | |
321 to_unpickle = base64.b64decode(as_object["pickle"]) | |
322 loaded_object = pickle.loads(to_unpickle) | |
323 return loaded_object | |
324 | |
325 | |
326 class CommandLineToolProxy(ToolProxy): | |
327 _class = "CommandLineTool" | |
328 | |
329 def description(self): | |
330 # Don't use description - typically too verbose. | |
331 return '' | |
332 | |
333 def doc(self): | |
334 # TODO: parse multiple lines and merge - valid in cwl-1.1 | |
335 doc = self._tool.tool.get('doc') | |
336 return doc | |
337 | |
338 def label(self): | |
339 label = self._tool.tool.get('label') | |
340 | |
341 if label is not None: | |
342 return label.partition(":")[0] # return substring before ':' | |
343 else: | |
344 return '' | |
345 | |
346 def input_fields(self): | |
347 input_records_schema = self._eval_schema(self._tool.inputs_record_schema) | |
348 if input_records_schema["type"] != "record": | |
349 raise Exception("Unhandled CWL tool input structure") | |
350 | |
351 # TODO: handle this somewhere else? | |
352 # schemadef_req_tool_param | |
353 rval = [] | |
354 for input in input_records_schema["fields"]: | |
355 input_copy = copy.deepcopy(input) | |
356 input_type = input.get("type") | |
357 if isinstance(input_type, list) or isinstance(input_type, dict): | |
358 rval.append(input_copy) | |
359 continue | |
360 | |
361 if input_type in self._tool.schemaDefs: | |
362 input_copy["type"] = self._tool.schemaDefs[input_type] | |
363 | |
364 rval.append(input_copy) | |
365 return rval | |
366 | |
367 def _eval_schema(self, io_schema): | |
368 schema_type = io_schema.get("type") | |
369 if schema_type in self._tool.schemaDefs: | |
370 io_schema = self._tool.schemaDefs[schema_type] | |
371 return io_schema | |
372 | |
373 def input_instances(self): | |
374 return [_outer_field_to_input_instance(_) for _ in self.input_fields()] | |
375 | |
376 def output_instances(self): | |
377 outputs_schema = self._eval_schema(self._tool.outputs_record_schema) | |
378 if outputs_schema["type"] != "record": | |
379 raise Exception("Unhandled CWL tool output structure") | |
380 | |
381 rval = [] | |
382 for output in outputs_schema["fields"]: | |
383 rval.append(_simple_field_to_output(output)) | |
384 | |
385 return rval | |
386 | |
387 def docker_identifier(self): | |
388 for hint in self.hints_or_requirements_of_class("DockerRequirement"): | |
389 if "dockerImageId" in hint: | |
390 return hint["dockerImageId"] | |
391 else: | |
392 return hint["dockerPull"] | |
393 | |
394 return None | |
395 | |
396 def hints_or_requirements_of_class(self, class_name): | |
397 tool = self._tool.tool | |
398 reqs_and_hints = tool.get("requirements", []) + tool.get("hints", []) | |
399 for hint in reqs_and_hints: | |
400 if hint["class"] == class_name: | |
401 yield hint | |
402 | |
403 def software_requirements(self): | |
404 # Roughest imaginable pass at parsing requirements, really need to take in specs, handle | |
405 # multiple versions, etc... | |
406 tool = self._tool.tool | |
407 reqs_and_hints = tool.get("requirements", []) + tool.get("hints", []) | |
408 requirements = [] | |
409 for hint in reqs_and_hints: | |
410 if hint["class"] == "SoftwareRequirement": | |
411 packages = hint.get("packages", []) | |
412 for package in packages: | |
413 versions = package.get("version", []) | |
414 first_version = None if not versions else versions[0] | |
415 requirements.append((package["package"], first_version)) | |
416 return requirements | |
417 | |
418 @property | |
419 def requirements(self): | |
420 return getattr(self._tool, "requirements", []) | |
421 | |
422 | |
423 class ExpressionToolProxy(CommandLineToolProxy): | |
424 _class = "ExpressionTool" | |
425 | |
426 | |
427 class JobProxy(object): | |
428 | |
429 def __init__(self, tool_proxy, input_dict, output_dict, job_directory): | |
430 self._tool_proxy = tool_proxy | |
431 self._input_dict = input_dict | |
432 self._output_dict = output_dict | |
433 self._job_directory = job_directory | |
434 | |
435 self._final_output = None | |
436 self._ok = True | |
437 self._cwl_job = None | |
438 self._is_command_line_job = None | |
439 | |
440 self._normalize_job() | |
441 | |
442 def cwl_job(self): | |
443 self._ensure_cwl_job_initialized() | |
444 return self._cwl_job | |
445 | |
446 @property | |
447 def is_command_line_job(self): | |
448 self._ensure_cwl_job_initialized() | |
449 assert self._is_command_line_job is not None | |
450 return self._is_command_line_job | |
451 | |
452 def _ensure_cwl_job_initialized(self): | |
453 if self._cwl_job is None: | |
454 job_args = dict( | |
455 basedir=self._job_directory, | |
456 select_resources=self._select_resources, | |
457 outdir=os.path.join(self._job_directory, "working"), | |
458 tmpdir=os.path.join(self._job_directory, "cwltmp"), | |
459 stagedir=os.path.join(self._job_directory, "cwlstagedir"), | |
460 use_container=False, | |
461 beta_relaxed_fmt_check=beta_relaxed_fmt_check, | |
462 ) | |
463 | |
464 args = [] | |
465 kwargs = {} | |
466 if RuntimeContext is not None: | |
467 args.append(RuntimeContext(job_args)) | |
468 else: | |
469 kwargs = job_args | |
470 self._cwl_job = next(self._tool_proxy._tool.job( | |
471 self._input_dict, | |
472 self._output_callback, | |
473 *args, **kwargs | |
474 )) | |
475 self._is_command_line_job = hasattr(self._cwl_job, "command_line") | |
476 | |
477 def _normalize_job(self): | |
478 # Somehow reuse whatever causes validate in cwltool... maybe? | |
479 def pathToLoc(p): | |
480 if "location" not in p and "path" in p: | |
481 p["location"] = p["path"] | |
482 del p["path"] | |
483 | |
484 runtime_context = RuntimeContext({}) | |
485 make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) | |
486 fs_access = make_fs_access(runtime_context.basedir) | |
487 process.fill_in_defaults(self._tool_proxy._tool.tool["inputs"], self._input_dict, fs_access) | |
488 process.visit_class(self._input_dict, ("File", "Directory"), pathToLoc) | |
489 # TODO: Why doesn't fillInDefault fill in locations instead of paths? | |
490 process.normalizeFilesDirs(self._input_dict) | |
491 # TODO: validate like cwltool process _init_job. | |
492 # validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job, | |
493 # strict=False, logger=_logger_validation_warnings) | |
494 | |
495 def rewrite_inputs_for_staging(self): | |
496 if hasattr(self._cwl_job, "pathmapper"): | |
497 pass | |
498 # DO SOMETHING LIKE THE FOLLOWING? | |
499 # path_rewrites = {} | |
500 # for f, p in self._cwl_job.pathmapper.items(): | |
501 # if not p.staged: | |
502 # continue | |
503 # if p.type in ("File", "Directory"): | |
504 # path_rewrites[p.resolved] = p.target | |
505 # for key, value in self._input_dict.items(): | |
506 # if key in path_rewrites: | |
507 # self._input_dict[key]["location"] = path_rewrites[value] | |
508 else: | |
509 stagedir = os.path.join(self._job_directory, "cwlstagedir") | |
510 safe_makedirs(stagedir) | |
511 | |
512 def stage_recursive(value): | |
513 is_list = isinstance(value, list) | |
514 is_dict = isinstance(value, dict) | |
515 log.info("handling value %s, is_list %s, is_dict %s" % (value, is_list, is_dict)) | |
516 if is_list: | |
517 for val in value: | |
518 stage_recursive(val) | |
519 elif is_dict: | |
520 if "location" in value and "basename" in value: | |
521 location = value["location"] | |
522 basename = value["basename"] | |
523 if not location.endswith(basename): # TODO: sep()[-1] | |
524 staged_loc = os.path.join(stagedir, basename) | |
525 if not os.path.exists(staged_loc): | |
526 os.symlink(location, staged_loc) | |
527 value["location"] = staged_loc | |
528 for key, dict_value in value.items(): | |
529 stage_recursive(dict_value) | |
530 else: | |
531 log.info("skipping simple value...") | |
532 stage_recursive(self._input_dict) | |
533 | |
534 def _select_resources(self, request, runtime_context=None): | |
535 new_request = request.copy() | |
536 new_request["cores"] = "$GALAXY_SLOTS" | |
537 return new_request | |
538 | |
539 @property | |
540 def command_line(self): | |
541 if self.is_command_line_job: | |
542 return self.cwl_job().command_line | |
543 else: | |
544 return ["true"] | |
545 | |
546 @property | |
547 def stdin(self): | |
548 if self.is_command_line_job: | |
549 return self.cwl_job().stdin | |
550 else: | |
551 return None | |
552 | |
553 @property | |
554 def stdout(self): | |
555 if self.is_command_line_job: | |
556 return self.cwl_job().stdout | |
557 else: | |
558 return None | |
559 | |
560 @property | |
561 def stderr(self): | |
562 if self.is_command_line_job: | |
563 return self.cwl_job().stderr | |
564 else: | |
565 return None | |
566 | |
567 @property | |
568 def environment(self): | |
569 if self.is_command_line_job: | |
570 return self.cwl_job().environment | |
571 else: | |
572 return {} | |
573 | |
574 @property | |
575 def generate_files(self): | |
576 if self.is_command_line_job: | |
577 return self.cwl_job().generatefiles | |
578 else: | |
579 return {} | |
580 | |
581 def _output_callback(self, out, process_status): | |
582 self._process_status = process_status | |
583 if process_status == "success": | |
584 self._final_output = out | |
585 else: | |
586 self._ok = False | |
587 | |
588 log.info("Output are %s, status is %s" % (out, process_status)) | |
589 | |
590 def collect_outputs(self, tool_working_directory, rcode): | |
591 if not self.is_command_line_job: | |
592 cwl_job = self.cwl_job() | |
593 if RuntimeContext is not None: | |
594 cwl_job.run( | |
595 RuntimeContext({}) | |
596 ) | |
597 else: | |
598 cwl_job.run() | |
599 if not self._ok: | |
600 raise Exception("Final process state not ok, [%s]" % self._process_status) | |
601 return self._final_output | |
602 else: | |
603 return self.cwl_job().collect_outputs(tool_working_directory, rcode) | |
604 | |
605 def save_job(self): | |
606 job_file = JobProxy._job_file(self._job_directory) | |
607 job_objects = { | |
608 # "tool_path": os.path.abspath(self._tool_proxy._tool_path), | |
609 "tool_representation": self._tool_proxy.to_persistent_representation(), | |
610 "job_inputs": self._input_dict, | |
611 "output_dict": self._output_dict, | |
612 } | |
613 json.dump(job_objects, open(job_file, "w")) | |
614 | |
615 def _output_extra_files_dir(self, output_name): | |
616 output_id = self.output_id(output_name) | |
617 return os.path.join(self._job_directory, "dataset_%s_files" % output_id) | |
618 | |
619 def output_id(self, output_name): | |
620 output_id = self._output_dict[output_name]["id"] | |
621 return output_id | |
622 | |
623 def output_path(self, output_name): | |
624 output_id = self._output_dict[output_name]["path"] | |
625 return output_id | |
626 | |
627 def output_directory_contents_dir(self, output_name, create=False): | |
628 extra_files_dir = self._output_extra_files_dir(output_name) | |
629 return extra_files_dir | |
630 | |
631 def output_secondary_files_dir(self, output_name, create=False): | |
632 extra_files_dir = self._output_extra_files_dir(output_name) | |
633 secondary_files_dir = os.path.join(extra_files_dir, SECONDARY_FILES_EXTRA_PREFIX) | |
634 if create and not os.path.exists(secondary_files_dir): | |
635 safe_makedirs(secondary_files_dir) | |
636 return secondary_files_dir | |
637 | |
638 def stage_files(self): | |
639 cwl_job = self.cwl_job() | |
640 | |
641 def stageFunc(resolved_path, target_path): | |
642 log.info("resolving %s to %s" % (resolved_path, target_path)) | |
643 try: | |
644 os.symlink(resolved_path, target_path) | |
645 except OSError: | |
646 pass | |
647 | |
648 if hasattr(cwl_job, "pathmapper"): | |
649 process.stage_files(cwl_job.pathmapper, stageFunc, ignore_writable=True, symlink=False) | |
650 | |
651 if hasattr(cwl_job, "generatefiles"): | |
652 outdir = os.path.join(self._job_directory, "working") | |
653 # TODO: Why doesn't cwl_job.generatemapper work? | |
654 generate_mapper = pathmapper.PathMapper(cwl_job.generatefiles["listing"], | |
655 outdir, outdir, separateDirs=False) | |
656 # TODO: figure out what inplace_update should be. | |
657 inplace_update = getattr(cwl_job, "inplace_update") | |
658 process.stage_files(generate_mapper, stageFunc, ignore_writable=inplace_update, symlink=False) | |
659 relink_initialworkdir(generate_mapper, outdir, outdir, inplace_update=inplace_update) | |
660 # else: expression tools do not have a path mapper. | |
661 | |
662 @staticmethod | |
663 def _job_file(job_directory): | |
664 return os.path.join(job_directory, JOB_JSON_FILE) | |
665 | |
666 | |
667 class WorkflowProxy(object): | |
668 | |
669 def __init__(self, workflow, workflow_path=None): | |
670 self._workflow = workflow | |
671 self._workflow_path = workflow_path | |
672 self._step_proxies = None | |
673 | |
674 @property | |
675 def cwl_id(self): | |
676 return self._workflow.tool["id"] | |
677 | |
678 def get_outputs_for_label(self, label): | |
679 outputs = [] | |
680 for output in self._workflow.tool['outputs']: | |
681 step, output_name = split_step_references( | |
682 output["outputSource"], | |
683 multiple=False, | |
684 workflow_id=self.cwl_id, | |
685 ) | |
686 if step == label: | |
687 output_id = output["id"] | |
688 if "#" not in self.cwl_id: | |
689 _, output_label = output_id.rsplit("#", 1) | |
690 else: | |
691 _, output_label = output_id.rsplit("/", 1) | |
692 | |
693 outputs.append({ | |
694 "output_name": output_name, | |
695 "label": output_label, | |
696 }) | |
697 return outputs | |
698 | |
699 def tool_reference_proxies(self): | |
700 """Fetch tool source definitions for all referenced tools.""" | |
701 references = [] | |
702 for step in self.step_proxies(): | |
703 references.extend(step.tool_reference_proxies()) | |
704 return references | |
705 | |
706 def step_proxies(self): | |
707 if self._step_proxies is None: | |
708 proxies = [] | |
709 num_input_steps = len(self._workflow.tool['inputs']) | |
710 for i, step in enumerate(self._workflow.steps): | |
711 proxies.append(build_step_proxy(self, step, i + num_input_steps)) | |
712 self._step_proxies = proxies | |
713 return self._step_proxies | |
714 | |
715 @property | |
716 def runnables(self): | |
717 runnables = [] | |
718 for step in self._workflow.steps: | |
719 if "run" in step.tool: | |
720 runnables.append(step.tool["run"]) | |
721 return runnables | |
722 | |
723 def cwl_ids_to_index(self, step_proxies): | |
724 index = 0 | |
725 cwl_ids_to_index = {} | |
726 for i, input_dict in enumerate(self._workflow.tool['inputs']): | |
727 cwl_ids_to_index[input_dict["id"]] = index | |
728 index += 1 | |
729 | |
730 for step_proxy in step_proxies: | |
731 cwl_ids_to_index[step_proxy.cwl_id] = index | |
732 index += 1 | |
733 | |
734 return cwl_ids_to_index | |
735 | |
736 @property | |
737 def output_labels(self): | |
738 return [self.jsonld_id_to_label(o['id']) for o in self._workflow.tool['outputs']] | |
739 | |
740 def input_connections_by_step(self, step_proxies): | |
741 cwl_ids_to_index = self.cwl_ids_to_index(step_proxies) | |
742 input_connections_by_step = [] | |
743 for step_proxy in step_proxies: | |
744 input_connections_step = {} | |
745 for input_proxy in step_proxy.input_proxies: | |
746 cwl_source_id = input_proxy.cwl_source_id | |
747 input_name = input_proxy.input_name | |
748 # Consider only allow multiple if MultipleInputFeatureRequirement is enabled | |
749 for (output_step_name, output_name) in split_step_references(cwl_source_id, workflow_id=self.cwl_id): | |
750 if "#" in self.cwl_id: | |
751 sep_on = "/" | |
752 else: | |
753 sep_on = "#" | |
754 output_step_id = self.cwl_id + sep_on + output_step_name | |
755 | |
756 if output_step_id not in cwl_ids_to_index: | |
757 template = "Output [%s] does not appear in ID-to-index map [%s]." | |
758 msg = template % (output_step_id, cwl_ids_to_index.keys()) | |
759 raise AssertionError(msg) | |
760 | |
761 if input_name not in input_connections_step: | |
762 input_connections_step[input_name] = [] | |
763 | |
764 input_connections_step[input_name].append({ | |
765 "id": cwl_ids_to_index[output_step_id], | |
766 "output_name": output_name, | |
767 "input_type": "dataset" | |
768 }) | |
769 | |
770 input_connections_by_step.append(input_connections_step) | |
771 | |
772 return input_connections_by_step | |
773 | |
774 def to_dict(self): | |
775 name = os.path.basename(self._workflow.tool.get('label') or self._workflow_path or 'TODO - derive a name from ID') | |
776 steps = {} | |
777 | |
778 step_proxies = self.step_proxies() | |
779 input_connections_by_step = self.input_connections_by_step(step_proxies) | |
780 index = 0 | |
781 for i, input_dict in enumerate(self._workflow.tool['inputs']): | |
782 steps[index] = self.cwl_input_to_galaxy_step(input_dict, i) | |
783 index += 1 | |
784 | |
785 for i, step_proxy in enumerate(step_proxies): | |
786 input_connections = input_connections_by_step[i] | |
787 steps[index] = step_proxy.to_dict(input_connections) | |
788 index += 1 | |
789 | |
790 return { | |
791 'name': name, | |
792 'steps': steps, | |
793 'annotation': self.cwl_object_to_annotation(self._workflow.tool), | |
794 } | |
795 | |
796 def find_inputs_step_index(self, label): | |
797 for i, input in enumerate(self._workflow.tool['inputs']): | |
798 if self.jsonld_id_to_label(input["id"]) == label: | |
799 return i | |
800 | |
801 raise Exception("Failed to find index for label %s" % label) | |
802 | |
803 def jsonld_id_to_label(self, id): | |
804 if "#" in self.cwl_id: | |
805 return id.rsplit("/", 1)[-1] | |
806 else: | |
807 return id.rsplit("#", 1)[-1] | |
808 | |
809 def cwl_input_to_galaxy_step(self, input, i): | |
810 input_type = input["type"] | |
811 label = self.jsonld_id_to_label(input["id"]) | |
812 input_as_dict = { | |
813 "id": i, | |
814 "label": label, | |
815 "position": {"left": 0, "top": 0}, | |
816 "annotation": self.cwl_object_to_annotation(input), | |
817 "input_connections": {}, # Should the Galaxy API really require this? - Seems to. | |
818 "workflow_outputs": self.get_outputs_for_label(label), | |
819 } | |
820 | |
821 if input_type == "File" and "default" not in input: | |
822 input_as_dict["type"] = "data_input" | |
823 elif isinstance(input_type, dict) and input_type.get("type") == "array": | |
824 input_as_dict["type"] = "data_collection_input" | |
825 input_as_dict["collection_type"] = "list" | |
826 elif isinstance(input_type, dict) and input_type.get("type") == "record": | |
827 input_as_dict["type"] = "data_collection_input" | |
828 input_as_dict["collection_type"] = "record" | |
829 else: | |
830 if USE_STEP_PARAMETERS: | |
831 input_as_dict["type"] = "parameter_input" | |
832 # TODO: dispatch on actual type so this doesn't always need | |
833 # to be field - simpler types could be simpler inputs. | |
834 tool_state = {} | |
835 tool_state["parameter_type"] = "field" | |
836 default_value = input.get("default") | |
837 optional = False | |
838 if isinstance(input_type, list) and "null" in input_type: | |
839 optional = True | |
840 if not optional and isinstance(input_type, dict) and "type" in input_type: | |
841 assert False | |
842 tool_state["default_value"] = {"src": "json", "value": default_value} | |
843 tool_state["optional"] = optional | |
844 input_as_dict["tool_state"] = tool_state | |
845 else: | |
846 input_as_dict["type"] = "data_input" | |
847 # TODO: format = expression.json | |
848 | |
849 return input_as_dict | |
850 | |
851 def cwl_object_to_annotation(self, cwl_obj): | |
852 return cwl_obj.get("doc", None) | |
853 | |
854 | |
855 def split_step_references(step_references, workflow_id=None, multiple=True): | |
856 """Split a CWL step input or output reference into step id and name.""" | |
857 # Trim off the workflow id part of the reference. | |
858 step_references = listify(step_references) | |
859 split_references = [] | |
860 | |
861 for step_reference in step_references: | |
862 if workflow_id is None: | |
863 # This path works fine for some simple workflows - but not so much | |
864 # for subworkflows (maybe same for $graph workflows?) | |
865 assert "#" in step_reference | |
866 _, step_reference = step_reference.split("#", 1) | |
867 else: | |
868 if "#" in workflow_id: | |
869 sep_on = "/" | |
870 else: | |
871 sep_on = "#" | |
872 expected_prefix = workflow_id + sep_on | |
873 if not step_reference.startswith(expected_prefix): | |
874 raise AssertionError("step_reference [%s] doesn't start with %s" % (step_reference, expected_prefix)) | |
875 step_reference = step_reference[len(expected_prefix):] | |
876 | |
877 # Now just grab the step name and input/output name. | |
878 assert "#" not in step_reference | |
879 if "/" in step_reference: | |
880 step_name, io_name = step_reference.split("/", 1) | |
881 else: | |
882 # Referencing an input, not a step. | |
883 # In Galaxy workflows input steps have an implicit output named | |
884 # "output" for consistency with tools - in cwl land | |
885 # just the input name is referenced. | |
886 step_name = step_reference | |
887 io_name = "output" | |
888 split_references.append((step_name, io_name)) | |
889 | |
890 if multiple: | |
891 return split_references | |
892 else: | |
893 assert len(split_references) == 1 | |
894 return split_references[0] | |
895 | |
896 | |
897 def build_step_proxy(workflow_proxy, step, index): | |
898 step_type = step.embedded_tool.tool["class"] | |
899 if step_type == "Workflow": | |
900 return SubworkflowStepProxy(workflow_proxy, step, index) | |
901 else: | |
902 return ToolStepProxy(workflow_proxy, step, index) | |
903 | |
904 | |
905 class BaseStepProxy(object): | |
906 | |
907 def __init__(self, workflow_proxy, step, index): | |
908 self._workflow_proxy = workflow_proxy | |
909 self._step = step | |
910 self._index = index | |
911 self._uuid = str(uuid4()) | |
912 self._input_proxies = None | |
913 | |
914 @property | |
915 def step_class(self): | |
916 return self.cwl_tool_object.tool["class"] | |
917 | |
918 @property | |
919 def cwl_id(self): | |
920 return self._step.id | |
921 | |
922 @property | |
923 def cwl_workflow_id(self): | |
924 return self._workflow_proxy.cwl_id | |
925 | |
926 @property | |
927 def requirements(self): | |
928 return self._step.requirements | |
929 | |
930 @property | |
931 def hints(self): | |
932 return self._step.hints | |
933 | |
934 @property | |
935 def label(self): | |
936 label = self._workflow_proxy.jsonld_id_to_label(self._step.id) | |
937 return label | |
938 | |
939 def galaxy_workflow_outputs_list(self): | |
940 return self._workflow_proxy.get_outputs_for_label(self.label) | |
941 | |
942 @property | |
943 def cwl_tool_object(self): | |
944 return self._step.embedded_tool | |
945 | |
946 @property | |
947 def input_proxies(self): | |
948 if self._input_proxies is None: | |
949 input_proxies = [] | |
950 cwl_inputs = self._step.tool["inputs"] | |
951 for cwl_input in cwl_inputs: | |
952 input_proxies.append(InputProxy(self, cwl_input)) | |
953 self._input_proxies = input_proxies | |
954 return self._input_proxies | |
955 | |
956 def inputs_to_dicts(self): | |
957 inputs_as_dicts = [] | |
958 for input_proxy in self.input_proxies: | |
959 inputs_as_dicts.append(input_proxy.to_dict()) | |
960 return inputs_as_dicts | |
961 | |
962 | |
963 class InputProxy(object): | |
964 | |
965 def __init__(self, step_proxy, cwl_input): | |
966 self._cwl_input = cwl_input | |
967 self.step_proxy = step_proxy | |
968 self.workflow_proxy = step_proxy._workflow_proxy | |
969 | |
970 cwl_input_id = cwl_input["id"] | |
971 cwl_source_id = cwl_input.get("source", None) | |
972 if cwl_source_id is None: | |
973 if "valueFrom" not in cwl_input and "default" not in cwl_input: | |
974 msg = "Workflow step input must define a source, a valueFrom, or a default value. Obtained [%s]." % cwl_input | |
975 raise MessageException(msg) | |
976 | |
977 assert cwl_input_id | |
978 step_name, input_name = split_step_references( | |
979 cwl_input_id, | |
980 multiple=False, | |
981 workflow_id=step_proxy.cwl_workflow_id | |
982 ) | |
983 self.step_name = step_name | |
984 self.input_name = input_name | |
985 | |
986 self.cwl_input_id = cwl_input_id | |
987 self.cwl_source_id = cwl_source_id | |
988 | |
989 scatter_inputs = [split_step_references( | |
990 i, multiple=False, workflow_id=step_proxy.cwl_workflow_id | |
991 )[1] for i in listify(step_proxy._step.tool.get("scatter", []))] | |
992 scatter = self.input_name in scatter_inputs | |
993 self.scatter = scatter | |
994 | |
995 def to_dict(self): | |
996 as_dict = { | |
997 "name": self.input_name, | |
998 } | |
999 if "linkMerge" in self._cwl_input: | |
1000 as_dict["merge_type"] = self._cwl_input["linkMerge"] | |
1001 if "scatterMethod" in self.step_proxy._step.tool: | |
1002 as_dict["scatter_type"] = self.step_proxy._step.tool.get("scatterMethod", "dotproduct") | |
1003 else: | |
1004 as_dict["scatter_type"] = "dotproduct" if self.scatter else "disabled" | |
1005 if "valueFrom" in self._cwl_input: | |
1006 # TODO: Add a table for expressions - mark the type as CWL 1.0 JavaScript. | |
1007 as_dict["value_from"] = self._cwl_input["valueFrom"] | |
1008 if "default" in self._cwl_input: | |
1009 as_dict["default"] = self._cwl_input["default"] | |
1010 return as_dict | |
1011 | |
1012 | |
1013 class ToolStepProxy(BaseStepProxy): | |
1014 | |
1015 def __init__(self, workflow_proxy, step, index): | |
1016 super(ToolStepProxy, self).__init__(workflow_proxy, step, index) | |
1017 self._tool_proxy = None | |
1018 | |
1019 @property | |
1020 def tool_proxy(self): | |
1021 # Neeeds to be cached so UUID that is loaded matches UUID generated with to_dict. | |
1022 if self._tool_proxy is None: | |
1023 self._tool_proxy = _cwl_tool_object_to_proxy(self.cwl_tool_object, uuid=str(uuid4())) | |
1024 return self._tool_proxy | |
1025 | |
1026 def tool_reference_proxies(self): | |
1027 # Return a list so we can handle subworkflows recursively. | |
1028 return [self.tool_proxy] | |
1029 | |
1030 def to_dict(self, input_connections): | |
1031 # We need to stub out null entries for things getting replaced by | |
1032 # connections. This doesn't seem ideal - consider just making Galaxy | |
1033 # handle this. | |
1034 tool_state = {} | |
1035 for input_name in input_connections.keys(): | |
1036 tool_state[input_name] = None | |
1037 | |
1038 outputs = self.galaxy_workflow_outputs_list() | |
1039 return { | |
1040 "id": self._index, | |
1041 "tool_uuid": self.tool_proxy._uuid, # TODO: make sure this is respected... | |
1042 "label": self.label, | |
1043 "position": {"left": 0, "top": 0}, | |
1044 "type": "tool", | |
1045 "annotation": self._workflow_proxy.cwl_object_to_annotation(self._step.tool), | |
1046 "input_connections": input_connections, | |
1047 "inputs": self.inputs_to_dicts(), | |
1048 "workflow_outputs": outputs, | |
1049 } | |
1050 | |
1051 | |
1052 class SubworkflowStepProxy(BaseStepProxy): | |
1053 | |
1054 def __init__(self, workflow_proxy, step, index): | |
1055 super(SubworkflowStepProxy, self).__init__(workflow_proxy, step, index) | |
1056 self._subworkflow_proxy = None | |
1057 | |
1058 def to_dict(self, input_connections): | |
1059 outputs = self.galaxy_workflow_outputs_list() | |
1060 for key, input_connection_list in input_connections.items(): | |
1061 input_subworkflow_step_id = self.subworkflow_proxy.find_inputs_step_index( | |
1062 key | |
1063 ) | |
1064 for input_connection in input_connection_list: | |
1065 input_connection["input_subworkflow_step_id"] = input_subworkflow_step_id | |
1066 | |
1067 return { | |
1068 "id": self._index, | |
1069 "label": self.label, | |
1070 "position": {"left": 0, "top": 0}, | |
1071 "type": "subworkflow", | |
1072 "subworkflow": self.subworkflow_proxy.to_dict(), | |
1073 "annotation": self.subworkflow_proxy.cwl_object_to_annotation(self._step.tool), | |
1074 "input_connections": input_connections, | |
1075 "inputs": self.inputs_to_dicts(), | |
1076 "workflow_outputs": outputs, | |
1077 } | |
1078 | |
1079 def tool_reference_proxies(self): | |
1080 return self.subworkflow_proxy.tool_reference_proxies() | |
1081 | |
1082 @property | |
1083 def subworkflow_proxy(self): | |
1084 if self._subworkflow_proxy is None: | |
1085 self._subworkflow_proxy = WorkflowProxy(self.cwl_tool_object) | |
1086 return self._subworkflow_proxy | |
1087 | |
1088 | |
1089 def remove_pickle_problems(obj): | |
1090 """doc_loader does not pickle correctly""" | |
1091 if hasattr(obj, "doc_loader"): | |
1092 obj.doc_loader = None | |
1093 if hasattr(obj, "embedded_tool"): | |
1094 obj.embedded_tool = remove_pickle_problems(obj.embedded_tool) | |
1095 if hasattr(obj, "steps"): | |
1096 obj.steps = [remove_pickle_problems(s) for s in obj.steps] | |
1097 return obj | |
1098 | |
1099 | |
1100 @six.add_metaclass(ABCMeta) | |
1101 class WorkflowToolReference(object): | |
1102 pass | |
1103 | |
1104 | |
1105 class EmbeddedWorkflowToolReference(WorkflowToolReference): | |
1106 pass | |
1107 | |
1108 | |
1109 class ExternalWorkflowToolReference(WorkflowToolReference): | |
1110 pass | |
1111 | |
1112 | |
1113 def _outer_field_to_input_instance(field): | |
1114 field_type = field_to_field_type(field) # Must be a list if in here? | |
1115 if not isinstance(field_type, list): | |
1116 field_type = [field_type] | |
1117 | |
1118 name, label, description = _field_metadata(field) | |
1119 | |
1120 case_name = "_cwl__type_" | |
1121 case_label = "Specify Parameter %s As" % label | |
1122 | |
1123 def value_input(type_description): | |
1124 value_name = "_cwl__value_" | |
1125 value_label = label | |
1126 value_description = description | |
1127 return InputInstance( | |
1128 value_name, | |
1129 value_label, | |
1130 value_description, | |
1131 input_type=type_description.galaxy_param_type, | |
1132 collection_type=type_description.collection_type, | |
1133 ) | |
1134 | |
1135 select_options = [] | |
1136 case_options = [] | |
1137 type_descriptions = type_descriptions_for_field_types(field_type) | |
1138 for type_description in type_descriptions: | |
1139 select_options.append({"value": type_description.name, "label": type_description.label}) | |
1140 input_instances = [] | |
1141 if type_description.uses_param(): | |
1142 input_instances.append(value_input(type_description)) | |
1143 case_options.append((type_description.name, input_instances)) | |
1144 | |
1145 # If there is more than one way to represent this parameter - produce a conditional | |
1146 # requesting user to ask for what form they want to submit the data in, else just map | |
1147 # a simple Galaxy parameter. | |
1148 if len(case_options) > 1 and not USE_FIELD_TYPES: | |
1149 case_input = SelectInputInstance( | |
1150 name=case_name, | |
1151 label=case_label, | |
1152 description=False, | |
1153 options=select_options, | |
1154 ) | |
1155 | |
1156 return ConditionalInstance(name, case_input, case_options) | |
1157 else: | |
1158 if len(case_options) > 1: | |
1159 only_type_description = FIELD_TYPE_REPRESENTATION | |
1160 else: | |
1161 only_type_description = type_descriptions[0] | |
1162 | |
1163 return InputInstance( | |
1164 name, label, description, input_type=only_type_description.galaxy_param_type, collection_type=only_type_description.collection_type | |
1165 ) | |
1166 | |
1167 # Older array to repeat handling, now we are just representing arrays as | |
1168 # dataset collections - we should offer a blended approach in the future. | |
1169 # if field_type in simple_map_type_map.keys(): | |
1170 # input_type = simple_map_type_map[field_type] | |
1171 # return {"input_type": input_type, "array": False} | |
1172 # elif field_type == "array": | |
1173 # if isinstance(field["type"], dict): | |
1174 # array_type = field["type"]["items"] | |
1175 # else: | |
1176 # array_type = field["items"] | |
1177 # if array_type in simple_map_type_map.keys(): | |
1178 # input_type = simple_map_type_map[array_type] | |
1179 # return {"input_type": input_type, "array": True} | |
1180 # else: | |
1181 # raise Exception("Unhandled simple field type encountered - [%s]." % field_type) | |
1182 | |
1183 | |
1184 def _field_metadata(field): | |
1185 name = field["name"] | |
1186 label = field.get("label", None) | |
1187 description = field.get("doc", None) | |
1188 return name, label, description | |
1189 | |
1190 | |
1191 def _simple_field_to_output(field): | |
1192 name = field["name"] | |
1193 output_data_class = field["type"] | |
1194 output_instance = OutputInstance( | |
1195 name, | |
1196 output_data_type=output_data_class, | |
1197 output_type=OUTPUT_TYPE.GLOB | |
1198 ) | |
1199 return output_instance | |
1200 | |
1201 | |
1202 class ConditionalInstance(object): | |
1203 | |
1204 def __init__(self, name, case, whens): | |
1205 self.input_type = INPUT_TYPE.CONDITIONAL | |
1206 self.name = name | |
1207 self.case = case | |
1208 self.whens = whens | |
1209 | |
1210 def to_dict(self): | |
1211 | |
1212 as_dict = dict( | |
1213 name=self.name, | |
1214 type=INPUT_TYPE.CONDITIONAL, | |
1215 test=self.case.to_dict(), | |
1216 when=OrderedDict(), | |
1217 ) | |
1218 for value, block in self.whens: | |
1219 as_dict["when"][value] = [i.to_dict() for i in block] | |
1220 | |
1221 return as_dict | |
1222 | |
1223 | |
1224 class SelectInputInstance(object): | |
1225 | |
1226 def __init__(self, name, label, description, options): | |
1227 self.input_type = INPUT_TYPE.SELECT | |
1228 self.name = name | |
1229 self.label = label | |
1230 self.description = description | |
1231 self.options = options | |
1232 | |
1233 def to_dict(self): | |
1234 # TODO: serialize options... | |
1235 as_dict = dict( | |
1236 name=self.name, | |
1237 label=self.label or self.name, | |
1238 help=self.description, | |
1239 type=self.input_type, | |
1240 options=self.options, | |
1241 ) | |
1242 return as_dict | |
1243 | |
1244 | |
1245 class InputInstance(object): | |
1246 | |
1247 def __init__(self, name, label, description, input_type, array=False, area=False, collection_type=None): | |
1248 self.input_type = input_type | |
1249 self.collection_type = collection_type | |
1250 self.name = name | |
1251 self.label = label | |
1252 self.description = description | |
1253 self.required = True | |
1254 self.array = array | |
1255 self.area = area | |
1256 | |
1257 def to_dict(self, itemwise=True): | |
1258 if itemwise and self.array: | |
1259 as_dict = dict( | |
1260 type="repeat", | |
1261 name="%s_repeat" % self.name, | |
1262 title="%s" % self.name, | |
1263 blocks=[ | |
1264 self.to_dict(itemwise=False) | |
1265 ] | |
1266 ) | |
1267 else: | |
1268 as_dict = dict( | |
1269 name=self.name, | |
1270 label=self.label or self.name, | |
1271 help=self.description, | |
1272 type=self.input_type, | |
1273 optional=not self.required, | |
1274 ) | |
1275 if self.area: | |
1276 as_dict["area"] = True | |
1277 | |
1278 if self.input_type == INPUT_TYPE.INTEGER: | |
1279 as_dict["value"] = "0" | |
1280 if self.input_type == INPUT_TYPE.FLOAT: | |
1281 as_dict["value"] = "0.0" | |
1282 elif self.input_type == INPUT_TYPE.DATA_COLLECTON: | |
1283 as_dict["collection_type"] = self.collection_type | |
1284 | |
1285 return as_dict | |
1286 | |
1287 | |
1288 OUTPUT_TYPE = Bunch( | |
1289 GLOB="glob", | |
1290 STDOUT="stdout", | |
1291 ) | |
1292 | |
1293 | |
1294 # TODO: Different subclasses - this is representing different types of things. | |
1295 class OutputInstance(object): | |
1296 | |
1297 def __init__(self, name, output_data_type, output_type, path=None, fields=None): | |
1298 self.name = name | |
1299 self.output_data_type = output_data_type | |
1300 self.output_type = output_type | |
1301 self.path = path | |
1302 self.fields = fields | |
1303 | |
1304 | |
1305 __all__ = ( | |
1306 'tool_proxy', | |
1307 'load_job_proxy', | |
1308 ) |