Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/main.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python3 # PYTHON_ARGCOMPLETE_OK """Entry point for cwltool.""" import argparse import functools import io import logging import os import signal import subprocess # nosec import sys import time import urllib from codecs import StreamWriter, getwriter from collections.abc import MutableMapping, MutableSequence from typing import ( IO, Any, Callable, Dict, List, Mapping, MutableMapping, MutableSequence, Optional, Sized, TextIO, Tuple, Union, cast, ) import argcomplete import coloredlogs import pkg_resources # part of setuptools from ruamel import yaml from ruamel.yaml.comments import CommentedMap, CommentedSeq from schema_salad.exceptions import ValidationException from schema_salad.ref_resolver import Loader, file_uri, uri_file_path from schema_salad.sourceline import strip_dup_lineno from schema_salad.utils import ContextType, FetcherCallableType, json_dumps from . import CWL_CONTENT_TYPES, workflow from .argparser import arg_parser, generate_parser, get_default_args from .builder import HasReqsHints from .context import LoadingContext, RuntimeContext, getdefault from .cwlrdf import printdot, printrdf from .errors import UnsupportedRequirement, WorkflowException from .executors import JobExecutor, MultithreadedJobExecutor, SingleJobExecutor from .load_tool import ( default_loader, fetch_document, jobloaderctx, load_overrides, make_tool, resolve_and_validate_document, resolve_overrides, resolve_tool_uri, ) from .loghandler import _logger, defaultStreamHandler from .mpi import MpiConfig from .mutation import MutationManager from .pack import pack from .process import ( CWL_IANA, Process, add_sizes, scandeps, shortname, use_custom_schema, use_standard_schema, ) from .procgenerator import ProcessGenerator from .provenance import ResearchObject from .resolver import ga4gh_tool_registries, tool_resolver from .secrets import SecretStore from .software_requirements import ( DependenciesConfiguration, get_container_from_software_requirements, ) from .stdfsaccess import StdFsAccess from .subgraph import get_step, get_subgraph from .update import ALLUPDATES, UPDATES from .utils import ( DEFAULT_TMP_PREFIX, CWLObjectType, CWLOutputAtomType, CWLOutputType, adjustDirObjs, normalizeFilesDirs, onWindows, processes_to_kill, trim_listing, versionstring, visit_class, windows_default_container_id, ) from .workflow import Workflow def _terminate_processes() -> None: """Kill all spawned processes. Processes to be killed must be appended to `utils.processes_to_kill` as they are spawned. An important caveat: since there's no supported way to kill another thread in Python, this function cannot stop other threads from continuing to execute while it kills the processes that they've spawned. This may occasionally lead to unexpected behaviour. """ # It's possible that another thread will spawn a new task while # we're executing, so it's not safe to use a for loop here. while processes_to_kill: process = processes_to_kill.popleft() cidfile = [ str(arg).split("=")[1] for arg in process.args if "--cidfile" in str(arg) ] if cidfile: try: with open(cidfile[0]) as inp_stream: p = subprocess.Popen( # nosec ["docker", "kill", inp_stream.read()], shell=False # nosec ) try: p.wait(timeout=10) except subprocess.TimeoutExpired: p.kill() except FileNotFoundError: pass def _signal_handler(signum: int, _: Any) -> None: """Kill all spawned processes and exit. Note that it's possible for another thread to spawn a process after all processes have been killed, but before Python exits. Refer to the docstring for _terminate_processes() for other caveats. """ _terminate_processes() sys.exit(signum) def generate_example_input( inptype: Optional[CWLOutputType], default: Optional[CWLOutputType], ) -> Tuple[Any, str]: """Convert a single input schema into an example.""" example = None comment = "" defaults = { "null": "null", "Any": "null", "boolean": False, "int": 0, "long": 0, "float": 0.1, "double": 0.1, "string": "a_string", "File": yaml.comments.CommentedMap( [("class", "File"), ("path", "a/file/path")] ), "Directory": yaml.comments.CommentedMap( [("class", "Directory"), ("path", "a/directory/path")] ), } # type: CWLObjectType if isinstance(inptype, MutableSequence): optional = False if "null" in inptype: inptype.remove("null") optional = True if len(inptype) == 1: example, comment = generate_example_input(inptype[0], default) if optional: if comment: comment = f"{comment} (optional)" else: comment = "optional" else: example = CommentedSeq() for index, entry in enumerate(inptype): value, e_comment = generate_example_input(entry, default) example.append(value) example.yaml_add_eol_comment(e_comment, index) if optional: comment = "optional" elif isinstance(inptype, Mapping) and "type" in inptype: if inptype["type"] == "array": first_item = cast(MutableSequence[CWLObjectType], inptype["items"])[0] items_len = len(cast(Sized, inptype["items"])) if items_len == 1 and "type" in first_item and first_item["type"] == "enum": # array of just an enum then list all the options example = first_item["symbols"] if "name" in first_item: comment = 'array of type "{}".'.format(first_item["name"]) else: value, comment = generate_example_input(inptype["items"], None) comment = "array of " + comment if items_len == 1: example = [value] else: example = value if default is not None: example = default elif inptype["type"] == "enum": symbols = cast(List[str], inptype["symbols"]) if default is not None: example = default elif "default" in inptype: example = inptype["default"] elif len(cast(Sized, inptype["symbols"])) == 1: example = symbols[0] else: example = "{}_enum_value".format(inptype.get("name", "valid")) comment = 'enum; valid values: "{}"'.format('", "'.join(symbols)) elif inptype["type"] == "record": example = yaml.comments.CommentedMap() if "name" in inptype: comment = '"{}" record type.'.format(inptype["name"]) for field in cast(List[CWLObjectType], inptype["fields"]): value, f_comment = generate_example_input(field["type"], None) example.insert(0, shortname(cast(str, field["name"])), value, f_comment) elif "default" in inptype: example = inptype["default"] comment = 'default value of type "{}".'.format(inptype["type"]) else: example = defaults.get(cast(str, inptype["type"]), str(inptype)) comment = 'type "{}".'.format(inptype["type"]) else: if not default: example = defaults.get(str(inptype), str(inptype)) comment = f'type "{inptype}"' else: example = default comment = f'default value of type "{inptype}".' return example, comment def realize_input_schema( input_types: MutableSequence[CWLObjectType], schema_defs: MutableMapping[str, CWLObjectType], ) -> MutableSequence[CWLObjectType]: """Replace references to named typed with the actual types.""" for index, entry in enumerate(input_types): if isinstance(entry, str): if "#" in entry: _, input_type_name = entry.split("#") else: input_type_name = entry if input_type_name in schema_defs: entry = input_types[index] = schema_defs[input_type_name] if isinstance(entry, Mapping): if isinstance(entry["type"], str) and "#" in entry["type"]: _, input_type_name = entry["type"].split("#") if input_type_name in schema_defs: input_types[index]["type"] = cast( CWLOutputAtomType, realize_input_schema( cast( MutableSequence[CWLObjectType], schema_defs[input_type_name], ), schema_defs, ), ) if isinstance(entry["type"], MutableSequence): input_types[index]["type"] = cast( CWLOutputAtomType, realize_input_schema( cast(MutableSequence[CWLObjectType], entry["type"]), schema_defs ), ) if isinstance(entry["type"], Mapping): input_types[index]["type"] = cast( CWLOutputAtomType, realize_input_schema( [cast(CWLObjectType, input_types[index]["type"])], schema_defs ), ) if entry["type"] == "array": items = ( entry["items"] if not isinstance(entry["items"], str) else [entry["items"]] ) input_types[index]["items"] = cast( CWLOutputAtomType, realize_input_schema( cast(MutableSequence[CWLObjectType], items), schema_defs ), ) if entry["type"] == "record": input_types[index]["fields"] = cast( CWLOutputAtomType, realize_input_schema( cast(MutableSequence[CWLObjectType], entry["fields"]), schema_defs, ), ) return input_types def generate_input_template(tool: Process) -> CWLObjectType: """Generate an example input object for the given CWL process.""" template = yaml.comments.CommentedMap() for inp in realize_input_schema(tool.tool["inputs"], tool.schemaDefs): name = shortname(cast(str, inp["id"])) value, comment = generate_example_input(inp["type"], inp.get("default", None)) template.insert(0, name, value, comment) return template def load_job_order( args: argparse.Namespace, stdin: IO[Any], fetcher_constructor: Optional[FetcherCallableType], overrides_list: List[CWLObjectType], tool_file_uri: str, ) -> Tuple[Optional[CWLObjectType], str, Loader]: job_order_object = None job_order_file = None _jobloaderctx = jobloaderctx.copy() loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor) if len(args.job_order) == 1 and args.job_order[0][0] != "-": job_order_file = args.job_order[0] elif len(args.job_order) == 1 and args.job_order[0] == "-": job_order_object = yaml.main.round_trip_load(stdin) job_order_object, _ = loader.resolve_all( job_order_object, file_uri(os.getcwd()) + "/" ) else: job_order_file = None if job_order_object is not None: input_basedir = args.basedir if args.basedir else os.getcwd() elif job_order_file is not None: input_basedir = ( args.basedir if args.basedir else os.path.abspath(os.path.dirname(job_order_file)) ) job_order_object, _ = loader.resolve_ref( job_order_file, checklinks=False, content_types=CWL_CONTENT_TYPES, ) if ( job_order_object is not None and "http://commonwl.org/cwltool#overrides" in job_order_object ): ov_uri = file_uri(job_order_file or input_basedir) overrides_list.extend( resolve_overrides(job_order_object, ov_uri, tool_file_uri) ) del job_order_object["http://commonwl.org/cwltool#overrides"] if job_order_object is None: input_basedir = args.basedir if args.basedir else os.getcwd() if job_order_object is not None and not isinstance( job_order_object, MutableMapping ): _logger.error( "CWL input object at %s is not formatted correctly, it should be a " "JSON/YAML dictionay, not %s.\n" "Raw input object:\n%s", job_order_file or "stdin", type(job_order_object), job_order_object, ) sys.exit(1) return (job_order_object, input_basedir, loader) def init_job_order( job_order_object: Optional[CWLObjectType], args: argparse.Namespace, process: Process, loader: Loader, stdout: Union[TextIO, StreamWriter], print_input_deps: bool = False, relative_deps: str = "primary", make_fs_access: Callable[[str], StdFsAccess] = StdFsAccess, input_basedir: str = "", secret_store: Optional[SecretStore] = None, input_required: bool = True, ) -> CWLObjectType: secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets") if job_order_object is None: namemap = {} # type: Dict[str, str] records = [] # type: List[str] toolparser = generate_parser( argparse.ArgumentParser(prog=args.workflow), process, namemap, records, input_required, ) if args.tool_help: toolparser.print_help() exit(0) cmd_line = vars(toolparser.parse_args(args.job_order)) for record_name in records: record = {} record_items = { k: v for k, v in cmd_line.items() if k.startswith(record_name) } for key, value in record_items.items(): record[key[len(record_name) + 1 :]] = value del cmd_line[key] cmd_line[str(record_name)] = record if "job_order" in cmd_line and cmd_line["job_order"]: try: job_order_object = cast( CWLObjectType, loader.resolve_ref(cmd_line["job_order"])[0], ) except Exception: _logger.exception( "Failed to resolv job_order: %s", cmd_line["job_order"] ) exit(1) else: job_order_object = {"id": args.workflow} del cmd_line["job_order"] job_order_object.update({namemap[k]: v for k, v in cmd_line.items()}) if secret_store and secrets_req: secret_store.store( [shortname(sc) for sc in cast(List[str], secrets_req["secrets"])], job_order_object, ) if _logger.isEnabledFor(logging.DEBUG): _logger.debug( "Parsed job order from command line: %s", json_dumps(job_order_object, indent=4), ) for inp in process.tool["inputs"]: if "default" in inp and ( not job_order_object or shortname(inp["id"]) not in job_order_object ): if not job_order_object: job_order_object = {} job_order_object[shortname(inp["id"])] = inp["default"] if job_order_object is None: if process.tool["inputs"]: if toolparser is not None: print(f"\nOptions for {args.workflow} ") toolparser.print_help() _logger.error("") _logger.error("Input object required, use --help for details") exit(1) else: job_order_object = {} if print_input_deps: basedir = None # type: Optional[str] uri = cast(str, job_order_object["id"]) if uri == args.workflow: basedir = os.path.dirname(uri) uri = "" printdeps( job_order_object, loader, stdout, relative_deps, uri, basedir=basedir, nestdirs=False, ) exit(0) def path_to_loc(p: CWLObjectType) -> None: if "location" not in p and "path" in p: p["location"] = p["path"] del p["path"] ns = {} # type: ContextType ns.update(cast(ContextType, job_order_object.get("$namespaces", {}))) ns.update(cast(ContextType, process.metadata.get("$namespaces", {}))) ld = Loader(ns) def expand_formats(p: CWLObjectType) -> None: if "format" in p: p["format"] = ld.expand_url(cast(str, p["format"]), "") visit_class(job_order_object, ("File", "Directory"), path_to_loc) visit_class( job_order_object, ("File",), functools.partial(add_sizes, make_fs_access(input_basedir)), ) visit_class(job_order_object, ("File",), expand_formats) adjustDirObjs(job_order_object, trim_listing) normalizeFilesDirs(job_order_object) if secret_store and secrets_req: secret_store.store( [shortname(sc) for sc in cast(List[str], secrets_req["secrets"])], job_order_object, ) if "cwl:tool" in job_order_object: del job_order_object["cwl:tool"] if "id" in job_order_object: del job_order_object["id"] return job_order_object def make_relative(base: str, obj: CWLObjectType) -> None: """Relativize the location URI of a File or Directory object.""" uri = cast(str, obj.get("location", obj.get("path"))) if ":" in uri.split("/")[0] and not uri.startswith("file://"): pass else: if uri.startswith("file://"): uri = uri_file_path(uri) obj["location"] = os.path.relpath(uri, base) def printdeps( obj: CWLObjectType, document_loader: Loader, stdout: Union[TextIO, StreamWriter], relative_deps: str, uri: str, basedir: Optional[str] = None, nestdirs: bool = True, ) -> None: """Print a JSON representation of the dependencies of the CWL document.""" deps = find_deps(obj, document_loader, uri, basedir=basedir, nestdirs=nestdirs) if relative_deps == "primary": base = basedir if basedir else os.path.dirname(uri_file_path(str(uri))) elif relative_deps == "cwd": base = os.getcwd() visit_class(deps, ("File", "Directory"), functools.partial(make_relative, base)) stdout.write(json_dumps(deps, indent=4)) def prov_deps( obj: CWLObjectType, document_loader: Loader, uri: str, basedir: Optional[str] = None, ) -> CWLObjectType: deps = find_deps(obj, document_loader, uri, basedir=basedir) def remove_non_cwl(deps: CWLObjectType) -> None: if "secondaryFiles" in deps: sec_files = cast(List[CWLObjectType], deps["secondaryFiles"]) for index, entry in enumerate(sec_files): if not ("format" in entry and entry["format"] == CWL_IANA): del sec_files[index] else: remove_non_cwl(entry) remove_non_cwl(deps) return deps def find_deps( obj: CWLObjectType, document_loader: Loader, uri: str, basedir: Optional[str] = None, nestdirs: bool = True, ) -> CWLObjectType: """Find the dependencies of the CWL document.""" deps = { "class": "File", "location": uri, "format": CWL_IANA, } # type: CWLObjectType def loadref(base: str, uri: str) -> Union[CommentedMap, CommentedSeq, str, None]: return document_loader.fetch(document_loader.fetcher.urljoin(base, uri)) sfs = scandeps( basedir if basedir else uri, obj, {"$import", "run"}, {"$include", "$schemas", "location"}, loadref, nestdirs=nestdirs, ) if sfs is not None: deps["secondaryFiles"] = cast(MutableSequence[CWLOutputAtomType], sfs) return deps def print_pack( loadingContext: LoadingContext, uri: str, ) -> str: """Return a CWL serialization of the CWL document in JSON.""" packed = pack(loadingContext, uri) if len(cast(Sized, packed["$graph"])) > 1: return json_dumps(packed, indent=4) return json_dumps( cast(MutableSequence[CWLObjectType], packed["$graph"])[0], indent=4 ) def supported_cwl_versions(enable_dev: bool) -> List[str]: # ALLUPDATES and UPDATES are dicts if enable_dev: versions = list(ALLUPDATES) else: versions = list(UPDATES) versions.sort() return versions def configure_logging( args: argparse.Namespace, stderr_handler: logging.Handler, runtimeContext: RuntimeContext, ) -> None: rdflib_logger = logging.getLogger("rdflib.term") rdflib_logger.addHandler(stderr_handler) rdflib_logger.setLevel(logging.ERROR) if args.quiet: # Silence STDERR, not an eventual provenance log file stderr_handler.setLevel(logging.WARN) if runtimeContext.debug: # Increase to debug for both stderr and provenance log file _logger.setLevel(logging.DEBUG) stderr_handler.setLevel(logging.DEBUG) rdflib_logger.setLevel(logging.DEBUG) fmtclass = coloredlogs.ColoredFormatter if args.enable_color else logging.Formatter formatter = fmtclass("%(levelname)s %(message)s") if args.timestamps: formatter = fmtclass( "[%(asctime)s] %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S" ) stderr_handler.setFormatter(formatter) def setup_schema( args: argparse.Namespace, custom_schema_callback: Optional[Callable[[], None]] ) -> None: if custom_schema_callback is not None: custom_schema_callback() elif args.enable_ext: with pkg_resources.resource_stream(__name__, "extensions.yml") as res: ext10 = res.read().decode("utf-8") with pkg_resources.resource_stream(__name__, "extensions-v1.1.yml") as res: ext11 = res.read().decode("utf-8") use_custom_schema("v1.0", "http://commonwl.org/cwltool", ext10) use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11) use_custom_schema("v1.2.0-dev1", "http://commonwl.org/cwltool", ext11) use_custom_schema("v1.2.0-dev2", "http://commonwl.org/cwltool", ext11) use_custom_schema("v1.2.0-dev3", "http://commonwl.org/cwltool", ext11) else: use_standard_schema("v1.0") use_standard_schema("v1.1") use_standard_schema("v1.2.0-dev1") use_standard_schema("v1.2.0-dev2") use_standard_schema("v1.2.0-dev3") class ProvLogFormatter(logging.Formatter): """Enforce ISO8601 with both T and Z.""" def __init__(self) -> None: """Use the default formatter with our custom formatstring.""" super().__init__("[%(asctime)sZ] %(message)s") def formatTime( self, record: logging.LogRecord, datefmt: Optional[str] = None ) -> str: formatted_time = time.strftime( "%Y-%m-%dT%H:%M:%S", time.gmtime(float(record.created)) ) with_msecs = f"{formatted_time},{record.msecs:03f}" return with_msecs def setup_provenance( args: argparse.Namespace, argsl: List[str], runtimeContext: RuntimeContext, ) -> Optional[int]: if not args.compute_checksum: _logger.error("--provenance incompatible with --no-compute-checksum") return 1 ro = ResearchObject( getdefault(runtimeContext.make_fs_access, StdFsAccess)(""), temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid, full_name=args.cwl_full_name, ) runtimeContext.research_obj = ro log_file_io = ro.open_log_file_for_activity(ro.engine_uuid) prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io)) prov_log_handler.setFormatter(ProvLogFormatter()) _logger.addHandler(prov_log_handler) _logger.debug("[provenance] Logging to %s", log_file_io) if argsl is not None: # Log cwltool command line options to provenance file _logger.info("[cwltool] %s %s", sys.argv[0], " ".join(argsl)) _logger.debug("[cwltool] Arguments: %s", args) return None def setup_loadingContext( loadingContext: Optional[LoadingContext], runtimeContext: RuntimeContext, args: argparse.Namespace, ) -> LoadingContext: if loadingContext is None: loadingContext = LoadingContext(vars(args)) else: loadingContext = loadingContext.copy() loadingContext.loader = default_loader( loadingContext.fetcher_constructor, enable_dev=args.enable_dev, doc_cache=args.doc_cache, ) loadingContext.research_obj = runtimeContext.research_obj loadingContext.disable_js_validation = args.disable_js_validation or ( not args.do_validate ) loadingContext.construct_tool_object = getdefault( loadingContext.construct_tool_object, workflow.default_make_tool ) loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver) if loadingContext.do_update is None: loadingContext.do_update = not (args.pack or args.print_subgraph) return loadingContext def make_template( tool: Process, ) -> None: """Make a template CWL input object for the give Process.""" def my_represent_none( self: Any, data: Any ) -> Any: # pylint: disable=unused-argument """Force clean representation of 'null'.""" return self.represent_scalar("tag:yaml.org,2002:null", "null") yaml.representer.RoundTripRepresenter.add_representer(type(None), my_represent_none) yaml.main.round_trip_dump( generate_input_template(tool), sys.stdout, default_flow_style=False, indent=4, block_seq_indent=2, ) def choose_target( args: argparse.Namespace, tool: Process, loadingContext: LoadingContext, ) -> Optional[Process]: """Walk the Workflow, extract the subset matches all the args.targets.""" if loadingContext.loader is None: raise Exception("loadingContext.loader cannot be None") if isinstance(tool, Workflow): url = urllib.parse.urlparse(tool.tool["id"]) if url.fragment: extracted = get_subgraph( [tool.tool["id"] + "/" + r for r in args.target], tool ) else: extracted = get_subgraph( [ loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r) for r in args.target ], tool, ) else: _logger.error("Can only use --target on Workflows") return None if isinstance(loadingContext.loader.idx, MutableMapping): loadingContext.loader.idx[extracted["id"]] = extracted tool = make_tool(extracted["id"], loadingContext) else: raise Exception("Missing loadingContext.loader.idx!") return tool def choose_step( args: argparse.Namespace, tool: Process, loadingContext: LoadingContext, ) -> Optional[Process]: """Walk the given Workflow and extract just args.single_step.""" if loadingContext.loader is None: raise Exception("loadingContext.loader cannot be None") if isinstance(tool, Workflow): url = urllib.parse.urlparse(tool.tool["id"]) if url.fragment: extracted = get_step(tool, tool.tool["id"] + "/" + args.singe_step) else: extracted = get_step( tool, loadingContext.loader.fetcher.urljoin( tool.tool["id"], "#" + args.single_step ), ) else: _logger.error("Can only use --single-step on Workflows") return None if isinstance(loadingContext.loader.idx, MutableMapping): loadingContext.loader.idx[extracted["id"]] = extracted tool = make_tool(extracted["id"], loadingContext) else: raise Exception("Missing loadingContext.loader.idx!") return tool def check_working_directories( runtimeContext: RuntimeContext, ) -> Optional[int]: """Make any needed working directories.""" for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"): if ( getattr(runtimeContext, dirprefix) and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX ): sl = ( "/" if getattr(runtimeContext, dirprefix).endswith("/") or dirprefix == "cachedir" else "" ) setattr( runtimeContext, dirprefix, os.path.abspath(getattr(runtimeContext, dirprefix)) + sl, ) if not os.path.exists(os.path.dirname(getattr(runtimeContext, dirprefix))): try: os.makedirs(os.path.dirname(getattr(runtimeContext, dirprefix))) except Exception: _logger.exception("Failed to create directory.") return 1 return None def main( argsl: Optional[List[str]] = None, args: Optional[argparse.Namespace] = None, job_order_object: Optional[CWLObjectType] = None, stdin: IO[Any] = sys.stdin, stdout: Optional[Union[TextIO, StreamWriter]] = None, stderr: IO[Any] = sys.stderr, versionfunc: Callable[[], str] = versionstring, logger_handler: Optional[logging.Handler] = None, custom_schema_callback: Optional[Callable[[], None]] = None, executor: Optional[JobExecutor] = None, loadingContext: Optional[LoadingContext] = None, runtimeContext: Optional[RuntimeContext] = None, input_required: bool = True, ) -> int: if not stdout: # force UTF-8 even if the console is configured differently if hasattr(sys.stdout, "encoding") and sys.stdout.encoding.upper() not in ( "UTF-8", "UTF8", ): if hasattr(sys.stdout, "detach"): stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") else: stdout = getwriter("utf-8")(sys.stdout) # type: ignore else: stdout = sys.stdout _logger.removeHandler(defaultStreamHandler) stderr_handler = logger_handler if stderr_handler is not None: _logger.addHandler(stderr_handler) else: coloredlogs.install(logger=_logger, stream=stderr) stderr_handler = _logger.handlers[-1] workflowobj = None prov_log_handler = None # type: Optional[logging.StreamHandler] try: if args is None: if argsl is None: argsl = sys.argv[1:] addl = [] # type: List[str] if "CWLTOOL_OPTIONS" in os.environ: addl = os.environ["CWLTOOL_OPTIONS"].split(" ") parser = arg_parser() argcomplete.autocomplete(parser) args = parser.parse_args(addl + argsl) if args.record_container_id: if not args.cidfile_dir: args.cidfile_dir = os.getcwd() del args.record_container_id if runtimeContext is None: runtimeContext = RuntimeContext(vars(args)) else: runtimeContext = runtimeContext.copy() # If on Windows platform, a default Docker Container is used if not # explicitely provided by user if onWindows() and not runtimeContext.default_container: # This docker image is a minimal alpine image with bash installed # (size 6 mb). source: https://github.com/frol/docker-alpine-bash runtimeContext.default_container = windows_default_container_id # If caller parsed its own arguments, it may not include every # cwltool option, so fill in defaults to avoid crashing when # dereferencing them in args. for key, val in get_default_args().items(): if not hasattr(args, key): setattr(args, key, val) configure_logging(args, stderr_handler, runtimeContext) if args.version: print(versionfunc()) return 0 _logger.info(versionfunc()) if args.print_supported_versions: print("\n".join(supported_cwl_versions(args.enable_dev))) return 0 if not args.workflow: if os.path.isfile("CWLFile"): args.workflow = "CWLFile" else: _logger.error("CWL document required, no input file was provided") parser.print_help() return 1 if args.ga4gh_tool_registries: ga4gh_tool_registries[:] = args.ga4gh_tool_registries if not args.enable_ga4gh_tool_registry: del ga4gh_tool_registries[:] if args.mpi_config_file is not None: runtimeContext.mpi_config = MpiConfig.load(args.mpi_config_file) setup_schema(args, custom_schema_callback) if args.provenance: if argsl is None: raise Exception("argsl cannot be None") if setup_provenance(args, argsl, runtimeContext) is not None: return 1 loadingContext = setup_loadingContext(loadingContext, runtimeContext, args) uri, tool_file_uri = resolve_tool_uri( args.workflow, resolver=loadingContext.resolver, fetcher_constructor=loadingContext.fetcher_constructor, ) try_again_msg = ( "" if args.debug else ", try again with --debug for more information" ) try: job_order_object, input_basedir, jobloader = load_job_order( args, stdin, loadingContext.fetcher_constructor, loadingContext.overrides_list, tool_file_uri, ) if args.overrides: loadingContext.overrides_list.extend( load_overrides( file_uri(os.path.abspath(args.overrides)), tool_file_uri ) ) loadingContext, workflowobj, uri = fetch_document(uri, loadingContext) if args.print_deps and loadingContext.loader: printdeps( workflowobj, loadingContext.loader, stdout, args.relative_deps, uri ) return 0 loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri, preprocess_only=(args.print_pre or args.pack), skip_schemas=args.skip_schemas, ) if loadingContext.loader is None: raise Exception("Impossible code path.") processobj, metadata = loadingContext.loader.resolve_ref(uri) processobj = cast(CommentedMap, processobj) if args.pack: stdout.write(print_pack(loadingContext, uri)) return 0 if args.provenance and runtimeContext.research_obj: # Can't really be combined with args.pack at same time runtimeContext.research_obj.packed_workflow( print_pack(loadingContext, uri) ) if args.print_pre: stdout.write( json_dumps( processobj, indent=4, sort_keys=True, separators=(",", ": ") ) ) return 0 tool = make_tool(uri, loadingContext) if args.make_template: make_template(tool) return 0 if args.validate: print(f"{args.workflow} is valid CWL.") return 0 if args.print_rdf: stdout.write( printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer) ) return 0 if args.print_dot: printdot(tool, loadingContext.loader.ctx, stdout) return 0 if args.print_targets: for f in ("outputs", "steps", "inputs"): if tool.tool[f]: _logger.info("%s%s targets:", f[0].upper(), f[1:-1]) stdout.write( " " + "\n ".join([shortname(t["id"]) for t in tool.tool[f]]) + "\n" ) return 0 if args.target: ctool = choose_target(args, tool, loadingContext) if ctool is None: return 1 else: tool = ctool elif args.single_step: ctool = choose_step(args, tool, loadingContext) if ctool is None: return 1 else: tool = ctool if args.print_subgraph: if "name" in tool.tool: del tool.tool["name"] stdout.write( json_dumps( tool.tool, indent=4, sort_keys=True, separators=(",", ": ") ) ) return 0 except (ValidationException) as exc: _logger.error( "Tool definition failed validation:\n%s", str(exc), exc_info=args.debug ) return 1 except (RuntimeError, WorkflowException) as exc: _logger.error( "Tool definition failed initialization:\n%s", str(exc), exc_info=args.debug, ) return 1 except Exception as exc: _logger.error( "I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s", try_again_msg, str(exc) if not args.debug else "", exc_info=args.debug, ) return 1 if isinstance(tool, int): return tool # If on MacOS platform, TMPDIR must be set to be under one of the # shared volumes in Docker for Mac # More info: https://dockstore.org/docs/faq if sys.platform == "darwin": default_mac_path = "/private/tmp/docker_tmp" if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX: runtimeContext.tmp_outdir_prefix = default_mac_path if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX: runtimeContext.tmpdir_prefix = default_mac_path if check_working_directories(runtimeContext) is not None: return 1 if args.cachedir: if args.move_outputs == "move": runtimeContext.move_outputs = "copy" runtimeContext.tmp_outdir_prefix = args.cachedir runtimeContext.secret_store = getdefault( runtimeContext.secret_store, SecretStore() ) runtimeContext.make_fs_access = getdefault( runtimeContext.make_fs_access, StdFsAccess ) if not executor: if args.parallel: temp_executor = MultithreadedJobExecutor() runtimeContext.select_resources = temp_executor.select_resources real_executor = temp_executor # type: JobExecutor else: real_executor = SingleJobExecutor() else: real_executor = executor try: runtimeContext.basedir = input_basedir if isinstance(tool, ProcessGenerator): tfjob_order = {} # type: CWLObjectType if loadingContext.jobdefaults: tfjob_order.update(loadingContext.jobdefaults) if job_order_object: tfjob_order.update(job_order_object) tfout, tfstatus = real_executor( tool.embedded_tool, tfjob_order, runtimeContext ) if not tfout or tfstatus != "success": raise WorkflowException( "ProcessGenerator failed to generate workflow" ) tool, job_order_object = tool.result(tfjob_order, tfout, runtimeContext) if not job_order_object: job_order_object = None try: initialized_job_order_object = init_job_order( job_order_object, args, tool, jobloader, stdout, print_input_deps=args.print_input_deps, relative_deps=args.relative_deps, make_fs_access=runtimeContext.make_fs_access, input_basedir=input_basedir, secret_store=runtimeContext.secret_store, input_required=input_required, ) except SystemExit as err: return err.code del args.workflow del args.job_order conf_file = getattr( args, "beta_dependency_resolvers_configuration", None ) # str use_conda_dependencies = getattr( args, "beta_conda_dependencies", None ) # str if conf_file or use_conda_dependencies: runtimeContext.job_script_provider = DependenciesConfiguration(args) else: runtimeContext.find_default_container = functools.partial( find_default_container, default_container=runtimeContext.default_container, use_biocontainers=args.beta_use_biocontainers, ) (out, status) = real_executor( tool, initialized_job_order_object, runtimeContext, logger=_logger ) if out is not None: if runtimeContext.research_obj is not None: runtimeContext.research_obj.create_job(out, True) def remove_at_id(doc: CWLObjectType) -> None: for key in list(doc.keys()): if key == "@id": del doc[key] else: value = doc[key] if isinstance(value, MutableMapping): remove_at_id(value) elif isinstance(value, MutableSequence): for entry in value: if isinstance(entry, MutableMapping): remove_at_id(entry) remove_at_id(out) visit_class( out, ("File",), functools.partial(add_sizes, runtimeContext.make_fs_access("")), ) def loc_to_path(obj: CWLObjectType) -> None: for field in ("path", "nameext", "nameroot", "dirname"): if field in obj: del obj[field] if cast(str, obj["location"]).startswith("file://"): obj["path"] = uri_file_path(cast(str, obj["location"])) visit_class(out, ("File", "Directory"), loc_to_path) # Unsetting the Generation from final output object visit_class(out, ("File",), MutationManager().unset_generation) if isinstance(out, str): stdout.write(out) else: stdout.write(json_dumps(out, indent=4, ensure_ascii=False)) stdout.write("\n") if hasattr(stdout, "flush"): stdout.flush() if status != "success": _logger.warning("Final process status is %s", status) return 1 _logger.info("Final process status is %s", status) return 0 except (ValidationException) as exc: _logger.error( "Input object failed validation:\n%s", str(exc), exc_info=args.debug ) return 1 except UnsupportedRequirement as exc: _logger.error( "Workflow or tool uses unsupported feature:\n%s", str(exc), exc_info=args.debug, ) return 33 except WorkflowException as exc: _logger.error( "Workflow error%s:\n%s", try_again_msg, strip_dup_lineno(str(exc)), exc_info=args.debug, ) return 1 except Exception as exc: # pylint: disable=broad-except _logger.error( "Unhandled error%s:\n %s", try_again_msg, str(exc), exc_info=args.debug, ) return 1 finally: if ( args and runtimeContext and runtimeContext.research_obj and workflowobj and loadingContext ): research_obj = runtimeContext.research_obj if loadingContext.loader is not None: research_obj.generate_snapshot( prov_deps(workflowobj, loadingContext.loader, uri) ) else: _logger.warning( "Unable to generate provenance snapshot " " due to missing loadingContext.loader." ) if prov_log_handler is not None: # Stop logging so we won't half-log adding ourself to RO _logger.debug( "[provenance] Closing provenance log file %s", prov_log_handler ) _logger.removeHandler(prov_log_handler) # Ensure last log lines are written out prov_log_handler.flush() # Underlying WritableBagFile will add the tagfile to the manifest prov_log_handler.stream.close() prov_log_handler.close() research_obj.close(args.provenance) _logger.removeHandler(stderr_handler) _logger.addHandler(defaultStreamHandler) def find_default_container( builder: HasReqsHints, default_container: Optional[str] = None, use_biocontainers: Optional[bool] = None, ) -> Optional[str]: """Find a container.""" if not default_container and use_biocontainers: default_container = get_container_from_software_requirements( use_biocontainers, builder ) return default_container def run(*args, **kwargs): # type: (*Any, **Any) -> None """Run cwltool.""" signal.signal(signal.SIGTERM, _signal_handler) try: sys.exit(main(*args, **kwargs)) finally: _terminate_processes() if __name__ == "__main__": run(sys.argv[1:])