Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/cwltool/main.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/cwltool/main.py Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,1025 @@ +#!/usr/bin/env python +"""Entry point for cwltool.""" +from __future__ import absolute_import, print_function + +import argparse +import copy +import functools +import io +import logging +import os +import signal +import sys +import time +from codecs import StreamWriter, getwriter # pylint: disable=unused-import +from six.moves import urllib +from typing import (IO, Any, Callable, Dict, Iterable, List, Mapping, + MutableMapping, MutableSequence, Optional, TextIO, Tuple, + Union, cast) + +import pkg_resources # part of setuptools +from ruamel import yaml +from ruamel.yaml.comments import CommentedMap, CommentedSeq +from schema_salad import validate +from schema_salad.ref_resolver import Fetcher, Loader, file_uri, uri_file_path +from schema_salad.sourceline import strip_dup_lineno, cmap +from six import string_types, iteritems, PY3 +from typing_extensions import Text +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +if PY3: + from collections.abc import Iterable, Sequence, MutableSequence +else: # Needed for Py3.8 + from collections import Iterable, Sequence, MutableSequence + +from . import command_line_tool, workflow +from .argparser import arg_parser, generate_parser, get_default_args +from .builder import HasReqsHints # pylint: disable=unused-import +from .context import LoadingContext, RuntimeContext, getdefault +from .cwlrdf import printdot, printrdf +from .errors import UnsupportedRequirement, WorkflowException +from .executors import MultithreadedJobExecutor, SingleJobExecutor, JobExecutor +from .load_tool import (FetcherConstructorType, # pylint: disable=unused-import + fetch_document, jobloaderctx, load_overrides, + make_tool, resolve_overrides, resolve_tool_uri, + resolve_and_validate_document, default_loader) +from .loghandler import _logger, defaultStreamHandler +from .mutation import MutationManager +from .pack import pack +from .pathmapper import adjustDirObjs, normalizeFilesDirs, trim_listing +from .process import (Process, add_sizes, # pylint: disable=unused-import + scandeps, shortname, use_custom_schema, + use_standard_schema, CWL_IANA) +from .workflow import Workflow +from .procgenerator import ProcessGenerator +from .provenance import ResearchObject +from .resolver import ga4gh_tool_registries, tool_resolver +from .secrets import SecretStore +from .software_requirements import (DependenciesConfiguration, + get_container_from_software_requirements) +from .stdfsaccess import StdFsAccess +from .update import ALLUPDATES, UPDATES +from .utils import (DEFAULT_TMP_PREFIX, json_dumps, onWindows, + processes_to_kill, versionstring, visit_class, + windows_default_container_id) +from .subgraph import get_subgraph + +import coloredlogs + +def _terminate_processes(): + # type: () -> None + """Kill all spawned processes. + + Processes to be killed must be appended to `utils.processes_to_kill` + as they are spawned. + + An important caveat: since there's no supported way to kill another + thread in Python, this function cannot stop other threads from + continuing to execute while it kills the processes that they've + spawned. This may occasionally lead to unexpected behaviour. + """ + # It's possible that another thread will spawn a new task while + # we're executing, so it's not safe to use a for loop here. + while processes_to_kill: + processes_to_kill.popleft().kill() + + +def _signal_handler(signum, _): + # type: (int, Any) -> None + """Kill all spawned processes and exit. + + Note that it's possible for another thread to spawn a process after + all processes have been killed, but before Python exits. + + Refer to the docstring for _terminate_processes() for other caveats. + """ + _terminate_processes() + sys.exit(signum) + + +def generate_example_input(inptype, # type: Any + default # type: Optional[Any] + ): # type: (...) -> Tuple[Any, Text] + """Convert a single input schema into an example.""" + example = None + comment = u"" + defaults = {u'null': 'null', + u'Any': 'null', + u'boolean': False, + u'int': 0, + u'long': 0, + u'float': 0.1, + u'double': 0.1, + u'string': 'a_string', + u'File': yaml.comments.CommentedMap([ + ('class', 'File'), ('path', 'a/file/path')]), + u'Directory': yaml.comments.CommentedMap([ + ('class', 'Directory'), ('path', 'a/directory/path')]) + } # type: Dict[Text, Any] + if isinstance(inptype, MutableSequence): + optional = False + if 'null' in inptype: + inptype.remove('null') + optional = True + if len(inptype) == 1: + example, comment = generate_example_input(inptype[0], default) + if optional: + if comment: + comment = u"{} (optional)".format(comment) + else: + comment = u"optional" + else: + example = yaml.comments.CommentedSeq() + for index, entry in enumerate(inptype): + value, e_comment = generate_example_input(entry, default) + example.append(value) + example.yaml_add_eol_comment(e_comment, index) + if optional: + comment = u"optional" + elif isinstance(inptype, Mapping) and 'type' in inptype: + if inptype['type'] == 'array': + if len(inptype['items']) == 1 and 'type' in inptype['items'][0] \ + and inptype['items'][0]['type'] == 'enum': + # array of just an enum then list all the options + example = inptype['items'][0]['symbols'] + if 'name' in inptype['items'][0]: + comment = u'array of type "{}".'.format(inptype['items'][0]['name']) + else: + value, comment = generate_example_input(inptype['items'], None) + comment = u"array of " + comment + if len(inptype['items']) == 1: + example = [value] + else: + example = value + if default is not None: + example = default + elif inptype['type'] == 'enum': + if default is not None: + example = default + elif 'default' in inptype: + example = inptype['default'] + elif len(inptype['symbols']) == 1: + example = inptype['symbols'][0] + else: + example = '{}_enum_value'.format(inptype.get('name', 'valid')) + comment = u'enum; valid values: "{}"'.format( + '", "'.join(inptype['symbols'])) + elif inptype['type'] == 'record': + example = yaml.comments.CommentedMap() + if 'name' in inptype: + comment = u'"{}" record type.'.format(inptype['name']) + for field in inptype['fields']: + value, f_comment = generate_example_input(field['type'], None) + example.insert(0, shortname(field['name']), value, f_comment) + elif 'default' in inptype: + example = inptype['default'] + comment = u'default value of type "{}".'.format(inptype['type']) + else: + example = defaults.get(inptype['type'], Text(inptype)) + comment = u'type "{}".'.format(inptype['type']) + else: + if not default: + example = defaults.get(Text(inptype), Text(inptype)) + comment = u'type "{}"'.format(inptype) + else: + example = default + comment = u'default value of type "{}".'.format(inptype) + return example, comment + +def realize_input_schema(input_types, # type: MutableSequence[Dict[Text, Any]] + schema_defs # type: Dict[Text, Any] + ): # type: (...) -> MutableSequence[Dict[Text, Any]] + """Replace references to named typed with the actual types.""" + for index, entry in enumerate(input_types): + if isinstance(entry, string_types): + if '#' in entry: + _, input_type_name = entry.split('#') + else: + input_type_name = entry + if input_type_name in schema_defs: + entry = input_types[index] = schema_defs[input_type_name] + if isinstance(entry, Mapping): + if isinstance(entry['type'], string_types) and '#' in entry['type']: + _, input_type_name = entry['type'].split('#') + if input_type_name in schema_defs: + input_types[index]['type'] = realize_input_schema( + schema_defs[input_type_name], schema_defs) + if isinstance(entry['type'], MutableSequence): + input_types[index]['type'] = realize_input_schema( + entry['type'], schema_defs) + if isinstance(entry['type'], Mapping): + input_types[index]['type'] = realize_input_schema( + [input_types[index]['type']], schema_defs) + if entry['type'] == 'array': + items = entry['items'] if \ + not isinstance(entry['items'], string_types) else [entry['items']] + input_types[index]['items'] = realize_input_schema(items, schema_defs) + if entry['type'] == 'record': + input_types[index]['fields'] = realize_input_schema( + entry['fields'], schema_defs) + return input_types + +def generate_input_template(tool): + # type: (Process) -> Dict[Text, Any] + """Generate an example input object for the given CWL process.""" + template = yaml.comments.CommentedMap() + for inp in realize_input_schema(tool.tool["inputs"], tool.schemaDefs): + name = shortname(inp["id"]) + value, comment = generate_example_input( + inp['type'], inp.get('default', None)) + template.insert(0, name, value, comment) + return template + +def load_job_order(args, # type: argparse.Namespace + stdin, # type: IO[Any] + fetcher_constructor, # type: Optional[Fetcher] + overrides_list, # type: List[Dict[Text, Any]] + tool_file_uri # type: Text + ): # type: (...) -> Tuple[Optional[MutableMapping[Text, Any]], Text, Loader] + + job_order_object = None + job_order_file = None + + _jobloaderctx = jobloaderctx.copy() + loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore + + if len(args.job_order) == 1 and args.job_order[0][0] != "-": + job_order_file = args.job_order[0] + elif len(args.job_order) == 1 and args.job_order[0] == "-": + job_order_object = yaml.round_trip_load(stdin) + job_order_object, _ = loader.resolve_all(job_order_object, file_uri(os.getcwd()) + "/") + else: + job_order_file = None + + if job_order_object is not None: + input_basedir = args.basedir if args.basedir else os.getcwd() + elif job_order_file is not None: + input_basedir = args.basedir if args.basedir \ + else os.path.abspath(os.path.dirname(job_order_file)) + job_order_object, _ = loader.resolve_ref(job_order_file, checklinks=False) + + if job_order_object is not None and "http://commonwl.org/cwltool#overrides" in job_order_object: + ov_uri = file_uri(job_order_file or input_basedir) + overrides_list.extend( + resolve_overrides(job_order_object, ov_uri, tool_file_uri)) + del job_order_object["http://commonwl.org/cwltool#overrides"] + + if job_order_object is None: + input_basedir = args.basedir if args.basedir else os.getcwd() + + if job_order_object is not None and not isinstance(job_order_object, MutableMapping): + _logger.error( + 'CWL input object at %s is not formatted correctly, it should be a ' + 'JSON/YAML dictionay, not %s.\n' + 'Raw input object:\n%s', job_order_file or "stdin", + type(job_order_object), job_order_object) + sys.exit(1) + return (job_order_object, input_basedir, loader) + +def init_job_order(job_order_object, # type: Optional[MutableMapping[Text, Any]] + args, # type: argparse.Namespace + process, # type: Process + loader, # type: Loader + stdout, # type: Union[TextIO, StreamWriter] + print_input_deps=False, # type: bool + relative_deps=False, # type: bool + make_fs_access=StdFsAccess, # type: Callable[[Text], StdFsAccess] + input_basedir="", # type: Text + secret_store=None, # type: Optional[SecretStore] + input_required=True # type: bool + ): # type: (...) -> MutableMapping[Text, Any] + secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets") + if job_order_object is None: + namemap = {} # type: Dict[Text, Text] + records = [] # type: List[Text] + toolparser = generate_parser( + argparse.ArgumentParser(prog=args.workflow), process, namemap, records, input_required) + if args.tool_help: + toolparser.print_help() + exit(0) + cmd_line = vars(toolparser.parse_args(args.job_order)) + for record_name in records: + record = {} + record_items = { + k: v for k, v in iteritems(cmd_line) + if k.startswith(record_name)} + for key, value in iteritems(record_items): + record[key[len(record_name) + 1:]] = value + del cmd_line[key] + cmd_line[str(record_name)] = record + if 'job_order' in cmd_line and cmd_line["job_order"]: + try: + job_order_object = cast( + MutableMapping[Text, Any], + loader.resolve_ref(cmd_line["job_order"])[0]) + except Exception as err: + _logger.error(Text(err), exc_info=args.debug) + exit(1) + else: + job_order_object = {"id": args.workflow} + + del cmd_line["job_order"] + + job_order_object.update({namemap[k]: v for k, v in cmd_line.items()}) + + if secret_store and secrets_req: + secret_store.store( + [shortname(sc) for sc in secrets_req["secrets"]], job_order_object) + + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(u"Parsed job order from command line: %s", + json_dumps(job_order_object, indent=4)) + + for inp in process.tool["inputs"]: + if "default" in inp and ( + not job_order_object or shortname(inp["id"]) not in job_order_object): + if not job_order_object: + job_order_object = {} + job_order_object[shortname(inp["id"])] = inp["default"] + + if job_order_object is None: + if process.tool["inputs"]: + if toolparser is not None: + print(u"\nOptions for {} ".format(args.workflow)) + toolparser.print_help() + _logger.error("") + _logger.error("Input object required, use --help for details") + exit(1) + else: + job_order_object = {} + + if print_input_deps: + basedir = None # type: Optional[Text] + uri = job_order_object["id"] + if uri == args.workflow: + basedir = os.path.dirname(uri) + uri = "" + printdeps(job_order_object, loader, stdout, relative_deps, uri, + basedir=basedir, nestdirs=False) + exit(0) + + def path_to_loc(p): # type: (Dict[Text, Any]) -> None + if "location" not in p and "path" in p: + p["location"] = p["path"] + del p["path"] + + ns = {} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]] + ns.update(job_order_object.get("$namespaces", {})) + ns.update(process.metadata.get("$namespaces", {})) + ld = Loader(ns) + + def expand_formats(p): # type: (Dict[Text, Any]) -> None + if "format" in p: + p["format"] = ld.expand_url(p["format"], "") + + visit_class(job_order_object, ("File", "Directory"), path_to_loc) + visit_class(job_order_object, ("File",), functools.partial(add_sizes, make_fs_access(input_basedir))) + visit_class(job_order_object, ("File",), expand_formats) + adjustDirObjs(job_order_object, trim_listing) + normalizeFilesDirs(job_order_object) + + if secret_store and secrets_req: + secret_store.store( + [shortname(sc) for sc in secrets_req["secrets"]], job_order_object) + + if "cwl:tool" in job_order_object: + del job_order_object["cwl:tool"] + if "id" in job_order_object: + del job_order_object["id"] + return job_order_object + + +def make_relative(base, obj): # type: (Text, Dict[Text, Any]) -> None + """Relativize the location URI of a File or Directory object.""" + uri = obj.get("location", obj.get("path")) + if ":" in uri.split("/")[0] and not uri.startswith("file://"): + pass + else: + if uri.startswith("file://"): + uri = uri_file_path(uri) + obj["location"] = os.path.relpath(uri, base) + +def printdeps(obj, # type: Mapping[Text, Any] + document_loader, # type: Loader + stdout, # type: Union[TextIO, StreamWriter] + relative_deps, # type: bool + uri, # type: Text + basedir=None, # type: Optional[Text] + nestdirs=True # type: bool + ): # type: (...) -> None + """Print a JSON representation of the dependencies of the CWL document.""" + deps = find_deps(obj, document_loader, uri, basedir=basedir, + nestdirs=nestdirs) + if relative_deps == "primary": + base = basedir if basedir else os.path.dirname(uri_file_path(str(uri))) + elif relative_deps == "cwd": + base = os.getcwd() + visit_class(deps, ("File", "Directory"), functools.partial( + make_relative, base)) + stdout.write(json_dumps(deps, indent=4)) + +def prov_deps(obj, # type: Mapping[Text, Any] + document_loader, # type: Loader + uri, # type: Text + basedir=None # type: Optional[Text] + ): # type: (...) -> MutableMapping[Text, Any] + deps = find_deps(obj, document_loader, uri, basedir=basedir) + + def remove_non_cwl(deps): # type: (MutableMapping[Text, Any]) -> None + if 'secondaryFiles' in deps: + sec_files = deps['secondaryFiles'] + for index, entry in enumerate(sec_files): + if not ('format' in entry and entry['format'] == CWL_IANA): + del sec_files[index] + else: + remove_non_cwl(entry) + + remove_non_cwl(deps) + return deps + + +def find_deps(obj, # type: Mapping[Text, Any] + document_loader, # type: Loader + uri, # type: Text + basedir=None, # type: Optional[Text] + nestdirs=True # type: bool + ): # type: (...) -> Dict[Text, Any] + """Find the dependencies of the CWL document.""" + deps = {"class": "File", "location": uri, "format": CWL_IANA} # type: Dict[Text, Any] + + def loadref(base, uri): # type: (Text, Text) -> Any + return document_loader.fetch(document_loader.fetcher.urljoin(base, uri)) + + sfs = scandeps( + basedir if basedir else uri, obj, {"$import", "run"}, + {"$include", "$schemas", "location"}, loadref, nestdirs=nestdirs) + if sfs is not None: + deps["secondaryFiles"] = sfs + + return deps + +def print_pack(document_loader, # type: Loader + processobj, # type: CommentedMap + uri, # type: Text + metadata # type: Dict[Text, Any] + ): # type: (...) -> Text + """Return a CWL serialization of the CWL document in JSON.""" + packed = pack(document_loader, processobj, uri, metadata) + if len(packed["$graph"]) > 1: + return json_dumps(packed, indent=4) + return json_dumps(packed["$graph"][0], indent=4) + + +def supported_cwl_versions(enable_dev): # type: (bool) -> List[Text] + # ALLUPDATES and UPDATES are dicts + if enable_dev: + versions = list(ALLUPDATES) + else: + versions = list(UPDATES) + versions.sort() + return versions + +def configure_logging(args, # type: argparse.Namespace + stderr_handler, # type: logging.Handler + runtimeContext # type: RuntimeContext +): # type: (...) -> None + # Configure logging + rdflib_logger = logging.getLogger("rdflib.term") + rdflib_logger.addHandler(stderr_handler) + rdflib_logger.setLevel(logging.ERROR) + if args.quiet: + # Silence STDERR, not an eventual provenance log file + stderr_handler.setLevel(logging.WARN) + if runtimeContext.debug: + # Increase to debug for both stderr and provenance log file + _logger.setLevel(logging.DEBUG) + stderr_handler.setLevel(logging.DEBUG) + rdflib_logger.setLevel(logging.DEBUG) + fmtclass = coloredlogs.ColoredFormatter if args.enable_color else logging.Formatter + formatter = fmtclass("%(levelname)s %(message)s") + if args.timestamps: + formatter = fmtclass( + "[%(asctime)s] %(levelname)s %(message)s", + "%Y-%m-%d %H:%M:%S") + stderr_handler.setFormatter(formatter) + +def setup_schema(args, # type: argparse.Namespace + custom_schema_callback # type: Optional[Callable[[], None]] +): # type: (...) -> None + if custom_schema_callback is not None: + custom_schema_callback() + elif args.enable_ext: + res = pkg_resources.resource_stream(__name__, 'extensions.yml') + use_custom_schema("v1.0", "http://commonwl.org/cwltool", res.read()) + res.close() + else: + use_standard_schema("v1.0") + +def setup_provenance(args, # type: argparse.Namespace + argsl, # type: List[str] + runtimeContext # type: RuntimeContext +): # type: (...) -> Optional[int] + if not args.compute_checksum: + _logger.error("--provenance incompatible with --no-compute-checksum") + return 1 + ro = ResearchObject( + getdefault(runtimeContext.make_fs_access, StdFsAccess), + temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid, + full_name=args.cwl_full_name) + runtimeContext.research_obj = ro + log_file_io = ro.open_log_file_for_activity(ro.engine_uuid) + prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io)) + + class ProvLogFormatter(logging.Formatter): + """Enforce ISO8601 with both T and Z.""" + + def __init__(self): # type: () -> None + super(ProvLogFormatter, self).__init__( + "[%(asctime)sZ] %(message)s") + + def formatTime(self, record, datefmt=None): + # type: (logging.LogRecord, Optional[str]) -> str + record_time = time.gmtime(record.created) + formatted_time = time.strftime("%Y-%m-%dT%H:%M:%S", record_time) + with_msecs = "%s,%03d" % (formatted_time, record.msecs) + return with_msecs + prov_log_handler.setFormatter(ProvLogFormatter()) + _logger.addHandler(prov_log_handler) + _logger.debug(u"[provenance] Logging to %s", log_file_io) + if argsl is not None: + # Log cwltool command line options to provenance file + _logger.info("[cwltool] %s %s", sys.argv[0], u" ".join(argsl)) + _logger.debug(u"[cwltool] Arguments: %s", args) + return None + +def setup_loadingContext(loadingContext, # type: Optional[LoadingContext] + runtimeContext, # type: RuntimeContext + args # type: argparse.Namespace +): # type: (...) -> LoadingContext + if loadingContext is None: + loadingContext = LoadingContext(vars(args)) + else: + loadingContext = loadingContext.copy() + loadingContext.loader = default_loader(loadingContext.fetcher_constructor, + enable_dev=args.enable_dev) + loadingContext.research_obj = runtimeContext.research_obj + loadingContext.disable_js_validation = \ + args.disable_js_validation or (not args.do_validate) + loadingContext.construct_tool_object = getdefault( + loadingContext.construct_tool_object, workflow.default_make_tool) + loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver) + if loadingContext.do_update is None: + loadingContext.do_update = not (args.pack or args.print_subgraph) + + return loadingContext + +def make_template(tool # type: Process +): # type: (...) -> None + def my_represent_none(self, data): # pylint: disable=unused-argument + # type: (Any, Any) -> Any + """Force clean representation of 'null'.""" + return self.represent_scalar(u'tag:yaml.org,2002:null', u'null') + yaml.RoundTripRepresenter.add_representer(type(None), my_represent_none) + yaml.round_trip_dump( + generate_input_template(tool), sys.stdout, + default_flow_style=False, indent=4, block_seq_indent=2) + + +def choose_target(args, # type: argparse.Namespace + tool, # type: Process + loadingContext # type: LoadingContext +): # type: (...) -> Optional[Process] + + if loadingContext.loader is None: + raise Exception("loadingContext.loader cannot be None") + + if isinstance(tool, Workflow): + url = urllib.parse.urlparse(tool.tool["id"]) + if url.fragment: + extracted = get_subgraph([tool.tool["id"] + "/" + r for r in args.target], tool) + else: + extracted = get_subgraph([loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r) + for r in args.target], + tool) + else: + _logger.error("Can only use --target on Workflows") + return None + if isinstance(loadingContext.loader.idx, CommentedMap): + loadingContext.loader.idx[extracted["id"]] = extracted + tool = make_tool(extracted["id"], + loadingContext) + else: + raise Exception("Missing loadingContext.loader.idx!") + + return tool + +def check_working_directories(runtimeContext # type: RuntimeContext +): # type: (...) -> Optional[int] + for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"): + if getattr(runtimeContext, dirprefix) and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX: + sl = "/" if getattr(runtimeContext, dirprefix).endswith("/") or dirprefix == "cachedir" \ + else "" + setattr(runtimeContext, dirprefix, + os.path.abspath(getattr(runtimeContext, dirprefix)) + sl) + if not os.path.exists(os.path.dirname(getattr(runtimeContext, dirprefix))): + try: + os.makedirs(os.path.dirname(getattr(runtimeContext, dirprefix))) + except Exception as e: + _logger.error("Failed to create directory: %s", Text(e)) + return 1 + return None + + +def main(argsl=None, # type: Optional[List[str]] + args=None, # type: Optional[argparse.Namespace] + job_order_object=None, # type: Optional[MutableMapping[Text, Any]] + stdin=sys.stdin, # type: IO[Any] + stdout=None, # type: Optional[Union[TextIO, StreamWriter]] + stderr=sys.stderr, # type: IO[Any] + versionfunc=versionstring, # type: Callable[[], Text] + logger_handler=None, # type: Optional[logging.Handler] + custom_schema_callback=None, # type: Optional[Callable[[], None]] + executor=None, # type: Optional[JobExecutor] + loadingContext=None, # type: Optional[LoadingContext] + runtimeContext=None, # type: Optional[RuntimeContext] + input_required=True # type: bool + ): # type: (...) -> int + if not stdout: # force UTF-8 even if the console is configured differently + if (hasattr(sys.stdout, "encoding") + and sys.stdout.encoding != 'UTF-8'): # type: ignore + if PY3 and hasattr(sys.stdout, "detach"): + stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') + else: + stdout = getwriter('utf-8')(sys.stdout) # type: ignore + else: + stdout = cast(TextIO, sys.stdout) # type: ignore + + _logger.removeHandler(defaultStreamHandler) + stderr_handler = logger_handler + if stderr_handler is not None: + _logger.addHandler(stderr_handler) + else: + coloredlogs.install(logger=_logger, stream=stderr) + stderr_handler = _logger.handlers[-1] + workflowobj = None + prov_log_handler = None # type: Optional[logging.StreamHandler] + try: + if args is None: + if argsl is None: + argsl = sys.argv[1:] + addl = [] # type: List[str] + if "CWLTOOL_OPTIONS" in os.environ: + addl = os.environ["CWLTOOL_OPTIONS"].split(" ") + args = arg_parser().parse_args(addl+argsl) + if args.record_container_id: + if not args.cidfile_dir: + args.cidfile_dir = os.getcwd() + del args.record_container_id + + if runtimeContext is None: + runtimeContext = RuntimeContext(vars(args)) + else: + runtimeContext = runtimeContext.copy() + + # If on Windows platform, a default Docker Container is used if not + # explicitely provided by user + if onWindows() and not runtimeContext.default_container: + # This docker image is a minimal alpine image with bash installed + # (size 6 mb). source: https://github.com/frol/docker-alpine-bash + runtimeContext.default_container = windows_default_container_id + + # If caller parsed its own arguments, it may not include every + # cwltool option, so fill in defaults to avoid crashing when + # dereferencing them in args. + for key, val in iteritems(get_default_args()): + if not hasattr(args, key): + setattr(args, key, val) + + configure_logging(args, stderr_handler, runtimeContext) + + if args.version: + print(versionfunc()) + return 0 + _logger.info(versionfunc()) + + if args.print_supported_versions: + print("\n".join(supported_cwl_versions(args.enable_dev))) + return 0 + + if not args.workflow: + if os.path.isfile("CWLFile"): + setattr(args, "workflow", "CWLFile") + else: + _logger.error("CWL document required, no input file was provided") + arg_parser().print_help() + return 1 + if args.relax_path_checks: + command_line_tool.ACCEPTLIST_RE = command_line_tool.ACCEPTLIST_EN_RELAXED_RE + + if args.ga4gh_tool_registries: + ga4gh_tool_registries[:] = args.ga4gh_tool_registries + if not args.enable_ga4gh_tool_registry: + del ga4gh_tool_registries[:] + + setup_schema(args, custom_schema_callback) + + if args.provenance: + if argsl is None: + raise Exception("argsl cannot be None") + if setup_provenance(args, argsl, runtimeContext) is not None: + return 1 + + loadingContext = setup_loadingContext(loadingContext, runtimeContext, args) + + uri, tool_file_uri = resolve_tool_uri( + args.workflow, resolver=loadingContext.resolver, + fetcher_constructor=loadingContext.fetcher_constructor) + + try_again_msg = "" if args.debug else ", try again with --debug for more information" + + try: + job_order_object, input_basedir, jobloader = load_job_order( + args, stdin, loadingContext.fetcher_constructor, + loadingContext.overrides_list, tool_file_uri) + + if args.overrides: + loadingContext.overrides_list.extend(load_overrides( + file_uri(os.path.abspath(args.overrides)), tool_file_uri)) + + loadingContext, workflowobj, uri = fetch_document( + uri, loadingContext) + + if args.print_deps and loadingContext.loader: + printdeps(workflowobj, loadingContext.loader, stdout, + args.relative_deps, uri) + return 0 + + loadingContext, uri \ + = resolve_and_validate_document(loadingContext, workflowobj, uri, + preprocess_only=(args.print_pre or args.pack), + skip_schemas=args.skip_schemas) + + if loadingContext.loader is None: + raise Exception("Impossible code path.") + processobj, metadata = loadingContext.loader.resolve_ref(uri) + processobj = cast(CommentedMap, processobj) + if args.pack: + stdout.write(print_pack(loadingContext.loader, processobj, uri, metadata)) + return 0 + + if args.provenance and runtimeContext.research_obj: + # Can't really be combined with args.pack at same time + runtimeContext.research_obj.packed_workflow( + print_pack(loadingContext.loader, processobj, uri, metadata)) + + if args.print_pre: + stdout.write(json_dumps(processobj, indent=4, sort_keys=True, separators=(',', ': '))) + return 0 + + tool = make_tool(uri, loadingContext) + if args.make_template: + make_template(tool) + return 0 + + if args.validate: + print("{} is valid CWL.".format(args.workflow)) + return 0 + + if args.print_rdf: + stdout.write(printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer)) + return 0 + + if args.print_dot: + printdot(tool, loadingContext.loader.ctx, stdout) + return 0 + + if args.print_targets: + for f in ("outputs", "steps", "inputs"): + if tool.tool[f]: + _logger.info("%s%s targets:", f[0].upper(), f[1:-1]) + stdout.write(" "+"\n ".join([shortname(t["id"]) for t in tool.tool[f]])+"\n") + return 0 + + if args.target: + ctool = choose_target(args, tool, loadingContext) + if ctool is None: + return 1 + else: + tool = ctool + + if args.print_subgraph: + if "name" in tool.tool: + del tool.tool["name"] + stdout.write(json_dumps(tool.tool, indent=4, sort_keys=True, separators=(',', ': '))) + return 0 + + except (validate.ValidationException) as exc: + _logger.error(u"Tool definition failed validation:\n%s", Text(exc), + exc_info=args.debug) + return 1 + except (RuntimeError, WorkflowException) as exc: + _logger.error(u"Tool definition failed initialization:\n%s", Text(exc), + exc_info=args.debug) + return 1 + except Exception as exc: + _logger.error( + u"I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s", + try_again_msg, + Text(exc) if not args.debug else "", + exc_info=args.debug) + return 1 + + if isinstance(tool, int): + return tool + + # If on MacOS platform, TMPDIR must be set to be under one of the + # shared volumes in Docker for Mac + # More info: https://dockstore.org/docs/faq + if sys.platform == "darwin": + default_mac_path = "/private/tmp/docker_tmp" + if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX: + runtimeContext.tmp_outdir_prefix = default_mac_path + if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX: + runtimeContext.tmpdir_prefix = default_mac_path + + if check_working_directories(runtimeContext) is not None: + return 1 + + if args.cachedir: + if args.move_outputs == "move": + runtimeContext.move_outputs = "copy" + runtimeContext.tmp_outdir_prefix = args.cachedir + + runtimeContext.secret_store = getdefault(runtimeContext.secret_store, SecretStore()) + runtimeContext.make_fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess) + + if not executor: + if args.parallel: + temp_executor = MultithreadedJobExecutor() + runtimeContext.select_resources = temp_executor.select_resources + real_executor = temp_executor # type: JobExecutor + else: + real_executor = SingleJobExecutor() + else: + real_executor = executor + + try: + runtimeContext.basedir = input_basedir + + if isinstance(tool, ProcessGenerator): + tfjob_order = {} # type: MutableMapping[Text, Any] + if loadingContext.jobdefaults: + tfjob_order.update(loadingContext.jobdefaults) + if job_order_object: + tfjob_order.update(job_order_object) + tfout, tfstatus = real_executor(tool.embedded_tool, tfjob_order, runtimeContext) + if tfstatus != "success": + raise WorkflowException("ProcessGenerator failed to generate workflow") + tool, job_order_object = tool.result(tfjob_order, tfout, runtimeContext) + if not job_order_object: + job_order_object = None + + try: + initialized_job_order_object = init_job_order( + job_order_object, args, tool, jobloader, stdout, + print_input_deps=args.print_input_deps, + relative_deps=args.relative_deps, + make_fs_access=runtimeContext.make_fs_access, + input_basedir=input_basedir, + secret_store=runtimeContext.secret_store, + input_required=input_required) + except SystemExit as err: + return err.code + + del args.workflow + del args.job_order + + conf_file = getattr(args, "beta_dependency_resolvers_configuration", None) # Text + use_conda_dependencies = getattr(args, "beta_conda_dependencies", None) # Text + + if conf_file or use_conda_dependencies: + runtimeContext.job_script_provider = DependenciesConfiguration(args) + else: + runtimeContext.find_default_container = functools.partial( + find_default_container, + default_container=runtimeContext.default_container, + use_biocontainers=args.beta_use_biocontainers) + + (out, status) = real_executor( + tool, initialized_job_order_object, runtimeContext, + logger=_logger) + + if out is not None: + if runtimeContext.research_obj is not None: + runtimeContext.research_obj.create_job( + out, None, True) + def remove_at_id(doc): # type: (MutableMapping[Text, Any]) -> None + for key in list(doc.keys()): + if key == '@id': + del doc[key] + else: + value = doc[key] + if isinstance(value, MutableMapping): + remove_at_id(value) + elif isinstance(value, MutableSequence): + for entry in value: + if isinstance(entry, MutableMapping): + remove_at_id(entry) + remove_at_id(out) + visit_class(out, ("File",), functools.partial( + add_sizes, runtimeContext.make_fs_access(''))) + + def loc_to_path(obj): # type: (Dict[Text, Any]) -> None + for field in ("path", "nameext", "nameroot", "dirname"): + if field in obj: + del obj[field] + if obj["location"].startswith("file://"): + obj["path"] = uri_file_path(obj["location"]) + + visit_class(out, ("File", "Directory"), loc_to_path) + + # Unsetting the Generation from final output object + visit_class(out, ("File", ), MutationManager().unset_generation) + + if isinstance(out, string_types): + stdout.write(out) + else: + stdout.write(json_dumps(out, indent=4, ensure_ascii=False)) + stdout.write("\n") + if hasattr(stdout, "flush"): + stdout.flush() + + if status != "success": + _logger.warning(u"Final process status is %s", status) + return 1 + _logger.info(u"Final process status is %s", status) + return 0 + + except (validate.ValidationException) as exc: + _logger.error(u"Input object failed validation:\n%s", Text(exc), + exc_info=args.debug) + return 1 + except UnsupportedRequirement as exc: + _logger.error( + u"Workflow or tool uses unsupported feature:\n%s", Text(exc), + exc_info=args.debug) + return 33 + except WorkflowException as exc: + _logger.error( + u"Workflow error%s:\n%s", try_again_msg, strip_dup_lineno(Text(exc)), + exc_info=args.debug) + return 1 + except Exception as exc: # pylint: disable=broad-except + _logger.error( + u"Unhandled error%s:\n %s", try_again_msg, Text(exc), exc_info=args.debug) + return 1 + + finally: + if args and runtimeContext and runtimeContext.research_obj \ + and workflowobj and loadingContext: + research_obj = runtimeContext.research_obj + if loadingContext.loader is not None: + research_obj.generate_snapshot(prov_deps( + workflowobj, loadingContext.loader, uri)) + else: + _logger.warning("Unable to generate provenance snapshot " + " due to missing loadingContext.loader.") + if prov_log_handler is not None: + # Stop logging so we won't half-log adding ourself to RO + _logger.debug(u"[provenance] Closing provenance log file %s", + prov_log_handler) + _logger.removeHandler(prov_log_handler) + # Ensure last log lines are written out + prov_log_handler.flush() + # Underlying WritableBagFile will add the tagfile to the manifest + prov_log_handler.stream.close() + prov_log_handler.close() + research_obj.close(args.provenance) + + _logger.removeHandler(stderr_handler) + _logger.addHandler(defaultStreamHandler) + + +def find_default_container(builder, # type: HasReqsHints + default_container=None, # type: Optional[Text] + use_biocontainers=None, # type: Optional[bool] + ): # type: (...) -> Optional[Text] + """Find a container.""" + if not default_container and use_biocontainers: + default_container = get_container_from_software_requirements( + use_biocontainers, builder) + return default_container + + +def run(*args, **kwargs): + # type: (*Any, **Any) -> None + """Run cwltool.""" + signal.signal(signal.SIGTERM, _signal_handler) + try: + sys.exit(main(*args, **kwargs)) + finally: + _terminate_processes() + + +if __name__ == "__main__": + run(sys.argv[1:])