Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/main.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/main.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1025 +0,0 @@ -#!/usr/bin/env python -"""Entry point for cwltool.""" -from __future__ import absolute_import, print_function - -import argparse -import copy -import functools -import io -import logging -import os -import signal -import sys -import time -from codecs import StreamWriter, getwriter # pylint: disable=unused-import -from six.moves import urllib -from typing import (IO, Any, Callable, Dict, Iterable, List, Mapping, - MutableMapping, MutableSequence, Optional, TextIO, Tuple, - Union, cast) - -import pkg_resources # part of setuptools -from ruamel import yaml -from ruamel.yaml.comments import CommentedMap, CommentedSeq -from schema_salad import validate -from schema_salad.ref_resolver import Fetcher, Loader, file_uri, uri_file_path -from schema_salad.sourceline import strip_dup_lineno, cmap -from six import string_types, iteritems, PY3 -from typing_extensions import Text -# move to a regular typing import when Python 3.3-3.6 is no longer supported - -if PY3: - from collections.abc import Iterable, Sequence, MutableSequence -else: # Needed for Py3.8 - from collections import Iterable, Sequence, MutableSequence - -from . import command_line_tool, workflow -from .argparser import arg_parser, generate_parser, get_default_args -from .builder import HasReqsHints # pylint: disable=unused-import -from .context import LoadingContext, RuntimeContext, getdefault -from .cwlrdf import printdot, printrdf -from .errors import UnsupportedRequirement, WorkflowException -from .executors import MultithreadedJobExecutor, SingleJobExecutor, JobExecutor -from .load_tool import (FetcherConstructorType, # pylint: disable=unused-import - fetch_document, jobloaderctx, load_overrides, - make_tool, resolve_overrides, resolve_tool_uri, - resolve_and_validate_document, default_loader) -from .loghandler import _logger, defaultStreamHandler -from .mutation import MutationManager -from .pack import pack -from .pathmapper import adjustDirObjs, normalizeFilesDirs, trim_listing -from .process import (Process, add_sizes, # pylint: disable=unused-import - scandeps, shortname, use_custom_schema, - use_standard_schema, CWL_IANA) -from .workflow import Workflow -from .procgenerator import ProcessGenerator -from .provenance import ResearchObject -from .resolver import ga4gh_tool_registries, tool_resolver -from .secrets import SecretStore -from .software_requirements import (DependenciesConfiguration, - get_container_from_software_requirements) -from .stdfsaccess import StdFsAccess -from .update import ALLUPDATES, UPDATES -from .utils import (DEFAULT_TMP_PREFIX, json_dumps, onWindows, - processes_to_kill, versionstring, visit_class, - windows_default_container_id) -from .subgraph import get_subgraph - -import coloredlogs - -def _terminate_processes(): - # type: () -> None - """Kill all spawned processes. - - Processes to be killed must be appended to `utils.processes_to_kill` - as they are spawned. - - An important caveat: since there's no supported way to kill another - thread in Python, this function cannot stop other threads from - continuing to execute while it kills the processes that they've - spawned. This may occasionally lead to unexpected behaviour. - """ - # It's possible that another thread will spawn a new task while - # we're executing, so it's not safe to use a for loop here. - while processes_to_kill: - processes_to_kill.popleft().kill() - - -def _signal_handler(signum, _): - # type: (int, Any) -> None - """Kill all spawned processes and exit. - - Note that it's possible for another thread to spawn a process after - all processes have been killed, but before Python exits. - - Refer to the docstring for _terminate_processes() for other caveats. - """ - _terminate_processes() - sys.exit(signum) - - -def generate_example_input(inptype, # type: Any - default # type: Optional[Any] - ): # type: (...) -> Tuple[Any, Text] - """Convert a single input schema into an example.""" - example = None - comment = u"" - defaults = {u'null': 'null', - u'Any': 'null', - u'boolean': False, - u'int': 0, - u'long': 0, - u'float': 0.1, - u'double': 0.1, - u'string': 'a_string', - u'File': yaml.comments.CommentedMap([ - ('class', 'File'), ('path', 'a/file/path')]), - u'Directory': yaml.comments.CommentedMap([ - ('class', 'Directory'), ('path', 'a/directory/path')]) - } # type: Dict[Text, Any] - if isinstance(inptype, MutableSequence): - optional = False - if 'null' in inptype: - inptype.remove('null') - optional = True - if len(inptype) == 1: - example, comment = generate_example_input(inptype[0], default) - if optional: - if comment: - comment = u"{} (optional)".format(comment) - else: - comment = u"optional" - else: - example = yaml.comments.CommentedSeq() - for index, entry in enumerate(inptype): - value, e_comment = generate_example_input(entry, default) - example.append(value) - example.yaml_add_eol_comment(e_comment, index) - if optional: - comment = u"optional" - elif isinstance(inptype, Mapping) and 'type' in inptype: - if inptype['type'] == 'array': - if len(inptype['items']) == 1 and 'type' in inptype['items'][0] \ - and inptype['items'][0]['type'] == 'enum': - # array of just an enum then list all the options - example = inptype['items'][0]['symbols'] - if 'name' in inptype['items'][0]: - comment = u'array of type "{}".'.format(inptype['items'][0]['name']) - else: - value, comment = generate_example_input(inptype['items'], None) - comment = u"array of " + comment - if len(inptype['items']) == 1: - example = [value] - else: - example = value - if default is not None: - example = default - elif inptype['type'] == 'enum': - if default is not None: - example = default - elif 'default' in inptype: - example = inptype['default'] - elif len(inptype['symbols']) == 1: - example = inptype['symbols'][0] - else: - example = '{}_enum_value'.format(inptype.get('name', 'valid')) - comment = u'enum; valid values: "{}"'.format( - '", "'.join(inptype['symbols'])) - elif inptype['type'] == 'record': - example = yaml.comments.CommentedMap() - if 'name' in inptype: - comment = u'"{}" record type.'.format(inptype['name']) - for field in inptype['fields']: - value, f_comment = generate_example_input(field['type'], None) - example.insert(0, shortname(field['name']), value, f_comment) - elif 'default' in inptype: - example = inptype['default'] - comment = u'default value of type "{}".'.format(inptype['type']) - else: - example = defaults.get(inptype['type'], Text(inptype)) - comment = u'type "{}".'.format(inptype['type']) - else: - if not default: - example = defaults.get(Text(inptype), Text(inptype)) - comment = u'type "{}"'.format(inptype) - else: - example = default - comment = u'default value of type "{}".'.format(inptype) - return example, comment - -def realize_input_schema(input_types, # type: MutableSequence[Dict[Text, Any]] - schema_defs # type: Dict[Text, Any] - ): # type: (...) -> MutableSequence[Dict[Text, Any]] - """Replace references to named typed with the actual types.""" - for index, entry in enumerate(input_types): - if isinstance(entry, string_types): - if '#' in entry: - _, input_type_name = entry.split('#') - else: - input_type_name = entry - if input_type_name in schema_defs: - entry = input_types[index] = schema_defs[input_type_name] - if isinstance(entry, Mapping): - if isinstance(entry['type'], string_types) and '#' in entry['type']: - _, input_type_name = entry['type'].split('#') - if input_type_name in schema_defs: - input_types[index]['type'] = realize_input_schema( - schema_defs[input_type_name], schema_defs) - if isinstance(entry['type'], MutableSequence): - input_types[index]['type'] = realize_input_schema( - entry['type'], schema_defs) - if isinstance(entry['type'], Mapping): - input_types[index]['type'] = realize_input_schema( - [input_types[index]['type']], schema_defs) - if entry['type'] == 'array': - items = entry['items'] if \ - not isinstance(entry['items'], string_types) else [entry['items']] - input_types[index]['items'] = realize_input_schema(items, schema_defs) - if entry['type'] == 'record': - input_types[index]['fields'] = realize_input_schema( - entry['fields'], schema_defs) - return input_types - -def generate_input_template(tool): - # type: (Process) -> Dict[Text, Any] - """Generate an example input object for the given CWL process.""" - template = yaml.comments.CommentedMap() - for inp in realize_input_schema(tool.tool["inputs"], tool.schemaDefs): - name = shortname(inp["id"]) - value, comment = generate_example_input( - inp['type'], inp.get('default', None)) - template.insert(0, name, value, comment) - return template - -def load_job_order(args, # type: argparse.Namespace - stdin, # type: IO[Any] - fetcher_constructor, # type: Optional[Fetcher] - overrides_list, # type: List[Dict[Text, Any]] - tool_file_uri # type: Text - ): # type: (...) -> Tuple[Optional[MutableMapping[Text, Any]], Text, Loader] - - job_order_object = None - job_order_file = None - - _jobloaderctx = jobloaderctx.copy() - loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore - - if len(args.job_order) == 1 and args.job_order[0][0] != "-": - job_order_file = args.job_order[0] - elif len(args.job_order) == 1 and args.job_order[0] == "-": - job_order_object = yaml.round_trip_load(stdin) - job_order_object, _ = loader.resolve_all(job_order_object, file_uri(os.getcwd()) + "/") - else: - job_order_file = None - - if job_order_object is not None: - input_basedir = args.basedir if args.basedir else os.getcwd() - elif job_order_file is not None: - input_basedir = args.basedir if args.basedir \ - else os.path.abspath(os.path.dirname(job_order_file)) - job_order_object, _ = loader.resolve_ref(job_order_file, checklinks=False) - - if job_order_object is not None and "http://commonwl.org/cwltool#overrides" in job_order_object: - ov_uri = file_uri(job_order_file or input_basedir) - overrides_list.extend( - resolve_overrides(job_order_object, ov_uri, tool_file_uri)) - del job_order_object["http://commonwl.org/cwltool#overrides"] - - if job_order_object is None: - input_basedir = args.basedir if args.basedir else os.getcwd() - - if job_order_object is not None and not isinstance(job_order_object, MutableMapping): - _logger.error( - 'CWL input object at %s is not formatted correctly, it should be a ' - 'JSON/YAML dictionay, not %s.\n' - 'Raw input object:\n%s', job_order_file or "stdin", - type(job_order_object), job_order_object) - sys.exit(1) - return (job_order_object, input_basedir, loader) - -def init_job_order(job_order_object, # type: Optional[MutableMapping[Text, Any]] - args, # type: argparse.Namespace - process, # type: Process - loader, # type: Loader - stdout, # type: Union[TextIO, StreamWriter] - print_input_deps=False, # type: bool - relative_deps=False, # type: bool - make_fs_access=StdFsAccess, # type: Callable[[Text], StdFsAccess] - input_basedir="", # type: Text - secret_store=None, # type: Optional[SecretStore] - input_required=True # type: bool - ): # type: (...) -> MutableMapping[Text, Any] - secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets") - if job_order_object is None: - namemap = {} # type: Dict[Text, Text] - records = [] # type: List[Text] - toolparser = generate_parser( - argparse.ArgumentParser(prog=args.workflow), process, namemap, records, input_required) - if args.tool_help: - toolparser.print_help() - exit(0) - cmd_line = vars(toolparser.parse_args(args.job_order)) - for record_name in records: - record = {} - record_items = { - k: v for k, v in iteritems(cmd_line) - if k.startswith(record_name)} - for key, value in iteritems(record_items): - record[key[len(record_name) + 1:]] = value - del cmd_line[key] - cmd_line[str(record_name)] = record - if 'job_order' in cmd_line and cmd_line["job_order"]: - try: - job_order_object = cast( - MutableMapping[Text, Any], - loader.resolve_ref(cmd_line["job_order"])[0]) - except Exception as err: - _logger.error(Text(err), exc_info=args.debug) - exit(1) - else: - job_order_object = {"id": args.workflow} - - del cmd_line["job_order"] - - job_order_object.update({namemap[k]: v for k, v in cmd_line.items()}) - - if secret_store and secrets_req: - secret_store.store( - [shortname(sc) for sc in secrets_req["secrets"]], job_order_object) - - if _logger.isEnabledFor(logging.DEBUG): - _logger.debug(u"Parsed job order from command line: %s", - json_dumps(job_order_object, indent=4)) - - for inp in process.tool["inputs"]: - if "default" in inp and ( - not job_order_object or shortname(inp["id"]) not in job_order_object): - if not job_order_object: - job_order_object = {} - job_order_object[shortname(inp["id"])] = inp["default"] - - if job_order_object is None: - if process.tool["inputs"]: - if toolparser is not None: - print(u"\nOptions for {} ".format(args.workflow)) - toolparser.print_help() - _logger.error("") - _logger.error("Input object required, use --help for details") - exit(1) - else: - job_order_object = {} - - if print_input_deps: - basedir = None # type: Optional[Text] - uri = job_order_object["id"] - if uri == args.workflow: - basedir = os.path.dirname(uri) - uri = "" - printdeps(job_order_object, loader, stdout, relative_deps, uri, - basedir=basedir, nestdirs=False) - exit(0) - - def path_to_loc(p): # type: (Dict[Text, Any]) -> None - if "location" not in p and "path" in p: - p["location"] = p["path"] - del p["path"] - - ns = {} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]] - ns.update(job_order_object.get("$namespaces", {})) - ns.update(process.metadata.get("$namespaces", {})) - ld = Loader(ns) - - def expand_formats(p): # type: (Dict[Text, Any]) -> None - if "format" in p: - p["format"] = ld.expand_url(p["format"], "") - - visit_class(job_order_object, ("File", "Directory"), path_to_loc) - visit_class(job_order_object, ("File",), functools.partial(add_sizes, make_fs_access(input_basedir))) - visit_class(job_order_object, ("File",), expand_formats) - adjustDirObjs(job_order_object, trim_listing) - normalizeFilesDirs(job_order_object) - - if secret_store and secrets_req: - secret_store.store( - [shortname(sc) for sc in secrets_req["secrets"]], job_order_object) - - if "cwl:tool" in job_order_object: - del job_order_object["cwl:tool"] - if "id" in job_order_object: - del job_order_object["id"] - return job_order_object - - -def make_relative(base, obj): # type: (Text, Dict[Text, Any]) -> None - """Relativize the location URI of a File or Directory object.""" - uri = obj.get("location", obj.get("path")) - if ":" in uri.split("/")[0] and not uri.startswith("file://"): - pass - else: - if uri.startswith("file://"): - uri = uri_file_path(uri) - obj["location"] = os.path.relpath(uri, base) - -def printdeps(obj, # type: Mapping[Text, Any] - document_loader, # type: Loader - stdout, # type: Union[TextIO, StreamWriter] - relative_deps, # type: bool - uri, # type: Text - basedir=None, # type: Optional[Text] - nestdirs=True # type: bool - ): # type: (...) -> None - """Print a JSON representation of the dependencies of the CWL document.""" - deps = find_deps(obj, document_loader, uri, basedir=basedir, - nestdirs=nestdirs) - if relative_deps == "primary": - base = basedir if basedir else os.path.dirname(uri_file_path(str(uri))) - elif relative_deps == "cwd": - base = os.getcwd() - visit_class(deps, ("File", "Directory"), functools.partial( - make_relative, base)) - stdout.write(json_dumps(deps, indent=4)) - -def prov_deps(obj, # type: Mapping[Text, Any] - document_loader, # type: Loader - uri, # type: Text - basedir=None # type: Optional[Text] - ): # type: (...) -> MutableMapping[Text, Any] - deps = find_deps(obj, document_loader, uri, basedir=basedir) - - def remove_non_cwl(deps): # type: (MutableMapping[Text, Any]) -> None - if 'secondaryFiles' in deps: - sec_files = deps['secondaryFiles'] - for index, entry in enumerate(sec_files): - if not ('format' in entry and entry['format'] == CWL_IANA): - del sec_files[index] - else: - remove_non_cwl(entry) - - remove_non_cwl(deps) - return deps - - -def find_deps(obj, # type: Mapping[Text, Any] - document_loader, # type: Loader - uri, # type: Text - basedir=None, # type: Optional[Text] - nestdirs=True # type: bool - ): # type: (...) -> Dict[Text, Any] - """Find the dependencies of the CWL document.""" - deps = {"class": "File", "location": uri, "format": CWL_IANA} # type: Dict[Text, Any] - - def loadref(base, uri): # type: (Text, Text) -> Any - return document_loader.fetch(document_loader.fetcher.urljoin(base, uri)) - - sfs = scandeps( - basedir if basedir else uri, obj, {"$import", "run"}, - {"$include", "$schemas", "location"}, loadref, nestdirs=nestdirs) - if sfs is not None: - deps["secondaryFiles"] = sfs - - return deps - -def print_pack(document_loader, # type: Loader - processobj, # type: CommentedMap - uri, # type: Text - metadata # type: Dict[Text, Any] - ): # type: (...) -> Text - """Return a CWL serialization of the CWL document in JSON.""" - packed = pack(document_loader, processobj, uri, metadata) - if len(packed["$graph"]) > 1: - return json_dumps(packed, indent=4) - return json_dumps(packed["$graph"][0], indent=4) - - -def supported_cwl_versions(enable_dev): # type: (bool) -> List[Text] - # ALLUPDATES and UPDATES are dicts - if enable_dev: - versions = list(ALLUPDATES) - else: - versions = list(UPDATES) - versions.sort() - return versions - -def configure_logging(args, # type: argparse.Namespace - stderr_handler, # type: logging.Handler - runtimeContext # type: RuntimeContext -): # type: (...) -> None - # Configure logging - rdflib_logger = logging.getLogger("rdflib.term") - rdflib_logger.addHandler(stderr_handler) - rdflib_logger.setLevel(logging.ERROR) - if args.quiet: - # Silence STDERR, not an eventual provenance log file - stderr_handler.setLevel(logging.WARN) - if runtimeContext.debug: - # Increase to debug for both stderr and provenance log file - _logger.setLevel(logging.DEBUG) - stderr_handler.setLevel(logging.DEBUG) - rdflib_logger.setLevel(logging.DEBUG) - fmtclass = coloredlogs.ColoredFormatter if args.enable_color else logging.Formatter - formatter = fmtclass("%(levelname)s %(message)s") - if args.timestamps: - formatter = fmtclass( - "[%(asctime)s] %(levelname)s %(message)s", - "%Y-%m-%d %H:%M:%S") - stderr_handler.setFormatter(formatter) - -def setup_schema(args, # type: argparse.Namespace - custom_schema_callback # type: Optional[Callable[[], None]] -): # type: (...) -> None - if custom_schema_callback is not None: - custom_schema_callback() - elif args.enable_ext: - res = pkg_resources.resource_stream(__name__, 'extensions.yml') - use_custom_schema("v1.0", "http://commonwl.org/cwltool", res.read()) - res.close() - else: - use_standard_schema("v1.0") - -def setup_provenance(args, # type: argparse.Namespace - argsl, # type: List[str] - runtimeContext # type: RuntimeContext -): # type: (...) -> Optional[int] - if not args.compute_checksum: - _logger.error("--provenance incompatible with --no-compute-checksum") - return 1 - ro = ResearchObject( - getdefault(runtimeContext.make_fs_access, StdFsAccess), - temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid, - full_name=args.cwl_full_name) - runtimeContext.research_obj = ro - log_file_io = ro.open_log_file_for_activity(ro.engine_uuid) - prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io)) - - class ProvLogFormatter(logging.Formatter): - """Enforce ISO8601 with both T and Z.""" - - def __init__(self): # type: () -> None - super(ProvLogFormatter, self).__init__( - "[%(asctime)sZ] %(message)s") - - def formatTime(self, record, datefmt=None): - # type: (logging.LogRecord, Optional[str]) -> str - record_time = time.gmtime(record.created) - formatted_time = time.strftime("%Y-%m-%dT%H:%M:%S", record_time) - with_msecs = "%s,%03d" % (formatted_time, record.msecs) - return with_msecs - prov_log_handler.setFormatter(ProvLogFormatter()) - _logger.addHandler(prov_log_handler) - _logger.debug(u"[provenance] Logging to %s", log_file_io) - if argsl is not None: - # Log cwltool command line options to provenance file - _logger.info("[cwltool] %s %s", sys.argv[0], u" ".join(argsl)) - _logger.debug(u"[cwltool] Arguments: %s", args) - return None - -def setup_loadingContext(loadingContext, # type: Optional[LoadingContext] - runtimeContext, # type: RuntimeContext - args # type: argparse.Namespace -): # type: (...) -> LoadingContext - if loadingContext is None: - loadingContext = LoadingContext(vars(args)) - else: - loadingContext = loadingContext.copy() - loadingContext.loader = default_loader(loadingContext.fetcher_constructor, - enable_dev=args.enable_dev) - loadingContext.research_obj = runtimeContext.research_obj - loadingContext.disable_js_validation = \ - args.disable_js_validation or (not args.do_validate) - loadingContext.construct_tool_object = getdefault( - loadingContext.construct_tool_object, workflow.default_make_tool) - loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver) - if loadingContext.do_update is None: - loadingContext.do_update = not (args.pack or args.print_subgraph) - - return loadingContext - -def make_template(tool # type: Process -): # type: (...) -> None - def my_represent_none(self, data): # pylint: disable=unused-argument - # type: (Any, Any) -> Any - """Force clean representation of 'null'.""" - return self.represent_scalar(u'tag:yaml.org,2002:null', u'null') - yaml.RoundTripRepresenter.add_representer(type(None), my_represent_none) - yaml.round_trip_dump( - generate_input_template(tool), sys.stdout, - default_flow_style=False, indent=4, block_seq_indent=2) - - -def choose_target(args, # type: argparse.Namespace - tool, # type: Process - loadingContext # type: LoadingContext -): # type: (...) -> Optional[Process] - - if loadingContext.loader is None: - raise Exception("loadingContext.loader cannot be None") - - if isinstance(tool, Workflow): - url = urllib.parse.urlparse(tool.tool["id"]) - if url.fragment: - extracted = get_subgraph([tool.tool["id"] + "/" + r for r in args.target], tool) - else: - extracted = get_subgraph([loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r) - for r in args.target], - tool) - else: - _logger.error("Can only use --target on Workflows") - return None - if isinstance(loadingContext.loader.idx, CommentedMap): - loadingContext.loader.idx[extracted["id"]] = extracted - tool = make_tool(extracted["id"], - loadingContext) - else: - raise Exception("Missing loadingContext.loader.idx!") - - return tool - -def check_working_directories(runtimeContext # type: RuntimeContext -): # type: (...) -> Optional[int] - for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"): - if getattr(runtimeContext, dirprefix) and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX: - sl = "/" if getattr(runtimeContext, dirprefix).endswith("/") or dirprefix == "cachedir" \ - else "" - setattr(runtimeContext, dirprefix, - os.path.abspath(getattr(runtimeContext, dirprefix)) + sl) - if not os.path.exists(os.path.dirname(getattr(runtimeContext, dirprefix))): - try: - os.makedirs(os.path.dirname(getattr(runtimeContext, dirprefix))) - except Exception as e: - _logger.error("Failed to create directory: %s", Text(e)) - return 1 - return None - - -def main(argsl=None, # type: Optional[List[str]] - args=None, # type: Optional[argparse.Namespace] - job_order_object=None, # type: Optional[MutableMapping[Text, Any]] - stdin=sys.stdin, # type: IO[Any] - stdout=None, # type: Optional[Union[TextIO, StreamWriter]] - stderr=sys.stderr, # type: IO[Any] - versionfunc=versionstring, # type: Callable[[], Text] - logger_handler=None, # type: Optional[logging.Handler] - custom_schema_callback=None, # type: Optional[Callable[[], None]] - executor=None, # type: Optional[JobExecutor] - loadingContext=None, # type: Optional[LoadingContext] - runtimeContext=None, # type: Optional[RuntimeContext] - input_required=True # type: bool - ): # type: (...) -> int - if not stdout: # force UTF-8 even if the console is configured differently - if (hasattr(sys.stdout, "encoding") - and sys.stdout.encoding != 'UTF-8'): # type: ignore - if PY3 and hasattr(sys.stdout, "detach"): - stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') - else: - stdout = getwriter('utf-8')(sys.stdout) # type: ignore - else: - stdout = cast(TextIO, sys.stdout) # type: ignore - - _logger.removeHandler(defaultStreamHandler) - stderr_handler = logger_handler - if stderr_handler is not None: - _logger.addHandler(stderr_handler) - else: - coloredlogs.install(logger=_logger, stream=stderr) - stderr_handler = _logger.handlers[-1] - workflowobj = None - prov_log_handler = None # type: Optional[logging.StreamHandler] - try: - if args is None: - if argsl is None: - argsl = sys.argv[1:] - addl = [] # type: List[str] - if "CWLTOOL_OPTIONS" in os.environ: - addl = os.environ["CWLTOOL_OPTIONS"].split(" ") - args = arg_parser().parse_args(addl+argsl) - if args.record_container_id: - if not args.cidfile_dir: - args.cidfile_dir = os.getcwd() - del args.record_container_id - - if runtimeContext is None: - runtimeContext = RuntimeContext(vars(args)) - else: - runtimeContext = runtimeContext.copy() - - # If on Windows platform, a default Docker Container is used if not - # explicitely provided by user - if onWindows() and not runtimeContext.default_container: - # This docker image is a minimal alpine image with bash installed - # (size 6 mb). source: https://github.com/frol/docker-alpine-bash - runtimeContext.default_container = windows_default_container_id - - # If caller parsed its own arguments, it may not include every - # cwltool option, so fill in defaults to avoid crashing when - # dereferencing them in args. - for key, val in iteritems(get_default_args()): - if not hasattr(args, key): - setattr(args, key, val) - - configure_logging(args, stderr_handler, runtimeContext) - - if args.version: - print(versionfunc()) - return 0 - _logger.info(versionfunc()) - - if args.print_supported_versions: - print("\n".join(supported_cwl_versions(args.enable_dev))) - return 0 - - if not args.workflow: - if os.path.isfile("CWLFile"): - setattr(args, "workflow", "CWLFile") - else: - _logger.error("CWL document required, no input file was provided") - arg_parser().print_help() - return 1 - if args.relax_path_checks: - command_line_tool.ACCEPTLIST_RE = command_line_tool.ACCEPTLIST_EN_RELAXED_RE - - if args.ga4gh_tool_registries: - ga4gh_tool_registries[:] = args.ga4gh_tool_registries - if not args.enable_ga4gh_tool_registry: - del ga4gh_tool_registries[:] - - setup_schema(args, custom_schema_callback) - - if args.provenance: - if argsl is None: - raise Exception("argsl cannot be None") - if setup_provenance(args, argsl, runtimeContext) is not None: - return 1 - - loadingContext = setup_loadingContext(loadingContext, runtimeContext, args) - - uri, tool_file_uri = resolve_tool_uri( - args.workflow, resolver=loadingContext.resolver, - fetcher_constructor=loadingContext.fetcher_constructor) - - try_again_msg = "" if args.debug else ", try again with --debug for more information" - - try: - job_order_object, input_basedir, jobloader = load_job_order( - args, stdin, loadingContext.fetcher_constructor, - loadingContext.overrides_list, tool_file_uri) - - if args.overrides: - loadingContext.overrides_list.extend(load_overrides( - file_uri(os.path.abspath(args.overrides)), tool_file_uri)) - - loadingContext, workflowobj, uri = fetch_document( - uri, loadingContext) - - if args.print_deps and loadingContext.loader: - printdeps(workflowobj, loadingContext.loader, stdout, - args.relative_deps, uri) - return 0 - - loadingContext, uri \ - = resolve_and_validate_document(loadingContext, workflowobj, uri, - preprocess_only=(args.print_pre or args.pack), - skip_schemas=args.skip_schemas) - - if loadingContext.loader is None: - raise Exception("Impossible code path.") - processobj, metadata = loadingContext.loader.resolve_ref(uri) - processobj = cast(CommentedMap, processobj) - if args.pack: - stdout.write(print_pack(loadingContext.loader, processobj, uri, metadata)) - return 0 - - if args.provenance and runtimeContext.research_obj: - # Can't really be combined with args.pack at same time - runtimeContext.research_obj.packed_workflow( - print_pack(loadingContext.loader, processobj, uri, metadata)) - - if args.print_pre: - stdout.write(json_dumps(processobj, indent=4, sort_keys=True, separators=(',', ': '))) - return 0 - - tool = make_tool(uri, loadingContext) - if args.make_template: - make_template(tool) - return 0 - - if args.validate: - print("{} is valid CWL.".format(args.workflow)) - return 0 - - if args.print_rdf: - stdout.write(printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer)) - return 0 - - if args.print_dot: - printdot(tool, loadingContext.loader.ctx, stdout) - return 0 - - if args.print_targets: - for f in ("outputs", "steps", "inputs"): - if tool.tool[f]: - _logger.info("%s%s targets:", f[0].upper(), f[1:-1]) - stdout.write(" "+"\n ".join([shortname(t["id"]) for t in tool.tool[f]])+"\n") - return 0 - - if args.target: - ctool = choose_target(args, tool, loadingContext) - if ctool is None: - return 1 - else: - tool = ctool - - if args.print_subgraph: - if "name" in tool.tool: - del tool.tool["name"] - stdout.write(json_dumps(tool.tool, indent=4, sort_keys=True, separators=(',', ': '))) - return 0 - - except (validate.ValidationException) as exc: - _logger.error(u"Tool definition failed validation:\n%s", Text(exc), - exc_info=args.debug) - return 1 - except (RuntimeError, WorkflowException) as exc: - _logger.error(u"Tool definition failed initialization:\n%s", Text(exc), - exc_info=args.debug) - return 1 - except Exception as exc: - _logger.error( - u"I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s", - try_again_msg, - Text(exc) if not args.debug else "", - exc_info=args.debug) - return 1 - - if isinstance(tool, int): - return tool - - # If on MacOS platform, TMPDIR must be set to be under one of the - # shared volumes in Docker for Mac - # More info: https://dockstore.org/docs/faq - if sys.platform == "darwin": - default_mac_path = "/private/tmp/docker_tmp" - if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX: - runtimeContext.tmp_outdir_prefix = default_mac_path - if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX: - runtimeContext.tmpdir_prefix = default_mac_path - - if check_working_directories(runtimeContext) is not None: - return 1 - - if args.cachedir: - if args.move_outputs == "move": - runtimeContext.move_outputs = "copy" - runtimeContext.tmp_outdir_prefix = args.cachedir - - runtimeContext.secret_store = getdefault(runtimeContext.secret_store, SecretStore()) - runtimeContext.make_fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess) - - if not executor: - if args.parallel: - temp_executor = MultithreadedJobExecutor() - runtimeContext.select_resources = temp_executor.select_resources - real_executor = temp_executor # type: JobExecutor - else: - real_executor = SingleJobExecutor() - else: - real_executor = executor - - try: - runtimeContext.basedir = input_basedir - - if isinstance(tool, ProcessGenerator): - tfjob_order = {} # type: MutableMapping[Text, Any] - if loadingContext.jobdefaults: - tfjob_order.update(loadingContext.jobdefaults) - if job_order_object: - tfjob_order.update(job_order_object) - tfout, tfstatus = real_executor(tool.embedded_tool, tfjob_order, runtimeContext) - if tfstatus != "success": - raise WorkflowException("ProcessGenerator failed to generate workflow") - tool, job_order_object = tool.result(tfjob_order, tfout, runtimeContext) - if not job_order_object: - job_order_object = None - - try: - initialized_job_order_object = init_job_order( - job_order_object, args, tool, jobloader, stdout, - print_input_deps=args.print_input_deps, - relative_deps=args.relative_deps, - make_fs_access=runtimeContext.make_fs_access, - input_basedir=input_basedir, - secret_store=runtimeContext.secret_store, - input_required=input_required) - except SystemExit as err: - return err.code - - del args.workflow - del args.job_order - - conf_file = getattr(args, "beta_dependency_resolvers_configuration", None) # Text - use_conda_dependencies = getattr(args, "beta_conda_dependencies", None) # Text - - if conf_file or use_conda_dependencies: - runtimeContext.job_script_provider = DependenciesConfiguration(args) - else: - runtimeContext.find_default_container = functools.partial( - find_default_container, - default_container=runtimeContext.default_container, - use_biocontainers=args.beta_use_biocontainers) - - (out, status) = real_executor( - tool, initialized_job_order_object, runtimeContext, - logger=_logger) - - if out is not None: - if runtimeContext.research_obj is not None: - runtimeContext.research_obj.create_job( - out, None, True) - def remove_at_id(doc): # type: (MutableMapping[Text, Any]) -> None - for key in list(doc.keys()): - if key == '@id': - del doc[key] - else: - value = doc[key] - if isinstance(value, MutableMapping): - remove_at_id(value) - elif isinstance(value, MutableSequence): - for entry in value: - if isinstance(entry, MutableMapping): - remove_at_id(entry) - remove_at_id(out) - visit_class(out, ("File",), functools.partial( - add_sizes, runtimeContext.make_fs_access(''))) - - def loc_to_path(obj): # type: (Dict[Text, Any]) -> None - for field in ("path", "nameext", "nameroot", "dirname"): - if field in obj: - del obj[field] - if obj["location"].startswith("file://"): - obj["path"] = uri_file_path(obj["location"]) - - visit_class(out, ("File", "Directory"), loc_to_path) - - # Unsetting the Generation from final output object - visit_class(out, ("File", ), MutationManager().unset_generation) - - if isinstance(out, string_types): - stdout.write(out) - else: - stdout.write(json_dumps(out, indent=4, ensure_ascii=False)) - stdout.write("\n") - if hasattr(stdout, "flush"): - stdout.flush() - - if status != "success": - _logger.warning(u"Final process status is %s", status) - return 1 - _logger.info(u"Final process status is %s", status) - return 0 - - except (validate.ValidationException) as exc: - _logger.error(u"Input object failed validation:\n%s", Text(exc), - exc_info=args.debug) - return 1 - except UnsupportedRequirement as exc: - _logger.error( - u"Workflow or tool uses unsupported feature:\n%s", Text(exc), - exc_info=args.debug) - return 33 - except WorkflowException as exc: - _logger.error( - u"Workflow error%s:\n%s", try_again_msg, strip_dup_lineno(Text(exc)), - exc_info=args.debug) - return 1 - except Exception as exc: # pylint: disable=broad-except - _logger.error( - u"Unhandled error%s:\n %s", try_again_msg, Text(exc), exc_info=args.debug) - return 1 - - finally: - if args and runtimeContext and runtimeContext.research_obj \ - and workflowobj and loadingContext: - research_obj = runtimeContext.research_obj - if loadingContext.loader is not None: - research_obj.generate_snapshot(prov_deps( - workflowobj, loadingContext.loader, uri)) - else: - _logger.warning("Unable to generate provenance snapshot " - " due to missing loadingContext.loader.") - if prov_log_handler is not None: - # Stop logging so we won't half-log adding ourself to RO - _logger.debug(u"[provenance] Closing provenance log file %s", - prov_log_handler) - _logger.removeHandler(prov_log_handler) - # Ensure last log lines are written out - prov_log_handler.flush() - # Underlying WritableBagFile will add the tagfile to the manifest - prov_log_handler.stream.close() - prov_log_handler.close() - research_obj.close(args.provenance) - - _logger.removeHandler(stderr_handler) - _logger.addHandler(defaultStreamHandler) - - -def find_default_container(builder, # type: HasReqsHints - default_container=None, # type: Optional[Text] - use_biocontainers=None, # type: Optional[bool] - ): # type: (...) -> Optional[Text] - """Find a container.""" - if not default_container and use_biocontainers: - default_container = get_container_from_software_requirements( - use_biocontainers, builder) - return default_container - - -def run(*args, **kwargs): - # type: (*Any, **Any) -> None - """Run cwltool.""" - signal.signal(signal.SIGTERM, _signal_handler) - try: - sys.exit(main(*args, **kwargs)) - finally: - _terminate_processes() - - -if __name__ == "__main__": - run(sys.argv[1:])