Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/main.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 # PYTHON_ARGCOMPLETE_OK | |
| 3 """Entry point for cwltool.""" | |
| 4 | |
| 5 import argparse | |
| 6 import functools | |
| 7 import io | |
| 8 import logging | |
| 9 import os | |
| 10 import signal | |
| 11 import subprocess # nosec | |
| 12 import sys | |
| 13 import time | |
| 14 import urllib | |
| 15 from codecs import StreamWriter, getwriter | |
| 16 from collections.abc import MutableMapping, MutableSequence | |
| 17 from typing import ( | |
| 18 IO, | |
| 19 Any, | |
| 20 Callable, | |
| 21 Dict, | |
| 22 List, | |
| 23 Mapping, | |
| 24 MutableMapping, | |
| 25 MutableSequence, | |
| 26 Optional, | |
| 27 Sized, | |
| 28 TextIO, | |
| 29 Tuple, | |
| 30 Union, | |
| 31 cast, | |
| 32 ) | |
| 33 | |
| 34 import argcomplete | |
| 35 import coloredlogs | |
| 36 import pkg_resources # part of setuptools | |
| 37 from ruamel import yaml | |
| 38 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
| 39 from schema_salad.exceptions import ValidationException | |
| 40 from schema_salad.ref_resolver import Loader, file_uri, uri_file_path | |
| 41 from schema_salad.sourceline import strip_dup_lineno | |
| 42 from schema_salad.utils import ContextType, FetcherCallableType, json_dumps | |
| 43 | |
| 44 from . import CWL_CONTENT_TYPES, workflow | |
| 45 from .argparser import arg_parser, generate_parser, get_default_args | |
| 46 from .builder import HasReqsHints | |
| 47 from .context import LoadingContext, RuntimeContext, getdefault | |
| 48 from .cwlrdf import printdot, printrdf | |
| 49 from .errors import UnsupportedRequirement, WorkflowException | |
| 50 from .executors import JobExecutor, MultithreadedJobExecutor, SingleJobExecutor | |
| 51 from .load_tool import ( | |
| 52 default_loader, | |
| 53 fetch_document, | |
| 54 jobloaderctx, | |
| 55 load_overrides, | |
| 56 make_tool, | |
| 57 resolve_and_validate_document, | |
| 58 resolve_overrides, | |
| 59 resolve_tool_uri, | |
| 60 ) | |
| 61 from .loghandler import _logger, defaultStreamHandler | |
| 62 from .mpi import MpiConfig | |
| 63 from .mutation import MutationManager | |
| 64 from .pack import pack | |
| 65 from .process import ( | |
| 66 CWL_IANA, | |
| 67 Process, | |
| 68 add_sizes, | |
| 69 scandeps, | |
| 70 shortname, | |
| 71 use_custom_schema, | |
| 72 use_standard_schema, | |
| 73 ) | |
| 74 from .procgenerator import ProcessGenerator | |
| 75 from .provenance import ResearchObject | |
| 76 from .resolver import ga4gh_tool_registries, tool_resolver | |
| 77 from .secrets import SecretStore | |
| 78 from .software_requirements import ( | |
| 79 DependenciesConfiguration, | |
| 80 get_container_from_software_requirements, | |
| 81 ) | |
| 82 from .stdfsaccess import StdFsAccess | |
| 83 from .subgraph import get_step, get_subgraph | |
| 84 from .update import ALLUPDATES, UPDATES | |
| 85 from .utils import ( | |
| 86 DEFAULT_TMP_PREFIX, | |
| 87 CWLObjectType, | |
| 88 CWLOutputAtomType, | |
| 89 CWLOutputType, | |
| 90 adjustDirObjs, | |
| 91 normalizeFilesDirs, | |
| 92 onWindows, | |
| 93 processes_to_kill, | |
| 94 trim_listing, | |
| 95 versionstring, | |
| 96 visit_class, | |
| 97 windows_default_container_id, | |
| 98 ) | |
| 99 from .workflow import Workflow | |
| 100 | |
| 101 | |
| 102 def _terminate_processes() -> None: | |
| 103 """Kill all spawned processes. | |
| 104 | |
| 105 Processes to be killed must be appended to `utils.processes_to_kill` | |
| 106 as they are spawned. | |
| 107 | |
| 108 An important caveat: since there's no supported way to kill another | |
| 109 thread in Python, this function cannot stop other threads from | |
| 110 continuing to execute while it kills the processes that they've | |
| 111 spawned. This may occasionally lead to unexpected behaviour. | |
| 112 """ | |
| 113 # It's possible that another thread will spawn a new task while | |
| 114 # we're executing, so it's not safe to use a for loop here. | |
| 115 while processes_to_kill: | |
| 116 process = processes_to_kill.popleft() | |
| 117 cidfile = [ | |
| 118 str(arg).split("=")[1] for arg in process.args if "--cidfile" in str(arg) | |
| 119 ] | |
| 120 if cidfile: | |
| 121 try: | |
| 122 with open(cidfile[0]) as inp_stream: | |
| 123 p = subprocess.Popen( # nosec | |
| 124 ["docker", "kill", inp_stream.read()], shell=False # nosec | |
| 125 ) | |
| 126 try: | |
| 127 p.wait(timeout=10) | |
| 128 except subprocess.TimeoutExpired: | |
| 129 p.kill() | |
| 130 except FileNotFoundError: | |
| 131 pass | |
| 132 | |
| 133 | |
| 134 def _signal_handler(signum: int, _: Any) -> None: | |
| 135 """Kill all spawned processes and exit. | |
| 136 | |
| 137 Note that it's possible for another thread to spawn a process after | |
| 138 all processes have been killed, but before Python exits. | |
| 139 | |
| 140 Refer to the docstring for _terminate_processes() for other caveats. | |
| 141 """ | |
| 142 _terminate_processes() | |
| 143 sys.exit(signum) | |
| 144 | |
| 145 | |
| 146 def generate_example_input( | |
| 147 inptype: Optional[CWLOutputType], | |
| 148 default: Optional[CWLOutputType], | |
| 149 ) -> Tuple[Any, str]: | |
| 150 """Convert a single input schema into an example.""" | |
| 151 example = None | |
| 152 comment = "" | |
| 153 defaults = { | |
| 154 "null": "null", | |
| 155 "Any": "null", | |
| 156 "boolean": False, | |
| 157 "int": 0, | |
| 158 "long": 0, | |
| 159 "float": 0.1, | |
| 160 "double": 0.1, | |
| 161 "string": "a_string", | |
| 162 "File": yaml.comments.CommentedMap( | |
| 163 [("class", "File"), ("path", "a/file/path")] | |
| 164 ), | |
| 165 "Directory": yaml.comments.CommentedMap( | |
| 166 [("class", "Directory"), ("path", "a/directory/path")] | |
| 167 ), | |
| 168 } # type: CWLObjectType | |
| 169 if isinstance(inptype, MutableSequence): | |
| 170 optional = False | |
| 171 if "null" in inptype: | |
| 172 inptype.remove("null") | |
| 173 optional = True | |
| 174 if len(inptype) == 1: | |
| 175 example, comment = generate_example_input(inptype[0], default) | |
| 176 if optional: | |
| 177 if comment: | |
| 178 comment = f"{comment} (optional)" | |
| 179 else: | |
| 180 comment = "optional" | |
| 181 else: | |
| 182 example = CommentedSeq() | |
| 183 for index, entry in enumerate(inptype): | |
| 184 value, e_comment = generate_example_input(entry, default) | |
| 185 example.append(value) | |
| 186 example.yaml_add_eol_comment(e_comment, index) | |
| 187 if optional: | |
| 188 comment = "optional" | |
| 189 elif isinstance(inptype, Mapping) and "type" in inptype: | |
| 190 if inptype["type"] == "array": | |
| 191 first_item = cast(MutableSequence[CWLObjectType], inptype["items"])[0] | |
| 192 items_len = len(cast(Sized, inptype["items"])) | |
| 193 if items_len == 1 and "type" in first_item and first_item["type"] == "enum": | |
| 194 # array of just an enum then list all the options | |
| 195 example = first_item["symbols"] | |
| 196 if "name" in first_item: | |
| 197 comment = 'array of type "{}".'.format(first_item["name"]) | |
| 198 else: | |
| 199 value, comment = generate_example_input(inptype["items"], None) | |
| 200 comment = "array of " + comment | |
| 201 if items_len == 1: | |
| 202 example = [value] | |
| 203 else: | |
| 204 example = value | |
| 205 if default is not None: | |
| 206 example = default | |
| 207 elif inptype["type"] == "enum": | |
| 208 symbols = cast(List[str], inptype["symbols"]) | |
| 209 if default is not None: | |
| 210 example = default | |
| 211 elif "default" in inptype: | |
| 212 example = inptype["default"] | |
| 213 elif len(cast(Sized, inptype["symbols"])) == 1: | |
| 214 example = symbols[0] | |
| 215 else: | |
| 216 example = "{}_enum_value".format(inptype.get("name", "valid")) | |
| 217 comment = 'enum; valid values: "{}"'.format('", "'.join(symbols)) | |
| 218 elif inptype["type"] == "record": | |
| 219 example = yaml.comments.CommentedMap() | |
| 220 if "name" in inptype: | |
| 221 comment = '"{}" record type.'.format(inptype["name"]) | |
| 222 for field in cast(List[CWLObjectType], inptype["fields"]): | |
| 223 value, f_comment = generate_example_input(field["type"], None) | |
| 224 example.insert(0, shortname(cast(str, field["name"])), value, f_comment) | |
| 225 elif "default" in inptype: | |
| 226 example = inptype["default"] | |
| 227 comment = 'default value of type "{}".'.format(inptype["type"]) | |
| 228 else: | |
| 229 example = defaults.get(cast(str, inptype["type"]), str(inptype)) | |
| 230 comment = 'type "{}".'.format(inptype["type"]) | |
| 231 else: | |
| 232 if not default: | |
| 233 example = defaults.get(str(inptype), str(inptype)) | |
| 234 comment = f'type "{inptype}"' | |
| 235 else: | |
| 236 example = default | |
| 237 comment = f'default value of type "{inptype}".' | |
| 238 return example, comment | |
| 239 | |
| 240 | |
| 241 def realize_input_schema( | |
| 242 input_types: MutableSequence[CWLObjectType], | |
| 243 schema_defs: MutableMapping[str, CWLObjectType], | |
| 244 ) -> MutableSequence[CWLObjectType]: | |
| 245 """Replace references to named typed with the actual types.""" | |
| 246 for index, entry in enumerate(input_types): | |
| 247 if isinstance(entry, str): | |
| 248 if "#" in entry: | |
| 249 _, input_type_name = entry.split("#") | |
| 250 else: | |
| 251 input_type_name = entry | |
| 252 if input_type_name in schema_defs: | |
| 253 entry = input_types[index] = schema_defs[input_type_name] | |
| 254 if isinstance(entry, Mapping): | |
| 255 if isinstance(entry["type"], str) and "#" in entry["type"]: | |
| 256 _, input_type_name = entry["type"].split("#") | |
| 257 if input_type_name in schema_defs: | |
| 258 input_types[index]["type"] = cast( | |
| 259 CWLOutputAtomType, | |
| 260 realize_input_schema( | |
| 261 cast( | |
| 262 MutableSequence[CWLObjectType], | |
| 263 schema_defs[input_type_name], | |
| 264 ), | |
| 265 schema_defs, | |
| 266 ), | |
| 267 ) | |
| 268 if isinstance(entry["type"], MutableSequence): | |
| 269 input_types[index]["type"] = cast( | |
| 270 CWLOutputAtomType, | |
| 271 realize_input_schema( | |
| 272 cast(MutableSequence[CWLObjectType], entry["type"]), schema_defs | |
| 273 ), | |
| 274 ) | |
| 275 if isinstance(entry["type"], Mapping): | |
| 276 input_types[index]["type"] = cast( | |
| 277 CWLOutputAtomType, | |
| 278 realize_input_schema( | |
| 279 [cast(CWLObjectType, input_types[index]["type"])], schema_defs | |
| 280 ), | |
| 281 ) | |
| 282 if entry["type"] == "array": | |
| 283 items = ( | |
| 284 entry["items"] | |
| 285 if not isinstance(entry["items"], str) | |
| 286 else [entry["items"]] | |
| 287 ) | |
| 288 input_types[index]["items"] = cast( | |
| 289 CWLOutputAtomType, | |
| 290 realize_input_schema( | |
| 291 cast(MutableSequence[CWLObjectType], items), schema_defs | |
| 292 ), | |
| 293 ) | |
| 294 if entry["type"] == "record": | |
| 295 input_types[index]["fields"] = cast( | |
| 296 CWLOutputAtomType, | |
| 297 realize_input_schema( | |
| 298 cast(MutableSequence[CWLObjectType], entry["fields"]), | |
| 299 schema_defs, | |
| 300 ), | |
| 301 ) | |
| 302 return input_types | |
| 303 | |
| 304 | |
| 305 def generate_input_template(tool: Process) -> CWLObjectType: | |
| 306 """Generate an example input object for the given CWL process.""" | |
| 307 template = yaml.comments.CommentedMap() | |
| 308 for inp in realize_input_schema(tool.tool["inputs"], tool.schemaDefs): | |
| 309 name = shortname(cast(str, inp["id"])) | |
| 310 value, comment = generate_example_input(inp["type"], inp.get("default", None)) | |
| 311 template.insert(0, name, value, comment) | |
| 312 return template | |
| 313 | |
| 314 | |
| 315 def load_job_order( | |
| 316 args: argparse.Namespace, | |
| 317 stdin: IO[Any], | |
| 318 fetcher_constructor: Optional[FetcherCallableType], | |
| 319 overrides_list: List[CWLObjectType], | |
| 320 tool_file_uri: str, | |
| 321 ) -> Tuple[Optional[CWLObjectType], str, Loader]: | |
| 322 | |
| 323 job_order_object = None | |
| 324 job_order_file = None | |
| 325 | |
| 326 _jobloaderctx = jobloaderctx.copy() | |
| 327 loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor) | |
| 328 | |
| 329 if len(args.job_order) == 1 and args.job_order[0][0] != "-": | |
| 330 job_order_file = args.job_order[0] | |
| 331 elif len(args.job_order) == 1 and args.job_order[0] == "-": | |
| 332 job_order_object = yaml.main.round_trip_load(stdin) | |
| 333 job_order_object, _ = loader.resolve_all( | |
| 334 job_order_object, file_uri(os.getcwd()) + "/" | |
| 335 ) | |
| 336 else: | |
| 337 job_order_file = None | |
| 338 | |
| 339 if job_order_object is not None: | |
| 340 input_basedir = args.basedir if args.basedir else os.getcwd() | |
| 341 elif job_order_file is not None: | |
| 342 input_basedir = ( | |
| 343 args.basedir | |
| 344 if args.basedir | |
| 345 else os.path.abspath(os.path.dirname(job_order_file)) | |
| 346 ) | |
| 347 job_order_object, _ = loader.resolve_ref( | |
| 348 job_order_file, | |
| 349 checklinks=False, | |
| 350 content_types=CWL_CONTENT_TYPES, | |
| 351 ) | |
| 352 | |
| 353 if ( | |
| 354 job_order_object is not None | |
| 355 and "http://commonwl.org/cwltool#overrides" in job_order_object | |
| 356 ): | |
| 357 ov_uri = file_uri(job_order_file or input_basedir) | |
| 358 overrides_list.extend( | |
| 359 resolve_overrides(job_order_object, ov_uri, tool_file_uri) | |
| 360 ) | |
| 361 del job_order_object["http://commonwl.org/cwltool#overrides"] | |
| 362 | |
| 363 if job_order_object is None: | |
| 364 input_basedir = args.basedir if args.basedir else os.getcwd() | |
| 365 | |
| 366 if job_order_object is not None and not isinstance( | |
| 367 job_order_object, MutableMapping | |
| 368 ): | |
| 369 _logger.error( | |
| 370 "CWL input object at %s is not formatted correctly, it should be a " | |
| 371 "JSON/YAML dictionay, not %s.\n" | |
| 372 "Raw input object:\n%s", | |
| 373 job_order_file or "stdin", | |
| 374 type(job_order_object), | |
| 375 job_order_object, | |
| 376 ) | |
| 377 sys.exit(1) | |
| 378 return (job_order_object, input_basedir, loader) | |
| 379 | |
| 380 | |
| 381 def init_job_order( | |
| 382 job_order_object: Optional[CWLObjectType], | |
| 383 args: argparse.Namespace, | |
| 384 process: Process, | |
| 385 loader: Loader, | |
| 386 stdout: Union[TextIO, StreamWriter], | |
| 387 print_input_deps: bool = False, | |
| 388 relative_deps: str = "primary", | |
| 389 make_fs_access: Callable[[str], StdFsAccess] = StdFsAccess, | |
| 390 input_basedir: str = "", | |
| 391 secret_store: Optional[SecretStore] = None, | |
| 392 input_required: bool = True, | |
| 393 ) -> CWLObjectType: | |
| 394 secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets") | |
| 395 if job_order_object is None: | |
| 396 namemap = {} # type: Dict[str, str] | |
| 397 records = [] # type: List[str] | |
| 398 toolparser = generate_parser( | |
| 399 argparse.ArgumentParser(prog=args.workflow), | |
| 400 process, | |
| 401 namemap, | |
| 402 records, | |
| 403 input_required, | |
| 404 ) | |
| 405 if args.tool_help: | |
| 406 toolparser.print_help() | |
| 407 exit(0) | |
| 408 cmd_line = vars(toolparser.parse_args(args.job_order)) | |
| 409 for record_name in records: | |
| 410 record = {} | |
| 411 record_items = { | |
| 412 k: v for k, v in cmd_line.items() if k.startswith(record_name) | |
| 413 } | |
| 414 for key, value in record_items.items(): | |
| 415 record[key[len(record_name) + 1 :]] = value | |
| 416 del cmd_line[key] | |
| 417 cmd_line[str(record_name)] = record | |
| 418 if "job_order" in cmd_line and cmd_line["job_order"]: | |
| 419 try: | |
| 420 job_order_object = cast( | |
| 421 CWLObjectType, | |
| 422 loader.resolve_ref(cmd_line["job_order"])[0], | |
| 423 ) | |
| 424 except Exception: | |
| 425 _logger.exception( | |
| 426 "Failed to resolv job_order: %s", cmd_line["job_order"] | |
| 427 ) | |
| 428 exit(1) | |
| 429 else: | |
| 430 job_order_object = {"id": args.workflow} | |
| 431 | |
| 432 del cmd_line["job_order"] | |
| 433 | |
| 434 job_order_object.update({namemap[k]: v for k, v in cmd_line.items()}) | |
| 435 | |
| 436 if secret_store and secrets_req: | |
| 437 secret_store.store( | |
| 438 [shortname(sc) for sc in cast(List[str], secrets_req["secrets"])], | |
| 439 job_order_object, | |
| 440 ) | |
| 441 | |
| 442 if _logger.isEnabledFor(logging.DEBUG): | |
| 443 _logger.debug( | |
| 444 "Parsed job order from command line: %s", | |
| 445 json_dumps(job_order_object, indent=4), | |
| 446 ) | |
| 447 | |
| 448 for inp in process.tool["inputs"]: | |
| 449 if "default" in inp and ( | |
| 450 not job_order_object or shortname(inp["id"]) not in job_order_object | |
| 451 ): | |
| 452 if not job_order_object: | |
| 453 job_order_object = {} | |
| 454 job_order_object[shortname(inp["id"])] = inp["default"] | |
| 455 | |
| 456 if job_order_object is None: | |
| 457 if process.tool["inputs"]: | |
| 458 if toolparser is not None: | |
| 459 print(f"\nOptions for {args.workflow} ") | |
| 460 toolparser.print_help() | |
| 461 _logger.error("") | |
| 462 _logger.error("Input object required, use --help for details") | |
| 463 exit(1) | |
| 464 else: | |
| 465 job_order_object = {} | |
| 466 | |
| 467 if print_input_deps: | |
| 468 basedir = None # type: Optional[str] | |
| 469 uri = cast(str, job_order_object["id"]) | |
| 470 if uri == args.workflow: | |
| 471 basedir = os.path.dirname(uri) | |
| 472 uri = "" | |
| 473 printdeps( | |
| 474 job_order_object, | |
| 475 loader, | |
| 476 stdout, | |
| 477 relative_deps, | |
| 478 uri, | |
| 479 basedir=basedir, | |
| 480 nestdirs=False, | |
| 481 ) | |
| 482 exit(0) | |
| 483 | |
| 484 def path_to_loc(p: CWLObjectType) -> None: | |
| 485 if "location" not in p and "path" in p: | |
| 486 p["location"] = p["path"] | |
| 487 del p["path"] | |
| 488 | |
| 489 ns = {} # type: ContextType | |
| 490 ns.update(cast(ContextType, job_order_object.get("$namespaces", {}))) | |
| 491 ns.update(cast(ContextType, process.metadata.get("$namespaces", {}))) | |
| 492 ld = Loader(ns) | |
| 493 | |
| 494 def expand_formats(p: CWLObjectType) -> None: | |
| 495 if "format" in p: | |
| 496 p["format"] = ld.expand_url(cast(str, p["format"]), "") | |
| 497 | |
| 498 visit_class(job_order_object, ("File", "Directory"), path_to_loc) | |
| 499 visit_class( | |
| 500 job_order_object, | |
| 501 ("File",), | |
| 502 functools.partial(add_sizes, make_fs_access(input_basedir)), | |
| 503 ) | |
| 504 visit_class(job_order_object, ("File",), expand_formats) | |
| 505 adjustDirObjs(job_order_object, trim_listing) | |
| 506 normalizeFilesDirs(job_order_object) | |
| 507 | |
| 508 if secret_store and secrets_req: | |
| 509 secret_store.store( | |
| 510 [shortname(sc) for sc in cast(List[str], secrets_req["secrets"])], | |
| 511 job_order_object, | |
| 512 ) | |
| 513 | |
| 514 if "cwl:tool" in job_order_object: | |
| 515 del job_order_object["cwl:tool"] | |
| 516 if "id" in job_order_object: | |
| 517 del job_order_object["id"] | |
| 518 return job_order_object | |
| 519 | |
| 520 | |
| 521 def make_relative(base: str, obj: CWLObjectType) -> None: | |
| 522 """Relativize the location URI of a File or Directory object.""" | |
| 523 uri = cast(str, obj.get("location", obj.get("path"))) | |
| 524 if ":" in uri.split("/")[0] and not uri.startswith("file://"): | |
| 525 pass | |
| 526 else: | |
| 527 if uri.startswith("file://"): | |
| 528 uri = uri_file_path(uri) | |
| 529 obj["location"] = os.path.relpath(uri, base) | |
| 530 | |
| 531 | |
| 532 def printdeps( | |
| 533 obj: CWLObjectType, | |
| 534 document_loader: Loader, | |
| 535 stdout: Union[TextIO, StreamWriter], | |
| 536 relative_deps: str, | |
| 537 uri: str, | |
| 538 basedir: Optional[str] = None, | |
| 539 nestdirs: bool = True, | |
| 540 ) -> None: | |
| 541 """Print a JSON representation of the dependencies of the CWL document.""" | |
| 542 deps = find_deps(obj, document_loader, uri, basedir=basedir, nestdirs=nestdirs) | |
| 543 if relative_deps == "primary": | |
| 544 base = basedir if basedir else os.path.dirname(uri_file_path(str(uri))) | |
| 545 elif relative_deps == "cwd": | |
| 546 base = os.getcwd() | |
| 547 visit_class(deps, ("File", "Directory"), functools.partial(make_relative, base)) | |
| 548 stdout.write(json_dumps(deps, indent=4)) | |
| 549 | |
| 550 | |
| 551 def prov_deps( | |
| 552 obj: CWLObjectType, | |
| 553 document_loader: Loader, | |
| 554 uri: str, | |
| 555 basedir: Optional[str] = None, | |
| 556 ) -> CWLObjectType: | |
| 557 deps = find_deps(obj, document_loader, uri, basedir=basedir) | |
| 558 | |
| 559 def remove_non_cwl(deps: CWLObjectType) -> None: | |
| 560 if "secondaryFiles" in deps: | |
| 561 sec_files = cast(List[CWLObjectType], deps["secondaryFiles"]) | |
| 562 for index, entry in enumerate(sec_files): | |
| 563 if not ("format" in entry and entry["format"] == CWL_IANA): | |
| 564 del sec_files[index] | |
| 565 else: | |
| 566 remove_non_cwl(entry) | |
| 567 | |
| 568 remove_non_cwl(deps) | |
| 569 return deps | |
| 570 | |
| 571 | |
| 572 def find_deps( | |
| 573 obj: CWLObjectType, | |
| 574 document_loader: Loader, | |
| 575 uri: str, | |
| 576 basedir: Optional[str] = None, | |
| 577 nestdirs: bool = True, | |
| 578 ) -> CWLObjectType: | |
| 579 """Find the dependencies of the CWL document.""" | |
| 580 deps = { | |
| 581 "class": "File", | |
| 582 "location": uri, | |
| 583 "format": CWL_IANA, | |
| 584 } # type: CWLObjectType | |
| 585 | |
| 586 def loadref(base: str, uri: str) -> Union[CommentedMap, CommentedSeq, str, None]: | |
| 587 return document_loader.fetch(document_loader.fetcher.urljoin(base, uri)) | |
| 588 | |
| 589 sfs = scandeps( | |
| 590 basedir if basedir else uri, | |
| 591 obj, | |
| 592 {"$import", "run"}, | |
| 593 {"$include", "$schemas", "location"}, | |
| 594 loadref, | |
| 595 nestdirs=nestdirs, | |
| 596 ) | |
| 597 if sfs is not None: | |
| 598 deps["secondaryFiles"] = cast(MutableSequence[CWLOutputAtomType], sfs) | |
| 599 | |
| 600 return deps | |
| 601 | |
| 602 | |
| 603 def print_pack( | |
| 604 loadingContext: LoadingContext, | |
| 605 uri: str, | |
| 606 ) -> str: | |
| 607 """Return a CWL serialization of the CWL document in JSON.""" | |
| 608 packed = pack(loadingContext, uri) | |
| 609 if len(cast(Sized, packed["$graph"])) > 1: | |
| 610 return json_dumps(packed, indent=4) | |
| 611 return json_dumps( | |
| 612 cast(MutableSequence[CWLObjectType], packed["$graph"])[0], indent=4 | |
| 613 ) | |
| 614 | |
| 615 | |
| 616 def supported_cwl_versions(enable_dev: bool) -> List[str]: | |
| 617 # ALLUPDATES and UPDATES are dicts | |
| 618 if enable_dev: | |
| 619 versions = list(ALLUPDATES) | |
| 620 else: | |
| 621 versions = list(UPDATES) | |
| 622 versions.sort() | |
| 623 return versions | |
| 624 | |
| 625 | |
| 626 def configure_logging( | |
| 627 args: argparse.Namespace, | |
| 628 stderr_handler: logging.Handler, | |
| 629 runtimeContext: RuntimeContext, | |
| 630 ) -> None: | |
| 631 rdflib_logger = logging.getLogger("rdflib.term") | |
| 632 rdflib_logger.addHandler(stderr_handler) | |
| 633 rdflib_logger.setLevel(logging.ERROR) | |
| 634 if args.quiet: | |
| 635 # Silence STDERR, not an eventual provenance log file | |
| 636 stderr_handler.setLevel(logging.WARN) | |
| 637 if runtimeContext.debug: | |
| 638 # Increase to debug for both stderr and provenance log file | |
| 639 _logger.setLevel(logging.DEBUG) | |
| 640 stderr_handler.setLevel(logging.DEBUG) | |
| 641 rdflib_logger.setLevel(logging.DEBUG) | |
| 642 fmtclass = coloredlogs.ColoredFormatter if args.enable_color else logging.Formatter | |
| 643 formatter = fmtclass("%(levelname)s %(message)s") | |
| 644 if args.timestamps: | |
| 645 formatter = fmtclass( | |
| 646 "[%(asctime)s] %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S" | |
| 647 ) | |
| 648 stderr_handler.setFormatter(formatter) | |
| 649 | |
| 650 | |
| 651 def setup_schema( | |
| 652 args: argparse.Namespace, custom_schema_callback: Optional[Callable[[], None]] | |
| 653 ) -> None: | |
| 654 if custom_schema_callback is not None: | |
| 655 custom_schema_callback() | |
| 656 elif args.enable_ext: | |
| 657 with pkg_resources.resource_stream(__name__, "extensions.yml") as res: | |
| 658 ext10 = res.read().decode("utf-8") | |
| 659 with pkg_resources.resource_stream(__name__, "extensions-v1.1.yml") as res: | |
| 660 ext11 = res.read().decode("utf-8") | |
| 661 use_custom_schema("v1.0", "http://commonwl.org/cwltool", ext10) | |
| 662 use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11) | |
| 663 use_custom_schema("v1.2.0-dev1", "http://commonwl.org/cwltool", ext11) | |
| 664 use_custom_schema("v1.2.0-dev2", "http://commonwl.org/cwltool", ext11) | |
| 665 use_custom_schema("v1.2.0-dev3", "http://commonwl.org/cwltool", ext11) | |
| 666 else: | |
| 667 use_standard_schema("v1.0") | |
| 668 use_standard_schema("v1.1") | |
| 669 use_standard_schema("v1.2.0-dev1") | |
| 670 use_standard_schema("v1.2.0-dev2") | |
| 671 use_standard_schema("v1.2.0-dev3") | |
| 672 | |
| 673 | |
| 674 class ProvLogFormatter(logging.Formatter): | |
| 675 """Enforce ISO8601 with both T and Z.""" | |
| 676 | |
| 677 def __init__(self) -> None: | |
| 678 """Use the default formatter with our custom formatstring.""" | |
| 679 super().__init__("[%(asctime)sZ] %(message)s") | |
| 680 | |
| 681 def formatTime( | |
| 682 self, record: logging.LogRecord, datefmt: Optional[str] = None | |
| 683 ) -> str: | |
| 684 formatted_time = time.strftime( | |
| 685 "%Y-%m-%dT%H:%M:%S", time.gmtime(float(record.created)) | |
| 686 ) | |
| 687 with_msecs = f"{formatted_time},{record.msecs:03f}" | |
| 688 return with_msecs | |
| 689 | |
| 690 | |
| 691 def setup_provenance( | |
| 692 args: argparse.Namespace, | |
| 693 argsl: List[str], | |
| 694 runtimeContext: RuntimeContext, | |
| 695 ) -> Optional[int]: | |
| 696 if not args.compute_checksum: | |
| 697 _logger.error("--provenance incompatible with --no-compute-checksum") | |
| 698 return 1 | |
| 699 ro = ResearchObject( | |
| 700 getdefault(runtimeContext.make_fs_access, StdFsAccess)(""), | |
| 701 temp_prefix_ro=args.tmpdir_prefix, | |
| 702 orcid=args.orcid, | |
| 703 full_name=args.cwl_full_name, | |
| 704 ) | |
| 705 runtimeContext.research_obj = ro | |
| 706 log_file_io = ro.open_log_file_for_activity(ro.engine_uuid) | |
| 707 prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io)) | |
| 708 | |
| 709 prov_log_handler.setFormatter(ProvLogFormatter()) | |
| 710 _logger.addHandler(prov_log_handler) | |
| 711 _logger.debug("[provenance] Logging to %s", log_file_io) | |
| 712 if argsl is not None: | |
| 713 # Log cwltool command line options to provenance file | |
| 714 _logger.info("[cwltool] %s %s", sys.argv[0], " ".join(argsl)) | |
| 715 _logger.debug("[cwltool] Arguments: %s", args) | |
| 716 return None | |
| 717 | |
| 718 | |
| 719 def setup_loadingContext( | |
| 720 loadingContext: Optional[LoadingContext], | |
| 721 runtimeContext: RuntimeContext, | |
| 722 args: argparse.Namespace, | |
| 723 ) -> LoadingContext: | |
| 724 if loadingContext is None: | |
| 725 loadingContext = LoadingContext(vars(args)) | |
| 726 else: | |
| 727 loadingContext = loadingContext.copy() | |
| 728 loadingContext.loader = default_loader( | |
| 729 loadingContext.fetcher_constructor, | |
| 730 enable_dev=args.enable_dev, | |
| 731 doc_cache=args.doc_cache, | |
| 732 ) | |
| 733 loadingContext.research_obj = runtimeContext.research_obj | |
| 734 loadingContext.disable_js_validation = args.disable_js_validation or ( | |
| 735 not args.do_validate | |
| 736 ) | |
| 737 loadingContext.construct_tool_object = getdefault( | |
| 738 loadingContext.construct_tool_object, workflow.default_make_tool | |
| 739 ) | |
| 740 loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver) | |
| 741 if loadingContext.do_update is None: | |
| 742 loadingContext.do_update = not (args.pack or args.print_subgraph) | |
| 743 | |
| 744 return loadingContext | |
| 745 | |
| 746 | |
| 747 def make_template( | |
| 748 tool: Process, | |
| 749 ) -> None: | |
| 750 """Make a template CWL input object for the give Process.""" | |
| 751 | |
| 752 def my_represent_none( | |
| 753 self: Any, data: Any | |
| 754 ) -> Any: # pylint: disable=unused-argument | |
| 755 """Force clean representation of 'null'.""" | |
| 756 return self.represent_scalar("tag:yaml.org,2002:null", "null") | |
| 757 | |
| 758 yaml.representer.RoundTripRepresenter.add_representer(type(None), my_represent_none) | |
| 759 yaml.main.round_trip_dump( | |
| 760 generate_input_template(tool), | |
| 761 sys.stdout, | |
| 762 default_flow_style=False, | |
| 763 indent=4, | |
| 764 block_seq_indent=2, | |
| 765 ) | |
| 766 | |
| 767 | |
| 768 def choose_target( | |
| 769 args: argparse.Namespace, | |
| 770 tool: Process, | |
| 771 loadingContext: LoadingContext, | |
| 772 ) -> Optional[Process]: | |
| 773 """Walk the Workflow, extract the subset matches all the args.targets.""" | |
| 774 if loadingContext.loader is None: | |
| 775 raise Exception("loadingContext.loader cannot be None") | |
| 776 | |
| 777 if isinstance(tool, Workflow): | |
| 778 url = urllib.parse.urlparse(tool.tool["id"]) | |
| 779 if url.fragment: | |
| 780 extracted = get_subgraph( | |
| 781 [tool.tool["id"] + "/" + r for r in args.target], tool | |
| 782 ) | |
| 783 else: | |
| 784 extracted = get_subgraph( | |
| 785 [ | |
| 786 loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r) | |
| 787 for r in args.target | |
| 788 ], | |
| 789 tool, | |
| 790 ) | |
| 791 else: | |
| 792 _logger.error("Can only use --target on Workflows") | |
| 793 return None | |
| 794 if isinstance(loadingContext.loader.idx, MutableMapping): | |
| 795 loadingContext.loader.idx[extracted["id"]] = extracted | |
| 796 tool = make_tool(extracted["id"], loadingContext) | |
| 797 else: | |
| 798 raise Exception("Missing loadingContext.loader.idx!") | |
| 799 | |
| 800 return tool | |
| 801 | |
| 802 | |
| 803 def choose_step( | |
| 804 args: argparse.Namespace, | |
| 805 tool: Process, | |
| 806 loadingContext: LoadingContext, | |
| 807 ) -> Optional[Process]: | |
| 808 """Walk the given Workflow and extract just args.single_step.""" | |
| 809 if loadingContext.loader is None: | |
| 810 raise Exception("loadingContext.loader cannot be None") | |
| 811 | |
| 812 if isinstance(tool, Workflow): | |
| 813 url = urllib.parse.urlparse(tool.tool["id"]) | |
| 814 if url.fragment: | |
| 815 extracted = get_step(tool, tool.tool["id"] + "/" + args.singe_step) | |
| 816 else: | |
| 817 extracted = get_step( | |
| 818 tool, | |
| 819 loadingContext.loader.fetcher.urljoin( | |
| 820 tool.tool["id"], "#" + args.single_step | |
| 821 ), | |
| 822 ) | |
| 823 else: | |
| 824 _logger.error("Can only use --single-step on Workflows") | |
| 825 return None | |
| 826 if isinstance(loadingContext.loader.idx, MutableMapping): | |
| 827 loadingContext.loader.idx[extracted["id"]] = extracted | |
| 828 tool = make_tool(extracted["id"], loadingContext) | |
| 829 else: | |
| 830 raise Exception("Missing loadingContext.loader.idx!") | |
| 831 | |
| 832 return tool | |
| 833 | |
| 834 | |
| 835 def check_working_directories( | |
| 836 runtimeContext: RuntimeContext, | |
| 837 ) -> Optional[int]: | |
| 838 """Make any needed working directories.""" | |
| 839 for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"): | |
| 840 if ( | |
| 841 getattr(runtimeContext, dirprefix) | |
| 842 and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX | |
| 843 ): | |
| 844 sl = ( | |
| 845 "/" | |
| 846 if getattr(runtimeContext, dirprefix).endswith("/") | |
| 847 or dirprefix == "cachedir" | |
| 848 else "" | |
| 849 ) | |
| 850 setattr( | |
| 851 runtimeContext, | |
| 852 dirprefix, | |
| 853 os.path.abspath(getattr(runtimeContext, dirprefix)) + sl, | |
| 854 ) | |
| 855 if not os.path.exists(os.path.dirname(getattr(runtimeContext, dirprefix))): | |
| 856 try: | |
| 857 os.makedirs(os.path.dirname(getattr(runtimeContext, dirprefix))) | |
| 858 except Exception: | |
| 859 _logger.exception("Failed to create directory.") | |
| 860 return 1 | |
| 861 return None | |
| 862 | |
| 863 | |
| 864 def main( | |
| 865 argsl: Optional[List[str]] = None, | |
| 866 args: Optional[argparse.Namespace] = None, | |
| 867 job_order_object: Optional[CWLObjectType] = None, | |
| 868 stdin: IO[Any] = sys.stdin, | |
| 869 stdout: Optional[Union[TextIO, StreamWriter]] = None, | |
| 870 stderr: IO[Any] = sys.stderr, | |
| 871 versionfunc: Callable[[], str] = versionstring, | |
| 872 logger_handler: Optional[logging.Handler] = None, | |
| 873 custom_schema_callback: Optional[Callable[[], None]] = None, | |
| 874 executor: Optional[JobExecutor] = None, | |
| 875 loadingContext: Optional[LoadingContext] = None, | |
| 876 runtimeContext: Optional[RuntimeContext] = None, | |
| 877 input_required: bool = True, | |
| 878 ) -> int: | |
| 879 if not stdout: # force UTF-8 even if the console is configured differently | |
| 880 if hasattr(sys.stdout, "encoding") and sys.stdout.encoding.upper() not in ( | |
| 881 "UTF-8", | |
| 882 "UTF8", | |
| 883 ): | |
| 884 if hasattr(sys.stdout, "detach"): | |
| 885 stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") | |
| 886 else: | |
| 887 stdout = getwriter("utf-8")(sys.stdout) # type: ignore | |
| 888 else: | |
| 889 stdout = sys.stdout | |
| 890 | |
| 891 _logger.removeHandler(defaultStreamHandler) | |
| 892 stderr_handler = logger_handler | |
| 893 if stderr_handler is not None: | |
| 894 _logger.addHandler(stderr_handler) | |
| 895 else: | |
| 896 coloredlogs.install(logger=_logger, stream=stderr) | |
| 897 stderr_handler = _logger.handlers[-1] | |
| 898 workflowobj = None | |
| 899 prov_log_handler = None # type: Optional[logging.StreamHandler] | |
| 900 try: | |
| 901 if args is None: | |
| 902 if argsl is None: | |
| 903 argsl = sys.argv[1:] | |
| 904 addl = [] # type: List[str] | |
| 905 if "CWLTOOL_OPTIONS" in os.environ: | |
| 906 addl = os.environ["CWLTOOL_OPTIONS"].split(" ") | |
| 907 parser = arg_parser() | |
| 908 argcomplete.autocomplete(parser) | |
| 909 args = parser.parse_args(addl + argsl) | |
| 910 if args.record_container_id: | |
| 911 if not args.cidfile_dir: | |
| 912 args.cidfile_dir = os.getcwd() | |
| 913 del args.record_container_id | |
| 914 | |
| 915 if runtimeContext is None: | |
| 916 runtimeContext = RuntimeContext(vars(args)) | |
| 917 else: | |
| 918 runtimeContext = runtimeContext.copy() | |
| 919 | |
| 920 # If on Windows platform, a default Docker Container is used if not | |
| 921 # explicitely provided by user | |
| 922 if onWindows() and not runtimeContext.default_container: | |
| 923 # This docker image is a minimal alpine image with bash installed | |
| 924 # (size 6 mb). source: https://github.com/frol/docker-alpine-bash | |
| 925 runtimeContext.default_container = windows_default_container_id | |
| 926 | |
| 927 # If caller parsed its own arguments, it may not include every | |
| 928 # cwltool option, so fill in defaults to avoid crashing when | |
| 929 # dereferencing them in args. | |
| 930 for key, val in get_default_args().items(): | |
| 931 if not hasattr(args, key): | |
| 932 setattr(args, key, val) | |
| 933 | |
| 934 configure_logging(args, stderr_handler, runtimeContext) | |
| 935 | |
| 936 if args.version: | |
| 937 print(versionfunc()) | |
| 938 return 0 | |
| 939 _logger.info(versionfunc()) | |
| 940 | |
| 941 if args.print_supported_versions: | |
| 942 print("\n".join(supported_cwl_versions(args.enable_dev))) | |
| 943 return 0 | |
| 944 | |
| 945 if not args.workflow: | |
| 946 if os.path.isfile("CWLFile"): | |
| 947 args.workflow = "CWLFile" | |
| 948 else: | |
| 949 _logger.error("CWL document required, no input file was provided") | |
| 950 parser.print_help() | |
| 951 return 1 | |
| 952 | |
| 953 if args.ga4gh_tool_registries: | |
| 954 ga4gh_tool_registries[:] = args.ga4gh_tool_registries | |
| 955 if not args.enable_ga4gh_tool_registry: | |
| 956 del ga4gh_tool_registries[:] | |
| 957 | |
| 958 if args.mpi_config_file is not None: | |
| 959 runtimeContext.mpi_config = MpiConfig.load(args.mpi_config_file) | |
| 960 | |
| 961 setup_schema(args, custom_schema_callback) | |
| 962 | |
| 963 if args.provenance: | |
| 964 if argsl is None: | |
| 965 raise Exception("argsl cannot be None") | |
| 966 if setup_provenance(args, argsl, runtimeContext) is not None: | |
| 967 return 1 | |
| 968 | |
| 969 loadingContext = setup_loadingContext(loadingContext, runtimeContext, args) | |
| 970 | |
| 971 uri, tool_file_uri = resolve_tool_uri( | |
| 972 args.workflow, | |
| 973 resolver=loadingContext.resolver, | |
| 974 fetcher_constructor=loadingContext.fetcher_constructor, | |
| 975 ) | |
| 976 | |
| 977 try_again_msg = ( | |
| 978 "" if args.debug else ", try again with --debug for more information" | |
| 979 ) | |
| 980 | |
| 981 try: | |
| 982 job_order_object, input_basedir, jobloader = load_job_order( | |
| 983 args, | |
| 984 stdin, | |
| 985 loadingContext.fetcher_constructor, | |
| 986 loadingContext.overrides_list, | |
| 987 tool_file_uri, | |
| 988 ) | |
| 989 | |
| 990 if args.overrides: | |
| 991 loadingContext.overrides_list.extend( | |
| 992 load_overrides( | |
| 993 file_uri(os.path.abspath(args.overrides)), tool_file_uri | |
| 994 ) | |
| 995 ) | |
| 996 | |
| 997 loadingContext, workflowobj, uri = fetch_document(uri, loadingContext) | |
| 998 | |
| 999 if args.print_deps and loadingContext.loader: | |
| 1000 printdeps( | |
| 1001 workflowobj, loadingContext.loader, stdout, args.relative_deps, uri | |
| 1002 ) | |
| 1003 return 0 | |
| 1004 | |
| 1005 loadingContext, uri = resolve_and_validate_document( | |
| 1006 loadingContext, | |
| 1007 workflowobj, | |
| 1008 uri, | |
| 1009 preprocess_only=(args.print_pre or args.pack), | |
| 1010 skip_schemas=args.skip_schemas, | |
| 1011 ) | |
| 1012 | |
| 1013 if loadingContext.loader is None: | |
| 1014 raise Exception("Impossible code path.") | |
| 1015 processobj, metadata = loadingContext.loader.resolve_ref(uri) | |
| 1016 processobj = cast(CommentedMap, processobj) | |
| 1017 if args.pack: | |
| 1018 stdout.write(print_pack(loadingContext, uri)) | |
| 1019 return 0 | |
| 1020 | |
| 1021 if args.provenance and runtimeContext.research_obj: | |
| 1022 # Can't really be combined with args.pack at same time | |
| 1023 runtimeContext.research_obj.packed_workflow( | |
| 1024 print_pack(loadingContext, uri) | |
| 1025 ) | |
| 1026 | |
| 1027 if args.print_pre: | |
| 1028 stdout.write( | |
| 1029 json_dumps( | |
| 1030 processobj, indent=4, sort_keys=True, separators=(",", ": ") | |
| 1031 ) | |
| 1032 ) | |
| 1033 return 0 | |
| 1034 | |
| 1035 tool = make_tool(uri, loadingContext) | |
| 1036 if args.make_template: | |
| 1037 make_template(tool) | |
| 1038 return 0 | |
| 1039 | |
| 1040 if args.validate: | |
| 1041 print(f"{args.workflow} is valid CWL.") | |
| 1042 return 0 | |
| 1043 | |
| 1044 if args.print_rdf: | |
| 1045 stdout.write( | |
| 1046 printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer) | |
| 1047 ) | |
| 1048 return 0 | |
| 1049 | |
| 1050 if args.print_dot: | |
| 1051 printdot(tool, loadingContext.loader.ctx, stdout) | |
| 1052 return 0 | |
| 1053 | |
| 1054 if args.print_targets: | |
| 1055 for f in ("outputs", "steps", "inputs"): | |
| 1056 if tool.tool[f]: | |
| 1057 _logger.info("%s%s targets:", f[0].upper(), f[1:-1]) | |
| 1058 stdout.write( | |
| 1059 " " | |
| 1060 + "\n ".join([shortname(t["id"]) for t in tool.tool[f]]) | |
| 1061 + "\n" | |
| 1062 ) | |
| 1063 return 0 | |
| 1064 | |
| 1065 if args.target: | |
| 1066 ctool = choose_target(args, tool, loadingContext) | |
| 1067 if ctool is None: | |
| 1068 return 1 | |
| 1069 else: | |
| 1070 tool = ctool | |
| 1071 | |
| 1072 elif args.single_step: | |
| 1073 ctool = choose_step(args, tool, loadingContext) | |
| 1074 if ctool is None: | |
| 1075 return 1 | |
| 1076 else: | |
| 1077 tool = ctool | |
| 1078 | |
| 1079 if args.print_subgraph: | |
| 1080 if "name" in tool.tool: | |
| 1081 del tool.tool["name"] | |
| 1082 stdout.write( | |
| 1083 json_dumps( | |
| 1084 tool.tool, indent=4, sort_keys=True, separators=(",", ": ") | |
| 1085 ) | |
| 1086 ) | |
| 1087 return 0 | |
| 1088 | |
| 1089 except (ValidationException) as exc: | |
| 1090 _logger.error( | |
| 1091 "Tool definition failed validation:\n%s", str(exc), exc_info=args.debug | |
| 1092 ) | |
| 1093 return 1 | |
| 1094 except (RuntimeError, WorkflowException) as exc: | |
| 1095 _logger.error( | |
| 1096 "Tool definition failed initialization:\n%s", | |
| 1097 str(exc), | |
| 1098 exc_info=args.debug, | |
| 1099 ) | |
| 1100 return 1 | |
| 1101 except Exception as exc: | |
| 1102 _logger.error( | |
| 1103 "I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s", | |
| 1104 try_again_msg, | |
| 1105 str(exc) if not args.debug else "", | |
| 1106 exc_info=args.debug, | |
| 1107 ) | |
| 1108 return 1 | |
| 1109 | |
| 1110 if isinstance(tool, int): | |
| 1111 return tool | |
| 1112 | |
| 1113 # If on MacOS platform, TMPDIR must be set to be under one of the | |
| 1114 # shared volumes in Docker for Mac | |
| 1115 # More info: https://dockstore.org/docs/faq | |
| 1116 if sys.platform == "darwin": | |
| 1117 default_mac_path = "/private/tmp/docker_tmp" | |
| 1118 if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX: | |
| 1119 runtimeContext.tmp_outdir_prefix = default_mac_path | |
| 1120 if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX: | |
| 1121 runtimeContext.tmpdir_prefix = default_mac_path | |
| 1122 | |
| 1123 if check_working_directories(runtimeContext) is not None: | |
| 1124 return 1 | |
| 1125 | |
| 1126 if args.cachedir: | |
| 1127 if args.move_outputs == "move": | |
| 1128 runtimeContext.move_outputs = "copy" | |
| 1129 runtimeContext.tmp_outdir_prefix = args.cachedir | |
| 1130 | |
| 1131 runtimeContext.secret_store = getdefault( | |
| 1132 runtimeContext.secret_store, SecretStore() | |
| 1133 ) | |
| 1134 runtimeContext.make_fs_access = getdefault( | |
| 1135 runtimeContext.make_fs_access, StdFsAccess | |
| 1136 ) | |
| 1137 | |
| 1138 if not executor: | |
| 1139 if args.parallel: | |
| 1140 temp_executor = MultithreadedJobExecutor() | |
| 1141 runtimeContext.select_resources = temp_executor.select_resources | |
| 1142 real_executor = temp_executor # type: JobExecutor | |
| 1143 else: | |
| 1144 real_executor = SingleJobExecutor() | |
| 1145 else: | |
| 1146 real_executor = executor | |
| 1147 | |
| 1148 try: | |
| 1149 runtimeContext.basedir = input_basedir | |
| 1150 | |
| 1151 if isinstance(tool, ProcessGenerator): | |
| 1152 tfjob_order = {} # type: CWLObjectType | |
| 1153 if loadingContext.jobdefaults: | |
| 1154 tfjob_order.update(loadingContext.jobdefaults) | |
| 1155 if job_order_object: | |
| 1156 tfjob_order.update(job_order_object) | |
| 1157 tfout, tfstatus = real_executor( | |
| 1158 tool.embedded_tool, tfjob_order, runtimeContext | |
| 1159 ) | |
| 1160 if not tfout or tfstatus != "success": | |
| 1161 raise WorkflowException( | |
| 1162 "ProcessGenerator failed to generate workflow" | |
| 1163 ) | |
| 1164 tool, job_order_object = tool.result(tfjob_order, tfout, runtimeContext) | |
| 1165 if not job_order_object: | |
| 1166 job_order_object = None | |
| 1167 | |
| 1168 try: | |
| 1169 initialized_job_order_object = init_job_order( | |
| 1170 job_order_object, | |
| 1171 args, | |
| 1172 tool, | |
| 1173 jobloader, | |
| 1174 stdout, | |
| 1175 print_input_deps=args.print_input_deps, | |
| 1176 relative_deps=args.relative_deps, | |
| 1177 make_fs_access=runtimeContext.make_fs_access, | |
| 1178 input_basedir=input_basedir, | |
| 1179 secret_store=runtimeContext.secret_store, | |
| 1180 input_required=input_required, | |
| 1181 ) | |
| 1182 except SystemExit as err: | |
| 1183 return err.code | |
| 1184 | |
| 1185 del args.workflow | |
| 1186 del args.job_order | |
| 1187 | |
| 1188 conf_file = getattr( | |
| 1189 args, "beta_dependency_resolvers_configuration", None | |
| 1190 ) # str | |
| 1191 use_conda_dependencies = getattr( | |
| 1192 args, "beta_conda_dependencies", None | |
| 1193 ) # str | |
| 1194 | |
| 1195 if conf_file or use_conda_dependencies: | |
| 1196 runtimeContext.job_script_provider = DependenciesConfiguration(args) | |
| 1197 else: | |
| 1198 runtimeContext.find_default_container = functools.partial( | |
| 1199 find_default_container, | |
| 1200 default_container=runtimeContext.default_container, | |
| 1201 use_biocontainers=args.beta_use_biocontainers, | |
| 1202 ) | |
| 1203 | |
| 1204 (out, status) = real_executor( | |
| 1205 tool, initialized_job_order_object, runtimeContext, logger=_logger | |
| 1206 ) | |
| 1207 | |
| 1208 if out is not None: | |
| 1209 if runtimeContext.research_obj is not None: | |
| 1210 runtimeContext.research_obj.create_job(out, True) | |
| 1211 | |
| 1212 def remove_at_id(doc: CWLObjectType) -> None: | |
| 1213 for key in list(doc.keys()): | |
| 1214 if key == "@id": | |
| 1215 del doc[key] | |
| 1216 else: | |
| 1217 value = doc[key] | |
| 1218 if isinstance(value, MutableMapping): | |
| 1219 remove_at_id(value) | |
| 1220 elif isinstance(value, MutableSequence): | |
| 1221 for entry in value: | |
| 1222 if isinstance(entry, MutableMapping): | |
| 1223 remove_at_id(entry) | |
| 1224 | |
| 1225 remove_at_id(out) | |
| 1226 visit_class( | |
| 1227 out, | |
| 1228 ("File",), | |
| 1229 functools.partial(add_sizes, runtimeContext.make_fs_access("")), | |
| 1230 ) | |
| 1231 | |
| 1232 def loc_to_path(obj: CWLObjectType) -> None: | |
| 1233 for field in ("path", "nameext", "nameroot", "dirname"): | |
| 1234 if field in obj: | |
| 1235 del obj[field] | |
| 1236 if cast(str, obj["location"]).startswith("file://"): | |
| 1237 obj["path"] = uri_file_path(cast(str, obj["location"])) | |
| 1238 | |
| 1239 visit_class(out, ("File", "Directory"), loc_to_path) | |
| 1240 | |
| 1241 # Unsetting the Generation from final output object | |
| 1242 visit_class(out, ("File",), MutationManager().unset_generation) | |
| 1243 | |
| 1244 if isinstance(out, str): | |
| 1245 stdout.write(out) | |
| 1246 else: | |
| 1247 stdout.write(json_dumps(out, indent=4, ensure_ascii=False)) | |
| 1248 stdout.write("\n") | |
| 1249 if hasattr(stdout, "flush"): | |
| 1250 stdout.flush() | |
| 1251 | |
| 1252 if status != "success": | |
| 1253 _logger.warning("Final process status is %s", status) | |
| 1254 return 1 | |
| 1255 _logger.info("Final process status is %s", status) | |
| 1256 return 0 | |
| 1257 | |
| 1258 except (ValidationException) as exc: | |
| 1259 _logger.error( | |
| 1260 "Input object failed validation:\n%s", str(exc), exc_info=args.debug | |
| 1261 ) | |
| 1262 return 1 | |
| 1263 except UnsupportedRequirement as exc: | |
| 1264 _logger.error( | |
| 1265 "Workflow or tool uses unsupported feature:\n%s", | |
| 1266 str(exc), | |
| 1267 exc_info=args.debug, | |
| 1268 ) | |
| 1269 return 33 | |
| 1270 except WorkflowException as exc: | |
| 1271 _logger.error( | |
| 1272 "Workflow error%s:\n%s", | |
| 1273 try_again_msg, | |
| 1274 strip_dup_lineno(str(exc)), | |
| 1275 exc_info=args.debug, | |
| 1276 ) | |
| 1277 return 1 | |
| 1278 except Exception as exc: # pylint: disable=broad-except | |
| 1279 _logger.error( | |
| 1280 "Unhandled error%s:\n %s", | |
| 1281 try_again_msg, | |
| 1282 str(exc), | |
| 1283 exc_info=args.debug, | |
| 1284 ) | |
| 1285 return 1 | |
| 1286 | |
| 1287 finally: | |
| 1288 if ( | |
| 1289 args | |
| 1290 and runtimeContext | |
| 1291 and runtimeContext.research_obj | |
| 1292 and workflowobj | |
| 1293 and loadingContext | |
| 1294 ): | |
| 1295 research_obj = runtimeContext.research_obj | |
| 1296 if loadingContext.loader is not None: | |
| 1297 research_obj.generate_snapshot( | |
| 1298 prov_deps(workflowobj, loadingContext.loader, uri) | |
| 1299 ) | |
| 1300 else: | |
| 1301 _logger.warning( | |
| 1302 "Unable to generate provenance snapshot " | |
| 1303 " due to missing loadingContext.loader." | |
| 1304 ) | |
| 1305 if prov_log_handler is not None: | |
| 1306 # Stop logging so we won't half-log adding ourself to RO | |
| 1307 _logger.debug( | |
| 1308 "[provenance] Closing provenance log file %s", prov_log_handler | |
| 1309 ) | |
| 1310 _logger.removeHandler(prov_log_handler) | |
| 1311 # Ensure last log lines are written out | |
| 1312 prov_log_handler.flush() | |
| 1313 # Underlying WritableBagFile will add the tagfile to the manifest | |
| 1314 prov_log_handler.stream.close() | |
| 1315 prov_log_handler.close() | |
| 1316 research_obj.close(args.provenance) | |
| 1317 | |
| 1318 _logger.removeHandler(stderr_handler) | |
| 1319 _logger.addHandler(defaultStreamHandler) | |
| 1320 | |
| 1321 | |
| 1322 def find_default_container( | |
| 1323 builder: HasReqsHints, | |
| 1324 default_container: Optional[str] = None, | |
| 1325 use_biocontainers: Optional[bool] = None, | |
| 1326 ) -> Optional[str]: | |
| 1327 """Find a container.""" | |
| 1328 if not default_container and use_biocontainers: | |
| 1329 default_container = get_container_from_software_requirements( | |
| 1330 use_biocontainers, builder | |
| 1331 ) | |
| 1332 return default_container | |
| 1333 | |
| 1334 | |
| 1335 def run(*args, **kwargs): | |
| 1336 # type: (*Any, **Any) -> None | |
| 1337 """Run cwltool.""" | |
| 1338 signal.signal(signal.SIGTERM, _signal_handler) | |
| 1339 try: | |
| 1340 sys.exit(main(*args, **kwargs)) | |
| 1341 finally: | |
| 1342 _terminate_processes() | |
| 1343 | |
| 1344 | |
| 1345 if __name__ == "__main__": | |
| 1346 run(sys.argv[1:]) |
