Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/main.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 (2021-03-22) |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 #!/usr/bin/env python3 | |
2 # PYTHON_ARGCOMPLETE_OK | |
3 """Entry point for cwltool.""" | |
4 | |
5 import argparse | |
6 import functools | |
7 import io | |
8 import logging | |
9 import os | |
10 import signal | |
11 import subprocess # nosec | |
12 import sys | |
13 import time | |
14 import urllib | |
15 from codecs import StreamWriter, getwriter | |
16 from collections.abc import MutableMapping, MutableSequence | |
17 from typing import ( | |
18 IO, | |
19 Any, | |
20 Callable, | |
21 Dict, | |
22 List, | |
23 Mapping, | |
24 MutableMapping, | |
25 MutableSequence, | |
26 Optional, | |
27 Sized, | |
28 TextIO, | |
29 Tuple, | |
30 Union, | |
31 cast, | |
32 ) | |
33 | |
34 import argcomplete | |
35 import coloredlogs | |
36 import pkg_resources # part of setuptools | |
37 from ruamel import yaml | |
38 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
39 from schema_salad.exceptions import ValidationException | |
40 from schema_salad.ref_resolver import Loader, file_uri, uri_file_path | |
41 from schema_salad.sourceline import strip_dup_lineno | |
42 from schema_salad.utils import ContextType, FetcherCallableType, json_dumps | |
43 | |
44 from . import CWL_CONTENT_TYPES, workflow | |
45 from .argparser import arg_parser, generate_parser, get_default_args | |
46 from .builder import HasReqsHints | |
47 from .context import LoadingContext, RuntimeContext, getdefault | |
48 from .cwlrdf import printdot, printrdf | |
49 from .errors import UnsupportedRequirement, WorkflowException | |
50 from .executors import JobExecutor, MultithreadedJobExecutor, SingleJobExecutor | |
51 from .load_tool import ( | |
52 default_loader, | |
53 fetch_document, | |
54 jobloaderctx, | |
55 load_overrides, | |
56 make_tool, | |
57 resolve_and_validate_document, | |
58 resolve_overrides, | |
59 resolve_tool_uri, | |
60 ) | |
61 from .loghandler import _logger, defaultStreamHandler | |
62 from .mpi import MpiConfig | |
63 from .mutation import MutationManager | |
64 from .pack import pack | |
65 from .process import ( | |
66 CWL_IANA, | |
67 Process, | |
68 add_sizes, | |
69 scandeps, | |
70 shortname, | |
71 use_custom_schema, | |
72 use_standard_schema, | |
73 ) | |
74 from .procgenerator import ProcessGenerator | |
75 from .provenance import ResearchObject | |
76 from .resolver import ga4gh_tool_registries, tool_resolver | |
77 from .secrets import SecretStore | |
78 from .software_requirements import ( | |
79 DependenciesConfiguration, | |
80 get_container_from_software_requirements, | |
81 ) | |
82 from .stdfsaccess import StdFsAccess | |
83 from .subgraph import get_step, get_subgraph | |
84 from .update import ALLUPDATES, UPDATES | |
85 from .utils import ( | |
86 DEFAULT_TMP_PREFIX, | |
87 CWLObjectType, | |
88 CWLOutputAtomType, | |
89 CWLOutputType, | |
90 adjustDirObjs, | |
91 normalizeFilesDirs, | |
92 onWindows, | |
93 processes_to_kill, | |
94 trim_listing, | |
95 versionstring, | |
96 visit_class, | |
97 windows_default_container_id, | |
98 ) | |
99 from .workflow import Workflow | |
100 | |
101 | |
102 def _terminate_processes() -> None: | |
103 """Kill all spawned processes. | |
104 | |
105 Processes to be killed must be appended to `utils.processes_to_kill` | |
106 as they are spawned. | |
107 | |
108 An important caveat: since there's no supported way to kill another | |
109 thread in Python, this function cannot stop other threads from | |
110 continuing to execute while it kills the processes that they've | |
111 spawned. This may occasionally lead to unexpected behaviour. | |
112 """ | |
113 # It's possible that another thread will spawn a new task while | |
114 # we're executing, so it's not safe to use a for loop here. | |
115 while processes_to_kill: | |
116 process = processes_to_kill.popleft() | |
117 cidfile = [ | |
118 str(arg).split("=")[1] for arg in process.args if "--cidfile" in str(arg) | |
119 ] | |
120 if cidfile: | |
121 try: | |
122 with open(cidfile[0]) as inp_stream: | |
123 p = subprocess.Popen( # nosec | |
124 ["docker", "kill", inp_stream.read()], shell=False # nosec | |
125 ) | |
126 try: | |
127 p.wait(timeout=10) | |
128 except subprocess.TimeoutExpired: | |
129 p.kill() | |
130 except FileNotFoundError: | |
131 pass | |
132 | |
133 | |
134 def _signal_handler(signum: int, _: Any) -> None: | |
135 """Kill all spawned processes and exit. | |
136 | |
137 Note that it's possible for another thread to spawn a process after | |
138 all processes have been killed, but before Python exits. | |
139 | |
140 Refer to the docstring for _terminate_processes() for other caveats. | |
141 """ | |
142 _terminate_processes() | |
143 sys.exit(signum) | |
144 | |
145 | |
146 def generate_example_input( | |
147 inptype: Optional[CWLOutputType], | |
148 default: Optional[CWLOutputType], | |
149 ) -> Tuple[Any, str]: | |
150 """Convert a single input schema into an example.""" | |
151 example = None | |
152 comment = "" | |
153 defaults = { | |
154 "null": "null", | |
155 "Any": "null", | |
156 "boolean": False, | |
157 "int": 0, | |
158 "long": 0, | |
159 "float": 0.1, | |
160 "double": 0.1, | |
161 "string": "a_string", | |
162 "File": yaml.comments.CommentedMap( | |
163 [("class", "File"), ("path", "a/file/path")] | |
164 ), | |
165 "Directory": yaml.comments.CommentedMap( | |
166 [("class", "Directory"), ("path", "a/directory/path")] | |
167 ), | |
168 } # type: CWLObjectType | |
169 if isinstance(inptype, MutableSequence): | |
170 optional = False | |
171 if "null" in inptype: | |
172 inptype.remove("null") | |
173 optional = True | |
174 if len(inptype) == 1: | |
175 example, comment = generate_example_input(inptype[0], default) | |
176 if optional: | |
177 if comment: | |
178 comment = f"{comment} (optional)" | |
179 else: | |
180 comment = "optional" | |
181 else: | |
182 example = CommentedSeq() | |
183 for index, entry in enumerate(inptype): | |
184 value, e_comment = generate_example_input(entry, default) | |
185 example.append(value) | |
186 example.yaml_add_eol_comment(e_comment, index) | |
187 if optional: | |
188 comment = "optional" | |
189 elif isinstance(inptype, Mapping) and "type" in inptype: | |
190 if inptype["type"] == "array": | |
191 first_item = cast(MutableSequence[CWLObjectType], inptype["items"])[0] | |
192 items_len = len(cast(Sized, inptype["items"])) | |
193 if items_len == 1 and "type" in first_item and first_item["type"] == "enum": | |
194 # array of just an enum then list all the options | |
195 example = first_item["symbols"] | |
196 if "name" in first_item: | |
197 comment = 'array of type "{}".'.format(first_item["name"]) | |
198 else: | |
199 value, comment = generate_example_input(inptype["items"], None) | |
200 comment = "array of " + comment | |
201 if items_len == 1: | |
202 example = [value] | |
203 else: | |
204 example = value | |
205 if default is not None: | |
206 example = default | |
207 elif inptype["type"] == "enum": | |
208 symbols = cast(List[str], inptype["symbols"]) | |
209 if default is not None: | |
210 example = default | |
211 elif "default" in inptype: | |
212 example = inptype["default"] | |
213 elif len(cast(Sized, inptype["symbols"])) == 1: | |
214 example = symbols[0] | |
215 else: | |
216 example = "{}_enum_value".format(inptype.get("name", "valid")) | |
217 comment = 'enum; valid values: "{}"'.format('", "'.join(symbols)) | |
218 elif inptype["type"] == "record": | |
219 example = yaml.comments.CommentedMap() | |
220 if "name" in inptype: | |
221 comment = '"{}" record type.'.format(inptype["name"]) | |
222 for field in cast(List[CWLObjectType], inptype["fields"]): | |
223 value, f_comment = generate_example_input(field["type"], None) | |
224 example.insert(0, shortname(cast(str, field["name"])), value, f_comment) | |
225 elif "default" in inptype: | |
226 example = inptype["default"] | |
227 comment = 'default value of type "{}".'.format(inptype["type"]) | |
228 else: | |
229 example = defaults.get(cast(str, inptype["type"]), str(inptype)) | |
230 comment = 'type "{}".'.format(inptype["type"]) | |
231 else: | |
232 if not default: | |
233 example = defaults.get(str(inptype), str(inptype)) | |
234 comment = f'type "{inptype}"' | |
235 else: | |
236 example = default | |
237 comment = f'default value of type "{inptype}".' | |
238 return example, comment | |
239 | |
240 | |
241 def realize_input_schema( | |
242 input_types: MutableSequence[CWLObjectType], | |
243 schema_defs: MutableMapping[str, CWLObjectType], | |
244 ) -> MutableSequence[CWLObjectType]: | |
245 """Replace references to named typed with the actual types.""" | |
246 for index, entry in enumerate(input_types): | |
247 if isinstance(entry, str): | |
248 if "#" in entry: | |
249 _, input_type_name = entry.split("#") | |
250 else: | |
251 input_type_name = entry | |
252 if input_type_name in schema_defs: | |
253 entry = input_types[index] = schema_defs[input_type_name] | |
254 if isinstance(entry, Mapping): | |
255 if isinstance(entry["type"], str) and "#" in entry["type"]: | |
256 _, input_type_name = entry["type"].split("#") | |
257 if input_type_name in schema_defs: | |
258 input_types[index]["type"] = cast( | |
259 CWLOutputAtomType, | |
260 realize_input_schema( | |
261 cast( | |
262 MutableSequence[CWLObjectType], | |
263 schema_defs[input_type_name], | |
264 ), | |
265 schema_defs, | |
266 ), | |
267 ) | |
268 if isinstance(entry["type"], MutableSequence): | |
269 input_types[index]["type"] = cast( | |
270 CWLOutputAtomType, | |
271 realize_input_schema( | |
272 cast(MutableSequence[CWLObjectType], entry["type"]), schema_defs | |
273 ), | |
274 ) | |
275 if isinstance(entry["type"], Mapping): | |
276 input_types[index]["type"] = cast( | |
277 CWLOutputAtomType, | |
278 realize_input_schema( | |
279 [cast(CWLObjectType, input_types[index]["type"])], schema_defs | |
280 ), | |
281 ) | |
282 if entry["type"] == "array": | |
283 items = ( | |
284 entry["items"] | |
285 if not isinstance(entry["items"], str) | |
286 else [entry["items"]] | |
287 ) | |
288 input_types[index]["items"] = cast( | |
289 CWLOutputAtomType, | |
290 realize_input_schema( | |
291 cast(MutableSequence[CWLObjectType], items), schema_defs | |
292 ), | |
293 ) | |
294 if entry["type"] == "record": | |
295 input_types[index]["fields"] = cast( | |
296 CWLOutputAtomType, | |
297 realize_input_schema( | |
298 cast(MutableSequence[CWLObjectType], entry["fields"]), | |
299 schema_defs, | |
300 ), | |
301 ) | |
302 return input_types | |
303 | |
304 | |
305 def generate_input_template(tool: Process) -> CWLObjectType: | |
306 """Generate an example input object for the given CWL process.""" | |
307 template = yaml.comments.CommentedMap() | |
308 for inp in realize_input_schema(tool.tool["inputs"], tool.schemaDefs): | |
309 name = shortname(cast(str, inp["id"])) | |
310 value, comment = generate_example_input(inp["type"], inp.get("default", None)) | |
311 template.insert(0, name, value, comment) | |
312 return template | |
313 | |
314 | |
315 def load_job_order( | |
316 args: argparse.Namespace, | |
317 stdin: IO[Any], | |
318 fetcher_constructor: Optional[FetcherCallableType], | |
319 overrides_list: List[CWLObjectType], | |
320 tool_file_uri: str, | |
321 ) -> Tuple[Optional[CWLObjectType], str, Loader]: | |
322 | |
323 job_order_object = None | |
324 job_order_file = None | |
325 | |
326 _jobloaderctx = jobloaderctx.copy() | |
327 loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor) | |
328 | |
329 if len(args.job_order) == 1 and args.job_order[0][0] != "-": | |
330 job_order_file = args.job_order[0] | |
331 elif len(args.job_order) == 1 and args.job_order[0] == "-": | |
332 job_order_object = yaml.main.round_trip_load(stdin) | |
333 job_order_object, _ = loader.resolve_all( | |
334 job_order_object, file_uri(os.getcwd()) + "/" | |
335 ) | |
336 else: | |
337 job_order_file = None | |
338 | |
339 if job_order_object is not None: | |
340 input_basedir = args.basedir if args.basedir else os.getcwd() | |
341 elif job_order_file is not None: | |
342 input_basedir = ( | |
343 args.basedir | |
344 if args.basedir | |
345 else os.path.abspath(os.path.dirname(job_order_file)) | |
346 ) | |
347 job_order_object, _ = loader.resolve_ref( | |
348 job_order_file, | |
349 checklinks=False, | |
350 content_types=CWL_CONTENT_TYPES, | |
351 ) | |
352 | |
353 if ( | |
354 job_order_object is not None | |
355 and "http://commonwl.org/cwltool#overrides" in job_order_object | |
356 ): | |
357 ov_uri = file_uri(job_order_file or input_basedir) | |
358 overrides_list.extend( | |
359 resolve_overrides(job_order_object, ov_uri, tool_file_uri) | |
360 ) | |
361 del job_order_object["http://commonwl.org/cwltool#overrides"] | |
362 | |
363 if job_order_object is None: | |
364 input_basedir = args.basedir if args.basedir else os.getcwd() | |
365 | |
366 if job_order_object is not None and not isinstance( | |
367 job_order_object, MutableMapping | |
368 ): | |
369 _logger.error( | |
370 "CWL input object at %s is not formatted correctly, it should be a " | |
371 "JSON/YAML dictionay, not %s.\n" | |
372 "Raw input object:\n%s", | |
373 job_order_file or "stdin", | |
374 type(job_order_object), | |
375 job_order_object, | |
376 ) | |
377 sys.exit(1) | |
378 return (job_order_object, input_basedir, loader) | |
379 | |
380 | |
381 def init_job_order( | |
382 job_order_object: Optional[CWLObjectType], | |
383 args: argparse.Namespace, | |
384 process: Process, | |
385 loader: Loader, | |
386 stdout: Union[TextIO, StreamWriter], | |
387 print_input_deps: bool = False, | |
388 relative_deps: str = "primary", | |
389 make_fs_access: Callable[[str], StdFsAccess] = StdFsAccess, | |
390 input_basedir: str = "", | |
391 secret_store: Optional[SecretStore] = None, | |
392 input_required: bool = True, | |
393 ) -> CWLObjectType: | |
394 secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets") | |
395 if job_order_object is None: | |
396 namemap = {} # type: Dict[str, str] | |
397 records = [] # type: List[str] | |
398 toolparser = generate_parser( | |
399 argparse.ArgumentParser(prog=args.workflow), | |
400 process, | |
401 namemap, | |
402 records, | |
403 input_required, | |
404 ) | |
405 if args.tool_help: | |
406 toolparser.print_help() | |
407 exit(0) | |
408 cmd_line = vars(toolparser.parse_args(args.job_order)) | |
409 for record_name in records: | |
410 record = {} | |
411 record_items = { | |
412 k: v for k, v in cmd_line.items() if k.startswith(record_name) | |
413 } | |
414 for key, value in record_items.items(): | |
415 record[key[len(record_name) + 1 :]] = value | |
416 del cmd_line[key] | |
417 cmd_line[str(record_name)] = record | |
418 if "job_order" in cmd_line and cmd_line["job_order"]: | |
419 try: | |
420 job_order_object = cast( | |
421 CWLObjectType, | |
422 loader.resolve_ref(cmd_line["job_order"])[0], | |
423 ) | |
424 except Exception: | |
425 _logger.exception( | |
426 "Failed to resolv job_order: %s", cmd_line["job_order"] | |
427 ) | |
428 exit(1) | |
429 else: | |
430 job_order_object = {"id": args.workflow} | |
431 | |
432 del cmd_line["job_order"] | |
433 | |
434 job_order_object.update({namemap[k]: v for k, v in cmd_line.items()}) | |
435 | |
436 if secret_store and secrets_req: | |
437 secret_store.store( | |
438 [shortname(sc) for sc in cast(List[str], secrets_req["secrets"])], | |
439 job_order_object, | |
440 ) | |
441 | |
442 if _logger.isEnabledFor(logging.DEBUG): | |
443 _logger.debug( | |
444 "Parsed job order from command line: %s", | |
445 json_dumps(job_order_object, indent=4), | |
446 ) | |
447 | |
448 for inp in process.tool["inputs"]: | |
449 if "default" in inp and ( | |
450 not job_order_object or shortname(inp["id"]) not in job_order_object | |
451 ): | |
452 if not job_order_object: | |
453 job_order_object = {} | |
454 job_order_object[shortname(inp["id"])] = inp["default"] | |
455 | |
456 if job_order_object is None: | |
457 if process.tool["inputs"]: | |
458 if toolparser is not None: | |
459 print(f"\nOptions for {args.workflow} ") | |
460 toolparser.print_help() | |
461 _logger.error("") | |
462 _logger.error("Input object required, use --help for details") | |
463 exit(1) | |
464 else: | |
465 job_order_object = {} | |
466 | |
467 if print_input_deps: | |
468 basedir = None # type: Optional[str] | |
469 uri = cast(str, job_order_object["id"]) | |
470 if uri == args.workflow: | |
471 basedir = os.path.dirname(uri) | |
472 uri = "" | |
473 printdeps( | |
474 job_order_object, | |
475 loader, | |
476 stdout, | |
477 relative_deps, | |
478 uri, | |
479 basedir=basedir, | |
480 nestdirs=False, | |
481 ) | |
482 exit(0) | |
483 | |
484 def path_to_loc(p: CWLObjectType) -> None: | |
485 if "location" not in p and "path" in p: | |
486 p["location"] = p["path"] | |
487 del p["path"] | |
488 | |
489 ns = {} # type: ContextType | |
490 ns.update(cast(ContextType, job_order_object.get("$namespaces", {}))) | |
491 ns.update(cast(ContextType, process.metadata.get("$namespaces", {}))) | |
492 ld = Loader(ns) | |
493 | |
494 def expand_formats(p: CWLObjectType) -> None: | |
495 if "format" in p: | |
496 p["format"] = ld.expand_url(cast(str, p["format"]), "") | |
497 | |
498 visit_class(job_order_object, ("File", "Directory"), path_to_loc) | |
499 visit_class( | |
500 job_order_object, | |
501 ("File",), | |
502 functools.partial(add_sizes, make_fs_access(input_basedir)), | |
503 ) | |
504 visit_class(job_order_object, ("File",), expand_formats) | |
505 adjustDirObjs(job_order_object, trim_listing) | |
506 normalizeFilesDirs(job_order_object) | |
507 | |
508 if secret_store and secrets_req: | |
509 secret_store.store( | |
510 [shortname(sc) for sc in cast(List[str], secrets_req["secrets"])], | |
511 job_order_object, | |
512 ) | |
513 | |
514 if "cwl:tool" in job_order_object: | |
515 del job_order_object["cwl:tool"] | |
516 if "id" in job_order_object: | |
517 del job_order_object["id"] | |
518 return job_order_object | |
519 | |
520 | |
521 def make_relative(base: str, obj: CWLObjectType) -> None: | |
522 """Relativize the location URI of a File or Directory object.""" | |
523 uri = cast(str, obj.get("location", obj.get("path"))) | |
524 if ":" in uri.split("/")[0] and not uri.startswith("file://"): | |
525 pass | |
526 else: | |
527 if uri.startswith("file://"): | |
528 uri = uri_file_path(uri) | |
529 obj["location"] = os.path.relpath(uri, base) | |
530 | |
531 | |
532 def printdeps( | |
533 obj: CWLObjectType, | |
534 document_loader: Loader, | |
535 stdout: Union[TextIO, StreamWriter], | |
536 relative_deps: str, | |
537 uri: str, | |
538 basedir: Optional[str] = None, | |
539 nestdirs: bool = True, | |
540 ) -> None: | |
541 """Print a JSON representation of the dependencies of the CWL document.""" | |
542 deps = find_deps(obj, document_loader, uri, basedir=basedir, nestdirs=nestdirs) | |
543 if relative_deps == "primary": | |
544 base = basedir if basedir else os.path.dirname(uri_file_path(str(uri))) | |
545 elif relative_deps == "cwd": | |
546 base = os.getcwd() | |
547 visit_class(deps, ("File", "Directory"), functools.partial(make_relative, base)) | |
548 stdout.write(json_dumps(deps, indent=4)) | |
549 | |
550 | |
551 def prov_deps( | |
552 obj: CWLObjectType, | |
553 document_loader: Loader, | |
554 uri: str, | |
555 basedir: Optional[str] = None, | |
556 ) -> CWLObjectType: | |
557 deps = find_deps(obj, document_loader, uri, basedir=basedir) | |
558 | |
559 def remove_non_cwl(deps: CWLObjectType) -> None: | |
560 if "secondaryFiles" in deps: | |
561 sec_files = cast(List[CWLObjectType], deps["secondaryFiles"]) | |
562 for index, entry in enumerate(sec_files): | |
563 if not ("format" in entry and entry["format"] == CWL_IANA): | |
564 del sec_files[index] | |
565 else: | |
566 remove_non_cwl(entry) | |
567 | |
568 remove_non_cwl(deps) | |
569 return deps | |
570 | |
571 | |
572 def find_deps( | |
573 obj: CWLObjectType, | |
574 document_loader: Loader, | |
575 uri: str, | |
576 basedir: Optional[str] = None, | |
577 nestdirs: bool = True, | |
578 ) -> CWLObjectType: | |
579 """Find the dependencies of the CWL document.""" | |
580 deps = { | |
581 "class": "File", | |
582 "location": uri, | |
583 "format": CWL_IANA, | |
584 } # type: CWLObjectType | |
585 | |
586 def loadref(base: str, uri: str) -> Union[CommentedMap, CommentedSeq, str, None]: | |
587 return document_loader.fetch(document_loader.fetcher.urljoin(base, uri)) | |
588 | |
589 sfs = scandeps( | |
590 basedir if basedir else uri, | |
591 obj, | |
592 {"$import", "run"}, | |
593 {"$include", "$schemas", "location"}, | |
594 loadref, | |
595 nestdirs=nestdirs, | |
596 ) | |
597 if sfs is not None: | |
598 deps["secondaryFiles"] = cast(MutableSequence[CWLOutputAtomType], sfs) | |
599 | |
600 return deps | |
601 | |
602 | |
603 def print_pack( | |
604 loadingContext: LoadingContext, | |
605 uri: str, | |
606 ) -> str: | |
607 """Return a CWL serialization of the CWL document in JSON.""" | |
608 packed = pack(loadingContext, uri) | |
609 if len(cast(Sized, packed["$graph"])) > 1: | |
610 return json_dumps(packed, indent=4) | |
611 return json_dumps( | |
612 cast(MutableSequence[CWLObjectType], packed["$graph"])[0], indent=4 | |
613 ) | |
614 | |
615 | |
616 def supported_cwl_versions(enable_dev: bool) -> List[str]: | |
617 # ALLUPDATES and UPDATES are dicts | |
618 if enable_dev: | |
619 versions = list(ALLUPDATES) | |
620 else: | |
621 versions = list(UPDATES) | |
622 versions.sort() | |
623 return versions | |
624 | |
625 | |
626 def configure_logging( | |
627 args: argparse.Namespace, | |
628 stderr_handler: logging.Handler, | |
629 runtimeContext: RuntimeContext, | |
630 ) -> None: | |
631 rdflib_logger = logging.getLogger("rdflib.term") | |
632 rdflib_logger.addHandler(stderr_handler) | |
633 rdflib_logger.setLevel(logging.ERROR) | |
634 if args.quiet: | |
635 # Silence STDERR, not an eventual provenance log file | |
636 stderr_handler.setLevel(logging.WARN) | |
637 if runtimeContext.debug: | |
638 # Increase to debug for both stderr and provenance log file | |
639 _logger.setLevel(logging.DEBUG) | |
640 stderr_handler.setLevel(logging.DEBUG) | |
641 rdflib_logger.setLevel(logging.DEBUG) | |
642 fmtclass = coloredlogs.ColoredFormatter if args.enable_color else logging.Formatter | |
643 formatter = fmtclass("%(levelname)s %(message)s") | |
644 if args.timestamps: | |
645 formatter = fmtclass( | |
646 "[%(asctime)s] %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S" | |
647 ) | |
648 stderr_handler.setFormatter(formatter) | |
649 | |
650 | |
651 def setup_schema( | |
652 args: argparse.Namespace, custom_schema_callback: Optional[Callable[[], None]] | |
653 ) -> None: | |
654 if custom_schema_callback is not None: | |
655 custom_schema_callback() | |
656 elif args.enable_ext: | |
657 with pkg_resources.resource_stream(__name__, "extensions.yml") as res: | |
658 ext10 = res.read().decode("utf-8") | |
659 with pkg_resources.resource_stream(__name__, "extensions-v1.1.yml") as res: | |
660 ext11 = res.read().decode("utf-8") | |
661 use_custom_schema("v1.0", "http://commonwl.org/cwltool", ext10) | |
662 use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11) | |
663 use_custom_schema("v1.2.0-dev1", "http://commonwl.org/cwltool", ext11) | |
664 use_custom_schema("v1.2.0-dev2", "http://commonwl.org/cwltool", ext11) | |
665 use_custom_schema("v1.2.0-dev3", "http://commonwl.org/cwltool", ext11) | |
666 else: | |
667 use_standard_schema("v1.0") | |
668 use_standard_schema("v1.1") | |
669 use_standard_schema("v1.2.0-dev1") | |
670 use_standard_schema("v1.2.0-dev2") | |
671 use_standard_schema("v1.2.0-dev3") | |
672 | |
673 | |
674 class ProvLogFormatter(logging.Formatter): | |
675 """Enforce ISO8601 with both T and Z.""" | |
676 | |
677 def __init__(self) -> None: | |
678 """Use the default formatter with our custom formatstring.""" | |
679 super().__init__("[%(asctime)sZ] %(message)s") | |
680 | |
681 def formatTime( | |
682 self, record: logging.LogRecord, datefmt: Optional[str] = None | |
683 ) -> str: | |
684 formatted_time = time.strftime( | |
685 "%Y-%m-%dT%H:%M:%S", time.gmtime(float(record.created)) | |
686 ) | |
687 with_msecs = f"{formatted_time},{record.msecs:03f}" | |
688 return with_msecs | |
689 | |
690 | |
691 def setup_provenance( | |
692 args: argparse.Namespace, | |
693 argsl: List[str], | |
694 runtimeContext: RuntimeContext, | |
695 ) -> Optional[int]: | |
696 if not args.compute_checksum: | |
697 _logger.error("--provenance incompatible with --no-compute-checksum") | |
698 return 1 | |
699 ro = ResearchObject( | |
700 getdefault(runtimeContext.make_fs_access, StdFsAccess)(""), | |
701 temp_prefix_ro=args.tmpdir_prefix, | |
702 orcid=args.orcid, | |
703 full_name=args.cwl_full_name, | |
704 ) | |
705 runtimeContext.research_obj = ro | |
706 log_file_io = ro.open_log_file_for_activity(ro.engine_uuid) | |
707 prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io)) | |
708 | |
709 prov_log_handler.setFormatter(ProvLogFormatter()) | |
710 _logger.addHandler(prov_log_handler) | |
711 _logger.debug("[provenance] Logging to %s", log_file_io) | |
712 if argsl is not None: | |
713 # Log cwltool command line options to provenance file | |
714 _logger.info("[cwltool] %s %s", sys.argv[0], " ".join(argsl)) | |
715 _logger.debug("[cwltool] Arguments: %s", args) | |
716 return None | |
717 | |
718 | |
719 def setup_loadingContext( | |
720 loadingContext: Optional[LoadingContext], | |
721 runtimeContext: RuntimeContext, | |
722 args: argparse.Namespace, | |
723 ) -> LoadingContext: | |
724 if loadingContext is None: | |
725 loadingContext = LoadingContext(vars(args)) | |
726 else: | |
727 loadingContext = loadingContext.copy() | |
728 loadingContext.loader = default_loader( | |
729 loadingContext.fetcher_constructor, | |
730 enable_dev=args.enable_dev, | |
731 doc_cache=args.doc_cache, | |
732 ) | |
733 loadingContext.research_obj = runtimeContext.research_obj | |
734 loadingContext.disable_js_validation = args.disable_js_validation or ( | |
735 not args.do_validate | |
736 ) | |
737 loadingContext.construct_tool_object = getdefault( | |
738 loadingContext.construct_tool_object, workflow.default_make_tool | |
739 ) | |
740 loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver) | |
741 if loadingContext.do_update is None: | |
742 loadingContext.do_update = not (args.pack or args.print_subgraph) | |
743 | |
744 return loadingContext | |
745 | |
746 | |
747 def make_template( | |
748 tool: Process, | |
749 ) -> None: | |
750 """Make a template CWL input object for the give Process.""" | |
751 | |
752 def my_represent_none( | |
753 self: Any, data: Any | |
754 ) -> Any: # pylint: disable=unused-argument | |
755 """Force clean representation of 'null'.""" | |
756 return self.represent_scalar("tag:yaml.org,2002:null", "null") | |
757 | |
758 yaml.representer.RoundTripRepresenter.add_representer(type(None), my_represent_none) | |
759 yaml.main.round_trip_dump( | |
760 generate_input_template(tool), | |
761 sys.stdout, | |
762 default_flow_style=False, | |
763 indent=4, | |
764 block_seq_indent=2, | |
765 ) | |
766 | |
767 | |
768 def choose_target( | |
769 args: argparse.Namespace, | |
770 tool: Process, | |
771 loadingContext: LoadingContext, | |
772 ) -> Optional[Process]: | |
773 """Walk the Workflow, extract the subset matches all the args.targets.""" | |
774 if loadingContext.loader is None: | |
775 raise Exception("loadingContext.loader cannot be None") | |
776 | |
777 if isinstance(tool, Workflow): | |
778 url = urllib.parse.urlparse(tool.tool["id"]) | |
779 if url.fragment: | |
780 extracted = get_subgraph( | |
781 [tool.tool["id"] + "/" + r for r in args.target], tool | |
782 ) | |
783 else: | |
784 extracted = get_subgraph( | |
785 [ | |
786 loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r) | |
787 for r in args.target | |
788 ], | |
789 tool, | |
790 ) | |
791 else: | |
792 _logger.error("Can only use --target on Workflows") | |
793 return None | |
794 if isinstance(loadingContext.loader.idx, MutableMapping): | |
795 loadingContext.loader.idx[extracted["id"]] = extracted | |
796 tool = make_tool(extracted["id"], loadingContext) | |
797 else: | |
798 raise Exception("Missing loadingContext.loader.idx!") | |
799 | |
800 return tool | |
801 | |
802 | |
803 def choose_step( | |
804 args: argparse.Namespace, | |
805 tool: Process, | |
806 loadingContext: LoadingContext, | |
807 ) -> Optional[Process]: | |
808 """Walk the given Workflow and extract just args.single_step.""" | |
809 if loadingContext.loader is None: | |
810 raise Exception("loadingContext.loader cannot be None") | |
811 | |
812 if isinstance(tool, Workflow): | |
813 url = urllib.parse.urlparse(tool.tool["id"]) | |
814 if url.fragment: | |
815 extracted = get_step(tool, tool.tool["id"] + "/" + args.singe_step) | |
816 else: | |
817 extracted = get_step( | |
818 tool, | |
819 loadingContext.loader.fetcher.urljoin( | |
820 tool.tool["id"], "#" + args.single_step | |
821 ), | |
822 ) | |
823 else: | |
824 _logger.error("Can only use --single-step on Workflows") | |
825 return None | |
826 if isinstance(loadingContext.loader.idx, MutableMapping): | |
827 loadingContext.loader.idx[extracted["id"]] = extracted | |
828 tool = make_tool(extracted["id"], loadingContext) | |
829 else: | |
830 raise Exception("Missing loadingContext.loader.idx!") | |
831 | |
832 return tool | |
833 | |
834 | |
835 def check_working_directories( | |
836 runtimeContext: RuntimeContext, | |
837 ) -> Optional[int]: | |
838 """Make any needed working directories.""" | |
839 for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"): | |
840 if ( | |
841 getattr(runtimeContext, dirprefix) | |
842 and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX | |
843 ): | |
844 sl = ( | |
845 "/" | |
846 if getattr(runtimeContext, dirprefix).endswith("/") | |
847 or dirprefix == "cachedir" | |
848 else "" | |
849 ) | |
850 setattr( | |
851 runtimeContext, | |
852 dirprefix, | |
853 os.path.abspath(getattr(runtimeContext, dirprefix)) + sl, | |
854 ) | |
855 if not os.path.exists(os.path.dirname(getattr(runtimeContext, dirprefix))): | |
856 try: | |
857 os.makedirs(os.path.dirname(getattr(runtimeContext, dirprefix))) | |
858 except Exception: | |
859 _logger.exception("Failed to create directory.") | |
860 return 1 | |
861 return None | |
862 | |
863 | |
864 def main( | |
865 argsl: Optional[List[str]] = None, | |
866 args: Optional[argparse.Namespace] = None, | |
867 job_order_object: Optional[CWLObjectType] = None, | |
868 stdin: IO[Any] = sys.stdin, | |
869 stdout: Optional[Union[TextIO, StreamWriter]] = None, | |
870 stderr: IO[Any] = sys.stderr, | |
871 versionfunc: Callable[[], str] = versionstring, | |
872 logger_handler: Optional[logging.Handler] = None, | |
873 custom_schema_callback: Optional[Callable[[], None]] = None, | |
874 executor: Optional[JobExecutor] = None, | |
875 loadingContext: Optional[LoadingContext] = None, | |
876 runtimeContext: Optional[RuntimeContext] = None, | |
877 input_required: bool = True, | |
878 ) -> int: | |
879 if not stdout: # force UTF-8 even if the console is configured differently | |
880 if hasattr(sys.stdout, "encoding") and sys.stdout.encoding.upper() not in ( | |
881 "UTF-8", | |
882 "UTF8", | |
883 ): | |
884 if hasattr(sys.stdout, "detach"): | |
885 stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") | |
886 else: | |
887 stdout = getwriter("utf-8")(sys.stdout) # type: ignore | |
888 else: | |
889 stdout = sys.stdout | |
890 | |
891 _logger.removeHandler(defaultStreamHandler) | |
892 stderr_handler = logger_handler | |
893 if stderr_handler is not None: | |
894 _logger.addHandler(stderr_handler) | |
895 else: | |
896 coloredlogs.install(logger=_logger, stream=stderr) | |
897 stderr_handler = _logger.handlers[-1] | |
898 workflowobj = None | |
899 prov_log_handler = None # type: Optional[logging.StreamHandler] | |
900 try: | |
901 if args is None: | |
902 if argsl is None: | |
903 argsl = sys.argv[1:] | |
904 addl = [] # type: List[str] | |
905 if "CWLTOOL_OPTIONS" in os.environ: | |
906 addl = os.environ["CWLTOOL_OPTIONS"].split(" ") | |
907 parser = arg_parser() | |
908 argcomplete.autocomplete(parser) | |
909 args = parser.parse_args(addl + argsl) | |
910 if args.record_container_id: | |
911 if not args.cidfile_dir: | |
912 args.cidfile_dir = os.getcwd() | |
913 del args.record_container_id | |
914 | |
915 if runtimeContext is None: | |
916 runtimeContext = RuntimeContext(vars(args)) | |
917 else: | |
918 runtimeContext = runtimeContext.copy() | |
919 | |
920 # If on Windows platform, a default Docker Container is used if not | |
921 # explicitely provided by user | |
922 if onWindows() and not runtimeContext.default_container: | |
923 # This docker image is a minimal alpine image with bash installed | |
924 # (size 6 mb). source: https://github.com/frol/docker-alpine-bash | |
925 runtimeContext.default_container = windows_default_container_id | |
926 | |
927 # If caller parsed its own arguments, it may not include every | |
928 # cwltool option, so fill in defaults to avoid crashing when | |
929 # dereferencing them in args. | |
930 for key, val in get_default_args().items(): | |
931 if not hasattr(args, key): | |
932 setattr(args, key, val) | |
933 | |
934 configure_logging(args, stderr_handler, runtimeContext) | |
935 | |
936 if args.version: | |
937 print(versionfunc()) | |
938 return 0 | |
939 _logger.info(versionfunc()) | |
940 | |
941 if args.print_supported_versions: | |
942 print("\n".join(supported_cwl_versions(args.enable_dev))) | |
943 return 0 | |
944 | |
945 if not args.workflow: | |
946 if os.path.isfile("CWLFile"): | |
947 args.workflow = "CWLFile" | |
948 else: | |
949 _logger.error("CWL document required, no input file was provided") | |
950 parser.print_help() | |
951 return 1 | |
952 | |
953 if args.ga4gh_tool_registries: | |
954 ga4gh_tool_registries[:] = args.ga4gh_tool_registries | |
955 if not args.enable_ga4gh_tool_registry: | |
956 del ga4gh_tool_registries[:] | |
957 | |
958 if args.mpi_config_file is not None: | |
959 runtimeContext.mpi_config = MpiConfig.load(args.mpi_config_file) | |
960 | |
961 setup_schema(args, custom_schema_callback) | |
962 | |
963 if args.provenance: | |
964 if argsl is None: | |
965 raise Exception("argsl cannot be None") | |
966 if setup_provenance(args, argsl, runtimeContext) is not None: | |
967 return 1 | |
968 | |
969 loadingContext = setup_loadingContext(loadingContext, runtimeContext, args) | |
970 | |
971 uri, tool_file_uri = resolve_tool_uri( | |
972 args.workflow, | |
973 resolver=loadingContext.resolver, | |
974 fetcher_constructor=loadingContext.fetcher_constructor, | |
975 ) | |
976 | |
977 try_again_msg = ( | |
978 "" if args.debug else ", try again with --debug for more information" | |
979 ) | |
980 | |
981 try: | |
982 job_order_object, input_basedir, jobloader = load_job_order( | |
983 args, | |
984 stdin, | |
985 loadingContext.fetcher_constructor, | |
986 loadingContext.overrides_list, | |
987 tool_file_uri, | |
988 ) | |
989 | |
990 if args.overrides: | |
991 loadingContext.overrides_list.extend( | |
992 load_overrides( | |
993 file_uri(os.path.abspath(args.overrides)), tool_file_uri | |
994 ) | |
995 ) | |
996 | |
997 loadingContext, workflowobj, uri = fetch_document(uri, loadingContext) | |
998 | |
999 if args.print_deps and loadingContext.loader: | |
1000 printdeps( | |
1001 workflowobj, loadingContext.loader, stdout, args.relative_deps, uri | |
1002 ) | |
1003 return 0 | |
1004 | |
1005 loadingContext, uri = resolve_and_validate_document( | |
1006 loadingContext, | |
1007 workflowobj, | |
1008 uri, | |
1009 preprocess_only=(args.print_pre or args.pack), | |
1010 skip_schemas=args.skip_schemas, | |
1011 ) | |
1012 | |
1013 if loadingContext.loader is None: | |
1014 raise Exception("Impossible code path.") | |
1015 processobj, metadata = loadingContext.loader.resolve_ref(uri) | |
1016 processobj = cast(CommentedMap, processobj) | |
1017 if args.pack: | |
1018 stdout.write(print_pack(loadingContext, uri)) | |
1019 return 0 | |
1020 | |
1021 if args.provenance and runtimeContext.research_obj: | |
1022 # Can't really be combined with args.pack at same time | |
1023 runtimeContext.research_obj.packed_workflow( | |
1024 print_pack(loadingContext, uri) | |
1025 ) | |
1026 | |
1027 if args.print_pre: | |
1028 stdout.write( | |
1029 json_dumps( | |
1030 processobj, indent=4, sort_keys=True, separators=(",", ": ") | |
1031 ) | |
1032 ) | |
1033 return 0 | |
1034 | |
1035 tool = make_tool(uri, loadingContext) | |
1036 if args.make_template: | |
1037 make_template(tool) | |
1038 return 0 | |
1039 | |
1040 if args.validate: | |
1041 print(f"{args.workflow} is valid CWL.") | |
1042 return 0 | |
1043 | |
1044 if args.print_rdf: | |
1045 stdout.write( | |
1046 printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer) | |
1047 ) | |
1048 return 0 | |
1049 | |
1050 if args.print_dot: | |
1051 printdot(tool, loadingContext.loader.ctx, stdout) | |
1052 return 0 | |
1053 | |
1054 if args.print_targets: | |
1055 for f in ("outputs", "steps", "inputs"): | |
1056 if tool.tool[f]: | |
1057 _logger.info("%s%s targets:", f[0].upper(), f[1:-1]) | |
1058 stdout.write( | |
1059 " " | |
1060 + "\n ".join([shortname(t["id"]) for t in tool.tool[f]]) | |
1061 + "\n" | |
1062 ) | |
1063 return 0 | |
1064 | |
1065 if args.target: | |
1066 ctool = choose_target(args, tool, loadingContext) | |
1067 if ctool is None: | |
1068 return 1 | |
1069 else: | |
1070 tool = ctool | |
1071 | |
1072 elif args.single_step: | |
1073 ctool = choose_step(args, tool, loadingContext) | |
1074 if ctool is None: | |
1075 return 1 | |
1076 else: | |
1077 tool = ctool | |
1078 | |
1079 if args.print_subgraph: | |
1080 if "name" in tool.tool: | |
1081 del tool.tool["name"] | |
1082 stdout.write( | |
1083 json_dumps( | |
1084 tool.tool, indent=4, sort_keys=True, separators=(",", ": ") | |
1085 ) | |
1086 ) | |
1087 return 0 | |
1088 | |
1089 except (ValidationException) as exc: | |
1090 _logger.error( | |
1091 "Tool definition failed validation:\n%s", str(exc), exc_info=args.debug | |
1092 ) | |
1093 return 1 | |
1094 except (RuntimeError, WorkflowException) as exc: | |
1095 _logger.error( | |
1096 "Tool definition failed initialization:\n%s", | |
1097 str(exc), | |
1098 exc_info=args.debug, | |
1099 ) | |
1100 return 1 | |
1101 except Exception as exc: | |
1102 _logger.error( | |
1103 "I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s", | |
1104 try_again_msg, | |
1105 str(exc) if not args.debug else "", | |
1106 exc_info=args.debug, | |
1107 ) | |
1108 return 1 | |
1109 | |
1110 if isinstance(tool, int): | |
1111 return tool | |
1112 | |
1113 # If on MacOS platform, TMPDIR must be set to be under one of the | |
1114 # shared volumes in Docker for Mac | |
1115 # More info: https://dockstore.org/docs/faq | |
1116 if sys.platform == "darwin": | |
1117 default_mac_path = "/private/tmp/docker_tmp" | |
1118 if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX: | |
1119 runtimeContext.tmp_outdir_prefix = default_mac_path | |
1120 if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX: | |
1121 runtimeContext.tmpdir_prefix = default_mac_path | |
1122 | |
1123 if check_working_directories(runtimeContext) is not None: | |
1124 return 1 | |
1125 | |
1126 if args.cachedir: | |
1127 if args.move_outputs == "move": | |
1128 runtimeContext.move_outputs = "copy" | |
1129 runtimeContext.tmp_outdir_prefix = args.cachedir | |
1130 | |
1131 runtimeContext.secret_store = getdefault( | |
1132 runtimeContext.secret_store, SecretStore() | |
1133 ) | |
1134 runtimeContext.make_fs_access = getdefault( | |
1135 runtimeContext.make_fs_access, StdFsAccess | |
1136 ) | |
1137 | |
1138 if not executor: | |
1139 if args.parallel: | |
1140 temp_executor = MultithreadedJobExecutor() | |
1141 runtimeContext.select_resources = temp_executor.select_resources | |
1142 real_executor = temp_executor # type: JobExecutor | |
1143 else: | |
1144 real_executor = SingleJobExecutor() | |
1145 else: | |
1146 real_executor = executor | |
1147 | |
1148 try: | |
1149 runtimeContext.basedir = input_basedir | |
1150 | |
1151 if isinstance(tool, ProcessGenerator): | |
1152 tfjob_order = {} # type: CWLObjectType | |
1153 if loadingContext.jobdefaults: | |
1154 tfjob_order.update(loadingContext.jobdefaults) | |
1155 if job_order_object: | |
1156 tfjob_order.update(job_order_object) | |
1157 tfout, tfstatus = real_executor( | |
1158 tool.embedded_tool, tfjob_order, runtimeContext | |
1159 ) | |
1160 if not tfout or tfstatus != "success": | |
1161 raise WorkflowException( | |
1162 "ProcessGenerator failed to generate workflow" | |
1163 ) | |
1164 tool, job_order_object = tool.result(tfjob_order, tfout, runtimeContext) | |
1165 if not job_order_object: | |
1166 job_order_object = None | |
1167 | |
1168 try: | |
1169 initialized_job_order_object = init_job_order( | |
1170 job_order_object, | |
1171 args, | |
1172 tool, | |
1173 jobloader, | |
1174 stdout, | |
1175 print_input_deps=args.print_input_deps, | |
1176 relative_deps=args.relative_deps, | |
1177 make_fs_access=runtimeContext.make_fs_access, | |
1178 input_basedir=input_basedir, | |
1179 secret_store=runtimeContext.secret_store, | |
1180 input_required=input_required, | |
1181 ) | |
1182 except SystemExit as err: | |
1183 return err.code | |
1184 | |
1185 del args.workflow | |
1186 del args.job_order | |
1187 | |
1188 conf_file = getattr( | |
1189 args, "beta_dependency_resolvers_configuration", None | |
1190 ) # str | |
1191 use_conda_dependencies = getattr( | |
1192 args, "beta_conda_dependencies", None | |
1193 ) # str | |
1194 | |
1195 if conf_file or use_conda_dependencies: | |
1196 runtimeContext.job_script_provider = DependenciesConfiguration(args) | |
1197 else: | |
1198 runtimeContext.find_default_container = functools.partial( | |
1199 find_default_container, | |
1200 default_container=runtimeContext.default_container, | |
1201 use_biocontainers=args.beta_use_biocontainers, | |
1202 ) | |
1203 | |
1204 (out, status) = real_executor( | |
1205 tool, initialized_job_order_object, runtimeContext, logger=_logger | |
1206 ) | |
1207 | |
1208 if out is not None: | |
1209 if runtimeContext.research_obj is not None: | |
1210 runtimeContext.research_obj.create_job(out, True) | |
1211 | |
1212 def remove_at_id(doc: CWLObjectType) -> None: | |
1213 for key in list(doc.keys()): | |
1214 if key == "@id": | |
1215 del doc[key] | |
1216 else: | |
1217 value = doc[key] | |
1218 if isinstance(value, MutableMapping): | |
1219 remove_at_id(value) | |
1220 elif isinstance(value, MutableSequence): | |
1221 for entry in value: | |
1222 if isinstance(entry, MutableMapping): | |
1223 remove_at_id(entry) | |
1224 | |
1225 remove_at_id(out) | |
1226 visit_class( | |
1227 out, | |
1228 ("File",), | |
1229 functools.partial(add_sizes, runtimeContext.make_fs_access("")), | |
1230 ) | |
1231 | |
1232 def loc_to_path(obj: CWLObjectType) -> None: | |
1233 for field in ("path", "nameext", "nameroot", "dirname"): | |
1234 if field in obj: | |
1235 del obj[field] | |
1236 if cast(str, obj["location"]).startswith("file://"): | |
1237 obj["path"] = uri_file_path(cast(str, obj["location"])) | |
1238 | |
1239 visit_class(out, ("File", "Directory"), loc_to_path) | |
1240 | |
1241 # Unsetting the Generation from final output object | |
1242 visit_class(out, ("File",), MutationManager().unset_generation) | |
1243 | |
1244 if isinstance(out, str): | |
1245 stdout.write(out) | |
1246 else: | |
1247 stdout.write(json_dumps(out, indent=4, ensure_ascii=False)) | |
1248 stdout.write("\n") | |
1249 if hasattr(stdout, "flush"): | |
1250 stdout.flush() | |
1251 | |
1252 if status != "success": | |
1253 _logger.warning("Final process status is %s", status) | |
1254 return 1 | |
1255 _logger.info("Final process status is %s", status) | |
1256 return 0 | |
1257 | |
1258 except (ValidationException) as exc: | |
1259 _logger.error( | |
1260 "Input object failed validation:\n%s", str(exc), exc_info=args.debug | |
1261 ) | |
1262 return 1 | |
1263 except UnsupportedRequirement as exc: | |
1264 _logger.error( | |
1265 "Workflow or tool uses unsupported feature:\n%s", | |
1266 str(exc), | |
1267 exc_info=args.debug, | |
1268 ) | |
1269 return 33 | |
1270 except WorkflowException as exc: | |
1271 _logger.error( | |
1272 "Workflow error%s:\n%s", | |
1273 try_again_msg, | |
1274 strip_dup_lineno(str(exc)), | |
1275 exc_info=args.debug, | |
1276 ) | |
1277 return 1 | |
1278 except Exception as exc: # pylint: disable=broad-except | |
1279 _logger.error( | |
1280 "Unhandled error%s:\n %s", | |
1281 try_again_msg, | |
1282 str(exc), | |
1283 exc_info=args.debug, | |
1284 ) | |
1285 return 1 | |
1286 | |
1287 finally: | |
1288 if ( | |
1289 args | |
1290 and runtimeContext | |
1291 and runtimeContext.research_obj | |
1292 and workflowobj | |
1293 and loadingContext | |
1294 ): | |
1295 research_obj = runtimeContext.research_obj | |
1296 if loadingContext.loader is not None: | |
1297 research_obj.generate_snapshot( | |
1298 prov_deps(workflowobj, loadingContext.loader, uri) | |
1299 ) | |
1300 else: | |
1301 _logger.warning( | |
1302 "Unable to generate provenance snapshot " | |
1303 " due to missing loadingContext.loader." | |
1304 ) | |
1305 if prov_log_handler is not None: | |
1306 # Stop logging so we won't half-log adding ourself to RO | |
1307 _logger.debug( | |
1308 "[provenance] Closing provenance log file %s", prov_log_handler | |
1309 ) | |
1310 _logger.removeHandler(prov_log_handler) | |
1311 # Ensure last log lines are written out | |
1312 prov_log_handler.flush() | |
1313 # Underlying WritableBagFile will add the tagfile to the manifest | |
1314 prov_log_handler.stream.close() | |
1315 prov_log_handler.close() | |
1316 research_obj.close(args.provenance) | |
1317 | |
1318 _logger.removeHandler(stderr_handler) | |
1319 _logger.addHandler(defaultStreamHandler) | |
1320 | |
1321 | |
1322 def find_default_container( | |
1323 builder: HasReqsHints, | |
1324 default_container: Optional[str] = None, | |
1325 use_biocontainers: Optional[bool] = None, | |
1326 ) -> Optional[str]: | |
1327 """Find a container.""" | |
1328 if not default_container and use_biocontainers: | |
1329 default_container = get_container_from_software_requirements( | |
1330 use_biocontainers, builder | |
1331 ) | |
1332 return default_container | |
1333 | |
1334 | |
1335 def run(*args, **kwargs): | |
1336 # type: (*Any, **Any) -> None | |
1337 """Run cwltool.""" | |
1338 signal.signal(signal.SIGTERM, _signal_handler) | |
1339 try: | |
1340 sys.exit(main(*args, **kwargs)) | |
1341 finally: | |
1342 _terminate_processes() | |
1343 | |
1344 | |
1345 if __name__ == "__main__": | |
1346 run(sys.argv[1:]) |