Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/cwltool/main.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 #!/usr/bin/env python | |
2 """Entry point for cwltool.""" | |
3 from __future__ import absolute_import, print_function | |
4 | |
5 import argparse | |
6 import copy | |
7 import functools | |
8 import io | |
9 import logging | |
10 import os | |
11 import signal | |
12 import sys | |
13 import time | |
14 from codecs import StreamWriter, getwriter # pylint: disable=unused-import | |
15 from six.moves import urllib | |
16 from typing import (IO, Any, Callable, Dict, Iterable, List, Mapping, | |
17 MutableMapping, MutableSequence, Optional, TextIO, Tuple, | |
18 Union, cast) | |
19 | |
20 import pkg_resources # part of setuptools | |
21 from ruamel import yaml | |
22 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
23 from schema_salad import validate | |
24 from schema_salad.ref_resolver import Fetcher, Loader, file_uri, uri_file_path | |
25 from schema_salad.sourceline import strip_dup_lineno, cmap | |
26 from six import string_types, iteritems, PY3 | |
27 from typing_extensions import Text | |
28 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
29 | |
30 if PY3: | |
31 from collections.abc import Iterable, Sequence, MutableSequence | |
32 else: # Needed for Py3.8 | |
33 from collections import Iterable, Sequence, MutableSequence | |
34 | |
35 from . import command_line_tool, workflow | |
36 from .argparser import arg_parser, generate_parser, get_default_args | |
37 from .builder import HasReqsHints # pylint: disable=unused-import | |
38 from .context import LoadingContext, RuntimeContext, getdefault | |
39 from .cwlrdf import printdot, printrdf | |
40 from .errors import UnsupportedRequirement, WorkflowException | |
41 from .executors import MultithreadedJobExecutor, SingleJobExecutor, JobExecutor | |
42 from .load_tool import (FetcherConstructorType, # pylint: disable=unused-import | |
43 fetch_document, jobloaderctx, load_overrides, | |
44 make_tool, resolve_overrides, resolve_tool_uri, | |
45 resolve_and_validate_document, default_loader) | |
46 from .loghandler import _logger, defaultStreamHandler | |
47 from .mutation import MutationManager | |
48 from .pack import pack | |
49 from .pathmapper import adjustDirObjs, normalizeFilesDirs, trim_listing | |
50 from .process import (Process, add_sizes, # pylint: disable=unused-import | |
51 scandeps, shortname, use_custom_schema, | |
52 use_standard_schema, CWL_IANA) | |
53 from .workflow import Workflow | |
54 from .procgenerator import ProcessGenerator | |
55 from .provenance import ResearchObject | |
56 from .resolver import ga4gh_tool_registries, tool_resolver | |
57 from .secrets import SecretStore | |
58 from .software_requirements import (DependenciesConfiguration, | |
59 get_container_from_software_requirements) | |
60 from .stdfsaccess import StdFsAccess | |
61 from .update import ALLUPDATES, UPDATES | |
62 from .utils import (DEFAULT_TMP_PREFIX, json_dumps, onWindows, | |
63 processes_to_kill, versionstring, visit_class, | |
64 windows_default_container_id) | |
65 from .subgraph import get_subgraph | |
66 | |
67 import coloredlogs | |
68 | |
69 def _terminate_processes(): | |
70 # type: () -> None | |
71 """Kill all spawned processes. | |
72 | |
73 Processes to be killed must be appended to `utils.processes_to_kill` | |
74 as they are spawned. | |
75 | |
76 An important caveat: since there's no supported way to kill another | |
77 thread in Python, this function cannot stop other threads from | |
78 continuing to execute while it kills the processes that they've | |
79 spawned. This may occasionally lead to unexpected behaviour. | |
80 """ | |
81 # It's possible that another thread will spawn a new task while | |
82 # we're executing, so it's not safe to use a for loop here. | |
83 while processes_to_kill: | |
84 processes_to_kill.popleft().kill() | |
85 | |
86 | |
87 def _signal_handler(signum, _): | |
88 # type: (int, Any) -> None | |
89 """Kill all spawned processes and exit. | |
90 | |
91 Note that it's possible for another thread to spawn a process after | |
92 all processes have been killed, but before Python exits. | |
93 | |
94 Refer to the docstring for _terminate_processes() for other caveats. | |
95 """ | |
96 _terminate_processes() | |
97 sys.exit(signum) | |
98 | |
99 | |
100 def generate_example_input(inptype, # type: Any | |
101 default # type: Optional[Any] | |
102 ): # type: (...) -> Tuple[Any, Text] | |
103 """Convert a single input schema into an example.""" | |
104 example = None | |
105 comment = u"" | |
106 defaults = {u'null': 'null', | |
107 u'Any': 'null', | |
108 u'boolean': False, | |
109 u'int': 0, | |
110 u'long': 0, | |
111 u'float': 0.1, | |
112 u'double': 0.1, | |
113 u'string': 'a_string', | |
114 u'File': yaml.comments.CommentedMap([ | |
115 ('class', 'File'), ('path', 'a/file/path')]), | |
116 u'Directory': yaml.comments.CommentedMap([ | |
117 ('class', 'Directory'), ('path', 'a/directory/path')]) | |
118 } # type: Dict[Text, Any] | |
119 if isinstance(inptype, MutableSequence): | |
120 optional = False | |
121 if 'null' in inptype: | |
122 inptype.remove('null') | |
123 optional = True | |
124 if len(inptype) == 1: | |
125 example, comment = generate_example_input(inptype[0], default) | |
126 if optional: | |
127 if comment: | |
128 comment = u"{} (optional)".format(comment) | |
129 else: | |
130 comment = u"optional" | |
131 else: | |
132 example = yaml.comments.CommentedSeq() | |
133 for index, entry in enumerate(inptype): | |
134 value, e_comment = generate_example_input(entry, default) | |
135 example.append(value) | |
136 example.yaml_add_eol_comment(e_comment, index) | |
137 if optional: | |
138 comment = u"optional" | |
139 elif isinstance(inptype, Mapping) and 'type' in inptype: | |
140 if inptype['type'] == 'array': | |
141 if len(inptype['items']) == 1 and 'type' in inptype['items'][0] \ | |
142 and inptype['items'][0]['type'] == 'enum': | |
143 # array of just an enum then list all the options | |
144 example = inptype['items'][0]['symbols'] | |
145 if 'name' in inptype['items'][0]: | |
146 comment = u'array of type "{}".'.format(inptype['items'][0]['name']) | |
147 else: | |
148 value, comment = generate_example_input(inptype['items'], None) | |
149 comment = u"array of " + comment | |
150 if len(inptype['items']) == 1: | |
151 example = [value] | |
152 else: | |
153 example = value | |
154 if default is not None: | |
155 example = default | |
156 elif inptype['type'] == 'enum': | |
157 if default is not None: | |
158 example = default | |
159 elif 'default' in inptype: | |
160 example = inptype['default'] | |
161 elif len(inptype['symbols']) == 1: | |
162 example = inptype['symbols'][0] | |
163 else: | |
164 example = '{}_enum_value'.format(inptype.get('name', 'valid')) | |
165 comment = u'enum; valid values: "{}"'.format( | |
166 '", "'.join(inptype['symbols'])) | |
167 elif inptype['type'] == 'record': | |
168 example = yaml.comments.CommentedMap() | |
169 if 'name' in inptype: | |
170 comment = u'"{}" record type.'.format(inptype['name']) | |
171 for field in inptype['fields']: | |
172 value, f_comment = generate_example_input(field['type'], None) | |
173 example.insert(0, shortname(field['name']), value, f_comment) | |
174 elif 'default' in inptype: | |
175 example = inptype['default'] | |
176 comment = u'default value of type "{}".'.format(inptype['type']) | |
177 else: | |
178 example = defaults.get(inptype['type'], Text(inptype)) | |
179 comment = u'type "{}".'.format(inptype['type']) | |
180 else: | |
181 if not default: | |
182 example = defaults.get(Text(inptype), Text(inptype)) | |
183 comment = u'type "{}"'.format(inptype) | |
184 else: | |
185 example = default | |
186 comment = u'default value of type "{}".'.format(inptype) | |
187 return example, comment | |
188 | |
189 def realize_input_schema(input_types, # type: MutableSequence[Dict[Text, Any]] | |
190 schema_defs # type: Dict[Text, Any] | |
191 ): # type: (...) -> MutableSequence[Dict[Text, Any]] | |
192 """Replace references to named typed with the actual types.""" | |
193 for index, entry in enumerate(input_types): | |
194 if isinstance(entry, string_types): | |
195 if '#' in entry: | |
196 _, input_type_name = entry.split('#') | |
197 else: | |
198 input_type_name = entry | |
199 if input_type_name in schema_defs: | |
200 entry = input_types[index] = schema_defs[input_type_name] | |
201 if isinstance(entry, Mapping): | |
202 if isinstance(entry['type'], string_types) and '#' in entry['type']: | |
203 _, input_type_name = entry['type'].split('#') | |
204 if input_type_name in schema_defs: | |
205 input_types[index]['type'] = realize_input_schema( | |
206 schema_defs[input_type_name], schema_defs) | |
207 if isinstance(entry['type'], MutableSequence): | |
208 input_types[index]['type'] = realize_input_schema( | |
209 entry['type'], schema_defs) | |
210 if isinstance(entry['type'], Mapping): | |
211 input_types[index]['type'] = realize_input_schema( | |
212 [input_types[index]['type']], schema_defs) | |
213 if entry['type'] == 'array': | |
214 items = entry['items'] if \ | |
215 not isinstance(entry['items'], string_types) else [entry['items']] | |
216 input_types[index]['items'] = realize_input_schema(items, schema_defs) | |
217 if entry['type'] == 'record': | |
218 input_types[index]['fields'] = realize_input_schema( | |
219 entry['fields'], schema_defs) | |
220 return input_types | |
221 | |
222 def generate_input_template(tool): | |
223 # type: (Process) -> Dict[Text, Any] | |
224 """Generate an example input object for the given CWL process.""" | |
225 template = yaml.comments.CommentedMap() | |
226 for inp in realize_input_schema(tool.tool["inputs"], tool.schemaDefs): | |
227 name = shortname(inp["id"]) | |
228 value, comment = generate_example_input( | |
229 inp['type'], inp.get('default', None)) | |
230 template.insert(0, name, value, comment) | |
231 return template | |
232 | |
233 def load_job_order(args, # type: argparse.Namespace | |
234 stdin, # type: IO[Any] | |
235 fetcher_constructor, # type: Optional[Fetcher] | |
236 overrides_list, # type: List[Dict[Text, Any]] | |
237 tool_file_uri # type: Text | |
238 ): # type: (...) -> Tuple[Optional[MutableMapping[Text, Any]], Text, Loader] | |
239 | |
240 job_order_object = None | |
241 job_order_file = None | |
242 | |
243 _jobloaderctx = jobloaderctx.copy() | |
244 loader = Loader(_jobloaderctx, fetcher_constructor=fetcher_constructor) # type: ignore | |
245 | |
246 if len(args.job_order) == 1 and args.job_order[0][0] != "-": | |
247 job_order_file = args.job_order[0] | |
248 elif len(args.job_order) == 1 and args.job_order[0] == "-": | |
249 job_order_object = yaml.round_trip_load(stdin) | |
250 job_order_object, _ = loader.resolve_all(job_order_object, file_uri(os.getcwd()) + "/") | |
251 else: | |
252 job_order_file = None | |
253 | |
254 if job_order_object is not None: | |
255 input_basedir = args.basedir if args.basedir else os.getcwd() | |
256 elif job_order_file is not None: | |
257 input_basedir = args.basedir if args.basedir \ | |
258 else os.path.abspath(os.path.dirname(job_order_file)) | |
259 job_order_object, _ = loader.resolve_ref(job_order_file, checklinks=False) | |
260 | |
261 if job_order_object is not None and "http://commonwl.org/cwltool#overrides" in job_order_object: | |
262 ov_uri = file_uri(job_order_file or input_basedir) | |
263 overrides_list.extend( | |
264 resolve_overrides(job_order_object, ov_uri, tool_file_uri)) | |
265 del job_order_object["http://commonwl.org/cwltool#overrides"] | |
266 | |
267 if job_order_object is None: | |
268 input_basedir = args.basedir if args.basedir else os.getcwd() | |
269 | |
270 if job_order_object is not None and not isinstance(job_order_object, MutableMapping): | |
271 _logger.error( | |
272 'CWL input object at %s is not formatted correctly, it should be a ' | |
273 'JSON/YAML dictionay, not %s.\n' | |
274 'Raw input object:\n%s', job_order_file or "stdin", | |
275 type(job_order_object), job_order_object) | |
276 sys.exit(1) | |
277 return (job_order_object, input_basedir, loader) | |
278 | |
279 def init_job_order(job_order_object, # type: Optional[MutableMapping[Text, Any]] | |
280 args, # type: argparse.Namespace | |
281 process, # type: Process | |
282 loader, # type: Loader | |
283 stdout, # type: Union[TextIO, StreamWriter] | |
284 print_input_deps=False, # type: bool | |
285 relative_deps=False, # type: bool | |
286 make_fs_access=StdFsAccess, # type: Callable[[Text], StdFsAccess] | |
287 input_basedir="", # type: Text | |
288 secret_store=None, # type: Optional[SecretStore] | |
289 input_required=True # type: bool | |
290 ): # type: (...) -> MutableMapping[Text, Any] | |
291 secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets") | |
292 if job_order_object is None: | |
293 namemap = {} # type: Dict[Text, Text] | |
294 records = [] # type: List[Text] | |
295 toolparser = generate_parser( | |
296 argparse.ArgumentParser(prog=args.workflow), process, namemap, records, input_required) | |
297 if args.tool_help: | |
298 toolparser.print_help() | |
299 exit(0) | |
300 cmd_line = vars(toolparser.parse_args(args.job_order)) | |
301 for record_name in records: | |
302 record = {} | |
303 record_items = { | |
304 k: v for k, v in iteritems(cmd_line) | |
305 if k.startswith(record_name)} | |
306 for key, value in iteritems(record_items): | |
307 record[key[len(record_name) + 1:]] = value | |
308 del cmd_line[key] | |
309 cmd_line[str(record_name)] = record | |
310 if 'job_order' in cmd_line and cmd_line["job_order"]: | |
311 try: | |
312 job_order_object = cast( | |
313 MutableMapping[Text, Any], | |
314 loader.resolve_ref(cmd_line["job_order"])[0]) | |
315 except Exception as err: | |
316 _logger.error(Text(err), exc_info=args.debug) | |
317 exit(1) | |
318 else: | |
319 job_order_object = {"id": args.workflow} | |
320 | |
321 del cmd_line["job_order"] | |
322 | |
323 job_order_object.update({namemap[k]: v for k, v in cmd_line.items()}) | |
324 | |
325 if secret_store and secrets_req: | |
326 secret_store.store( | |
327 [shortname(sc) for sc in secrets_req["secrets"]], job_order_object) | |
328 | |
329 if _logger.isEnabledFor(logging.DEBUG): | |
330 _logger.debug(u"Parsed job order from command line: %s", | |
331 json_dumps(job_order_object, indent=4)) | |
332 | |
333 for inp in process.tool["inputs"]: | |
334 if "default" in inp and ( | |
335 not job_order_object or shortname(inp["id"]) not in job_order_object): | |
336 if not job_order_object: | |
337 job_order_object = {} | |
338 job_order_object[shortname(inp["id"])] = inp["default"] | |
339 | |
340 if job_order_object is None: | |
341 if process.tool["inputs"]: | |
342 if toolparser is not None: | |
343 print(u"\nOptions for {} ".format(args.workflow)) | |
344 toolparser.print_help() | |
345 _logger.error("") | |
346 _logger.error("Input object required, use --help for details") | |
347 exit(1) | |
348 else: | |
349 job_order_object = {} | |
350 | |
351 if print_input_deps: | |
352 basedir = None # type: Optional[Text] | |
353 uri = job_order_object["id"] | |
354 if uri == args.workflow: | |
355 basedir = os.path.dirname(uri) | |
356 uri = "" | |
357 printdeps(job_order_object, loader, stdout, relative_deps, uri, | |
358 basedir=basedir, nestdirs=False) | |
359 exit(0) | |
360 | |
361 def path_to_loc(p): # type: (Dict[Text, Any]) -> None | |
362 if "location" not in p and "path" in p: | |
363 p["location"] = p["path"] | |
364 del p["path"] | |
365 | |
366 ns = {} # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]] | |
367 ns.update(job_order_object.get("$namespaces", {})) | |
368 ns.update(process.metadata.get("$namespaces", {})) | |
369 ld = Loader(ns) | |
370 | |
371 def expand_formats(p): # type: (Dict[Text, Any]) -> None | |
372 if "format" in p: | |
373 p["format"] = ld.expand_url(p["format"], "") | |
374 | |
375 visit_class(job_order_object, ("File", "Directory"), path_to_loc) | |
376 visit_class(job_order_object, ("File",), functools.partial(add_sizes, make_fs_access(input_basedir))) | |
377 visit_class(job_order_object, ("File",), expand_formats) | |
378 adjustDirObjs(job_order_object, trim_listing) | |
379 normalizeFilesDirs(job_order_object) | |
380 | |
381 if secret_store and secrets_req: | |
382 secret_store.store( | |
383 [shortname(sc) for sc in secrets_req["secrets"]], job_order_object) | |
384 | |
385 if "cwl:tool" in job_order_object: | |
386 del job_order_object["cwl:tool"] | |
387 if "id" in job_order_object: | |
388 del job_order_object["id"] | |
389 return job_order_object | |
390 | |
391 | |
392 def make_relative(base, obj): # type: (Text, Dict[Text, Any]) -> None | |
393 """Relativize the location URI of a File or Directory object.""" | |
394 uri = obj.get("location", obj.get("path")) | |
395 if ":" in uri.split("/")[0] and not uri.startswith("file://"): | |
396 pass | |
397 else: | |
398 if uri.startswith("file://"): | |
399 uri = uri_file_path(uri) | |
400 obj["location"] = os.path.relpath(uri, base) | |
401 | |
402 def printdeps(obj, # type: Mapping[Text, Any] | |
403 document_loader, # type: Loader | |
404 stdout, # type: Union[TextIO, StreamWriter] | |
405 relative_deps, # type: bool | |
406 uri, # type: Text | |
407 basedir=None, # type: Optional[Text] | |
408 nestdirs=True # type: bool | |
409 ): # type: (...) -> None | |
410 """Print a JSON representation of the dependencies of the CWL document.""" | |
411 deps = find_deps(obj, document_loader, uri, basedir=basedir, | |
412 nestdirs=nestdirs) | |
413 if relative_deps == "primary": | |
414 base = basedir if basedir else os.path.dirname(uri_file_path(str(uri))) | |
415 elif relative_deps == "cwd": | |
416 base = os.getcwd() | |
417 visit_class(deps, ("File", "Directory"), functools.partial( | |
418 make_relative, base)) | |
419 stdout.write(json_dumps(deps, indent=4)) | |
420 | |
421 def prov_deps(obj, # type: Mapping[Text, Any] | |
422 document_loader, # type: Loader | |
423 uri, # type: Text | |
424 basedir=None # type: Optional[Text] | |
425 ): # type: (...) -> MutableMapping[Text, Any] | |
426 deps = find_deps(obj, document_loader, uri, basedir=basedir) | |
427 | |
428 def remove_non_cwl(deps): # type: (MutableMapping[Text, Any]) -> None | |
429 if 'secondaryFiles' in deps: | |
430 sec_files = deps['secondaryFiles'] | |
431 for index, entry in enumerate(sec_files): | |
432 if not ('format' in entry and entry['format'] == CWL_IANA): | |
433 del sec_files[index] | |
434 else: | |
435 remove_non_cwl(entry) | |
436 | |
437 remove_non_cwl(deps) | |
438 return deps | |
439 | |
440 | |
441 def find_deps(obj, # type: Mapping[Text, Any] | |
442 document_loader, # type: Loader | |
443 uri, # type: Text | |
444 basedir=None, # type: Optional[Text] | |
445 nestdirs=True # type: bool | |
446 ): # type: (...) -> Dict[Text, Any] | |
447 """Find the dependencies of the CWL document.""" | |
448 deps = {"class": "File", "location": uri, "format": CWL_IANA} # type: Dict[Text, Any] | |
449 | |
450 def loadref(base, uri): # type: (Text, Text) -> Any | |
451 return document_loader.fetch(document_loader.fetcher.urljoin(base, uri)) | |
452 | |
453 sfs = scandeps( | |
454 basedir if basedir else uri, obj, {"$import", "run"}, | |
455 {"$include", "$schemas", "location"}, loadref, nestdirs=nestdirs) | |
456 if sfs is not None: | |
457 deps["secondaryFiles"] = sfs | |
458 | |
459 return deps | |
460 | |
461 def print_pack(document_loader, # type: Loader | |
462 processobj, # type: CommentedMap | |
463 uri, # type: Text | |
464 metadata # type: Dict[Text, Any] | |
465 ): # type: (...) -> Text | |
466 """Return a CWL serialization of the CWL document in JSON.""" | |
467 packed = pack(document_loader, processobj, uri, metadata) | |
468 if len(packed["$graph"]) > 1: | |
469 return json_dumps(packed, indent=4) | |
470 return json_dumps(packed["$graph"][0], indent=4) | |
471 | |
472 | |
473 def supported_cwl_versions(enable_dev): # type: (bool) -> List[Text] | |
474 # ALLUPDATES and UPDATES are dicts | |
475 if enable_dev: | |
476 versions = list(ALLUPDATES) | |
477 else: | |
478 versions = list(UPDATES) | |
479 versions.sort() | |
480 return versions | |
481 | |
482 def configure_logging(args, # type: argparse.Namespace | |
483 stderr_handler, # type: logging.Handler | |
484 runtimeContext # type: RuntimeContext | |
485 ): # type: (...) -> None | |
486 # Configure logging | |
487 rdflib_logger = logging.getLogger("rdflib.term") | |
488 rdflib_logger.addHandler(stderr_handler) | |
489 rdflib_logger.setLevel(logging.ERROR) | |
490 if args.quiet: | |
491 # Silence STDERR, not an eventual provenance log file | |
492 stderr_handler.setLevel(logging.WARN) | |
493 if runtimeContext.debug: | |
494 # Increase to debug for both stderr and provenance log file | |
495 _logger.setLevel(logging.DEBUG) | |
496 stderr_handler.setLevel(logging.DEBUG) | |
497 rdflib_logger.setLevel(logging.DEBUG) | |
498 fmtclass = coloredlogs.ColoredFormatter if args.enable_color else logging.Formatter | |
499 formatter = fmtclass("%(levelname)s %(message)s") | |
500 if args.timestamps: | |
501 formatter = fmtclass( | |
502 "[%(asctime)s] %(levelname)s %(message)s", | |
503 "%Y-%m-%d %H:%M:%S") | |
504 stderr_handler.setFormatter(formatter) | |
505 | |
506 def setup_schema(args, # type: argparse.Namespace | |
507 custom_schema_callback # type: Optional[Callable[[], None]] | |
508 ): # type: (...) -> None | |
509 if custom_schema_callback is not None: | |
510 custom_schema_callback() | |
511 elif args.enable_ext: | |
512 res = pkg_resources.resource_stream(__name__, 'extensions.yml') | |
513 use_custom_schema("v1.0", "http://commonwl.org/cwltool", res.read()) | |
514 res.close() | |
515 else: | |
516 use_standard_schema("v1.0") | |
517 | |
518 def setup_provenance(args, # type: argparse.Namespace | |
519 argsl, # type: List[str] | |
520 runtimeContext # type: RuntimeContext | |
521 ): # type: (...) -> Optional[int] | |
522 if not args.compute_checksum: | |
523 _logger.error("--provenance incompatible with --no-compute-checksum") | |
524 return 1 | |
525 ro = ResearchObject( | |
526 getdefault(runtimeContext.make_fs_access, StdFsAccess), | |
527 temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid, | |
528 full_name=args.cwl_full_name) | |
529 runtimeContext.research_obj = ro | |
530 log_file_io = ro.open_log_file_for_activity(ro.engine_uuid) | |
531 prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io)) | |
532 | |
533 class ProvLogFormatter(logging.Formatter): | |
534 """Enforce ISO8601 with both T and Z.""" | |
535 | |
536 def __init__(self): # type: () -> None | |
537 super(ProvLogFormatter, self).__init__( | |
538 "[%(asctime)sZ] %(message)s") | |
539 | |
540 def formatTime(self, record, datefmt=None): | |
541 # type: (logging.LogRecord, Optional[str]) -> str | |
542 record_time = time.gmtime(record.created) | |
543 formatted_time = time.strftime("%Y-%m-%dT%H:%M:%S", record_time) | |
544 with_msecs = "%s,%03d" % (formatted_time, record.msecs) | |
545 return with_msecs | |
546 prov_log_handler.setFormatter(ProvLogFormatter()) | |
547 _logger.addHandler(prov_log_handler) | |
548 _logger.debug(u"[provenance] Logging to %s", log_file_io) | |
549 if argsl is not None: | |
550 # Log cwltool command line options to provenance file | |
551 _logger.info("[cwltool] %s %s", sys.argv[0], u" ".join(argsl)) | |
552 _logger.debug(u"[cwltool] Arguments: %s", args) | |
553 return None | |
554 | |
555 def setup_loadingContext(loadingContext, # type: Optional[LoadingContext] | |
556 runtimeContext, # type: RuntimeContext | |
557 args # type: argparse.Namespace | |
558 ): # type: (...) -> LoadingContext | |
559 if loadingContext is None: | |
560 loadingContext = LoadingContext(vars(args)) | |
561 else: | |
562 loadingContext = loadingContext.copy() | |
563 loadingContext.loader = default_loader(loadingContext.fetcher_constructor, | |
564 enable_dev=args.enable_dev) | |
565 loadingContext.research_obj = runtimeContext.research_obj | |
566 loadingContext.disable_js_validation = \ | |
567 args.disable_js_validation or (not args.do_validate) | |
568 loadingContext.construct_tool_object = getdefault( | |
569 loadingContext.construct_tool_object, workflow.default_make_tool) | |
570 loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver) | |
571 if loadingContext.do_update is None: | |
572 loadingContext.do_update = not (args.pack or args.print_subgraph) | |
573 | |
574 return loadingContext | |
575 | |
576 def make_template(tool # type: Process | |
577 ): # type: (...) -> None | |
578 def my_represent_none(self, data): # pylint: disable=unused-argument | |
579 # type: (Any, Any) -> Any | |
580 """Force clean representation of 'null'.""" | |
581 return self.represent_scalar(u'tag:yaml.org,2002:null', u'null') | |
582 yaml.RoundTripRepresenter.add_representer(type(None), my_represent_none) | |
583 yaml.round_trip_dump( | |
584 generate_input_template(tool), sys.stdout, | |
585 default_flow_style=False, indent=4, block_seq_indent=2) | |
586 | |
587 | |
588 def choose_target(args, # type: argparse.Namespace | |
589 tool, # type: Process | |
590 loadingContext # type: LoadingContext | |
591 ): # type: (...) -> Optional[Process] | |
592 | |
593 if loadingContext.loader is None: | |
594 raise Exception("loadingContext.loader cannot be None") | |
595 | |
596 if isinstance(tool, Workflow): | |
597 url = urllib.parse.urlparse(tool.tool["id"]) | |
598 if url.fragment: | |
599 extracted = get_subgraph([tool.tool["id"] + "/" + r for r in args.target], tool) | |
600 else: | |
601 extracted = get_subgraph([loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r) | |
602 for r in args.target], | |
603 tool) | |
604 else: | |
605 _logger.error("Can only use --target on Workflows") | |
606 return None | |
607 if isinstance(loadingContext.loader.idx, CommentedMap): | |
608 loadingContext.loader.idx[extracted["id"]] = extracted | |
609 tool = make_tool(extracted["id"], | |
610 loadingContext) | |
611 else: | |
612 raise Exception("Missing loadingContext.loader.idx!") | |
613 | |
614 return tool | |
615 | |
616 def check_working_directories(runtimeContext # type: RuntimeContext | |
617 ): # type: (...) -> Optional[int] | |
618 for dirprefix in ("tmpdir_prefix", "tmp_outdir_prefix", "cachedir"): | |
619 if getattr(runtimeContext, dirprefix) and getattr(runtimeContext, dirprefix) != DEFAULT_TMP_PREFIX: | |
620 sl = "/" if getattr(runtimeContext, dirprefix).endswith("/") or dirprefix == "cachedir" \ | |
621 else "" | |
622 setattr(runtimeContext, dirprefix, | |
623 os.path.abspath(getattr(runtimeContext, dirprefix)) + sl) | |
624 if not os.path.exists(os.path.dirname(getattr(runtimeContext, dirprefix))): | |
625 try: | |
626 os.makedirs(os.path.dirname(getattr(runtimeContext, dirprefix))) | |
627 except Exception as e: | |
628 _logger.error("Failed to create directory: %s", Text(e)) | |
629 return 1 | |
630 return None | |
631 | |
632 | |
633 def main(argsl=None, # type: Optional[List[str]] | |
634 args=None, # type: Optional[argparse.Namespace] | |
635 job_order_object=None, # type: Optional[MutableMapping[Text, Any]] | |
636 stdin=sys.stdin, # type: IO[Any] | |
637 stdout=None, # type: Optional[Union[TextIO, StreamWriter]] | |
638 stderr=sys.stderr, # type: IO[Any] | |
639 versionfunc=versionstring, # type: Callable[[], Text] | |
640 logger_handler=None, # type: Optional[logging.Handler] | |
641 custom_schema_callback=None, # type: Optional[Callable[[], None]] | |
642 executor=None, # type: Optional[JobExecutor] | |
643 loadingContext=None, # type: Optional[LoadingContext] | |
644 runtimeContext=None, # type: Optional[RuntimeContext] | |
645 input_required=True # type: bool | |
646 ): # type: (...) -> int | |
647 if not stdout: # force UTF-8 even if the console is configured differently | |
648 if (hasattr(sys.stdout, "encoding") | |
649 and sys.stdout.encoding != 'UTF-8'): # type: ignore | |
650 if PY3 and hasattr(sys.stdout, "detach"): | |
651 stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') | |
652 else: | |
653 stdout = getwriter('utf-8')(sys.stdout) # type: ignore | |
654 else: | |
655 stdout = cast(TextIO, sys.stdout) # type: ignore | |
656 | |
657 _logger.removeHandler(defaultStreamHandler) | |
658 stderr_handler = logger_handler | |
659 if stderr_handler is not None: | |
660 _logger.addHandler(stderr_handler) | |
661 else: | |
662 coloredlogs.install(logger=_logger, stream=stderr) | |
663 stderr_handler = _logger.handlers[-1] | |
664 workflowobj = None | |
665 prov_log_handler = None # type: Optional[logging.StreamHandler] | |
666 try: | |
667 if args is None: | |
668 if argsl is None: | |
669 argsl = sys.argv[1:] | |
670 addl = [] # type: List[str] | |
671 if "CWLTOOL_OPTIONS" in os.environ: | |
672 addl = os.environ["CWLTOOL_OPTIONS"].split(" ") | |
673 args = arg_parser().parse_args(addl+argsl) | |
674 if args.record_container_id: | |
675 if not args.cidfile_dir: | |
676 args.cidfile_dir = os.getcwd() | |
677 del args.record_container_id | |
678 | |
679 if runtimeContext is None: | |
680 runtimeContext = RuntimeContext(vars(args)) | |
681 else: | |
682 runtimeContext = runtimeContext.copy() | |
683 | |
684 # If on Windows platform, a default Docker Container is used if not | |
685 # explicitely provided by user | |
686 if onWindows() and not runtimeContext.default_container: | |
687 # This docker image is a minimal alpine image with bash installed | |
688 # (size 6 mb). source: https://github.com/frol/docker-alpine-bash | |
689 runtimeContext.default_container = windows_default_container_id | |
690 | |
691 # If caller parsed its own arguments, it may not include every | |
692 # cwltool option, so fill in defaults to avoid crashing when | |
693 # dereferencing them in args. | |
694 for key, val in iteritems(get_default_args()): | |
695 if not hasattr(args, key): | |
696 setattr(args, key, val) | |
697 | |
698 configure_logging(args, stderr_handler, runtimeContext) | |
699 | |
700 if args.version: | |
701 print(versionfunc()) | |
702 return 0 | |
703 _logger.info(versionfunc()) | |
704 | |
705 if args.print_supported_versions: | |
706 print("\n".join(supported_cwl_versions(args.enable_dev))) | |
707 return 0 | |
708 | |
709 if not args.workflow: | |
710 if os.path.isfile("CWLFile"): | |
711 setattr(args, "workflow", "CWLFile") | |
712 else: | |
713 _logger.error("CWL document required, no input file was provided") | |
714 arg_parser().print_help() | |
715 return 1 | |
716 if args.relax_path_checks: | |
717 command_line_tool.ACCEPTLIST_RE = command_line_tool.ACCEPTLIST_EN_RELAXED_RE | |
718 | |
719 if args.ga4gh_tool_registries: | |
720 ga4gh_tool_registries[:] = args.ga4gh_tool_registries | |
721 if not args.enable_ga4gh_tool_registry: | |
722 del ga4gh_tool_registries[:] | |
723 | |
724 setup_schema(args, custom_schema_callback) | |
725 | |
726 if args.provenance: | |
727 if argsl is None: | |
728 raise Exception("argsl cannot be None") | |
729 if setup_provenance(args, argsl, runtimeContext) is not None: | |
730 return 1 | |
731 | |
732 loadingContext = setup_loadingContext(loadingContext, runtimeContext, args) | |
733 | |
734 uri, tool_file_uri = resolve_tool_uri( | |
735 args.workflow, resolver=loadingContext.resolver, | |
736 fetcher_constructor=loadingContext.fetcher_constructor) | |
737 | |
738 try_again_msg = "" if args.debug else ", try again with --debug for more information" | |
739 | |
740 try: | |
741 job_order_object, input_basedir, jobloader = load_job_order( | |
742 args, stdin, loadingContext.fetcher_constructor, | |
743 loadingContext.overrides_list, tool_file_uri) | |
744 | |
745 if args.overrides: | |
746 loadingContext.overrides_list.extend(load_overrides( | |
747 file_uri(os.path.abspath(args.overrides)), tool_file_uri)) | |
748 | |
749 loadingContext, workflowobj, uri = fetch_document( | |
750 uri, loadingContext) | |
751 | |
752 if args.print_deps and loadingContext.loader: | |
753 printdeps(workflowobj, loadingContext.loader, stdout, | |
754 args.relative_deps, uri) | |
755 return 0 | |
756 | |
757 loadingContext, uri \ | |
758 = resolve_and_validate_document(loadingContext, workflowobj, uri, | |
759 preprocess_only=(args.print_pre or args.pack), | |
760 skip_schemas=args.skip_schemas) | |
761 | |
762 if loadingContext.loader is None: | |
763 raise Exception("Impossible code path.") | |
764 processobj, metadata = loadingContext.loader.resolve_ref(uri) | |
765 processobj = cast(CommentedMap, processobj) | |
766 if args.pack: | |
767 stdout.write(print_pack(loadingContext.loader, processobj, uri, metadata)) | |
768 return 0 | |
769 | |
770 if args.provenance and runtimeContext.research_obj: | |
771 # Can't really be combined with args.pack at same time | |
772 runtimeContext.research_obj.packed_workflow( | |
773 print_pack(loadingContext.loader, processobj, uri, metadata)) | |
774 | |
775 if args.print_pre: | |
776 stdout.write(json_dumps(processobj, indent=4, sort_keys=True, separators=(',', ': '))) | |
777 return 0 | |
778 | |
779 tool = make_tool(uri, loadingContext) | |
780 if args.make_template: | |
781 make_template(tool) | |
782 return 0 | |
783 | |
784 if args.validate: | |
785 print("{} is valid CWL.".format(args.workflow)) | |
786 return 0 | |
787 | |
788 if args.print_rdf: | |
789 stdout.write(printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer)) | |
790 return 0 | |
791 | |
792 if args.print_dot: | |
793 printdot(tool, loadingContext.loader.ctx, stdout) | |
794 return 0 | |
795 | |
796 if args.print_targets: | |
797 for f in ("outputs", "steps", "inputs"): | |
798 if tool.tool[f]: | |
799 _logger.info("%s%s targets:", f[0].upper(), f[1:-1]) | |
800 stdout.write(" "+"\n ".join([shortname(t["id"]) for t in tool.tool[f]])+"\n") | |
801 return 0 | |
802 | |
803 if args.target: | |
804 ctool = choose_target(args, tool, loadingContext) | |
805 if ctool is None: | |
806 return 1 | |
807 else: | |
808 tool = ctool | |
809 | |
810 if args.print_subgraph: | |
811 if "name" in tool.tool: | |
812 del tool.tool["name"] | |
813 stdout.write(json_dumps(tool.tool, indent=4, sort_keys=True, separators=(',', ': '))) | |
814 return 0 | |
815 | |
816 except (validate.ValidationException) as exc: | |
817 _logger.error(u"Tool definition failed validation:\n%s", Text(exc), | |
818 exc_info=args.debug) | |
819 return 1 | |
820 except (RuntimeError, WorkflowException) as exc: | |
821 _logger.error(u"Tool definition failed initialization:\n%s", Text(exc), | |
822 exc_info=args.debug) | |
823 return 1 | |
824 except Exception as exc: | |
825 _logger.error( | |
826 u"I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s", | |
827 try_again_msg, | |
828 Text(exc) if not args.debug else "", | |
829 exc_info=args.debug) | |
830 return 1 | |
831 | |
832 if isinstance(tool, int): | |
833 return tool | |
834 | |
835 # If on MacOS platform, TMPDIR must be set to be under one of the | |
836 # shared volumes in Docker for Mac | |
837 # More info: https://dockstore.org/docs/faq | |
838 if sys.platform == "darwin": | |
839 default_mac_path = "/private/tmp/docker_tmp" | |
840 if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX: | |
841 runtimeContext.tmp_outdir_prefix = default_mac_path | |
842 if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX: | |
843 runtimeContext.tmpdir_prefix = default_mac_path | |
844 | |
845 if check_working_directories(runtimeContext) is not None: | |
846 return 1 | |
847 | |
848 if args.cachedir: | |
849 if args.move_outputs == "move": | |
850 runtimeContext.move_outputs = "copy" | |
851 runtimeContext.tmp_outdir_prefix = args.cachedir | |
852 | |
853 runtimeContext.secret_store = getdefault(runtimeContext.secret_store, SecretStore()) | |
854 runtimeContext.make_fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess) | |
855 | |
856 if not executor: | |
857 if args.parallel: | |
858 temp_executor = MultithreadedJobExecutor() | |
859 runtimeContext.select_resources = temp_executor.select_resources | |
860 real_executor = temp_executor # type: JobExecutor | |
861 else: | |
862 real_executor = SingleJobExecutor() | |
863 else: | |
864 real_executor = executor | |
865 | |
866 try: | |
867 runtimeContext.basedir = input_basedir | |
868 | |
869 if isinstance(tool, ProcessGenerator): | |
870 tfjob_order = {} # type: MutableMapping[Text, Any] | |
871 if loadingContext.jobdefaults: | |
872 tfjob_order.update(loadingContext.jobdefaults) | |
873 if job_order_object: | |
874 tfjob_order.update(job_order_object) | |
875 tfout, tfstatus = real_executor(tool.embedded_tool, tfjob_order, runtimeContext) | |
876 if tfstatus != "success": | |
877 raise WorkflowException("ProcessGenerator failed to generate workflow") | |
878 tool, job_order_object = tool.result(tfjob_order, tfout, runtimeContext) | |
879 if not job_order_object: | |
880 job_order_object = None | |
881 | |
882 try: | |
883 initialized_job_order_object = init_job_order( | |
884 job_order_object, args, tool, jobloader, stdout, | |
885 print_input_deps=args.print_input_deps, | |
886 relative_deps=args.relative_deps, | |
887 make_fs_access=runtimeContext.make_fs_access, | |
888 input_basedir=input_basedir, | |
889 secret_store=runtimeContext.secret_store, | |
890 input_required=input_required) | |
891 except SystemExit as err: | |
892 return err.code | |
893 | |
894 del args.workflow | |
895 del args.job_order | |
896 | |
897 conf_file = getattr(args, "beta_dependency_resolvers_configuration", None) # Text | |
898 use_conda_dependencies = getattr(args, "beta_conda_dependencies", None) # Text | |
899 | |
900 if conf_file or use_conda_dependencies: | |
901 runtimeContext.job_script_provider = DependenciesConfiguration(args) | |
902 else: | |
903 runtimeContext.find_default_container = functools.partial( | |
904 find_default_container, | |
905 default_container=runtimeContext.default_container, | |
906 use_biocontainers=args.beta_use_biocontainers) | |
907 | |
908 (out, status) = real_executor( | |
909 tool, initialized_job_order_object, runtimeContext, | |
910 logger=_logger) | |
911 | |
912 if out is not None: | |
913 if runtimeContext.research_obj is not None: | |
914 runtimeContext.research_obj.create_job( | |
915 out, None, True) | |
916 def remove_at_id(doc): # type: (MutableMapping[Text, Any]) -> None | |
917 for key in list(doc.keys()): | |
918 if key == '@id': | |
919 del doc[key] | |
920 else: | |
921 value = doc[key] | |
922 if isinstance(value, MutableMapping): | |
923 remove_at_id(value) | |
924 elif isinstance(value, MutableSequence): | |
925 for entry in value: | |
926 if isinstance(entry, MutableMapping): | |
927 remove_at_id(entry) | |
928 remove_at_id(out) | |
929 visit_class(out, ("File",), functools.partial( | |
930 add_sizes, runtimeContext.make_fs_access(''))) | |
931 | |
932 def loc_to_path(obj): # type: (Dict[Text, Any]) -> None | |
933 for field in ("path", "nameext", "nameroot", "dirname"): | |
934 if field in obj: | |
935 del obj[field] | |
936 if obj["location"].startswith("file://"): | |
937 obj["path"] = uri_file_path(obj["location"]) | |
938 | |
939 visit_class(out, ("File", "Directory"), loc_to_path) | |
940 | |
941 # Unsetting the Generation from final output object | |
942 visit_class(out, ("File", ), MutationManager().unset_generation) | |
943 | |
944 if isinstance(out, string_types): | |
945 stdout.write(out) | |
946 else: | |
947 stdout.write(json_dumps(out, indent=4, ensure_ascii=False)) | |
948 stdout.write("\n") | |
949 if hasattr(stdout, "flush"): | |
950 stdout.flush() | |
951 | |
952 if status != "success": | |
953 _logger.warning(u"Final process status is %s", status) | |
954 return 1 | |
955 _logger.info(u"Final process status is %s", status) | |
956 return 0 | |
957 | |
958 except (validate.ValidationException) as exc: | |
959 _logger.error(u"Input object failed validation:\n%s", Text(exc), | |
960 exc_info=args.debug) | |
961 return 1 | |
962 except UnsupportedRequirement as exc: | |
963 _logger.error( | |
964 u"Workflow or tool uses unsupported feature:\n%s", Text(exc), | |
965 exc_info=args.debug) | |
966 return 33 | |
967 except WorkflowException as exc: | |
968 _logger.error( | |
969 u"Workflow error%s:\n%s", try_again_msg, strip_dup_lineno(Text(exc)), | |
970 exc_info=args.debug) | |
971 return 1 | |
972 except Exception as exc: # pylint: disable=broad-except | |
973 _logger.error( | |
974 u"Unhandled error%s:\n %s", try_again_msg, Text(exc), exc_info=args.debug) | |
975 return 1 | |
976 | |
977 finally: | |
978 if args and runtimeContext and runtimeContext.research_obj \ | |
979 and workflowobj and loadingContext: | |
980 research_obj = runtimeContext.research_obj | |
981 if loadingContext.loader is not None: | |
982 research_obj.generate_snapshot(prov_deps( | |
983 workflowobj, loadingContext.loader, uri)) | |
984 else: | |
985 _logger.warning("Unable to generate provenance snapshot " | |
986 " due to missing loadingContext.loader.") | |
987 if prov_log_handler is not None: | |
988 # Stop logging so we won't half-log adding ourself to RO | |
989 _logger.debug(u"[provenance] Closing provenance log file %s", | |
990 prov_log_handler) | |
991 _logger.removeHandler(prov_log_handler) | |
992 # Ensure last log lines are written out | |
993 prov_log_handler.flush() | |
994 # Underlying WritableBagFile will add the tagfile to the manifest | |
995 prov_log_handler.stream.close() | |
996 prov_log_handler.close() | |
997 research_obj.close(args.provenance) | |
998 | |
999 _logger.removeHandler(stderr_handler) | |
1000 _logger.addHandler(defaultStreamHandler) | |
1001 | |
1002 | |
1003 def find_default_container(builder, # type: HasReqsHints | |
1004 default_container=None, # type: Optional[Text] | |
1005 use_biocontainers=None, # type: Optional[bool] | |
1006 ): # type: (...) -> Optional[Text] | |
1007 """Find a container.""" | |
1008 if not default_container and use_biocontainers: | |
1009 default_container = get_container_from_software_requirements( | |
1010 use_biocontainers, builder) | |
1011 return default_container | |
1012 | |
1013 | |
1014 def run(*args, **kwargs): | |
1015 # type: (*Any, **Any) -> None | |
1016 """Run cwltool.""" | |
1017 signal.signal(signal.SIGTERM, _signal_handler) | |
1018 try: | |
1019 sys.exit(main(*args, **kwargs)) | |
1020 finally: | |
1021 _terminate_processes() | |
1022 | |
1023 | |
1024 if __name__ == "__main__": | |
1025 run(sys.argv[1:]) |