Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/main.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
#!/usr/bin/env python """Entry point for cwltool.""" from __future__ import absolute_import, print_function import argparse import copy import functools import io import logging import os import signal import sys import time from codecs import StreamWriter, getwriter # pylint: disable=unused-import from six.moves import urllib from typing import (IO, Any, Callable, Dict, Iterable, List, Mapping, MutableMapping, MutableSequence, Optional, TextIO, Tuple, Union, cast) import pkg_resources # part of setuptools from ruamel import yaml from ruamel.yaml.comments import CommentedMap, CommentedSeq from schema_salad import validate from schema_salad.ref_resolver import Fetcher, Loader, file_uri, uri_file_path from schema_salad.sourceline import strip_dup_lineno, cmap from six import string_types, iteritems, PY3 from typing_extensions import Text # move to a regular typing import when Python 3.3-3.6 is no longer supported if PY3: from collections.abc import Iterable, Sequence, MutableSequence else: # Needed for Py3.8 from collections import Iterable, Sequence, MutableSequence from . import command_line_tool, workflow from .argparser import arg_parser, generate_parser, get_default_args from .builder import HasReqsHints # pylint: disable=unused-import from .context import LoadingContext, RuntimeContext, getdefault from .cwlrdf import printdot, printrdf from .errors import UnsupportedRequirement, WorkflowException from .executors import MultithreadedJobExecutor, SingleJobExecutor, JobExecutor from .load_tool import (FetcherConstructorType, # pylint: disable=unused-import fetch_document, jobloaderctx, load_overrides, make_tool, resolve_overrides, resolve_tool_uri, resolve_and_validate_document, default_loader) from .loghandler import _logger, defaultStreamHandler from .mutation import MutationManager from .pack import pack from .pathmapper import adjustDirObjs, normalizeFilesDirs, trim_listing from .process import (Process, add_sizes, # pylint: disable=unused-import scandeps, shortname, use_custom_schema, use_standard_schema, CWL_IANA) from .workflow import Workflow from .procgenerator import ProcessGenerator from .provenance import ResearchObject from .resolver import ga4gh_tool_registries, tool_resolver from .secrets import SecretStore from .software_requirements import (DependenciesConfiguration, get_container_from_software_requirements) from .stdfsaccess import StdFsAccess from .update import ALLUPDATES, UPDATES from .utils import (DEFAULT_TMP_PREFIX, json_dumps, onWindows, processes_to_kill, versionstring, visit_class, windows_default_container_id) from .subgraph import get_subgraph import coloredlogs def _terminate_processes(): # type: () -> None """Kill all spawned processes. Processes to be killed must be appended to `utils.processes_to_kill` as they are spawned. An important caveat: since there's no supported way to kill another thread in Python, this function cannot stop other threads from continuing to execute while it kills the processes that they've spawned. This may occasionally lead to unexpected behaviour. """ # It's possible that another thread will spawn a new task while # we're executing, so it's not safe to use a for loop here. while processes_to_kill: processes_to_kill.popleft().kill() def _signal_handler(signum, _): # type: (int, Any) -> None """Kill all spawned processes and exit. Note that it's possible for another thread to spawn a process after all processes have been killed, but before Python exits. Refer to the docstring for _terminate_processes() for other caveats. """ _terminate_processes() sys.exit(signum) def generate_example_input(inptype, # type: Any default # type: Optional[Any] ): # type: (...) -> Tuple[Any, Text] """Convert a single input schema into an example.""" example = None comment = u"" defaults = {u'null': 'null', u'Any': 'null', u'boolean': False, u'int': 0, u'long': 0, u'float': 0.1, u'double': 0.1, u'string': 'a_string', u'File': yaml.comments.CommentedMap([ ('class', 'File'), ('path', 'a/file/path')]), u'Directory': yaml.comments.CommentedMap([ ('class', 'Directory'), ('path', 'a/directory/path')]) } # type: Dict[Text, Any] if isinstance(inptype, MutableSequence): optional = False if 'null' in inptype: inptype.remove('null') optional = True if len(inptype) == 1: example, comment = generate_example_input(inptype[0], default) if optional: if comment: comment = u"{} (optional)".format(comment) else: comment = u"optional" else: example = yaml.comments.CommentedSeq() for index, entry in enumerate(inptype): value, e_comment = generate_example_input(entry, default) example.append(value) example.yaml_add_eol_comment(e_comment, index) if optional: comment = u"optional" elif isinstance(inptype, Mapping) and 'type' in inptype: if inptype['type'] == 'array': if len(inptype['items']) == 1 and 'type' in inptype['items'][0] \ and inptype['items'][0]['type'] == 'enum': # array of just an enum then list all the options example = inptype['items'][0]['symbols'] if 'name' in inptype['items'][0]: comment = u'array of type "{}".'.format(inptype['items'][0]['name']) else: value, comment = generate_example_input(inptype['items'], None) comment = u"array of " + comment if len(inptype['items']) == 1: example = [value] else: example = value if default is not None: example = default elif inptype['type'] == 'enum': if default is not None: example = default elif 'default' in inptype: example = inptype['default'] elif len(inptype['symbols']) == 1: example = inptype['symbols'][0] else: example = '{}_enum_value'.format(inptype.get('name', 'valid')) comment = u'enum; valid values: "{}"'.format( '", "'.join(inptype['symbols'])) elif inptype['type'] == 'record': example = yaml.comments.CommentedMap() if 'name' in inptype: comment = u'"{}" record type.'.format(inptype['name']) for field in inptype['fields']: value, f_comment = generate_example_input(field['type'], None) example.insert(0, shortname(field['name']), value, f_comment) elif 'default' in inptype: example = inptype['default'] comment = u'default value of type "{}".'.format(inptype['type']) else: example = defaults.get(inptype['type'], Text(inptype)) comment = u'type "{}".'.format(inptype['type']) else: if not default: example = defaults.get(Text(inptype), Text(inptype)) comment = u'type "{}"'.format(inptype) else: example = default comment = u'default value of type "{}".'.format(inptype) return example, comment def realize_input_schema(input_types, # type: MutableSequence[Dict[Text, Any]] schema_defs # type: Dict[Text, Any] ): # type: (...) -> MutableSequence[Dict[Text, Any]] """Replace references to named typed with the actual types.""" for index, entry in enumerate(input_types): if isinstance(entry, string_types): if '#' in entry: _, input_type_name = entry.split('#') else: input_type_name = entry if input_type_name in schema_defs: entry = input_types[index] = schema_defs[input_type_name] if isinstance(entry, Mapping): if isinstance(entry['type'], string_types) and '#' in entry['type']: _, input_type_name = entry['type'].split('#') if input_type_name in schema_defs: input_types[index]['type'] = realize_input_schema( schema_defs[input_type_name], schema_defs) if isinstance(entry['type'], MutableSequence): input_types[index]['type'] = realize_input_schema( entry['type'], schema_defs) if isinstance(entry['type'], Mapping): input_types[index]['type'] = realize_input_schema( [input_types[index]['type']], schema_defs) if entry['type'] == 'array': items = entry['items'] if \ not isinstance(entry['items'], string_types) else [entry['items']] input_types[index]['items'] = realize_input_schema(items, schema_defs) if entry['type'] == 'record': input_types[index]['fields'] = realize_input_schema( entry['fields'], schema_defs) return input_types def generate_input_template(tool): # type: (Process) -> Dict[Text, Any] """Generate an example input object for the given CWL process.""" template = yaml.comments.CommentedMap() for inp in realize_input_schema(tool.tool["inputs"], tool.schemaDefs): name = shortname(inp["id"]) value, comment = generate_example_input( inp['type'], inp.get('default', None)) template.insert(0, name, value, comment) return template def load_job_order(args, # type: argparse.Namespace stdin, # type: IO[Any] fetcher_constructor, # type: Optional[Fetcher] overrides_list, # type: List[Dict[Text, Any]] tool_file_uri # type: Text ): # type: (...) -> Tuple[Optional[MutableMapping[Text, Any]], Text, Loader] job_order_object = None job_order_file = None _jobloaderctx = jobloaderctx.copy() loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore if len(args.job_order) == 1 and args.job_order[0][0] != "-": job_order_file = args.job_order[0] elif len(args.job_order) == 1 and args.job_order[0] == "-": job_order_object = yaml.round_trip_load(stdin) job_order_object, _ = loader.resolve_all(job_order_object, file_uri(os.getcwd()) + "/") else: job_order_file = None if job_order_object is not None: input_basedir = args.basedir if args.basedir else os.getcwd() elif job_order_file is not None: input_basedir = args.basedir if args.basedir \ else os.path.abspath(os.path.dirname(job_order_file)) job_order_object, _ = loader.resolve_ref(job_order_file, checklinks=False) if job_order_object is not None and "http://commonwl.org/cwltool#overrides" in job_order_object: ov_uri = file_uri(job_order_file or input_basedir) overrides_list.extend( resolve_overrides(job_order_object, ov_uri, tool_file_uri)) del job_order_object["http://commonwl.org/cwltool#overrides"] if job_order_object is None: input_basedir = args.basedir if args.basedir else os.getcwd() if job_order_object is not None and not isinstance(job_order_object, MutableMapping): _logger.error( 'CWL input object at %s is not formatted correctly, it should be a ' 'JSON/YAML dictionay, not %s.\n' 'Raw input object:\n%s', job_order_file or "stdin", type(job_order_object), job_order_object) sys.exit(1) return (job_order_object, input_basedir, loader) def init_job_order(job_order_object, # type: Optional[MutableMapping[Text, Any]] args, # type: argparse.Namespace process, # type: Process loader, # type: Loader stdout, # type: Union[TextIO, StreamWriter] print_input_deps=False, # type: bool relative_deps=False, # type: bool make_fs_access=StdFsAccess, # type: Callable[[Text], StdFsAccess] input_basedir="", # type: Text secret_store=None, # type: Optional[SecretStore] input_required=True # type: bool ): # type: (...) -> MutableMapping[Text, Any] secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets") if job_order_object is None: namemap = {} # type: Dict[Text, Text] records = [] # type: List[Text] toolparser = generate_parser( argparse.ArgumentParser(prog=args.workflow), process, namemap, records, input_required) if args.tool_help: toolparser.print_help() exit(0) cmd_line = vars(toolparser.parse_args(args.job_order)) for record_name in records: record = {} record_items = { k: v for k, v in iteritems(cmd_line) if k.startswith(record_name)} for key, value in iteritems(record_items): record[key[len(record_name) + 1:]] = value del cmd_line[key] cmd_line[str(record_name)] = record if 'job_order' in cmd_line and cmd_line["job_order"]: try: job_order_object = cast( MutableMapping[Text, Any], loader.resolve_ref(cmd_line["job_order"])[0]) except Exception as err: _logger.error(Text(err), exc_info=args.debug) exit(1) else: job_order_object = {"id": args.workflow} del cmd_line["job_order"] job_order_object.update({namemap[k]: v for k, v in cmd_line.items()}) if secret_store and secrets_req: secret_store.store( [shortname(sc) for sc in secrets_req["secrets"]], job_order_object) if _logger.isEnabledFor(logging.DEBUG): _logger.debug(u"Parsed job order from command line: %s", json_dumps(job_order_object, indent=4)) for inp in process.tool["inputs"]: if "default" in inp and ( not job_order_object or shortname(inp["id"]) not in job_order_object): if not job_order_object: job_order_object = {} job_order_object[shortname(inp["id"])] = inp["default"] if job_order_object is None: if process.tool["inputs"]: if toolparser is not None: print(u"\nOptions for {} ".format(args.workflow)) toolparser.print_help() _logger.error("") _logger.error("Input object required, use --help for details") exit(1) else: job_order_object = {} if print_input_deps: basedir = None # type: Optional[Text] uri = job_order_object["id"] if uri == args.workflow: basedir = os.path.dirname(uri) uri = "" printdeps(job_order_object, loader, stdout, relative_deps, uri, basedir=basedir, nestdirs=False) exit(0) def path_to_loc(p): # type: (Dict[Text, Any]) -> None if "location" not in p and "path" in p: p["location"] = p["path"] del p["path"] ns = {} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]] ns.update(job_order_object.get("$namespaces", {})) ns.update(process.metadata.get("$namespaces", {})) ld = Loader(ns) def expand_formats(p): # type: (Dict[Text, Any]) -> None if "format" in p: p["format"] = ld.expand_url(p["format"], "") visit_class(job_order_object, ("File", "Directory"), path_to_loc) visit_class(job_order_object, ("File",), functools.partial(add_sizes, make_fs_access(input_basedir))) visit_class(job_order_object, ("File",), expand_formats) adjustDirObjs(job_order_object, trim_listing) normalizeFilesDirs(job_order_object) if secret_store and secrets_req: secret_store.store( [shortname(sc) for sc in secrets_req["secrets"]], job_order_object) if "cwl:tool" in job_order_object: del job_order_object["cwl:tool"] if "id" in job_order_object: del job_order_object["id"] return job_order_object def make_relative(base, obj): # type: (Text, Dict[Text, Any]) -> None """Relativize the location URI of a File or Directory object.""" uri = obj.get("location", obj.get("path")) if ":" in uri.split("/")[0] and not uri.startswith("file://"): pass else: if uri.startswith("file://"): uri = uri_file_path(uri) obj["location"] = os.path.relpath(uri, base) def printdeps(obj, # type: Mapping[Text, Any] document_loader, # type: Loader stdout, # type: Union[TextIO, StreamWriter] relative_deps, # type: bool uri, # type: Text basedir=None, # type: Optional[Text] nestdirs=True # type: bool ): # type: (...) -> None """Print a JSON representation of the dependencies of the CWL document.""" deps = find_deps(obj, document_loader, uri, basedir=basedir, nestdirs=nestdirs) if relative_deps == "primary": base = basedir if basedir else os.path.dirname(uri_file_path(str(uri))) elif relative_deps == "cwd": base = os.getcwd() visit_class(deps, ("File", "Directory"), functools.partial( make_relative, base)) stdout.write(json_dumps(deps, indent=4)) def prov_deps(obj, # type: Mapping[Text, Any] document_loader, # type: Loader uri, # type: Text basedir=None # type: Optional[Text] ): # type: (...) -> MutableMapping[Text, Any] deps = find_deps(obj, document_loader, uri, basedir=basedir) def remove_non_cwl(deps): # type: (MutableMapping[Text, Any]) -> None if 'secondaryFiles' in deps: sec_files = deps['secondaryFiles'] for index, entry in enumerate(sec_files): if not ('format' in entry and entry['format'] == CWL_IANA): del sec_files[index] else: remove_non_cwl(entry) remove_non_cwl(deps) return deps def find_deps(obj, # type: Mapping[Text, Any] document_loader, # type: Loader uri, # type: Text basedir=None, # type: Optional[Text] nestdirs=True # type: bool ): # type: (...) -> Dict[Text, Any] """Find the dependencies of the CWL document.""" deps = {"class": "File", "location": uri, "format": CWL_IANA} # type: Dict[Text, Any] def loadref(base, uri): # type: (Text, Text) -> Any return document_loader.fetch(document_loader.fetcher.urljoin(base, uri)) sfs = scandeps( basedir if basedir else uri, obj, {"$import", "run"}, {"$include", "$schemas", "location"}, loadref, nestdirs=nestdirs) if sfs is not None: deps["secondaryFiles"] = sfs return deps def print_pack(document_loader, # type: Loader processobj, # type: CommentedMap uri, # type: Text metadata # type: Dict[Text, Any] ): # type: (...) -> Text """Return a CWL serialization of the CWL document in JSON.""" packed = pack(document_loader, processobj, uri, metadata) if len(packed["$graph"]) > 1: return json_dumps(packed, indent=4) return json_dumps(packed["$graph"][0], indent=4) def supported_cwl_versions(enable_dev): # type: (bool) -> List[Text] # ALLUPDATES and UPDATES are dicts if enable_dev: versions = list(ALLUPDATES) else: versions = list(UPDATES) versions.sort() return versions def configure_logging(args, # type: argparse.Namespace stderr_handler, # type: logging.Handler runtimeContext # type: RuntimeContext ): # type: (...) -> None # Configure logging rdflib_logger = logging.getLogger("rdflib.term") rdflib_logger.addHandler(stderr_handler) rdflib_logger.setLevel(logging.ERROR) if args.quiet: # Silence STDERR, not an eventual provenance log file stderr_handler.setLevel(logging.WARN) if runtimeContext.debug: # Increase to debug for both stderr and provenance log file _logger.setLevel(logging.DEBUG) stderr_handler.setLevel(logging.DEBUG) rdflib_logger.setLevel(logging.DEBUG) fmtclass = coloredlogs.ColoredFormatter if args.enable_color else logging.Formatter formatter = fmtclass("%(levelname)s %(message)s") if args.timestamps: formatter = fmtclass( "[%(asctime)s] %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S") stderr_handler.setFormatter(formatter) def setup_schema(args, # type: argparse.Namespace custom_schema_callback # type: Optional[Callable[[], None]] ): # type: (...) -> None if custom_schema_callback is not None: custom_schema_callback() elif args.enable_ext: res = pkg_resources.resource_stream(__name__, 'extensions.yml') use_custom_schema("v1.0", "http://commonwl.org/cwltool", res.read()) res.close() else: use_standard_schema("v1.0") def setup_provenance(args, # type: argparse.Namespace argsl, # type: List[str] runtimeContext # type: RuntimeContext ): # type: (...) -> Optional[int] if not args.compute_checksum: _logger.error("--provenance incompatible with --no-compute-checksum") return 1 ro = ResearchObject( getdefault(runtimeContext.make_fs_access, StdFsAccess), temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid, full_name=args.cwl_full_name) runtimeContext.research_obj = ro log_file_io = ro.open_log_file_for_activity(ro.engine_uuid) prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io)) class ProvLogFormatter(logging.Formatter): """Enforce ISO8601 with both T and Z.""" def __init__(self): # type: () -> None super(ProvLogFormatter, self).__init__( "[%(asctime)sZ] %(message)s") def formatTime(self, record, datefmt=None): # type: (logging.LogRecord, Optional[str]) -> str record_time = time.gmtime(record.created) formatted_time = time.strftime("%Y-%m-%dT%H:%M:%S", record_time) with_msecs = "%s,%03d" % (formatted_time, record.msecs) return with_msecs prov_log_handler.setFormatter(ProvLogFormatter()) _logger.addHandler(prov_log_handler) _logger.debug(u"[provenance] Logging to %s", log_file_io) if argsl is not None: # Log cwltool command line options to provenance file _logger.info("[cwltool] %s %s", sys.argv[0], u" ".join(argsl)) _logger.debug(u"[cwltool] Arguments: %s", args) return None def setup_loadingContext(loadingContext, # type: Optional[LoadingContext] runtimeContext, # type: RuntimeContext args # type: argparse.Namespace ): # type: (...) -> LoadingContext if loadingContext is None: loadingContext = LoadingContext(vars(args)) else: loadingContext = loadingContext.copy() loadingContext.loader = default_loader(loadingContext.fetcher_constructor, enable_dev=args.enable_dev) loadingContext.research_obj = runtimeContext.research_obj loadingContext.disable_js_validation = \ args.disable_js_validation or (not args.do_validate) loadingContext.construct_tool_object = getdefault( loadingContext.construct_tool_object, workflow.default_make_tool) loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver) if loadingContext.do_update is None: loadingContext.do_update = not (args.pack or args.print_subgraph) return loadingContext def make_template(tool # type: Process ): # type: (...) -> None def my_represent_none(self, data): # pylint: disable=unused-argument # type: (Any, Any) -> Any """Force clean representation of 'null'.""" return self.represent_scalar(u'tag:yaml.org,2002:null', u'null') yaml.RoundTripRepresenter.add_representer(type(None), my_represent_none) yaml.round_trip_dump( generate_input_template(tool), sys.stdout, default_flow_style=False, indent=4, block_seq_indent=2) def choose_target(args, # type: argparse.Namespace tool, # type: Process loadingContext # type: LoadingContext ): # type: (...) -> Optional[Process] if loadingContext.loader is None: raise Exception("loadingContext.loader cannot be None") if isinstance(tool, Workflow): url = urllib.parse.urlparse(tool.tool["id"]) if url.fragment: extracted = get_subgraph([tool.tool["id"] + "/" + r for r in args.target], tool) else: extracted = get_subgraph([loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r) for r in args.target], tool) else: _logger.error("Can only use --target on Workflows") return None if isinstance(loadingContext.loader.idx, CommentedMap): loadingContext.loader.idx[extracted["id"]] = extracted tool = make_tool(extracted["id"], loadingContext) else: raise Exception("Missing loadingContext.loader.idx!") return tool def check_working_directories(runtimeContext # type: RuntimeContext ): # type: (...) -> Optional[int] for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"): if getattr(runtimeContext, dirprefix) and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX: sl = "/" if getattr(runtimeContext, dirprefix).endswith("/") or dirprefix == "cachedir" \ else "" setattr(runtimeContext, dirprefix, os.path.abspath(getattr(runtimeContext, dirprefix)) + sl) if not os.path.exists(os.path.dirname(getattr(runtimeContext, dirprefix))): try: os.makedirs(os.path.dirname(getattr(runtimeContext, dirprefix))) except Exception as e: _logger.error("Failed to create directory: %s", Text(e)) return 1 return None def main(argsl=None, # type: Optional[List[str]] args=None, # type: Optional[argparse.Namespace] job_order_object=None, # type: Optional[MutableMapping[Text, Any]] stdin=sys.stdin, # type: IO[Any] stdout=None, # type: Optional[Union[TextIO, StreamWriter]] stderr=sys.stderr, # type: IO[Any] versionfunc=versionstring, # type: Callable[[], Text] logger_handler=None, # type: Optional[logging.Handler] custom_schema_callback=None, # type: Optional[Callable[[], None]] executor=None, # type: Optional[JobExecutor] loadingContext=None, # type: Optional[LoadingContext] runtimeContext=None, # type: Optional[RuntimeContext] input_required=True # type: bool ): # type: (...) -> int if not stdout: # force UTF-8 even if the console is configured differently if (hasattr(sys.stdout, "encoding") and sys.stdout.encoding != 'UTF-8'): # type: ignore if PY3 and hasattr(sys.stdout, "detach"): stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') else: stdout = getwriter('utf-8')(sys.stdout) # type: ignore else: stdout = cast(TextIO, sys.stdout) # type: ignore _logger.removeHandler(defaultStreamHandler) stderr_handler = logger_handler if stderr_handler is not None: _logger.addHandler(stderr_handler) else: coloredlogs.install(logger=_logger, stream=stderr) stderr_handler = _logger.handlers[-1] workflowobj = None prov_log_handler = None # type: Optional[logging.StreamHandler] try: if args is None: if argsl is None: argsl = sys.argv[1:] addl = [] # type: List[str] if "CWLTOOL_OPTIONS" in os.environ: addl = os.environ["CWLTOOL_OPTIONS"].split(" ") args = arg_parser().parse_args(addl+argsl) if args.record_container_id: if not args.cidfile_dir: args.cidfile_dir = os.getcwd() del args.record_container_id if runtimeContext is None: runtimeContext = RuntimeContext(vars(args)) else: runtimeContext = runtimeContext.copy() # If on Windows platform, a default Docker Container is used if not # explicitely provided by user if onWindows() and not runtimeContext.default_container: # This docker image is a minimal alpine image with bash installed # (size 6 mb). source: https://github.com/frol/docker-alpine-bash runtimeContext.default_container = windows_default_container_id # If caller parsed its own arguments, it may not include every # cwltool option, so fill in defaults to avoid crashing when # dereferencing them in args. for key, val in iteritems(get_default_args()): if not hasattr(args, key): setattr(args, key, val) configure_logging(args, stderr_handler, runtimeContext) if args.version: print(versionfunc()) return 0 _logger.info(versionfunc()) if args.print_supported_versions: print("\n".join(supported_cwl_versions(args.enable_dev))) return 0 if not args.workflow: if os.path.isfile("CWLFile"): setattr(args, "workflow", "CWLFile") else: _logger.error("CWL document required, no input file was provided") arg_parser().print_help() return 1 if args.relax_path_checks: command_line_tool.ACCEPTLIST_RE = command_line_tool.ACCEPTLIST_EN_RELAXED_RE if args.ga4gh_tool_registries: ga4gh_tool_registries[:] = args.ga4gh_tool_registries if not args.enable_ga4gh_tool_registry: del ga4gh_tool_registries[:] setup_schema(args, custom_schema_callback) if args.provenance: if argsl is None: raise Exception("argsl cannot be None") if setup_provenance(args, argsl, runtimeContext) is not None: return 1 loadingContext = setup_loadingContext(loadingContext, runtimeContext, args) uri, tool_file_uri = resolve_tool_uri( args.workflow, resolver=loadingContext.resolver, fetcher_constructor=loadingContext.fetcher_constructor) try_again_msg = "" if args.debug else ", try again with --debug for more information" try: job_order_object, input_basedir, jobloader = load_job_order( args, stdin, loadingContext.fetcher_constructor, loadingContext.overrides_list, tool_file_uri) if args.overrides: loadingContext.overrides_list.extend(load_overrides( file_uri(os.path.abspath(args.overrides)), tool_file_uri)) loadingContext, workflowobj, uri = fetch_document( uri, loadingContext) if args.print_deps and loadingContext.loader: printdeps(workflowobj, loadingContext.loader, stdout, args.relative_deps, uri) return 0 loadingContext, uri \ = resolve_and_validate_document(loadingContext, workflowobj, uri, preprocess_only=(args.print_pre or args.pack), skip_schemas=args.skip_schemas) if loadingContext.loader is None: raise Exception("Impossible code path.") processobj, metadata = loadingContext.loader.resolve_ref(uri) processobj = cast(CommentedMap, processobj) if args.pack: stdout.write(print_pack(loadingContext.loader, processobj, uri, metadata)) return 0 if args.provenance and runtimeContext.research_obj: # Can't really be combined with args.pack at same time runtimeContext.research_obj.packed_workflow( print_pack(loadingContext.loader, processobj, uri, metadata)) if args.print_pre: stdout.write(json_dumps(processobj, indent=4, sort_keys=True, separators=(',', ': '))) return 0 tool = make_tool(uri, loadingContext) if args.make_template: make_template(tool) return 0 if args.validate: print("{} is valid CWL.".format(args.workflow)) return 0 if args.print_rdf: stdout.write(printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer)) return 0 if args.print_dot: printdot(tool, loadingContext.loader.ctx, stdout) return 0 if args.print_targets: for f in ("outputs", "steps", "inputs"): if tool.tool[f]: _logger.info("%s%s targets:", f[0].upper(), f[1:-1]) stdout.write(" "+"\n ".join([shortname(t["id"]) for t in tool.tool[f]])+"\n") return 0 if args.target: ctool = choose_target(args, tool, loadingContext) if ctool is None: return 1 else: tool = ctool if args.print_subgraph: if "name" in tool.tool: del tool.tool["name"] stdout.write(json_dumps(tool.tool, indent=4, sort_keys=True, separators=(',', ': '))) return 0 except (validate.ValidationException) as exc: _logger.error(u"Tool definition failed validation:\n%s", Text(exc), exc_info=args.debug) return 1 except (RuntimeError, WorkflowException) as exc: _logger.error(u"Tool definition failed initialization:\n%s", Text(exc), exc_info=args.debug) return 1 except Exception as exc: _logger.error( u"I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s", try_again_msg, Text(exc) if not args.debug else "", exc_info=args.debug) return 1 if isinstance(tool, int): return tool # If on MacOS platform, TMPDIR must be set to be under one of the # shared volumes in Docker for Mac # More info: https://dockstore.org/docs/faq if sys.platform == "darwin": default_mac_path = "/private/tmp/docker_tmp" if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX: runtimeContext.tmp_outdir_prefix = default_mac_path if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX: runtimeContext.tmpdir_prefix = default_mac_path if check_working_directories(runtimeContext) is not None: return 1 if args.cachedir: if args.move_outputs == "move": runtimeContext.move_outputs = "copy" runtimeContext.tmp_outdir_prefix = args.cachedir runtimeContext.secret_store = getdefault(runtimeContext.secret_store, SecretStore()) runtimeContext.make_fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess) if not executor: if args.parallel: temp_executor = MultithreadedJobExecutor() runtimeContext.select_resources = temp_executor.select_resources real_executor = temp_executor # type: JobExecutor else: real_executor = SingleJobExecutor() else: real_executor = executor try: runtimeContext.basedir = input_basedir if isinstance(tool, ProcessGenerator): tfjob_order = {} # type: MutableMapping[Text, Any] if loadingContext.jobdefaults: tfjob_order.update(loadingContext.jobdefaults) if job_order_object: tfjob_order.update(job_order_object) tfout, tfstatus = real_executor(tool.embedded_tool, tfjob_order, runtimeContext) if tfstatus != "success": raise WorkflowException("ProcessGenerator failed to generate workflow") tool, job_order_object = tool.result(tfjob_order, tfout, runtimeContext) if not job_order_object: job_order_object = None try: initialized_job_order_object = init_job_order( job_order_object, args, tool, jobloader, stdout, print_input_deps=args.print_input_deps, relative_deps=args.relative_deps, make_fs_access=runtimeContext.make_fs_access, input_basedir=input_basedir, secret_store=runtimeContext.secret_store, input_required=input_required) except SystemExit as err: return err.code del args.workflow del args.job_order conf_file = getattr(args, "beta_dependency_resolvers_configuration", None) # Text use_conda_dependencies = getattr(args, "beta_conda_dependencies", None) # Text if conf_file or use_conda_dependencies: runtimeContext.job_script_provider = DependenciesConfiguration(args) else: runtimeContext.find_default_container = functools.partial( find_default_container, default_container=runtimeContext.default_container, use_biocontainers=args.beta_use_biocontainers) (out, status) = real_executor( tool, initialized_job_order_object, runtimeContext, logger=_logger) if out is not None: if runtimeContext.research_obj is not None: runtimeContext.research_obj.create_job( out, None, True) def remove_at_id(doc): # type: (MutableMapping[Text, Any]) -> None for key in list(doc.keys()): if key == '@id': del doc[key] else: value = doc[key] if isinstance(value, MutableMapping): remove_at_id(value) elif isinstance(value, MutableSequence): for entry in value: if isinstance(entry, MutableMapping): remove_at_id(entry) remove_at_id(out) visit_class(out, ("File",), functools.partial( add_sizes, runtimeContext.make_fs_access(''))) def loc_to_path(obj): # type: (Dict[Text, Any]) -> None for field in ("path", "nameext", "nameroot", "dirname"): if field in obj: del obj[field] if obj["location"].startswith("file://"): obj["path"] = uri_file_path(obj["location"]) visit_class(out, ("File", "Directory"), loc_to_path) # Unsetting the Generation from final output object visit_class(out, ("File", ), MutationManager().unset_generation) if isinstance(out, string_types): stdout.write(out) else: stdout.write(json_dumps(out, indent=4, ensure_ascii=False)) stdout.write("\n") if hasattr(stdout, "flush"): stdout.flush() if status != "success": _logger.warning(u"Final process status is %s", status) return 1 _logger.info(u"Final process status is %s", status) return 0 except (validate.ValidationException) as exc: _logger.error(u"Input object failed validation:\n%s", Text(exc), exc_info=args.debug) return 1 except UnsupportedRequirement as exc: _logger.error( u"Workflow or tool uses unsupported feature:\n%s", Text(exc), exc_info=args.debug) return 33 except WorkflowException as exc: _logger.error( u"Workflow error%s:\n%s", try_again_msg, strip_dup_lineno(Text(exc)), exc_info=args.debug) return 1 except Exception as exc: # pylint: disable=broad-except _logger.error( u"Unhandled error%s:\n %s", try_again_msg, Text(exc), exc_info=args.debug) return 1 finally: if args and runtimeContext and runtimeContext.research_obj \ and workflowobj and loadingContext: research_obj = runtimeContext.research_obj if loadingContext.loader is not None: research_obj.generate_snapshot(prov_deps( workflowobj, loadingContext.loader, uri)) else: _logger.warning("Unable to generate provenance snapshot " " due to missing loadingContext.loader.") if prov_log_handler is not None: # Stop logging so we won't half-log adding ourself to RO _logger.debug(u"[provenance] Closing provenance log file %s", prov_log_handler) _logger.removeHandler(prov_log_handler) # Ensure last log lines are written out prov_log_handler.flush() # Underlying WritableBagFile will add the tagfile to the manifest prov_log_handler.stream.close() prov_log_handler.close() research_obj.close(args.provenance) _logger.removeHandler(stderr_handler) _logger.addHandler(defaultStreamHandler) def find_default_container(builder, # type: HasReqsHints default_container=None, # type: Optional[Text] use_biocontainers=None, # type: Optional[bool] ): # type: (...) -> Optional[Text] """Find a container.""" if not default_container and use_biocontainers: default_container = get_container_from_software_requirements( use_biocontainers, builder) return default_container def run(*args, **kwargs): # type: (*Any, **Any) -> None """Run cwltool.""" signal.signal(signal.SIGTERM, _signal_handler) try: sys.exit(main(*args, **kwargs)) finally: _terminate_processes() if __name__ == "__main__": run(sys.argv[1:])