comparison env/lib/python3.7/site-packages/cwltool/main.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 #!/usr/bin/env python
2 """Entry point for cwltool."""
3 from __future__ import absolute_import, print_function
4
5 import argparse
6 import copy
7 import functools
8 import io
9 import logging
10 import os
11 import signal
12 import sys
13 import time
14 from codecs import StreamWriter, getwriter # pylint: disable=unused-import
15 from six.moves import urllib
16 from typing import (IO, Any, Callable, Dict, Iterable, List, Mapping,
17 MutableMapping, MutableSequence, Optional, TextIO, Tuple,
18 Union, cast)
19
20 import pkg_resources # part of setuptools
21 from ruamel import yaml
22 from ruamel.yaml.comments import CommentedMap, CommentedSeq
23 from schema_salad import validate
24 from schema_salad.ref_resolver import Fetcher, Loader, file_uri, uri_file_path
25 from schema_salad.sourceline import strip_dup_lineno, cmap
26 from six import string_types, iteritems, PY3
27 from typing_extensions import Text
28 # move to a regular typing import when Python 3.3-3.6 is no longer supported
29
30 if PY3:
31 from collections.abc import Iterable, Sequence, MutableSequence
32 else: # Needed for Py3.8
33 from collections import Iterable, Sequence, MutableSequence
34
35 from . import command_line_tool, workflow
36 from .argparser import arg_parser, generate_parser, get_default_args
37 from .builder import HasReqsHints # pylint: disable=unused-import
38 from .context import LoadingContext, RuntimeContext, getdefault
39 from .cwlrdf import printdot, printrdf
40 from .errors import UnsupportedRequirement, WorkflowException
41 from .executors import MultithreadedJobExecutor, SingleJobExecutor, JobExecutor
42 from .load_tool import (FetcherConstructorType, # pylint: disable=unused-import
43 fetch_document, jobloaderctx, load_overrides,
44 make_tool, resolve_overrides, resolve_tool_uri,
45 resolve_and_validate_document, default_loader)
46 from .loghandler import _logger, defaultStreamHandler
47 from .mutation import MutationManager
48 from .pack import pack
49 from .pathmapper import adjustDirObjs, normalizeFilesDirs, trim_listing
50 from .process import (Process, add_sizes, # pylint: disable=unused-import
51 scandeps, shortname, use_custom_schema,
52 use_standard_schema, CWL_IANA)
53 from .workflow import Workflow
54 from .procgenerator import ProcessGenerator
55 from .provenance import ResearchObject
56 from .resolver import ga4gh_tool_registries, tool_resolver
57 from .secrets import SecretStore
58 from .software_requirements import (DependenciesConfiguration,
59 get_container_from_software_requirements)
60 from .stdfsaccess import StdFsAccess
61 from .update import ALLUPDATES, UPDATES
62 from .utils import (DEFAULT_TMP_PREFIX, json_dumps, onWindows,
63 processes_to_kill, versionstring, visit_class,
64 windows_default_container_id)
65 from .subgraph import get_subgraph
66
67 import coloredlogs
68
69 def _terminate_processes():
70 # type: () -> None
71 """Kill all spawned processes.
72
73 Processes to be killed must be appended to `utils.processes_to_kill`
74 as they are spawned.
75
76 An important caveat: since there's no supported way to kill another
77 thread in Python, this function cannot stop other threads from
78 continuing to execute while it kills the processes that they've
79 spawned. This may occasionally lead to unexpected behaviour.
80 """
81 # It's possible that another thread will spawn a new task while
82 # we're executing, so it's not safe to use a for loop here.
83 while processes_to_kill:
84 processes_to_kill.popleft().kill()
85
86
87 def _signal_handler(signum, _):
88 # type: (int, Any) -> None
89 """Kill all spawned processes and exit.
90
91 Note that it's possible for another thread to spawn a process after
92 all processes have been killed, but before Python exits.
93
94 Refer to the docstring for _terminate_processes() for other caveats.
95 """
96 _terminate_processes()
97 sys.exit(signum)
98
99
100 def generate_example_input(inptype, # type: Any
101 default # type: Optional[Any]
102 ): # type: (...) -> Tuple[Any, Text]
103 """Convert a single input schema into an example."""
104 example = None
105 comment = u""
106 defaults = {u'null': 'null',
107 u'Any': 'null',
108 u'boolean': False,
109 u'int': 0,
110 u'long': 0,
111 u'float': 0.1,
112 u'double': 0.1,
113 u'string': 'a_string',
114 u'File': yaml.comments.CommentedMap([
115 ('class', 'File'), ('path', 'a/file/path')]),
116 u'Directory': yaml.comments.CommentedMap([
117 ('class', 'Directory'), ('path', 'a/directory/path')])
118 } # type: Dict[Text, Any]
119 if isinstance(inptype, MutableSequence):
120 optional = False
121 if 'null' in inptype:
122 inptype.remove('null')
123 optional = True
124 if len(inptype) == 1:
125 example, comment = generate_example_input(inptype[0], default)
126 if optional:
127 if comment:
128 comment = u"{} (optional)".format(comment)
129 else:
130 comment = u"optional"
131 else:
132 example = yaml.comments.CommentedSeq()
133 for index, entry in enumerate(inptype):
134 value, e_comment = generate_example_input(entry, default)
135 example.append(value)
136 example.yaml_add_eol_comment(e_comment, index)
137 if optional:
138 comment = u"optional"
139 elif isinstance(inptype, Mapping) and 'type' in inptype:
140 if inptype['type'] == 'array':
141 if len(inptype['items']) == 1 and 'type' in inptype['items'][0] \
142 and inptype['items'][0]['type'] == 'enum':
143 # array of just an enum then list all the options
144 example = inptype['items'][0]['symbols']
145 if 'name' in inptype['items'][0]:
146 comment = u'array of type "{}".'.format(inptype['items'][0]['name'])
147 else:
148 value, comment = generate_example_input(inptype['items'], None)
149 comment = u"array of " + comment
150 if len(inptype['items']) == 1:
151 example = [value]
152 else:
153 example = value
154 if default is not None:
155 example = default
156 elif inptype['type'] == 'enum':
157 if default is not None:
158 example = default
159 elif 'default' in inptype:
160 example = inptype['default']
161 elif len(inptype['symbols']) == 1:
162 example = inptype['symbols'][0]
163 else:
164 example = '{}_enum_value'.format(inptype.get('name', 'valid'))
165 comment = u'enum; valid values: "{}"'.format(
166 '", "'.join(inptype['symbols']))
167 elif inptype['type'] == 'record':
168 example = yaml.comments.CommentedMap()
169 if 'name' in inptype:
170 comment = u'"{}" record type.'.format(inptype['name'])
171 for field in inptype['fields']:
172 value, f_comment = generate_example_input(field['type'], None)
173 example.insert(0, shortname(field['name']), value, f_comment)
174 elif 'default' in inptype:
175 example = inptype['default']
176 comment = u'default value of type "{}".'.format(inptype['type'])
177 else:
178 example = defaults.get(inptype['type'], Text(inptype))
179 comment = u'type "{}".'.format(inptype['type'])
180 else:
181 if not default:
182 example = defaults.get(Text(inptype), Text(inptype))
183 comment = u'type "{}"'.format(inptype)
184 else:
185 example = default
186 comment = u'default value of type "{}".'.format(inptype)
187 return example, comment
188
189 def realize_input_schema(input_types, # type: MutableSequence[Dict[Text, Any]]
190 schema_defs # type: Dict[Text, Any]
191 ): # type: (...) -> MutableSequence[Dict[Text, Any]]
192 """Replace references to named typed with the actual types."""
193 for index, entry in enumerate(input_types):
194 if isinstance(entry, string_types):
195 if '#' in entry:
196 _, input_type_name = entry.split('#')
197 else:
198 input_type_name = entry
199 if input_type_name in schema_defs:
200 entry = input_types[index] = schema_defs[input_type_name]
201 if isinstance(entry, Mapping):
202 if isinstance(entry['type'], string_types) and '#' in entry['type']:
203 _, input_type_name = entry['type'].split('#')
204 if input_type_name in schema_defs:
205 input_types[index]['type'] = realize_input_schema(
206 schema_defs[input_type_name], schema_defs)
207 if isinstance(entry['type'], MutableSequence):
208 input_types[index]['type'] = realize_input_schema(
209 entry['type'], schema_defs)
210 if isinstance(entry['type'], Mapping):
211 input_types[index]['type'] = realize_input_schema(
212 [input_types[index]['type']], schema_defs)
213 if entry['type'] == 'array':
214 items = entry['items'] if \
215 not isinstance(entry['items'], string_types) else [entry['items']]
216 input_types[index]['items'] = realize_input_schema(items, schema_defs)
217 if entry['type'] == 'record':
218 input_types[index]['fields'] = realize_input_schema(
219 entry['fields'], schema_defs)
220 return input_types
221
222 def generate_input_template(tool):
223 # type: (Process) -> Dict[Text, Any]
224 """Generate an example input object for the given CWL process."""
225 template = yaml.comments.CommentedMap()
226 for inp in realize_input_schema(tool.tool["inputs"], tool.schemaDefs):
227 name = shortname(inp["id"])
228 value, comment = generate_example_input(
229 inp['type'], inp.get('default', None))
230 template.insert(0, name, value, comment)
231 return template
232
233 def load_job_order(args, # type: argparse.Namespace
234 stdin, # type: IO[Any]
235 fetcher_constructor, # type: Optional[Fetcher]
236 overrides_list, # type: List[Dict[Text, Any]]
237 tool_file_uri # type: Text
238 ): # type: (...) -> Tuple[Optional[MutableMapping[Text, Any]], Text, Loader]
239
240 job_order_object = None
241 job_order_file = None
242
243 _jobloaderctx = jobloaderctx.copy()
244 loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore
245
246 if len(args.job_order) == 1 and args.job_order[0][0] != "-":
247 job_order_file = args.job_order[0]
248 elif len(args.job_order) == 1 and args.job_order[0] == "-":
249 job_order_object = yaml.round_trip_load(stdin)
250 job_order_object, _ = loader.resolve_all(job_order_object, file_uri(os.getcwd()) + "/")
251 else:
252 job_order_file = None
253
254 if job_order_object is not None:
255 input_basedir = args.basedir if args.basedir else os.getcwd()
256 elif job_order_file is not None:
257 input_basedir = args.basedir if args.basedir \
258 else os.path.abspath(os.path.dirname(job_order_file))
259 job_order_object, _ = loader.resolve_ref(job_order_file, checklinks=False)
260
261 if job_order_object is not None and "http://commonwl.org/cwltool#overrides" in job_order_object:
262 ov_uri = file_uri(job_order_file or input_basedir)
263 overrides_list.extend(
264 resolve_overrides(job_order_object, ov_uri, tool_file_uri))
265 del job_order_object["http://commonwl.org/cwltool#overrides"]
266
267 if job_order_object is None:
268 input_basedir = args.basedir if args.basedir else os.getcwd()
269
270 if job_order_object is not None and not isinstance(job_order_object, MutableMapping):
271 _logger.error(
272 'CWL input object at %s is not formatted correctly, it should be a '
273 'JSON/YAML dictionay, not %s.\n'
274 'Raw input object:\n%s', job_order_file or "stdin",
275 type(job_order_object), job_order_object)
276 sys.exit(1)
277 return (job_order_object, input_basedir, loader)
278
279 def init_job_order(job_order_object, # type: Optional[MutableMapping[Text, Any]]
280 args, # type: argparse.Namespace
281 process, # type: Process
282 loader, # type: Loader
283 stdout, # type: Union[TextIO, StreamWriter]
284 print_input_deps=False, # type: bool
285 relative_deps=False, # type: bool
286 make_fs_access=StdFsAccess, # type: Callable[[Text], StdFsAccess]
287 input_basedir="", # type: Text
288 secret_store=None, # type: Optional[SecretStore]
289 input_required=True # type: bool
290 ): # type: (...) -> MutableMapping[Text, Any]
291 secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets")
292 if job_order_object is None:
293 namemap = {} # type: Dict[Text, Text]
294 records = [] # type: List[Text]
295 toolparser = generate_parser(
296 argparse.ArgumentParser(prog=args.workflow), process, namemap, records, input_required)
297 if args.tool_help:
298 toolparser.print_help()
299 exit(0)
300 cmd_line = vars(toolparser.parse_args(args.job_order))
301 for record_name in records:
302 record = {}
303 record_items = {
304 k: v for k, v in iteritems(cmd_line)
305 if k.startswith(record_name)}
306 for key, value in iteritems(record_items):
307 record[key[len(record_name) + 1:]] = value
308 del cmd_line[key]
309 cmd_line[str(record_name)] = record
310 if 'job_order' in cmd_line and cmd_line["job_order"]:
311 try:
312 job_order_object = cast(
313 MutableMapping[Text, Any],
314 loader.resolve_ref(cmd_line["job_order"])[0])
315 except Exception as err:
316 _logger.error(Text(err), exc_info=args.debug)
317 exit(1)
318 else:
319 job_order_object = {"id": args.workflow}
320
321 del cmd_line["job_order"]
322
323 job_order_object.update({namemap[k]: v for k, v in cmd_line.items()})
324
325 if secret_store and secrets_req:
326 secret_store.store(
327 [shortname(sc) for sc in secrets_req["secrets"]], job_order_object)
328
329 if _logger.isEnabledFor(logging.DEBUG):
330 _logger.debug(u"Parsed job order from command line: %s",
331 json_dumps(job_order_object, indent=4))
332
333 for inp in process.tool["inputs"]:
334 if "default" in inp and (
335 not job_order_object or shortname(inp["id"]) not in job_order_object):
336 if not job_order_object:
337 job_order_object = {}
338 job_order_object[shortname(inp["id"])] = inp["default"]
339
340 if job_order_object is None:
341 if process.tool["inputs"]:
342 if toolparser is not None:
343 print(u"\nOptions for {} ".format(args.workflow))
344 toolparser.print_help()
345 _logger.error("")
346 _logger.error("Input object required, use --help for details")
347 exit(1)
348 else:
349 job_order_object = {}
350
351 if print_input_deps:
352 basedir = None # type: Optional[Text]
353 uri = job_order_object["id"]
354 if uri == args.workflow:
355 basedir = os.path.dirname(uri)
356 uri = ""
357 printdeps(job_order_object, loader, stdout, relative_deps, uri,
358 basedir=basedir, nestdirs=False)
359 exit(0)
360
361 def path_to_loc(p): # type: (Dict[Text, Any]) -> None
362 if "location" not in p and "path" in p:
363 p["location"] = p["path"]
364 del p["path"]
365
366 ns = {} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]]
367 ns.update(job_order_object.get("$namespaces", {}))
368 ns.update(process.metadata.get("$namespaces", {}))
369 ld = Loader(ns)
370
371 def expand_formats(p): # type: (Dict[Text, Any]) -> None
372 if "format" in p:
373 p["format"] = ld.expand_url(p["format"], "")
374
375 visit_class(job_order_object, ("File", "Directory"), path_to_loc)
376 visit_class(job_order_object, ("File",), functools.partial(add_sizes, make_fs_access(input_basedir)))
377 visit_class(job_order_object, ("File",), expand_formats)
378 adjustDirObjs(job_order_object, trim_listing)
379 normalizeFilesDirs(job_order_object)
380
381 if secret_store and secrets_req:
382 secret_store.store(
383 [shortname(sc) for sc in secrets_req["secrets"]], job_order_object)
384
385 if "cwl:tool" in job_order_object:
386 del job_order_object["cwl:tool"]
387 if "id" in job_order_object:
388 del job_order_object["id"]
389 return job_order_object
390
391
392 def make_relative(base, obj): # type: (Text, Dict[Text, Any]) -> None
393 """Relativize the location URI of a File or Directory object."""
394 uri = obj.get("location", obj.get("path"))
395 if ":" in uri.split("/")[0] and not uri.startswith("file://"):
396 pass
397 else:
398 if uri.startswith("file://"):
399 uri = uri_file_path(uri)
400 obj["location"] = os.path.relpath(uri, base)
401
402 def printdeps(obj, # type: Mapping[Text, Any]
403 document_loader, # type: Loader
404 stdout, # type: Union[TextIO, StreamWriter]
405 relative_deps, # type: bool
406 uri, # type: Text
407 basedir=None, # type: Optional[Text]
408 nestdirs=True # type: bool
409 ): # type: (...) -> None
410 """Print a JSON representation of the dependencies of the CWL document."""
411 deps = find_deps(obj, document_loader, uri, basedir=basedir,
412 nestdirs=nestdirs)
413 if relative_deps == "primary":
414 base = basedir if basedir else os.path.dirname(uri_file_path(str(uri)))
415 elif relative_deps == "cwd":
416 base = os.getcwd()
417 visit_class(deps, ("File", "Directory"), functools.partial(
418 make_relative, base))
419 stdout.write(json_dumps(deps, indent=4))
420
421 def prov_deps(obj, # type: Mapping[Text, Any]
422 document_loader, # type: Loader
423 uri, # type: Text
424 basedir=None # type: Optional[Text]
425 ): # type: (...) -> MutableMapping[Text, Any]
426 deps = find_deps(obj, document_loader, uri, basedir=basedir)
427
428 def remove_non_cwl(deps): # type: (MutableMapping[Text, Any]) -> None
429 if 'secondaryFiles' in deps:
430 sec_files = deps['secondaryFiles']
431 for index, entry in enumerate(sec_files):
432 if not ('format' in entry and entry['format'] == CWL_IANA):
433 del sec_files[index]
434 else:
435 remove_non_cwl(entry)
436
437 remove_non_cwl(deps)
438 return deps
439
440
441 def find_deps(obj, # type: Mapping[Text, Any]
442 document_loader, # type: Loader
443 uri, # type: Text
444 basedir=None, # type: Optional[Text]
445 nestdirs=True # type: bool
446 ): # type: (...) -> Dict[Text, Any]
447 """Find the dependencies of the CWL document."""
448 deps = {"class": "File", "location": uri, "format": CWL_IANA} # type: Dict[Text, Any]
449
450 def loadref(base, uri): # type: (Text, Text) -> Any
451 return document_loader.fetch(document_loader.fetcher.urljoin(base, uri))
452
453 sfs = scandeps(
454 basedir if basedir else uri, obj, {"$import", "run"},
455 {"$include", "$schemas", "location"}, loadref, nestdirs=nestdirs)
456 if sfs is not None:
457 deps["secondaryFiles"] = sfs
458
459 return deps
460
461 def print_pack(document_loader, # type: Loader
462 processobj, # type: CommentedMap
463 uri, # type: Text
464 metadata # type: Dict[Text, Any]
465 ): # type: (...) -> Text
466 """Return a CWL serialization of the CWL document in JSON."""
467 packed = pack(document_loader, processobj, uri, metadata)
468 if len(packed["$graph"]) > 1:
469 return json_dumps(packed, indent=4)
470 return json_dumps(packed["$graph"][0], indent=4)
471
472
473 def supported_cwl_versions(enable_dev): # type: (bool) -> List[Text]
474 # ALLUPDATES and UPDATES are dicts
475 if enable_dev:
476 versions = list(ALLUPDATES)
477 else:
478 versions = list(UPDATES)
479 versions.sort()
480 return versions
481
482 def configure_logging(args, # type: argparse.Namespace
483 stderr_handler, # type: logging.Handler
484 runtimeContext # type: RuntimeContext
485 ): # type: (...) -> None
486 # Configure logging
487 rdflib_logger = logging.getLogger("rdflib.term")
488 rdflib_logger.addHandler(stderr_handler)
489 rdflib_logger.setLevel(logging.ERROR)
490 if args.quiet:
491 # Silence STDERR, not an eventual provenance log file
492 stderr_handler.setLevel(logging.WARN)
493 if runtimeContext.debug:
494 # Increase to debug for both stderr and provenance log file
495 _logger.setLevel(logging.DEBUG)
496 stderr_handler.setLevel(logging.DEBUG)
497 rdflib_logger.setLevel(logging.DEBUG)
498 fmtclass = coloredlogs.ColoredFormatter if args.enable_color else logging.Formatter
499 formatter = fmtclass("%(levelname)s %(message)s")
500 if args.timestamps:
501 formatter = fmtclass(
502 "[%(asctime)s] %(levelname)s %(message)s",
503 "%Y-%m-%d %H:%M:%S")
504 stderr_handler.setFormatter(formatter)
505
506 def setup_schema(args, # type: argparse.Namespace
507 custom_schema_callback # type: Optional[Callable[[], None]]
508 ): # type: (...) -> None
509 if custom_schema_callback is not None:
510 custom_schema_callback()
511 elif args.enable_ext:
512 res = pkg_resources.resource_stream(__name__, 'extensions.yml')
513 use_custom_schema("v1.0", "http://commonwl.org/cwltool", res.read())
514 res.close()
515 else:
516 use_standard_schema("v1.0")
517
518 def setup_provenance(args, # type: argparse.Namespace
519 argsl, # type: List[str]
520 runtimeContext # type: RuntimeContext
521 ): # type: (...) -> Optional[int]
522 if not args.compute_checksum:
523 _logger.error("--provenance incompatible with --no-compute-checksum")
524 return 1
525 ro = ResearchObject(
526 getdefault(runtimeContext.make_fs_access, StdFsAccess),
527 temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid,
528 full_name=args.cwl_full_name)
529 runtimeContext.research_obj = ro
530 log_file_io = ro.open_log_file_for_activity(ro.engine_uuid)
531 prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io))
532
533 class ProvLogFormatter(logging.Formatter):
534 """Enforce ISO8601 with both T and Z."""
535
536 def __init__(self): # type: () -> None
537 super(ProvLogFormatter, self).__init__(
538 "[%(asctime)sZ] %(message)s")
539
540 def formatTime(self, record, datefmt=None):
541 # type: (logging.LogRecord, Optional[str]) -> str
542 record_time = time.gmtime(record.created)
543 formatted_time = time.strftime("%Y-%m-%dT%H:%M:%S", record_time)
544 with_msecs = "%s,%03d" % (formatted_time, record.msecs)
545 return with_msecs
546 prov_log_handler.setFormatter(ProvLogFormatter())
547 _logger.addHandler(prov_log_handler)
548 _logger.debug(u"[provenance] Logging to %s", log_file_io)
549 if argsl is not None:
550 # Log cwltool command line options to provenance file
551 _logger.info("[cwltool] %s %s", sys.argv[0], u" ".join(argsl))
552 _logger.debug(u"[cwltool] Arguments: %s", args)
553 return None
554
555 def setup_loadingContext(loadingContext, # type: Optional[LoadingContext]
556 runtimeContext, # type: RuntimeContext
557 args # type: argparse.Namespace
558 ): # type: (...) -> LoadingContext
559 if loadingContext is None:
560 loadingContext = LoadingContext(vars(args))
561 else:
562 loadingContext = loadingContext.copy()
563 loadingContext.loader = default_loader(loadingContext.fetcher_constructor,
564 enable_dev=args.enable_dev)
565 loadingContext.research_obj = runtimeContext.research_obj
566 loadingContext.disable_js_validation = \
567 args.disable_js_validation or (not args.do_validate)
568 loadingContext.construct_tool_object = getdefault(
569 loadingContext.construct_tool_object, workflow.default_make_tool)
570 loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver)
571 if loadingContext.do_update is None:
572 loadingContext.do_update = not (args.pack or args.print_subgraph)
573
574 return loadingContext
575
576 def make_template(tool # type: Process
577 ): # type: (...) -> None
578 def my_represent_none(self, data): # pylint: disable=unused-argument
579 # type: (Any, Any) -> Any
580 """Force clean representation of 'null'."""
581 return self.represent_scalar(u'tag:yaml.org,2002:null', u'null')
582 yaml.RoundTripRepresenter.add_representer(type(None), my_represent_none)
583 yaml.round_trip_dump(
584 generate_input_template(tool), sys.stdout,
585 default_flow_style=False, indent=4, block_seq_indent=2)
586
587
588 def choose_target(args, # type: argparse.Namespace
589 tool, # type: Process
590 loadingContext # type: LoadingContext
591 ): # type: (...) -> Optional[Process]
592
593 if loadingContext.loader is None:
594 raise Exception("loadingContext.loader cannot be None")
595
596 if isinstance(tool, Workflow):
597 url = urllib.parse.urlparse(tool.tool["id"])
598 if url.fragment:
599 extracted = get_subgraph([tool.tool["id"] + "/" + r for r in args.target], tool)
600 else:
601 extracted = get_subgraph([loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r)
602 for r in args.target],
603 tool)
604 else:
605 _logger.error("Can only use --target on Workflows")
606 return None
607 if isinstance(loadingContext.loader.idx, CommentedMap):
608 loadingContext.loader.idx[extracted["id"]] = extracted
609 tool = make_tool(extracted["id"],
610 loadingContext)
611 else:
612 raise Exception("Missing loadingContext.loader.idx!")
613
614 return tool
615
616 def check_working_directories(runtimeContext # type: RuntimeContext
617 ): # type: (...) -> Optional[int]
618 for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"):
619 if getattr(runtimeContext, dirprefix) and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX:
620 sl = "/" if getattr(runtimeContext, dirprefix).endswith("/") or dirprefix == "cachedir" \
621 else ""
622 setattr(runtimeContext, dirprefix,
623 os.path.abspath(getattr(runtimeContext, dirprefix)) + sl)
624 if not os.path.exists(os.path.dirname(getattr(runtimeContext, dirprefix))):
625 try:
626 os.makedirs(os.path.dirname(getattr(runtimeContext, dirprefix)))
627 except Exception as e:
628 _logger.error("Failed to create directory: %s", Text(e))
629 return 1
630 return None
631
632
633 def main(argsl=None, # type: Optional[List[str]]
634 args=None, # type: Optional[argparse.Namespace]
635 job_order_object=None, # type: Optional[MutableMapping[Text, Any]]
636 stdin=sys.stdin, # type: IO[Any]
637 stdout=None, # type: Optional[Union[TextIO, StreamWriter]]
638 stderr=sys.stderr, # type: IO[Any]
639 versionfunc=versionstring, # type: Callable[[], Text]
640 logger_handler=None, # type: Optional[logging.Handler]
641 custom_schema_callback=None, # type: Optional[Callable[[], None]]
642 executor=None, # type: Optional[JobExecutor]
643 loadingContext=None, # type: Optional[LoadingContext]
644 runtimeContext=None, # type: Optional[RuntimeContext]
645 input_required=True # type: bool
646 ): # type: (...) -> int
647 if not stdout: # force UTF-8 even if the console is configured differently
648 if (hasattr(sys.stdout, "encoding")
649 and sys.stdout.encoding != 'UTF-8'): # type: ignore
650 if PY3 and hasattr(sys.stdout, "detach"):
651 stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
652 else:
653 stdout = getwriter('utf-8')(sys.stdout) # type: ignore
654 else:
655 stdout = cast(TextIO, sys.stdout) # type: ignore
656
657 _logger.removeHandler(defaultStreamHandler)
658 stderr_handler = logger_handler
659 if stderr_handler is not None:
660 _logger.addHandler(stderr_handler)
661 else:
662 coloredlogs.install(logger=_logger, stream=stderr)
663 stderr_handler = _logger.handlers[-1]
664 workflowobj = None
665 prov_log_handler = None # type: Optional[logging.StreamHandler]
666 try:
667 if args is None:
668 if argsl is None:
669 argsl = sys.argv[1:]
670 addl = [] # type: List[str]
671 if "CWLTOOL_OPTIONS" in os.environ:
672 addl = os.environ["CWLTOOL_OPTIONS"].split(" ")
673 args = arg_parser().parse_args(addl+argsl)
674 if args.record_container_id:
675 if not args.cidfile_dir:
676 args.cidfile_dir = os.getcwd()
677 del args.record_container_id
678
679 if runtimeContext is None:
680 runtimeContext = RuntimeContext(vars(args))
681 else:
682 runtimeContext = runtimeContext.copy()
683
684 # If on Windows platform, a default Docker Container is used if not
685 # explicitely provided by user
686 if onWindows() and not runtimeContext.default_container:
687 # This docker image is a minimal alpine image with bash installed
688 # (size 6 mb). source: https://github.com/frol/docker-alpine-bash
689 runtimeContext.default_container = windows_default_container_id
690
691 # If caller parsed its own arguments, it may not include every
692 # cwltool option, so fill in defaults to avoid crashing when
693 # dereferencing them in args.
694 for key, val in iteritems(get_default_args()):
695 if not hasattr(args, key):
696 setattr(args, key, val)
697
698 configure_logging(args, stderr_handler, runtimeContext)
699
700 if args.version:
701 print(versionfunc())
702 return 0
703 _logger.info(versionfunc())
704
705 if args.print_supported_versions:
706 print("\n".join(supported_cwl_versions(args.enable_dev)))
707 return 0
708
709 if not args.workflow:
710 if os.path.isfile("CWLFile"):
711 setattr(args, "workflow", "CWLFile")
712 else:
713 _logger.error("CWL document required, no input file was provided")
714 arg_parser().print_help()
715 return 1
716 if args.relax_path_checks:
717 command_line_tool.ACCEPTLIST_RE = command_line_tool.ACCEPTLIST_EN_RELAXED_RE
718
719 if args.ga4gh_tool_registries:
720 ga4gh_tool_registries[:] = args.ga4gh_tool_registries
721 if not args.enable_ga4gh_tool_registry:
722 del ga4gh_tool_registries[:]
723
724 setup_schema(args, custom_schema_callback)
725
726 if args.provenance:
727 if argsl is None:
728 raise Exception("argsl cannot be None")
729 if setup_provenance(args, argsl, runtimeContext) is not None:
730 return 1
731
732 loadingContext = setup_loadingContext(loadingContext, runtimeContext, args)
733
734 uri, tool_file_uri = resolve_tool_uri(
735 args.workflow, resolver=loadingContext.resolver,
736 fetcher_constructor=loadingContext.fetcher_constructor)
737
738 try_again_msg = "" if args.debug else ", try again with --debug for more information"
739
740 try:
741 job_order_object, input_basedir, jobloader = load_job_order(
742 args, stdin, loadingContext.fetcher_constructor,
743 loadingContext.overrides_list, tool_file_uri)
744
745 if args.overrides:
746 loadingContext.overrides_list.extend(load_overrides(
747 file_uri(os.path.abspath(args.overrides)), tool_file_uri))
748
749 loadingContext, workflowobj, uri = fetch_document(
750 uri, loadingContext)
751
752 if args.print_deps and loadingContext.loader:
753 printdeps(workflowobj, loadingContext.loader, stdout,
754 args.relative_deps, uri)
755 return 0
756
757 loadingContext, uri \
758 = resolve_and_validate_document(loadingContext, workflowobj, uri,
759 preprocess_only=(args.print_pre or args.pack),
760 skip_schemas=args.skip_schemas)
761
762 if loadingContext.loader is None:
763 raise Exception("Impossible code path.")
764 processobj, metadata = loadingContext.loader.resolve_ref(uri)
765 processobj = cast(CommentedMap, processobj)
766 if args.pack:
767 stdout.write(print_pack(loadingContext.loader, processobj, uri, metadata))
768 return 0
769
770 if args.provenance and runtimeContext.research_obj:
771 # Can't really be combined with args.pack at same time
772 runtimeContext.research_obj.packed_workflow(
773 print_pack(loadingContext.loader, processobj, uri, metadata))
774
775 if args.print_pre:
776 stdout.write(json_dumps(processobj, indent=4, sort_keys=True, separators=(',', ': ')))
777 return 0
778
779 tool = make_tool(uri, loadingContext)
780 if args.make_template:
781 make_template(tool)
782 return 0
783
784 if args.validate:
785 print("{} is valid CWL.".format(args.workflow))
786 return 0
787
788 if args.print_rdf:
789 stdout.write(printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer))
790 return 0
791
792 if args.print_dot:
793 printdot(tool, loadingContext.loader.ctx, stdout)
794 return 0
795
796 if args.print_targets:
797 for f in ("outputs", "steps", "inputs"):
798 if tool.tool[f]:
799 _logger.info("%s%s targets:", f[0].upper(), f[1:-1])
800 stdout.write(" "+"\n ".join([shortname(t["id"]) for t in tool.tool[f]])+"\n")
801 return 0
802
803 if args.target:
804 ctool = choose_target(args, tool, loadingContext)
805 if ctool is None:
806 return 1
807 else:
808 tool = ctool
809
810 if args.print_subgraph:
811 if "name" in tool.tool:
812 del tool.tool["name"]
813 stdout.write(json_dumps(tool.tool, indent=4, sort_keys=True, separators=(',', ': ')))
814 return 0
815
816 except (validate.ValidationException) as exc:
817 _logger.error(u"Tool definition failed validation:\n%s", Text(exc),
818 exc_info=args.debug)
819 return 1
820 except (RuntimeError, WorkflowException) as exc:
821 _logger.error(u"Tool definition failed initialization:\n%s", Text(exc),
822 exc_info=args.debug)
823 return 1
824 except Exception as exc:
825 _logger.error(
826 u"I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s",
827 try_again_msg,
828 Text(exc) if not args.debug else "",
829 exc_info=args.debug)
830 return 1
831
832 if isinstance(tool, int):
833 return tool
834
835 # If on MacOS platform, TMPDIR must be set to be under one of the
836 # shared volumes in Docker for Mac
837 # More info: https://dockstore.org/docs/faq
838 if sys.platform == "darwin":
839 default_mac_path = "/private/tmp/docker_tmp"
840 if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX:
841 runtimeContext.tmp_outdir_prefix = default_mac_path
842 if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX:
843 runtimeContext.tmpdir_prefix = default_mac_path
844
845 if check_working_directories(runtimeContext) is not None:
846 return 1
847
848 if args.cachedir:
849 if args.move_outputs == "move":
850 runtimeContext.move_outputs = "copy"
851 runtimeContext.tmp_outdir_prefix = args.cachedir
852
853 runtimeContext.secret_store = getdefault(runtimeContext.secret_store, SecretStore())
854 runtimeContext.make_fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess)
855
856 if not executor:
857 if args.parallel:
858 temp_executor = MultithreadedJobExecutor()
859 runtimeContext.select_resources = temp_executor.select_resources
860 real_executor = temp_executor # type: JobExecutor
861 else:
862 real_executor = SingleJobExecutor()
863 else:
864 real_executor = executor
865
866 try:
867 runtimeContext.basedir = input_basedir
868
869 if isinstance(tool, ProcessGenerator):
870 tfjob_order = {} # type: MutableMapping[Text, Any]
871 if loadingContext.jobdefaults:
872 tfjob_order.update(loadingContext.jobdefaults)
873 if job_order_object:
874 tfjob_order.update(job_order_object)
875 tfout, tfstatus = real_executor(tool.embedded_tool, tfjob_order, runtimeContext)
876 if tfstatus != "success":
877 raise WorkflowException("ProcessGenerator failed to generate workflow")
878 tool, job_order_object = tool.result(tfjob_order, tfout, runtimeContext)
879 if not job_order_object:
880 job_order_object = None
881
882 try:
883 initialized_job_order_object = init_job_order(
884 job_order_object, args, tool, jobloader, stdout,
885 print_input_deps=args.print_input_deps,
886 relative_deps=args.relative_deps,
887 make_fs_access=runtimeContext.make_fs_access,
888 input_basedir=input_basedir,
889 secret_store=runtimeContext.secret_store,
890 input_required=input_required)
891 except SystemExit as err:
892 return err.code
893
894 del args.workflow
895 del args.job_order
896
897 conf_file = getattr(args, "beta_dependency_resolvers_configuration", None) # Text
898 use_conda_dependencies = getattr(args, "beta_conda_dependencies", None) # Text
899
900 if conf_file or use_conda_dependencies:
901 runtimeContext.job_script_provider = DependenciesConfiguration(args)
902 else:
903 runtimeContext.find_default_container = functools.partial(
904 find_default_container,
905 default_container=runtimeContext.default_container,
906 use_biocontainers=args.beta_use_biocontainers)
907
908 (out, status) = real_executor(
909 tool, initialized_job_order_object, runtimeContext,
910 logger=_logger)
911
912 if out is not None:
913 if runtimeContext.research_obj is not None:
914 runtimeContext.research_obj.create_job(
915 out, None, True)
916 def remove_at_id(doc): # type: (MutableMapping[Text, Any]) -> None
917 for key in list(doc.keys()):
918 if key == '@id':
919 del doc[key]
920 else:
921 value = doc[key]
922 if isinstance(value, MutableMapping):
923 remove_at_id(value)
924 elif isinstance(value, MutableSequence):
925 for entry in value:
926 if isinstance(entry, MutableMapping):
927 remove_at_id(entry)
928 remove_at_id(out)
929 visit_class(out, ("File",), functools.partial(
930 add_sizes, runtimeContext.make_fs_access('')))
931
932 def loc_to_path(obj): # type: (Dict[Text, Any]) -> None
933 for field in ("path", "nameext", "nameroot", "dirname"):
934 if field in obj:
935 del obj[field]
936 if obj["location"].startswith("file://"):
937 obj["path"] = uri_file_path(obj["location"])
938
939 visit_class(out, ("File", "Directory"), loc_to_path)
940
941 # Unsetting the Generation from final output object
942 visit_class(out, ("File", ), MutationManager().unset_generation)
943
944 if isinstance(out, string_types):
945 stdout.write(out)
946 else:
947 stdout.write(json_dumps(out, indent=4, ensure_ascii=False))
948 stdout.write("\n")
949 if hasattr(stdout, "flush"):
950 stdout.flush()
951
952 if status != "success":
953 _logger.warning(u"Final process status is %s", status)
954 return 1
955 _logger.info(u"Final process status is %s", status)
956 return 0
957
958 except (validate.ValidationException) as exc:
959 _logger.error(u"Input object failed validation:\n%s", Text(exc),
960 exc_info=args.debug)
961 return 1
962 except UnsupportedRequirement as exc:
963 _logger.error(
964 u"Workflow or tool uses unsupported feature:\n%s", Text(exc),
965 exc_info=args.debug)
966 return 33
967 except WorkflowException as exc:
968 _logger.error(
969 u"Workflow error%s:\n%s", try_again_msg, strip_dup_lineno(Text(exc)),
970 exc_info=args.debug)
971 return 1
972 except Exception as exc: # pylint: disable=broad-except
973 _logger.error(
974 u"Unhandled error%s:\n %s", try_again_msg, Text(exc), exc_info=args.debug)
975 return 1
976
977 finally:
978 if args and runtimeContext and runtimeContext.research_obj \
979 and workflowobj and loadingContext:
980 research_obj = runtimeContext.research_obj
981 if loadingContext.loader is not None:
982 research_obj.generate_snapshot(prov_deps(
983 workflowobj, loadingContext.loader, uri))
984 else:
985 _logger.warning("Unable to generate provenance snapshot "
986 " due to missing loadingContext.loader.")
987 if prov_log_handler is not None:
988 # Stop logging so we won't half-log adding ourself to RO
989 _logger.debug(u"[provenance] Closing provenance log file %s",
990 prov_log_handler)
991 _logger.removeHandler(prov_log_handler)
992 # Ensure last log lines are written out
993 prov_log_handler.flush()
994 # Underlying WritableBagFile will add the tagfile to the manifest
995 prov_log_handler.stream.close()
996 prov_log_handler.close()
997 research_obj.close(args.provenance)
998
999 _logger.removeHandler(stderr_handler)
1000 _logger.addHandler(defaultStreamHandler)
1001
1002
1003 def find_default_container(builder, # type: HasReqsHints
1004 default_container=None, # type: Optional[Text]
1005 use_biocontainers=None, # type: Optional[bool]
1006 ): # type: (...) -> Optional[Text]
1007 """Find a container."""
1008 if not default_container and use_biocontainers:
1009 default_container = get_container_from_software_requirements(
1010 use_biocontainers, builder)
1011 return default_container
1012
1013
1014 def run(*args, **kwargs):
1015 # type: (*Any, **Any) -> None
1016 """Run cwltool."""
1017 signal.signal(signal.SIGTERM, _signal_handler)
1018 try:
1019 sys.exit(main(*args, **kwargs))
1020 finally:
1021 _terminate_processes()
1022
1023
1024 if __name__ == "__main__":
1025 run(sys.argv[1:])