Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/process.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
comparison
equal
deleted
inserted
replaced
1:75ca89e9b81c | 2:6af9afd405e9 |
---|---|
1 from __future__ import absolute_import | |
2 | |
3 import abc | |
4 import copy | |
5 import errno | |
6 import functools | |
7 import hashlib | |
8 import json | |
9 import logging | |
10 import os | |
11 import shutil | |
12 import stat | |
13 import tempfile | |
14 import textwrap | |
15 import uuid | |
16 | |
17 from io import open | |
18 from typing import (Any, Callable, Dict, Generator, Iterator, List, | |
19 Mapping, MutableMapping, MutableSequence, Optional, Set, Tuple, | |
20 Type, Union, cast) | |
21 | |
22 from pkg_resources import resource_stream | |
23 from rdflib import Graph # pylint: disable=unused-import | |
24 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
25 from six import PY3, iteritems, itervalues, string_types, with_metaclass | |
26 from six.moves import urllib | |
27 from future.utils import raise_from | |
28 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import | |
29 Text) | |
30 from schema_salad import schema, validate | |
31 from schema_salad.ref_resolver import Loader, file_uri, uri_file_path | |
32 from schema_salad.sourceline import SourceLine, strip_dup_lineno | |
33 | |
34 from . import expression | |
35 from .builder import Builder, HasReqsHints | |
36 from .context import LoadingContext # pylint: disable=unused-import | |
37 from .context import RuntimeContext, getdefault | |
38 from .errors import UnsupportedRequirement, WorkflowException | |
39 from .loghandler import _logger | |
40 from .mutation import MutationManager # pylint: disable=unused-import | |
41 from .pathmapper import (PathMapper, adjustDirObjs, ensure_writable, | |
42 get_listing, normalizeFilesDirs, visit_class, | |
43 MapperEnt) | |
44 from .secrets import SecretStore # pylint: disable=unused-import | |
45 from .software_requirements import ( # pylint: disable=unused-import | |
46 DependenciesConfiguration) | |
47 from .stdfsaccess import StdFsAccess | |
48 from .utils import (DEFAULT_TMP_PREFIX, aslist, cmp_like_py2, | |
49 copytree_with_merge, onWindows, random_outdir) | |
50 from .validate_js import validate_js_expressions | |
51 from .update import INTERNAL_VERSION | |
52 | |
53 try: | |
54 from os import scandir # type: ignore | |
55 except ImportError: | |
56 from scandir import scandir # type: ignore | |
57 if TYPE_CHECKING: | |
58 from .provenance import ProvenanceProfile # pylint: disable=unused-import | |
59 | |
60 if PY3: | |
61 from collections.abc import Iterable # only works on python 3.3+ | |
62 else: | |
63 from collections import Iterable # pylint: disable=unused-import | |
64 | |
65 class LogAsDebugFilter(logging.Filter): | |
66 def __init__(self, name, parent): # type: (Text, logging.Logger) -> None | |
67 """Initialize.""" | |
68 name = str(name) | |
69 super(LogAsDebugFilter, self).__init__(name) | |
70 self.parent = parent | |
71 | |
72 def filter(self, record): # type: (logging.LogRecord) -> bool | |
73 return self.parent.isEnabledFor(logging.DEBUG) | |
74 | |
75 | |
76 _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings") | |
77 _logger_validation_warnings.setLevel(_logger.getEffectiveLevel()) | |
78 _logger_validation_warnings.addFilter(LogAsDebugFilter("cwltool.validation_warnings", _logger)) | |
79 | |
80 supportedProcessRequirements = ["DockerRequirement", | |
81 "SchemaDefRequirement", | |
82 "EnvVarRequirement", | |
83 "ScatterFeatureRequirement", | |
84 "SubworkflowFeatureRequirement", | |
85 "MultipleInputFeatureRequirement", | |
86 "InlineJavascriptRequirement", | |
87 "ShellCommandRequirement", | |
88 "StepInputExpressionRequirement", | |
89 "ResourceRequirement", | |
90 "InitialWorkDirRequirement", | |
91 "ToolTimeLimit", | |
92 "WorkReuse", | |
93 "NetworkAccess", | |
94 "InplaceUpdateRequirement", | |
95 "LoadListingRequirement", | |
96 "http://commonwl.org/cwltool#TimeLimit", | |
97 "http://commonwl.org/cwltool#WorkReuse", | |
98 "http://commonwl.org/cwltool#NetworkAccess", | |
99 "http://commonwl.org/cwltool#LoadListingRequirement", | |
100 "http://commonwl.org/cwltool#InplaceUpdateRequirement"] | |
101 | |
102 cwl_files = ( | |
103 "Workflow.yml", | |
104 "CommandLineTool.yml", | |
105 "CommonWorkflowLanguage.yml", | |
106 "Process.yml", | |
107 "concepts.md", | |
108 "contrib.md", | |
109 "intro.md", | |
110 "invocation.md") | |
111 | |
112 salad_files = ('metaschema.yml', | |
113 'metaschema_base.yml', | |
114 'salad.md', | |
115 'field_name.yml', | |
116 'import_include.md', | |
117 'link_res.yml', | |
118 'ident_res.yml', | |
119 'vocab_res.yml', | |
120 'vocab_res.yml', | |
121 'field_name_schema.yml', | |
122 'field_name_src.yml', | |
123 'field_name_proc.yml', | |
124 'ident_res_schema.yml', | |
125 'ident_res_src.yml', | |
126 'ident_res_proc.yml', | |
127 'link_res_schema.yml', | |
128 'link_res_src.yml', | |
129 'link_res_proc.yml', | |
130 'vocab_res_schema.yml', | |
131 'vocab_res_src.yml', | |
132 'vocab_res_proc.yml') | |
133 | |
134 SCHEMA_CACHE = {} # type: Dict[Text, Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text, Any], Loader]] | |
135 SCHEMA_FILE = None # type: Optional[Dict[Text, Any]] | |
136 SCHEMA_DIR = None # type: Optional[Dict[Text, Any]] | |
137 SCHEMA_ANY = None # type: Optional[Dict[Text, Any]] | |
138 | |
139 custom_schemas = {} # type: Dict[Text, Tuple[Text, Text]] | |
140 | |
141 def use_standard_schema(version): | |
142 # type: (Text) -> None | |
143 if version in custom_schemas: | |
144 del custom_schemas[version] | |
145 if version in SCHEMA_CACHE: | |
146 del SCHEMA_CACHE[version] | |
147 | |
148 def use_custom_schema(version, name, text): | |
149 # type: (Text, Text, Union[Text, bytes]) -> None | |
150 if isinstance(text, bytes): | |
151 text2 = text.decode() | |
152 else: | |
153 text2 = text | |
154 custom_schemas[version] = (name, text2) | |
155 if version in SCHEMA_CACHE: | |
156 del SCHEMA_CACHE[version] | |
157 | |
158 def get_schema(version): | |
159 # type: (Text) -> Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text,Any], Loader] | |
160 | |
161 if version in SCHEMA_CACHE: | |
162 return SCHEMA_CACHE[version] | |
163 | |
164 cache = {} # type: Dict[Text, Any] | |
165 version = version.split("#")[-1] | |
166 if '.dev' in version: | |
167 version = ".".join(version.split(".")[:-1]) | |
168 for f in cwl_files: | |
169 try: | |
170 res = resource_stream(__name__, 'schemas/%s/%s' % (version, f)) | |
171 cache["https://w3id.org/cwl/" + f] = res.read() | |
172 res.close() | |
173 except IOError: | |
174 pass | |
175 | |
176 for f in salad_files: | |
177 try: | |
178 res = resource_stream( | |
179 __name__, 'schemas/{}/salad/schema_salad/metaschema/{}'.format( | |
180 version, f)) | |
181 cache["https://w3id.org/cwl/salad/schema_salad/metaschema/" | |
182 + f] = res.read() | |
183 res.close() | |
184 except IOError: | |
185 pass | |
186 | |
187 if version in custom_schemas: | |
188 cache[custom_schemas[version][0]] = custom_schemas[version][1] | |
189 SCHEMA_CACHE[version] = schema.load_schema( | |
190 custom_schemas[version][0], cache=cache) | |
191 else: | |
192 SCHEMA_CACHE[version] = schema.load_schema( | |
193 "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache) | |
194 | |
195 return SCHEMA_CACHE[version] | |
196 | |
197 | |
198 def shortname(inputid): | |
199 # type: (Text) -> Text | |
200 d = urllib.parse.urlparse(inputid) | |
201 if d.fragment: | |
202 return d.fragment.split(u"/")[-1] | |
203 return d.path.split(u"/")[-1] | |
204 | |
205 | |
206 def checkRequirements(rec, supported_process_requirements): | |
207 # type: (Any, Iterable[Any]) -> None | |
208 if isinstance(rec, MutableMapping): | |
209 if "requirements" in rec: | |
210 for i, entry in enumerate(rec["requirements"]): | |
211 with SourceLine(rec["requirements"], i, UnsupportedRequirement): | |
212 if entry["class"] not in supported_process_requirements: | |
213 raise UnsupportedRequirement( | |
214 u"Unsupported requirement {}".format(entry["class"])) | |
215 for key in rec: | |
216 checkRequirements(rec[key], supported_process_requirements) | |
217 if isinstance(rec, MutableSequence): | |
218 for entry in rec: | |
219 checkRequirements(entry, supported_process_requirements) | |
220 | |
221 | |
222 def stage_files(pathmapper, # type: PathMapper | |
223 stage_func=None, # type: Optional[Callable[..., Any]] | |
224 ignore_writable=False, # type: bool | |
225 symlink=True, # type: bool | |
226 secret_store=None, # type: Optional[SecretStore] | |
227 fix_conflicts=False # type: bool | |
228 ): # type: (...) -> None | |
229 """Link or copy files to their targets. Create them as needed.""" | |
230 | |
231 targets = {} # type: Dict[Text, MapperEnt] | |
232 for key, entry in pathmapper.items(): | |
233 if not 'File' in entry.type: | |
234 continue | |
235 if entry.target not in targets: | |
236 targets[entry.target] = entry | |
237 elif targets[entry.target].resolved != entry.resolved: | |
238 if fix_conflicts: | |
239 tgt = entry.target | |
240 i = 2 | |
241 tgt = "%s_%s" % (tgt, i) | |
242 while tgt in targets: | |
243 i += 1 | |
244 tgt = "%s_%s" % (tgt, i) | |
245 targets[tgt] = pathmapper.update(key, entry.resolved, tgt, entry.type, entry.staged) | |
246 else: | |
247 raise WorkflowException("File staging conflict, trying to stage both %s and %s to the same target %s" % ( | |
248 targets[entry.target].resolved, entry.resolved, entry.target)) | |
249 | |
250 for key, entry in pathmapper.items(): | |
251 if not entry.staged: | |
252 continue | |
253 if not os.path.exists(os.path.dirname(entry.target)): | |
254 os.makedirs(os.path.dirname(entry.target)) | |
255 if entry.type in ("File", "Directory") and os.path.exists(entry.resolved): | |
256 if symlink: # Use symlink func if allowed | |
257 if onWindows(): | |
258 if entry.type == "File": | |
259 shutil.copy(entry.resolved, entry.target) | |
260 elif entry.type == "Directory": | |
261 if os.path.exists(entry.target) \ | |
262 and os.path.isdir(entry.target): | |
263 shutil.rmtree(entry.target) | |
264 copytree_with_merge(entry.resolved, entry.target) | |
265 else: | |
266 os.symlink(entry.resolved, entry.target) | |
267 elif stage_func is not None: | |
268 stage_func(entry.resolved, entry.target) | |
269 elif entry.type == "Directory" and not os.path.exists(entry.target) \ | |
270 and entry.resolved.startswith("_:"): | |
271 os.makedirs(entry.target) | |
272 elif entry.type == "WritableFile" and not ignore_writable: | |
273 shutil.copy(entry.resolved, entry.target) | |
274 ensure_writable(entry.target) | |
275 elif entry.type == "WritableDirectory" and not ignore_writable: | |
276 if entry.resolved.startswith("_:"): | |
277 os.makedirs(entry.target) | |
278 else: | |
279 shutil.copytree(entry.resolved, entry.target) | |
280 ensure_writable(entry.target) | |
281 elif entry.type == "CreateFile" or entry.type == "CreateWritableFile": | |
282 with open(entry.target, "wb") as new: | |
283 if secret_store is not None: | |
284 new.write( | |
285 secret_store.retrieve(entry.resolved).encode("utf-8")) | |
286 else: | |
287 new.write(entry.resolved.encode("utf-8")) | |
288 if entry.type == "CreateFile": | |
289 os.chmod(entry.target, stat.S_IRUSR) # Read only | |
290 else: # it is a "CreateWritableFile" | |
291 ensure_writable(entry.target) | |
292 pathmapper.update( | |
293 key, entry.target, entry.target, entry.type, entry.staged) | |
294 | |
295 | |
296 def relocateOutputs(outputObj, # type: Union[Dict[Text, Any], List[Dict[Text, Any]]] | |
297 destination_path, # type: Text | |
298 source_directories, # type: Set[Text] | |
299 action, # type: Text | |
300 fs_access, # type: StdFsAccess | |
301 compute_checksum=True, # type: bool | |
302 path_mapper=PathMapper # type: Type[PathMapper] | |
303 ): | |
304 # type: (...) -> Union[Dict[Text, Any], List[Dict[Text, Any]]] | |
305 adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True)) | |
306 | |
307 if action not in ("move", "copy"): | |
308 return outputObj | |
309 | |
310 def _collectDirEntries(obj): | |
311 # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> Iterator[Dict[Text, Any]] | |
312 if isinstance(obj, dict): | |
313 if obj.get("class") in ("File", "Directory"): | |
314 yield obj | |
315 else: | |
316 for sub_obj in obj.values(): | |
317 for dir_entry in _collectDirEntries(sub_obj): | |
318 yield dir_entry | |
319 elif isinstance(obj, MutableSequence): | |
320 for sub_obj in obj: | |
321 for dir_entry in _collectDirEntries(sub_obj): | |
322 yield dir_entry | |
323 | |
324 def _relocate(src, dst): # type: (Text, Text) -> None | |
325 if src == dst: | |
326 return | |
327 | |
328 # If the source is not contained in source_directories we're not allowed to delete it | |
329 src = fs_access.realpath(src) | |
330 src_can_deleted = any(os.path.commonprefix([p, src]) == p for p in source_directories) | |
331 | |
332 _action = "move" if action == "move" and src_can_deleted else "copy" | |
333 | |
334 if _action == "move": | |
335 _logger.debug("Moving %s to %s", src, dst) | |
336 if fs_access.isdir(src) and fs_access.isdir(dst): | |
337 # merge directories | |
338 for dir_entry in scandir(src): | |
339 _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name)) | |
340 else: | |
341 shutil.move(src, dst) | |
342 | |
343 elif _action == "copy": | |
344 _logger.debug("Copying %s to %s", src, dst) | |
345 if fs_access.isdir(src): | |
346 if os.path.isdir(dst): | |
347 shutil.rmtree(dst) | |
348 elif os.path.isfile(dst): | |
349 os.unlink(dst) | |
350 shutil.copytree(src, dst) | |
351 else: | |
352 shutil.copy2(src, dst) | |
353 | |
354 def _realpath(ob): # type: (Dict[Text, Any]) -> None | |
355 if ob["location"].startswith("file:"): | |
356 ob["location"] = file_uri(os.path.realpath(uri_file_path(ob["location"]))) | |
357 if ob["location"].startswith("/"): | |
358 ob["location"] = os.path.realpath(ob["location"]) | |
359 | |
360 outfiles = list(_collectDirEntries(outputObj)) | |
361 visit_class(outfiles, ("File", "Directory"), _realpath) | |
362 pm = path_mapper(outfiles, "", destination_path, separateDirs=False) | |
363 stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True) | |
364 | |
365 def _check_adjust(a_file): # type: (Dict[Text, Text]) -> Dict[Text, Text] | |
366 a_file["location"] = file_uri(pm.mapper(a_file["location"])[1]) | |
367 if "contents" in a_file: | |
368 del a_file["contents"] | |
369 return a_file | |
370 | |
371 visit_class(outputObj, ("File", "Directory"), _check_adjust) | |
372 | |
373 if compute_checksum: | |
374 visit_class(outputObj, ("File",), functools.partial( | |
375 compute_checksums, fs_access)) | |
376 return outputObj | |
377 | |
378 | |
379 def cleanIntermediate(output_dirs): # type: (Iterable[Text]) -> None | |
380 for a in output_dirs: | |
381 if os.path.exists(a): | |
382 _logger.debug(u"Removing intermediate output directory %s", a) | |
383 shutil.rmtree(a, True) | |
384 | |
385 def add_sizes(fsaccess, obj): # type: (StdFsAccess, Dict[Text, Any]) -> None | |
386 if 'location' in obj: | |
387 try: | |
388 if "size" not in obj: | |
389 obj["size"] = fsaccess.size(obj["location"]) | |
390 except OSError: | |
391 pass | |
392 elif 'contents' in obj: | |
393 obj["size"] = len(obj['contents']) | |
394 else: | |
395 return # best effort | |
396 | |
397 def fill_in_defaults(inputs, # type: List[Dict[Text, Text]] | |
398 job, # type: Dict[Text, expression.JSON] | |
399 fsaccess # type: StdFsAccess | |
400 ): # type: (...) -> None | |
401 for e, inp in enumerate(inputs): | |
402 with SourceLine(inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)): | |
403 fieldname = shortname(inp[u"id"]) | |
404 if job.get(fieldname) is not None: | |
405 pass | |
406 elif job.get(fieldname) is None and u"default" in inp: | |
407 job[fieldname] = copy.deepcopy(inp[u"default"]) | |
408 elif job.get(fieldname) is None and u"null" in aslist(inp[u"type"]): | |
409 job[fieldname] = None | |
410 else: | |
411 raise WorkflowException("Missing required input parameter '%s'" % shortname(inp["id"])) | |
412 | |
413 | |
414 def avroize_type(field_type, name_prefix=""): | |
415 # type: (Union[List[Dict[Text, Any]], Dict[Text, Any]], Text) -> Any | |
416 """Add missing information to a type so that CWL types are valid.""" | |
417 if isinstance(field_type, MutableSequence): | |
418 for field in field_type: | |
419 avroize_type(field, name_prefix) | |
420 elif isinstance(field_type, MutableMapping): | |
421 if field_type["type"] in ("enum", "record"): | |
422 if "name" not in field_type: | |
423 field_type["name"] = name_prefix + Text(uuid.uuid4()) | |
424 if field_type["type"] == "record": | |
425 avroize_type(field_type["fields"], name_prefix) | |
426 if field_type["type"] == "array": | |
427 avroize_type(field_type["items"], name_prefix) | |
428 if isinstance(field_type["type"], MutableSequence): | |
429 for ctype in field_type["type"]: | |
430 avroize_type(ctype, name_prefix) | |
431 return field_type | |
432 | |
433 def get_overrides(overrides, toolid): # type: (List[Dict[Text, Any]], Text) -> Dict[Text, Any] | |
434 req = {} # type: Dict[Text, Any] | |
435 if not isinstance(overrides, MutableSequence): | |
436 raise validate.ValidationException("Expected overrides to be a list, but was %s" % type(overrides)) | |
437 for ov in overrides: | |
438 if ov["overrideTarget"] == toolid: | |
439 req.update(ov) | |
440 return req | |
441 | |
442 | |
443 _VAR_SPOOL_ERROR = textwrap.dedent( | |
444 """ | |
445 Non-portable reference to /var/spool/cwl detected: '{}'. | |
446 To fix, replace /var/spool/cwl with $(runtime.outdir) or add | |
447 DockerRequirement to the 'requirements' section and declare | |
448 'dockerOutputDirectory: /var/spool/cwl'. | |
449 """) | |
450 | |
451 | |
452 def var_spool_cwl_detector(obj, # type: Union[MutableMapping[Text, Text], List[Dict[Text, Any]], Text] | |
453 item=None, # type: Optional[Any] | |
454 obj_key=None, # type: Optional[Any] | |
455 ): # type: (...)->bool | |
456 """Detect any textual reference to /var/spool/cwl.""" | |
457 r = False | |
458 if isinstance(obj, string_types): | |
459 if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory": | |
460 _logger.warning( | |
461 SourceLine(item=item, key=obj_key, raise_type=Text).makeError( | |
462 _VAR_SPOOL_ERROR.format(obj))) | |
463 r = True | |
464 elif isinstance(obj, MutableMapping): | |
465 for mkey, mvalue in iteritems(obj): | |
466 r = var_spool_cwl_detector(mvalue, obj, mkey) or r | |
467 elif isinstance(obj, MutableSequence): | |
468 for lkey, lvalue in enumerate(obj): | |
469 r = var_spool_cwl_detector(lvalue, obj, lkey) or r | |
470 return r | |
471 | |
472 def eval_resource(builder, resource_req): # type: (Builder, Text) -> Any | |
473 if expression.needs_parsing(resource_req): | |
474 return builder.do_eval(resource_req) | |
475 return resource_req | |
476 | |
477 | |
478 # Threshold where the "too many files" warning kicks in | |
479 FILE_COUNT_WARNING = 5000 | |
480 | |
481 class Process(with_metaclass(abc.ABCMeta, HasReqsHints)): | |
482 def __init__(self, | |
483 toolpath_object, # type: MutableMapping[Text, Any] | |
484 loadingContext # type: LoadingContext | |
485 ): # type: (...) -> None | |
486 """Build a Process object from the provided dictionary.""" | |
487 self.metadata = getdefault(loadingContext.metadata, {}) # type: Dict[Text,Any] | |
488 self.provenance_object = None # type: Optional[ProvenanceProfile] | |
489 self.parent_wf = None # type: Optional[ProvenanceProfile] | |
490 global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement | |
491 if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None: | |
492 get_schema("v1.0") | |
493 SCHEMA_ANY = cast(Dict[Text, Any], | |
494 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"]) | |
495 SCHEMA_FILE = cast(Dict[Text, Any], | |
496 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"]) | |
497 SCHEMA_DIR = cast(Dict[Text, Any], | |
498 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"]) | |
499 | |
500 self.names = schema.make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], | |
501 Loader({})) | |
502 self.tool = toolpath_object | |
503 self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, [])) | |
504 self.requirements.extend(self.tool.get("requirements", [])) | |
505 if "id" not in self.tool: | |
506 self.tool["id"] = "_:" + Text(uuid.uuid4()) | |
507 self.requirements.extend(get_overrides(getdefault(loadingContext.overrides_list, []), | |
508 self.tool["id"]).get("requirements", [])) | |
509 self.hints = copy.deepcopy(getdefault(loadingContext.hints, [])) | |
510 self.hints.extend(self.tool.get("hints", [])) | |
511 # Versions of requirements and hints which aren't mutated. | |
512 self.original_requirements = copy.deepcopy(self.requirements) | |
513 self.original_hints = copy.deepcopy(self.hints) | |
514 self.doc_loader = loadingContext.loader | |
515 self.doc_schema = loadingContext.avsc_names | |
516 | |
517 self.formatgraph = None # type: Optional[Graph] | |
518 if self.doc_loader is not None: | |
519 self.formatgraph = self.doc_loader.graph | |
520 | |
521 checkRequirements(self.tool, supportedProcessRequirements) | |
522 self.validate_hints(loadingContext.avsc_names, self.tool.get("hints", []), | |
523 strict=getdefault(loadingContext.strict, False)) | |
524 | |
525 self.schemaDefs = {} # type: Dict[Text,Dict[Text, Any]] | |
526 | |
527 sd, _ = self.get_requirement("SchemaDefRequirement") | |
528 | |
529 if sd is not None: | |
530 sdtypes = avroize_type(sd["types"]) | |
531 av = schema.make_valid_avro(sdtypes, {t["name"]: t for t in sdtypes}, set()) | |
532 for i in av: | |
533 self.schemaDefs[i["name"]] = i # type: ignore | |
534 schema.make_avsc_object(schema.convert_to_dict(av), self.names) | |
535 | |
536 # Build record schema from inputs | |
537 self.inputs_record_schema = { | |
538 "name": "input_record_schema", "type": "record", | |
539 "fields": []} # type: Dict[Text, Any] | |
540 self.outputs_record_schema = { | |
541 "name": "outputs_record_schema", "type": "record", | |
542 "fields": []} # type: Dict[Text, Any] | |
543 | |
544 for key in ("inputs", "outputs"): | |
545 for i in self.tool[key]: | |
546 c = copy.deepcopy(i) | |
547 c["name"] = shortname(c["id"]) | |
548 del c["id"] | |
549 | |
550 if "type" not in c: | |
551 raise validate.ValidationException( | |
552 u"Missing 'type' in parameter '{}'".format(c["name"])) | |
553 | |
554 if "default" in c and "null" not in aslist(c["type"]): | |
555 nullable = ["null"] | |
556 nullable.extend(aslist(c["type"])) | |
557 c["type"] = nullable | |
558 else: | |
559 c["type"] = c["type"] | |
560 c["type"] = avroize_type(c["type"], c["name"]) | |
561 if key == "inputs": | |
562 self.inputs_record_schema["fields"].append(c) | |
563 elif key == "outputs": | |
564 self.outputs_record_schema["fields"].append(c) | |
565 | |
566 with SourceLine(toolpath_object, "inputs", validate.ValidationException): | |
567 self.inputs_record_schema = cast( | |
568 Dict[Text, Any], schema.make_valid_avro( | |
569 self.inputs_record_schema, {}, set())) | |
570 schema.make_avsc_object( | |
571 schema.convert_to_dict(self.inputs_record_schema), self.names) | |
572 with SourceLine(toolpath_object, "outputs", validate.ValidationException): | |
573 self.outputs_record_schema = cast( | |
574 Dict[Text, Any], | |
575 schema.make_valid_avro(self.outputs_record_schema, {}, set())) | |
576 schema.make_avsc_object( | |
577 schema.convert_to_dict(self.outputs_record_schema), self.names) | |
578 | |
579 if toolpath_object.get("class") is not None \ | |
580 and not getdefault(loadingContext.disable_js_validation, False): | |
581 if loadingContext.js_hint_options_file is not None: | |
582 try: | |
583 with open(loadingContext.js_hint_options_file) as options_file: | |
584 validate_js_options = json.load(options_file) | |
585 except (OSError, ValueError) as err: | |
586 _logger.error( | |
587 "Failed to read options file %s", | |
588 loadingContext.js_hint_options_file) | |
589 raise | |
590 else: | |
591 validate_js_options = None | |
592 if self.doc_schema is not None: | |
593 validate_js_expressions( | |
594 cast(CommentedMap, toolpath_object), | |
595 self.doc_schema.names[toolpath_object["class"]], | |
596 validate_js_options) | |
597 | |
598 dockerReq, is_req = self.get_requirement("DockerRequirement") | |
599 | |
600 if dockerReq is not None and "dockerOutputDirectory" in dockerReq\ | |
601 and is_req is not None and not is_req: | |
602 _logger.warning(SourceLine( | |
603 item=dockerReq, raise_type=Text).makeError( | |
604 "When 'dockerOutputDirectory' is declared, DockerRequirement " | |
605 "should go in the 'requirements' section, not 'hints'.""")) | |
606 | |
607 if dockerReq is not None and is_req is not None\ | |
608 and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl": | |
609 if is_req: | |
610 # In this specific case, it is legal to have /var/spool/cwl, so skip the check. | |
611 pass | |
612 else: | |
613 # Must be a requirement | |
614 var_spool_cwl_detector(self.tool) | |
615 else: | |
616 var_spool_cwl_detector(self.tool) | |
617 | |
618 def _init_job(self, joborder, runtime_context): | |
619 # type: (Mapping[Text, Text], RuntimeContext) -> Builder | |
620 | |
621 if self.metadata.get("cwlVersion") != INTERNAL_VERSION: | |
622 raise WorkflowException("Process object loaded with version '%s', must update to '%s' in order to execute." % ( | |
623 self.metadata.get("cwlVersion"), INTERNAL_VERSION)) | |
624 | |
625 job = cast(Dict[Text, expression.JSON], copy.deepcopy(joborder)) | |
626 | |
627 make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) | |
628 fs_access = make_fs_access(runtime_context.basedir) | |
629 | |
630 load_listing_req, _ = self.get_requirement( | |
631 "LoadListingRequirement") | |
632 | |
633 if load_listing_req is not None: | |
634 load_listing = load_listing_req.get("loadListing") | |
635 else: | |
636 load_listing = "no_listing" | |
637 | |
638 # Validate job order | |
639 try: | |
640 fill_in_defaults(self.tool[u"inputs"], job, fs_access) | |
641 | |
642 normalizeFilesDirs(job) | |
643 schema = self.names.get_name("input_record_schema", "") | |
644 if schema is None: | |
645 raise WorkflowException("Missing input record schema: " | |
646 "{}".format(self.names)) | |
647 validate.validate_ex(schema, job, strict=False, | |
648 logger=_logger_validation_warnings) | |
649 | |
650 if load_listing and load_listing != "no_listing": | |
651 get_listing(fs_access, job, recursive=(load_listing == "deep_listing")) | |
652 | |
653 visit_class(job, ("File",), functools.partial(add_sizes, fs_access)) | |
654 | |
655 if load_listing == "deep_listing": | |
656 for i, inparm in enumerate(self.tool["inputs"]): | |
657 k = shortname(inparm["id"]) | |
658 if k not in job: | |
659 continue | |
660 v = job[k] | |
661 dircount = [0] | |
662 | |
663 def inc(d): # type: (List[int]) -> None | |
664 d[0] += 1 | |
665 visit_class(v, ("Directory",), lambda x: inc(dircount)) | |
666 if dircount[0] == 0: | |
667 continue | |
668 filecount = [0] | |
669 visit_class(v, ("File",), lambda x: inc(filecount)) | |
670 if filecount[0] > FILE_COUNT_WARNING: | |
671 # Long lines in this message are okay, will be reflowed based on terminal columns. | |
672 _logger.warning(strip_dup_lineno(SourceLine(self.tool["inputs"], i, Text).makeError( | |
673 """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use. | |
674 | |
675 If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior: | |
676 | |
677 $namespaces: | |
678 cwltool: "http://commonwl.org/cwltool#" | |
679 hints: | |
680 cwltool:LoadListingRequirement: | |
681 loadListing: shallow_listing | |
682 | |
683 """ % (filecount[0], k)))) | |
684 | |
685 except (validate.ValidationException, WorkflowException) as err: | |
686 raise_from(WorkflowException("Invalid job input record:\n" + Text(err)), err) | |
687 | |
688 files = [] # type: List[Dict[Text, Text]] | |
689 bindings = CommentedSeq() | |
690 tmpdir = u"" | |
691 stagedir = u"" | |
692 | |
693 docker_req, _ = self.get_requirement("DockerRequirement") | |
694 default_docker = None | |
695 | |
696 if docker_req is None and runtime_context.default_container: | |
697 default_docker = runtime_context.default_container | |
698 | |
699 if (docker_req or default_docker) and runtime_context.use_container: | |
700 if docker_req is not None: | |
701 # Check if docker output directory is absolute | |
702 if docker_req.get("dockerOutputDirectory") and \ | |
703 docker_req.get("dockerOutputDirectory").startswith('/'): | |
704 outdir = docker_req.get("dockerOutputDirectory") | |
705 else: | |
706 outdir = docker_req.get("dockerOutputDirectory") or \ | |
707 runtime_context.docker_outdir or random_outdir() | |
708 elif default_docker is not None: | |
709 outdir = runtime_context.docker_outdir or random_outdir() | |
710 tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec | |
711 stagedir = runtime_context.docker_stagedir or "/var/lib/cwl" | |
712 else: | |
713 outdir = fs_access.realpath( | |
714 runtime_context.outdir or tempfile.mkdtemp( | |
715 prefix=getdefault(runtime_context.tmp_outdir_prefix, | |
716 DEFAULT_TMP_PREFIX))) | |
717 if self.tool[u"class"] != 'Workflow': | |
718 tmpdir = fs_access.realpath(runtime_context.tmpdir | |
719 or tempfile.mkdtemp()) | |
720 stagedir = fs_access.realpath(runtime_context.stagedir | |
721 or tempfile.mkdtemp()) | |
722 | |
723 builder = Builder(job, | |
724 files, | |
725 bindings, | |
726 self.schemaDefs, | |
727 self.names, | |
728 self.requirements, | |
729 self.hints, | |
730 {}, | |
731 runtime_context.mutation_manager, | |
732 self.formatgraph, | |
733 make_fs_access, | |
734 fs_access, | |
735 runtime_context.job_script_provider, | |
736 runtime_context.eval_timeout, | |
737 runtime_context.debug, | |
738 runtime_context.js_console, | |
739 runtime_context.force_docker_pull, | |
740 load_listing, | |
741 outdir, | |
742 tmpdir, | |
743 stagedir) | |
744 | |
745 bindings.extend(builder.bind_input( | |
746 self.inputs_record_schema, job, | |
747 discover_secondaryFiles=getdefault(runtime_context.toplevel, False))) | |
748 | |
749 if self.tool.get("baseCommand"): | |
750 for index, command in enumerate(aslist(self.tool["baseCommand"])): | |
751 bindings.append({ | |
752 "position": [-1000000, index], | |
753 "datum": command | |
754 }) | |
755 | |
756 if self.tool.get("arguments"): | |
757 for i, arg in enumerate(self.tool["arguments"]): | |
758 lc = self.tool["arguments"].lc.data[i] | |
759 filename = self.tool["arguments"].lc.filename | |
760 bindings.lc.add_kv_line_col(len(bindings), lc) | |
761 if isinstance(arg, MutableMapping): | |
762 arg = copy.deepcopy(arg) | |
763 if arg.get("position"): | |
764 position = arg.get("position") | |
765 if isinstance(position, str): # no need to test the | |
766 # CWLVersion as the v1.0 | |
767 # schema only allows ints | |
768 position = builder.do_eval(position) | |
769 if position is None: | |
770 position = 0 | |
771 arg["position"] = [position, i] | |
772 else: | |
773 arg["position"] = [0, i] | |
774 bindings.append(arg) | |
775 elif ("$(" in arg) or ("${" in arg): | |
776 cm = CommentedMap(( | |
777 ("position", [0, i]), | |
778 ("valueFrom", arg) | |
779 )) | |
780 cm.lc.add_kv_line_col("valueFrom", lc) | |
781 cm.lc.filename = filename | |
782 bindings.append(cm) | |
783 else: | |
784 cm = CommentedMap(( | |
785 ("position", [0, i]), | |
786 ("datum", arg) | |
787 )) | |
788 cm.lc.add_kv_line_col("datum", lc) | |
789 cm.lc.filename = filename | |
790 bindings.append(cm) | |
791 | |
792 # use python2 like sorting of heterogeneous lists | |
793 # (containing str and int types), | |
794 if PY3: | |
795 key = functools.cmp_to_key(cmp_like_py2) | |
796 else: # PY2 | |
797 key = lambda d: d["position"] | |
798 | |
799 # This awkward construction replaces the contents of | |
800 # "bindings" in place (because Builder expects it to be | |
801 # mutated in place, sigh, I'm sorry) with its contents sorted, | |
802 # supporting different versions of Python and ruamel.yaml with | |
803 # different behaviors/bugs in CommentedSeq. | |
804 bindings_copy = copy.deepcopy(bindings) | |
805 del bindings[:] | |
806 bindings.extend(sorted(bindings_copy, key=key)) | |
807 | |
808 if self.tool[u"class"] != 'Workflow': | |
809 builder.resources = self.evalResources(builder, runtime_context) | |
810 return builder | |
811 | |
812 def evalResources(self, builder, runtimeContext): | |
813 # type: (Builder, RuntimeContext) -> Dict[str, int] | |
814 resourceReq, _ = self.get_requirement("ResourceRequirement") | |
815 if resourceReq is None: | |
816 resourceReq = {} | |
817 cwl_version = self.metadata.get( | |
818 "http://commonwl.org/cwltool#original_cwlVersion", None) | |
819 if cwl_version == "v1.0": | |
820 ram = 1024 | |
821 else: | |
822 ram = 256 | |
823 request = { | |
824 "coresMin": 1, | |
825 "coresMax": 1, | |
826 "ramMin": ram, | |
827 "ramMax": ram, | |
828 "tmpdirMin": 1024, | |
829 "tmpdirMax": 1024, | |
830 "outdirMin": 1024, | |
831 "outdirMax": 1024 | |
832 } # type: Dict[str, int] | |
833 for a in ("cores", "ram", "tmpdir", "outdir"): | |
834 mn = None | |
835 mx = None | |
836 if resourceReq.get(a + "Min"): | |
837 mn = eval_resource(builder, resourceReq[a + "Min"]) | |
838 if resourceReq.get(a + "Max"): | |
839 mx = eval_resource(builder, resourceReq[a + "Max"]) | |
840 if mn is None: | |
841 mn = mx | |
842 elif mx is None: | |
843 mx = mn | |
844 | |
845 if mn is not None: | |
846 request[a + "Min"] = cast(int, mn) | |
847 request[a + "Max"] = cast(int, mx) | |
848 | |
849 if runtimeContext.select_resources is not None: | |
850 return runtimeContext.select_resources(request, runtimeContext) | |
851 return { | |
852 "cores": request["coresMin"], | |
853 "ram": request["ramMin"], | |
854 "tmpdirSize": request["tmpdirMin"], | |
855 "outdirSize": request["outdirMin"], | |
856 } | |
857 | |
858 def validate_hints(self, avsc_names, hints, strict): | |
859 # type: (Any, List[Dict[Text, Any]], bool) -> None | |
860 for i, r in enumerate(hints): | |
861 sl = SourceLine(hints, i, validate.ValidationException) | |
862 with sl: | |
863 if avsc_names.get_name(r["class"], "") is not None and self.doc_loader is not None: | |
864 plain_hint = dict((key, r[key]) for key in r if key not in | |
865 self.doc_loader.identifiers) # strip identifiers | |
866 validate.validate_ex( | |
867 avsc_names.get_name(plain_hint["class"], ""), | |
868 plain_hint, strict=strict) | |
869 elif r["class"] in ("NetworkAccess", "LoadListingRequirement"): | |
870 pass | |
871 else: | |
872 _logger.info(Text(sl.makeError(u"Unknown hint %s" % (r["class"])))) | |
873 | |
874 def visit(self, op): # type: (Callable[[MutableMapping[Text, Any]], None]) -> None | |
875 op(self.tool) | |
876 | |
877 @abc.abstractmethod | |
878 def job(self, | |
879 job_order, # type: Mapping[Text, Text] | |
880 output_callbacks, # type: Callable[[Any, Any], Any] | |
881 runtimeContext # type: RuntimeContext | |
882 ): # type: (...) -> Generator[Any, None, None] | |
883 # FIXME: Declare base type for what Generator yields | |
884 pass | |
885 | |
886 | |
887 _names = set() # type: Set[Text] | |
888 | |
889 | |
890 def uniquename(stem, names=None): # type: (Text, Optional[Set[Text]]) -> Text | |
891 global _names | |
892 if names is None: | |
893 names = _names | |
894 c = 1 | |
895 u = stem | |
896 while u in names: | |
897 c += 1 | |
898 u = u"%s_%s" % (stem, c) | |
899 names.add(u) | |
900 return u | |
901 | |
902 | |
903 def nestdir(base, deps): | |
904 # type: (Text, Dict[Text, Any]) -> Dict[Text, Any] | |
905 dirname = os.path.dirname(base) + "/" | |
906 subid = deps["location"] | |
907 if subid.startswith(dirname): | |
908 s2 = subid[len(dirname):] | |
909 sp = s2.split('/') | |
910 sp.pop() | |
911 while sp: | |
912 nx = sp.pop() | |
913 deps = { | |
914 "class": "Directory", | |
915 "basename": nx, | |
916 "listing": [deps] | |
917 } | |
918 return deps | |
919 | |
920 | |
921 def mergedirs(listing): | |
922 # type: (List[Dict[Text, Any]]) -> List[Dict[Text, Any]] | |
923 r = [] # type: List[Dict[Text, Any]] | |
924 ents = {} # type: Dict[Text, Any] | |
925 collided = set() # type: Set[Text] | |
926 for e in listing: | |
927 if e["basename"] not in ents: | |
928 ents[e["basename"]] = e | |
929 elif e["class"] == "Directory": | |
930 if e.get("listing"): | |
931 ents[e["basename"]].setdefault("listing", []).extend(e["listing"]) | |
932 if ents[e["basename"]]["location"].startswith("_:"): | |
933 ents[e["basename"]]["location"] = e["location"] | |
934 elif e["location"] != ents[e["basename"]]["location"]: | |
935 # same basename, different location, collision, | |
936 # rename both. | |
937 collided.add(e["basename"]) | |
938 e2 = ents[e["basename"]] | |
939 | |
940 e["basename"] = urllib.parse.quote(e["location"], safe="") | |
941 e2["basename"] = urllib.parse.quote(e2["location"], safe="") | |
942 | |
943 e["nameroot"], e["nameext"] = os.path.splitext(e["basename"]) | |
944 e2["nameroot"], e2["nameext"] = os.path.splitext(e2["basename"]) | |
945 | |
946 ents[e["basename"]] = e | |
947 ents[e2["basename"]] = e2 | |
948 for c in collided: | |
949 del ents[c] | |
950 for e in itervalues(ents): | |
951 if e["class"] == "Directory" and "listing" in e: | |
952 e["listing"] = mergedirs(e["listing"]) | |
953 r.extend(itervalues(ents)) | |
954 return r | |
955 | |
956 | |
957 CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl" | |
958 | |
959 def scandeps(base, # type: Text | |
960 doc, # type: Any | |
961 reffields, # type: Set[Text] | |
962 urlfields, # type: Set[Text] | |
963 loadref, # type: Callable[[Text, Text], Text] | |
964 urljoin=urllib.parse.urljoin, # type: Callable[[Text, Text], Text] | |
965 nestdirs=True # type: bool | |
966 ): # type: (...) -> List[Dict[Text, Text]] | |
967 r = [] # type: List[Dict[Text, Text]] | |
968 if isinstance(doc, MutableMapping): | |
969 if "id" in doc: | |
970 if doc["id"].startswith("file://"): | |
971 df, _ = urllib.parse.urldefrag(doc["id"]) | |
972 if base != df: | |
973 r.append({ | |
974 "class": "File", | |
975 "location": df, | |
976 "format": CWL_IANA | |
977 }) | |
978 base = df | |
979 | |
980 if doc.get("class") in ("File", "Directory") and "location" in urlfields: | |
981 u = doc.get("location", doc.get("path")) | |
982 if u and not u.startswith("_:"): | |
983 deps = {"class": doc["class"], | |
984 "location": urljoin(base, u) | |
985 } # type: Dict[Text, Any] | |
986 if "basename" in doc: | |
987 deps["basename"] = doc["basename"] | |
988 if doc["class"] == "Directory" and "listing" in doc: | |
989 deps["listing"] = doc["listing"] | |
990 if doc["class"] == "File" and "secondaryFiles" in doc: | |
991 deps["secondaryFiles"] = doc["secondaryFiles"] | |
992 if nestdirs: | |
993 deps = nestdir(base, deps) | |
994 r.append(deps) | |
995 else: | |
996 if doc["class"] == "Directory" and "listing" in doc: | |
997 r.extend(scandeps( | |
998 base, doc["listing"], reffields, urlfields, loadref, | |
999 urljoin=urljoin, nestdirs=nestdirs)) | |
1000 elif doc["class"] == "File" and "secondaryFiles" in doc: | |
1001 r.extend(scandeps( | |
1002 base, doc["secondaryFiles"], reffields, urlfields, | |
1003 loadref, urljoin=urljoin, nestdirs=nestdirs)) | |
1004 | |
1005 for k, v in iteritems(doc): | |
1006 if k in reffields: | |
1007 for u in aslist(v): | |
1008 if isinstance(u, MutableMapping): | |
1009 r.extend(scandeps( | |
1010 base, u, reffields, urlfields, loadref, | |
1011 urljoin=urljoin, nestdirs=nestdirs)) | |
1012 else: | |
1013 subid = urljoin(base, u) | |
1014 basedf, _ = urllib.parse.urldefrag(base) | |
1015 subiddf, _ = urllib.parse.urldefrag(subid) | |
1016 if basedf == subiddf: | |
1017 continue | |
1018 sub = loadref(base, u) | |
1019 deps = { | |
1020 "class": "File", | |
1021 "location": subid, | |
1022 "format": CWL_IANA | |
1023 } | |
1024 sf = scandeps( | |
1025 subid, sub, reffields, urlfields, loadref, | |
1026 urljoin=urljoin, nestdirs=nestdirs) | |
1027 if sf: | |
1028 deps["secondaryFiles"] = sf | |
1029 if nestdirs: | |
1030 deps = nestdir(base, deps) | |
1031 r.append(deps) | |
1032 elif k in urlfields and k != "location": | |
1033 for u in aslist(v): | |
1034 deps = { | |
1035 "class": "File", | |
1036 "location": urljoin(base, u) | |
1037 } | |
1038 if nestdirs: | |
1039 deps = nestdir(base, deps) | |
1040 r.append(deps) | |
1041 elif k not in ("listing", "secondaryFiles"): | |
1042 r.extend(scandeps( | |
1043 base, v, reffields, urlfields, loadref, urljoin=urljoin, | |
1044 nestdirs=nestdirs)) | |
1045 elif isinstance(doc, MutableSequence): | |
1046 for d in doc: | |
1047 r.extend(scandeps( | |
1048 base, d, reffields, urlfields, loadref, urljoin=urljoin, | |
1049 nestdirs=nestdirs)) | |
1050 | |
1051 if r: | |
1052 normalizeFilesDirs(r) | |
1053 r = mergedirs(r) | |
1054 | |
1055 return r | |
1056 | |
1057 | |
1058 def compute_checksums(fs_access, fileobj): # type: (StdFsAccess, Dict[Text, Any]) -> None | |
1059 if "checksum" not in fileobj: | |
1060 checksum = hashlib.sha1() # nosec | |
1061 with fs_access.open(fileobj["location"], "rb") as f: | |
1062 contents = f.read(1024 * 1024) | |
1063 while contents != b"": | |
1064 checksum.update(contents) | |
1065 contents = f.read(1024 * 1024) | |
1066 fileobj["checksum"] = "sha1$%s" % checksum.hexdigest() | |
1067 fileobj["size"] = fs_access.size(fileobj["location"]) |