comparison env/lib/python3.7/site-packages/cwltool/process.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
comparison
equal deleted inserted replaced
1:75ca89e9b81c 2:6af9afd405e9
1 from __future__ import absolute_import
2
3 import abc
4 import copy
5 import errno
6 import functools
7 import hashlib
8 import json
9 import logging
10 import os
11 import shutil
12 import stat
13 import tempfile
14 import textwrap
15 import uuid
16
17 from io import open
18 from typing import (Any, Callable, Dict, Generator, Iterator, List,
19 Mapping, MutableMapping, MutableSequence, Optional, Set, Tuple,
20 Type, Union, cast)
21
22 from pkg_resources import resource_stream
23 from rdflib import Graph # pylint: disable=unused-import
24 from ruamel.yaml.comments import CommentedMap, CommentedSeq
25 from six import PY3, iteritems, itervalues, string_types, with_metaclass
26 from six.moves import urllib
27 from future.utils import raise_from
28 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import
29 Text)
30 from schema_salad import schema, validate
31 from schema_salad.ref_resolver import Loader, file_uri, uri_file_path
32 from schema_salad.sourceline import SourceLine, strip_dup_lineno
33
34 from . import expression
35 from .builder import Builder, HasReqsHints
36 from .context import LoadingContext # pylint: disable=unused-import
37 from .context import RuntimeContext, getdefault
38 from .errors import UnsupportedRequirement, WorkflowException
39 from .loghandler import _logger
40 from .mutation import MutationManager # pylint: disable=unused-import
41 from .pathmapper import (PathMapper, adjustDirObjs, ensure_writable,
42 get_listing, normalizeFilesDirs, visit_class,
43 MapperEnt)
44 from .secrets import SecretStore # pylint: disable=unused-import
45 from .software_requirements import ( # pylint: disable=unused-import
46 DependenciesConfiguration)
47 from .stdfsaccess import StdFsAccess
48 from .utils import (DEFAULT_TMP_PREFIX, aslist, cmp_like_py2,
49 copytree_with_merge, onWindows, random_outdir)
50 from .validate_js import validate_js_expressions
51 from .update import INTERNAL_VERSION
52
53 try:
54 from os import scandir # type: ignore
55 except ImportError:
56 from scandir import scandir # type: ignore
57 if TYPE_CHECKING:
58 from .provenance import ProvenanceProfile # pylint: disable=unused-import
59
60 if PY3:
61 from collections.abc import Iterable # only works on python 3.3+
62 else:
63 from collections import Iterable # pylint: disable=unused-import
64
65 class LogAsDebugFilter(logging.Filter):
66 def __init__(self, name, parent): # type: (Text, logging.Logger) -> None
67 """Initialize."""
68 name = str(name)
69 super(LogAsDebugFilter, self).__init__(name)
70 self.parent = parent
71
72 def filter(self, record): # type: (logging.LogRecord) -> bool
73 return self.parent.isEnabledFor(logging.DEBUG)
74
75
76 _logger_validation_warnings = logging.getLogger("cwltool.validation_warnings")
77 _logger_validation_warnings.setLevel(_logger.getEffectiveLevel())
78 _logger_validation_warnings.addFilter(LogAsDebugFilter("cwltool.validation_warnings", _logger))
79
80 supportedProcessRequirements = ["DockerRequirement",
81 "SchemaDefRequirement",
82 "EnvVarRequirement",
83 "ScatterFeatureRequirement",
84 "SubworkflowFeatureRequirement",
85 "MultipleInputFeatureRequirement",
86 "InlineJavascriptRequirement",
87 "ShellCommandRequirement",
88 "StepInputExpressionRequirement",
89 "ResourceRequirement",
90 "InitialWorkDirRequirement",
91 "ToolTimeLimit",
92 "WorkReuse",
93 "NetworkAccess",
94 "InplaceUpdateRequirement",
95 "LoadListingRequirement",
96 "http://commonwl.org/cwltool#TimeLimit",
97 "http://commonwl.org/cwltool#WorkReuse",
98 "http://commonwl.org/cwltool#NetworkAccess",
99 "http://commonwl.org/cwltool#LoadListingRequirement",
100 "http://commonwl.org/cwltool#InplaceUpdateRequirement"]
101
102 cwl_files = (
103 "Workflow.yml",
104 "CommandLineTool.yml",
105 "CommonWorkflowLanguage.yml",
106 "Process.yml",
107 "concepts.md",
108 "contrib.md",
109 "intro.md",
110 "invocation.md")
111
112 salad_files = ('metaschema.yml',
113 'metaschema_base.yml',
114 'salad.md',
115 'field_name.yml',
116 'import_include.md',
117 'link_res.yml',
118 'ident_res.yml',
119 'vocab_res.yml',
120 'vocab_res.yml',
121 'field_name_schema.yml',
122 'field_name_src.yml',
123 'field_name_proc.yml',
124 'ident_res_schema.yml',
125 'ident_res_src.yml',
126 'ident_res_proc.yml',
127 'link_res_schema.yml',
128 'link_res_src.yml',
129 'link_res_proc.yml',
130 'vocab_res_schema.yml',
131 'vocab_res_src.yml',
132 'vocab_res_proc.yml')
133
134 SCHEMA_CACHE = {} # type: Dict[Text, Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text, Any], Loader]]
135 SCHEMA_FILE = None # type: Optional[Dict[Text, Any]]
136 SCHEMA_DIR = None # type: Optional[Dict[Text, Any]]
137 SCHEMA_ANY = None # type: Optional[Dict[Text, Any]]
138
139 custom_schemas = {} # type: Dict[Text, Tuple[Text, Text]]
140
141 def use_standard_schema(version):
142 # type: (Text) -> None
143 if version in custom_schemas:
144 del custom_schemas[version]
145 if version in SCHEMA_CACHE:
146 del SCHEMA_CACHE[version]
147
148 def use_custom_schema(version, name, text):
149 # type: (Text, Text, Union[Text, bytes]) -> None
150 if isinstance(text, bytes):
151 text2 = text.decode()
152 else:
153 text2 = text
154 custom_schemas[version] = (name, text2)
155 if version in SCHEMA_CACHE:
156 del SCHEMA_CACHE[version]
157
158 def get_schema(version):
159 # type: (Text) -> Tuple[Loader, Union[schema.Names, schema.SchemaParseException], Dict[Text,Any], Loader]
160
161 if version in SCHEMA_CACHE:
162 return SCHEMA_CACHE[version]
163
164 cache = {} # type: Dict[Text, Any]
165 version = version.split("#")[-1]
166 if '.dev' in version:
167 version = ".".join(version.split(".")[:-1])
168 for f in cwl_files:
169 try:
170 res = resource_stream(__name__, 'schemas/%s/%s' % (version, f))
171 cache["https://w3id.org/cwl/" + f] = res.read()
172 res.close()
173 except IOError:
174 pass
175
176 for f in salad_files:
177 try:
178 res = resource_stream(
179 __name__, 'schemas/{}/salad/schema_salad/metaschema/{}'.format(
180 version, f))
181 cache["https://w3id.org/cwl/salad/schema_salad/metaschema/"
182 + f] = res.read()
183 res.close()
184 except IOError:
185 pass
186
187 if version in custom_schemas:
188 cache[custom_schemas[version][0]] = custom_schemas[version][1]
189 SCHEMA_CACHE[version] = schema.load_schema(
190 custom_schemas[version][0], cache=cache)
191 else:
192 SCHEMA_CACHE[version] = schema.load_schema(
193 "https://w3id.org/cwl/CommonWorkflowLanguage.yml", cache=cache)
194
195 return SCHEMA_CACHE[version]
196
197
198 def shortname(inputid):
199 # type: (Text) -> Text
200 d = urllib.parse.urlparse(inputid)
201 if d.fragment:
202 return d.fragment.split(u"/")[-1]
203 return d.path.split(u"/")[-1]
204
205
206 def checkRequirements(rec, supported_process_requirements):
207 # type: (Any, Iterable[Any]) -> None
208 if isinstance(rec, MutableMapping):
209 if "requirements" in rec:
210 for i, entry in enumerate(rec["requirements"]):
211 with SourceLine(rec["requirements"], i, UnsupportedRequirement):
212 if entry["class"] not in supported_process_requirements:
213 raise UnsupportedRequirement(
214 u"Unsupported requirement {}".format(entry["class"]))
215 for key in rec:
216 checkRequirements(rec[key], supported_process_requirements)
217 if isinstance(rec, MutableSequence):
218 for entry in rec:
219 checkRequirements(entry, supported_process_requirements)
220
221
222 def stage_files(pathmapper, # type: PathMapper
223 stage_func=None, # type: Optional[Callable[..., Any]]
224 ignore_writable=False, # type: bool
225 symlink=True, # type: bool
226 secret_store=None, # type: Optional[SecretStore]
227 fix_conflicts=False # type: bool
228 ): # type: (...) -> None
229 """Link or copy files to their targets. Create them as needed."""
230
231 targets = {} # type: Dict[Text, MapperEnt]
232 for key, entry in pathmapper.items():
233 if not 'File' in entry.type:
234 continue
235 if entry.target not in targets:
236 targets[entry.target] = entry
237 elif targets[entry.target].resolved != entry.resolved:
238 if fix_conflicts:
239 tgt = entry.target
240 i = 2
241 tgt = "%s_%s" % (tgt, i)
242 while tgt in targets:
243 i += 1
244 tgt = "%s_%s" % (tgt, i)
245 targets[tgt] = pathmapper.update(key, entry.resolved, tgt, entry.type, entry.staged)
246 else:
247 raise WorkflowException("File staging conflict, trying to stage both %s and %s to the same target %s" % (
248 targets[entry.target].resolved, entry.resolved, entry.target))
249
250 for key, entry in pathmapper.items():
251 if not entry.staged:
252 continue
253 if not os.path.exists(os.path.dirname(entry.target)):
254 os.makedirs(os.path.dirname(entry.target))
255 if entry.type in ("File", "Directory") and os.path.exists(entry.resolved):
256 if symlink: # Use symlink func if allowed
257 if onWindows():
258 if entry.type == "File":
259 shutil.copy(entry.resolved, entry.target)
260 elif entry.type == "Directory":
261 if os.path.exists(entry.target) \
262 and os.path.isdir(entry.target):
263 shutil.rmtree(entry.target)
264 copytree_with_merge(entry.resolved, entry.target)
265 else:
266 os.symlink(entry.resolved, entry.target)
267 elif stage_func is not None:
268 stage_func(entry.resolved, entry.target)
269 elif entry.type == "Directory" and not os.path.exists(entry.target) \
270 and entry.resolved.startswith("_:"):
271 os.makedirs(entry.target)
272 elif entry.type == "WritableFile" and not ignore_writable:
273 shutil.copy(entry.resolved, entry.target)
274 ensure_writable(entry.target)
275 elif entry.type == "WritableDirectory" and not ignore_writable:
276 if entry.resolved.startswith("_:"):
277 os.makedirs(entry.target)
278 else:
279 shutil.copytree(entry.resolved, entry.target)
280 ensure_writable(entry.target)
281 elif entry.type == "CreateFile" or entry.type == "CreateWritableFile":
282 with open(entry.target, "wb") as new:
283 if secret_store is not None:
284 new.write(
285 secret_store.retrieve(entry.resolved).encode("utf-8"))
286 else:
287 new.write(entry.resolved.encode("utf-8"))
288 if entry.type == "CreateFile":
289 os.chmod(entry.target, stat.S_IRUSR) # Read only
290 else: # it is a "CreateWritableFile"
291 ensure_writable(entry.target)
292 pathmapper.update(
293 key, entry.target, entry.target, entry.type, entry.staged)
294
295
296 def relocateOutputs(outputObj, # type: Union[Dict[Text, Any], List[Dict[Text, Any]]]
297 destination_path, # type: Text
298 source_directories, # type: Set[Text]
299 action, # type: Text
300 fs_access, # type: StdFsAccess
301 compute_checksum=True, # type: bool
302 path_mapper=PathMapper # type: Type[PathMapper]
303 ):
304 # type: (...) -> Union[Dict[Text, Any], List[Dict[Text, Any]]]
305 adjustDirObjs(outputObj, functools.partial(get_listing, fs_access, recursive=True))
306
307 if action not in ("move", "copy"):
308 return outputObj
309
310 def _collectDirEntries(obj):
311 # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> Iterator[Dict[Text, Any]]
312 if isinstance(obj, dict):
313 if obj.get("class") in ("File", "Directory"):
314 yield obj
315 else:
316 for sub_obj in obj.values():
317 for dir_entry in _collectDirEntries(sub_obj):
318 yield dir_entry
319 elif isinstance(obj, MutableSequence):
320 for sub_obj in obj:
321 for dir_entry in _collectDirEntries(sub_obj):
322 yield dir_entry
323
324 def _relocate(src, dst): # type: (Text, Text) -> None
325 if src == dst:
326 return
327
328 # If the source is not contained in source_directories we're not allowed to delete it
329 src = fs_access.realpath(src)
330 src_can_deleted = any(os.path.commonprefix([p, src]) == p for p in source_directories)
331
332 _action = "move" if action == "move" and src_can_deleted else "copy"
333
334 if _action == "move":
335 _logger.debug("Moving %s to %s", src, dst)
336 if fs_access.isdir(src) and fs_access.isdir(dst):
337 # merge directories
338 for dir_entry in scandir(src):
339 _relocate(dir_entry.path, fs_access.join(dst, dir_entry.name))
340 else:
341 shutil.move(src, dst)
342
343 elif _action == "copy":
344 _logger.debug("Copying %s to %s", src, dst)
345 if fs_access.isdir(src):
346 if os.path.isdir(dst):
347 shutil.rmtree(dst)
348 elif os.path.isfile(dst):
349 os.unlink(dst)
350 shutil.copytree(src, dst)
351 else:
352 shutil.copy2(src, dst)
353
354 def _realpath(ob): # type: (Dict[Text, Any]) -> None
355 if ob["location"].startswith("file:"):
356 ob["location"] = file_uri(os.path.realpath(uri_file_path(ob["location"])))
357 if ob["location"].startswith("/"):
358 ob["location"] = os.path.realpath(ob["location"])
359
360 outfiles = list(_collectDirEntries(outputObj))
361 visit_class(outfiles, ("File", "Directory"), _realpath)
362 pm = path_mapper(outfiles, "", destination_path, separateDirs=False)
363 stage_files(pm, stage_func=_relocate, symlink=False, fix_conflicts=True)
364
365 def _check_adjust(a_file): # type: (Dict[Text, Text]) -> Dict[Text, Text]
366 a_file["location"] = file_uri(pm.mapper(a_file["location"])[1])
367 if "contents" in a_file:
368 del a_file["contents"]
369 return a_file
370
371 visit_class(outputObj, ("File", "Directory"), _check_adjust)
372
373 if compute_checksum:
374 visit_class(outputObj, ("File",), functools.partial(
375 compute_checksums, fs_access))
376 return outputObj
377
378
379 def cleanIntermediate(output_dirs): # type: (Iterable[Text]) -> None
380 for a in output_dirs:
381 if os.path.exists(a):
382 _logger.debug(u"Removing intermediate output directory %s", a)
383 shutil.rmtree(a, True)
384
385 def add_sizes(fsaccess, obj): # type: (StdFsAccess, Dict[Text, Any]) -> None
386 if 'location' in obj:
387 try:
388 if "size" not in obj:
389 obj["size"] = fsaccess.size(obj["location"])
390 except OSError:
391 pass
392 elif 'contents' in obj:
393 obj["size"] = len(obj['contents'])
394 else:
395 return # best effort
396
397 def fill_in_defaults(inputs, # type: List[Dict[Text, Text]]
398 job, # type: Dict[Text, expression.JSON]
399 fsaccess # type: StdFsAccess
400 ): # type: (...) -> None
401 for e, inp in enumerate(inputs):
402 with SourceLine(inputs, e, WorkflowException, _logger.isEnabledFor(logging.DEBUG)):
403 fieldname = shortname(inp[u"id"])
404 if job.get(fieldname) is not None:
405 pass
406 elif job.get(fieldname) is None and u"default" in inp:
407 job[fieldname] = copy.deepcopy(inp[u"default"])
408 elif job.get(fieldname) is None and u"null" in aslist(inp[u"type"]):
409 job[fieldname] = None
410 else:
411 raise WorkflowException("Missing required input parameter '%s'" % shortname(inp["id"]))
412
413
414 def avroize_type(field_type, name_prefix=""):
415 # type: (Union[List[Dict[Text, Any]], Dict[Text, Any]], Text) -> Any
416 """Add missing information to a type so that CWL types are valid."""
417 if isinstance(field_type, MutableSequence):
418 for field in field_type:
419 avroize_type(field, name_prefix)
420 elif isinstance(field_type, MutableMapping):
421 if field_type["type"] in ("enum", "record"):
422 if "name" not in field_type:
423 field_type["name"] = name_prefix + Text(uuid.uuid4())
424 if field_type["type"] == "record":
425 avroize_type(field_type["fields"], name_prefix)
426 if field_type["type"] == "array":
427 avroize_type(field_type["items"], name_prefix)
428 if isinstance(field_type["type"], MutableSequence):
429 for ctype in field_type["type"]:
430 avroize_type(ctype, name_prefix)
431 return field_type
432
433 def get_overrides(overrides, toolid): # type: (List[Dict[Text, Any]], Text) -> Dict[Text, Any]
434 req = {} # type: Dict[Text, Any]
435 if not isinstance(overrides, MutableSequence):
436 raise validate.ValidationException("Expected overrides to be a list, but was %s" % type(overrides))
437 for ov in overrides:
438 if ov["overrideTarget"] == toolid:
439 req.update(ov)
440 return req
441
442
443 _VAR_SPOOL_ERROR = textwrap.dedent(
444 """
445 Non-portable reference to /var/spool/cwl detected: '{}'.
446 To fix, replace /var/spool/cwl with $(runtime.outdir) or add
447 DockerRequirement to the 'requirements' section and declare
448 'dockerOutputDirectory: /var/spool/cwl'.
449 """)
450
451
452 def var_spool_cwl_detector(obj, # type: Union[MutableMapping[Text, Text], List[Dict[Text, Any]], Text]
453 item=None, # type: Optional[Any]
454 obj_key=None, # type: Optional[Any]
455 ): # type: (...)->bool
456 """Detect any textual reference to /var/spool/cwl."""
457 r = False
458 if isinstance(obj, string_types):
459 if "var/spool/cwl" in obj and obj_key != "dockerOutputDirectory":
460 _logger.warning(
461 SourceLine(item=item, key=obj_key, raise_type=Text).makeError(
462 _VAR_SPOOL_ERROR.format(obj)))
463 r = True
464 elif isinstance(obj, MutableMapping):
465 for mkey, mvalue in iteritems(obj):
466 r = var_spool_cwl_detector(mvalue, obj, mkey) or r
467 elif isinstance(obj, MutableSequence):
468 for lkey, lvalue in enumerate(obj):
469 r = var_spool_cwl_detector(lvalue, obj, lkey) or r
470 return r
471
472 def eval_resource(builder, resource_req): # type: (Builder, Text) -> Any
473 if expression.needs_parsing(resource_req):
474 return builder.do_eval(resource_req)
475 return resource_req
476
477
478 # Threshold where the "too many files" warning kicks in
479 FILE_COUNT_WARNING = 5000
480
481 class Process(with_metaclass(abc.ABCMeta, HasReqsHints)):
482 def __init__(self,
483 toolpath_object, # type: MutableMapping[Text, Any]
484 loadingContext # type: LoadingContext
485 ): # type: (...) -> None
486 """Build a Process object from the provided dictionary."""
487 self.metadata = getdefault(loadingContext.metadata, {}) # type: Dict[Text,Any]
488 self.provenance_object = None # type: Optional[ProvenanceProfile]
489 self.parent_wf = None # type: Optional[ProvenanceProfile]
490 global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement
491 if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None:
492 get_schema("v1.0")
493 SCHEMA_ANY = cast(Dict[Text, Any],
494 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/salad#Any"])
495 SCHEMA_FILE = cast(Dict[Text, Any],
496 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#File"])
497 SCHEMA_DIR = cast(Dict[Text, Any],
498 SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"])
499
500 self.names = schema.make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY],
501 Loader({}))
502 self.tool = toolpath_object
503 self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
504 self.requirements.extend(self.tool.get("requirements", []))
505 if "id" not in self.tool:
506 self.tool["id"] = "_:" + Text(uuid.uuid4())
507 self.requirements.extend(get_overrides(getdefault(loadingContext.overrides_list, []),
508 self.tool["id"]).get("requirements", []))
509 self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
510 self.hints.extend(self.tool.get("hints", []))
511 # Versions of requirements and hints which aren't mutated.
512 self.original_requirements = copy.deepcopy(self.requirements)
513 self.original_hints = copy.deepcopy(self.hints)
514 self.doc_loader = loadingContext.loader
515 self.doc_schema = loadingContext.avsc_names
516
517 self.formatgraph = None # type: Optional[Graph]
518 if self.doc_loader is not None:
519 self.formatgraph = self.doc_loader.graph
520
521 checkRequirements(self.tool, supportedProcessRequirements)
522 self.validate_hints(loadingContext.avsc_names, self.tool.get("hints", []),
523 strict=getdefault(loadingContext.strict, False))
524
525 self.schemaDefs = {} # type: Dict[Text,Dict[Text, Any]]
526
527 sd, _ = self.get_requirement("SchemaDefRequirement")
528
529 if sd is not None:
530 sdtypes = avroize_type(sd["types"])
531 av = schema.make_valid_avro(sdtypes, {t["name"]: t for t in sdtypes}, set())
532 for i in av:
533 self.schemaDefs[i["name"]] = i # type: ignore
534 schema.make_avsc_object(schema.convert_to_dict(av), self.names)
535
536 # Build record schema from inputs
537 self.inputs_record_schema = {
538 "name": "input_record_schema", "type": "record",
539 "fields": []} # type: Dict[Text, Any]
540 self.outputs_record_schema = {
541 "name": "outputs_record_schema", "type": "record",
542 "fields": []} # type: Dict[Text, Any]
543
544 for key in ("inputs", "outputs"):
545 for i in self.tool[key]:
546 c = copy.deepcopy(i)
547 c["name"] = shortname(c["id"])
548 del c["id"]
549
550 if "type" not in c:
551 raise validate.ValidationException(
552 u"Missing 'type' in parameter '{}'".format(c["name"]))
553
554 if "default" in c and "null" not in aslist(c["type"]):
555 nullable = ["null"]
556 nullable.extend(aslist(c["type"]))
557 c["type"] = nullable
558 else:
559 c["type"] = c["type"]
560 c["type"] = avroize_type(c["type"], c["name"])
561 if key == "inputs":
562 self.inputs_record_schema["fields"].append(c)
563 elif key == "outputs":
564 self.outputs_record_schema["fields"].append(c)
565
566 with SourceLine(toolpath_object, "inputs", validate.ValidationException):
567 self.inputs_record_schema = cast(
568 Dict[Text, Any], schema.make_valid_avro(
569 self.inputs_record_schema, {}, set()))
570 schema.make_avsc_object(
571 schema.convert_to_dict(self.inputs_record_schema), self.names)
572 with SourceLine(toolpath_object, "outputs", validate.ValidationException):
573 self.outputs_record_schema = cast(
574 Dict[Text, Any],
575 schema.make_valid_avro(self.outputs_record_schema, {}, set()))
576 schema.make_avsc_object(
577 schema.convert_to_dict(self.outputs_record_schema), self.names)
578
579 if toolpath_object.get("class") is not None \
580 and not getdefault(loadingContext.disable_js_validation, False):
581 if loadingContext.js_hint_options_file is not None:
582 try:
583 with open(loadingContext.js_hint_options_file) as options_file:
584 validate_js_options = json.load(options_file)
585 except (OSError, ValueError) as err:
586 _logger.error(
587 "Failed to read options file %s",
588 loadingContext.js_hint_options_file)
589 raise
590 else:
591 validate_js_options = None
592 if self.doc_schema is not None:
593 validate_js_expressions(
594 cast(CommentedMap, toolpath_object),
595 self.doc_schema.names[toolpath_object["class"]],
596 validate_js_options)
597
598 dockerReq, is_req = self.get_requirement("DockerRequirement")
599
600 if dockerReq is not None and "dockerOutputDirectory" in dockerReq\
601 and is_req is not None and not is_req:
602 _logger.warning(SourceLine(
603 item=dockerReq, raise_type=Text).makeError(
604 "When 'dockerOutputDirectory' is declared, DockerRequirement "
605 "should go in the 'requirements' section, not 'hints'."""))
606
607 if dockerReq is not None and is_req is not None\
608 and dockerReq.get("dockerOutputDirectory") == "/var/spool/cwl":
609 if is_req:
610 # In this specific case, it is legal to have /var/spool/cwl, so skip the check.
611 pass
612 else:
613 # Must be a requirement
614 var_spool_cwl_detector(self.tool)
615 else:
616 var_spool_cwl_detector(self.tool)
617
618 def _init_job(self, joborder, runtime_context):
619 # type: (Mapping[Text, Text], RuntimeContext) -> Builder
620
621 if self.metadata.get("cwlVersion") != INTERNAL_VERSION:
622 raise WorkflowException("Process object loaded with version '%s', must update to '%s' in order to execute." % (
623 self.metadata.get("cwlVersion"), INTERNAL_VERSION))
624
625 job = cast(Dict[Text, expression.JSON], copy.deepcopy(joborder))
626
627 make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess)
628 fs_access = make_fs_access(runtime_context.basedir)
629
630 load_listing_req, _ = self.get_requirement(
631 "LoadListingRequirement")
632
633 if load_listing_req is not None:
634 load_listing = load_listing_req.get("loadListing")
635 else:
636 load_listing = "no_listing"
637
638 # Validate job order
639 try:
640 fill_in_defaults(self.tool[u"inputs"], job, fs_access)
641
642 normalizeFilesDirs(job)
643 schema = self.names.get_name("input_record_schema", "")
644 if schema is None:
645 raise WorkflowException("Missing input record schema: "
646 "{}".format(self.names))
647 validate.validate_ex(schema, job, strict=False,
648 logger=_logger_validation_warnings)
649
650 if load_listing and load_listing != "no_listing":
651 get_listing(fs_access, job, recursive=(load_listing == "deep_listing"))
652
653 visit_class(job, ("File",), functools.partial(add_sizes, fs_access))
654
655 if load_listing == "deep_listing":
656 for i, inparm in enumerate(self.tool["inputs"]):
657 k = shortname(inparm["id"])
658 if k not in job:
659 continue
660 v = job[k]
661 dircount = [0]
662
663 def inc(d): # type: (List[int]) -> None
664 d[0] += 1
665 visit_class(v, ("Directory",), lambda x: inc(dircount))
666 if dircount[0] == 0:
667 continue
668 filecount = [0]
669 visit_class(v, ("File",), lambda x: inc(filecount))
670 if filecount[0] > FILE_COUNT_WARNING:
671 # Long lines in this message are okay, will be reflowed based on terminal columns.
672 _logger.warning(strip_dup_lineno(SourceLine(self.tool["inputs"], i, Text).makeError(
673 """Recursive directory listing has resulted in a large number of File objects (%s) passed to the input parameter '%s'. This may negatively affect workflow performance and memory use.
674
675 If this is a problem, use the hint 'cwltool:LoadListingRequirement' with "shallow_listing" or "no_listing" to change the directory listing behavior:
676
677 $namespaces:
678 cwltool: "http://commonwl.org/cwltool#"
679 hints:
680 cwltool:LoadListingRequirement:
681 loadListing: shallow_listing
682
683 """ % (filecount[0], k))))
684
685 except (validate.ValidationException, WorkflowException) as err:
686 raise_from(WorkflowException("Invalid job input record:\n" + Text(err)), err)
687
688 files = [] # type: List[Dict[Text, Text]]
689 bindings = CommentedSeq()
690 tmpdir = u""
691 stagedir = u""
692
693 docker_req, _ = self.get_requirement("DockerRequirement")
694 default_docker = None
695
696 if docker_req is None and runtime_context.default_container:
697 default_docker = runtime_context.default_container
698
699 if (docker_req or default_docker) and runtime_context.use_container:
700 if docker_req is not None:
701 # Check if docker output directory is absolute
702 if docker_req.get("dockerOutputDirectory") and \
703 docker_req.get("dockerOutputDirectory").startswith('/'):
704 outdir = docker_req.get("dockerOutputDirectory")
705 else:
706 outdir = docker_req.get("dockerOutputDirectory") or \
707 runtime_context.docker_outdir or random_outdir()
708 elif default_docker is not None:
709 outdir = runtime_context.docker_outdir or random_outdir()
710 tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec
711 stagedir = runtime_context.docker_stagedir or "/var/lib/cwl"
712 else:
713 outdir = fs_access.realpath(
714 runtime_context.outdir or tempfile.mkdtemp(
715 prefix=getdefault(runtime_context.tmp_outdir_prefix,
716 DEFAULT_TMP_PREFIX)))
717 if self.tool[u"class"] != 'Workflow':
718 tmpdir = fs_access.realpath(runtime_context.tmpdir
719 or tempfile.mkdtemp())
720 stagedir = fs_access.realpath(runtime_context.stagedir
721 or tempfile.mkdtemp())
722
723 builder = Builder(job,
724 files,
725 bindings,
726 self.schemaDefs,
727 self.names,
728 self.requirements,
729 self.hints,
730 {},
731 runtime_context.mutation_manager,
732 self.formatgraph,
733 make_fs_access,
734 fs_access,
735 runtime_context.job_script_provider,
736 runtime_context.eval_timeout,
737 runtime_context.debug,
738 runtime_context.js_console,
739 runtime_context.force_docker_pull,
740 load_listing,
741 outdir,
742 tmpdir,
743 stagedir)
744
745 bindings.extend(builder.bind_input(
746 self.inputs_record_schema, job,
747 discover_secondaryFiles=getdefault(runtime_context.toplevel, False)))
748
749 if self.tool.get("baseCommand"):
750 for index, command in enumerate(aslist(self.tool["baseCommand"])):
751 bindings.append({
752 "position": [-1000000, index],
753 "datum": command
754 })
755
756 if self.tool.get("arguments"):
757 for i, arg in enumerate(self.tool["arguments"]):
758 lc = self.tool["arguments"].lc.data[i]
759 filename = self.tool["arguments"].lc.filename
760 bindings.lc.add_kv_line_col(len(bindings), lc)
761 if isinstance(arg, MutableMapping):
762 arg = copy.deepcopy(arg)
763 if arg.get("position"):
764 position = arg.get("position")
765 if isinstance(position, str): # no need to test the
766 # CWLVersion as the v1.0
767 # schema only allows ints
768 position = builder.do_eval(position)
769 if position is None:
770 position = 0
771 arg["position"] = [position, i]
772 else:
773 arg["position"] = [0, i]
774 bindings.append(arg)
775 elif ("$(" in arg) or ("${" in arg):
776 cm = CommentedMap((
777 ("position", [0, i]),
778 ("valueFrom", arg)
779 ))
780 cm.lc.add_kv_line_col("valueFrom", lc)
781 cm.lc.filename = filename
782 bindings.append(cm)
783 else:
784 cm = CommentedMap((
785 ("position", [0, i]),
786 ("datum", arg)
787 ))
788 cm.lc.add_kv_line_col("datum", lc)
789 cm.lc.filename = filename
790 bindings.append(cm)
791
792 # use python2 like sorting of heterogeneous lists
793 # (containing str and int types),
794 if PY3:
795 key = functools.cmp_to_key(cmp_like_py2)
796 else: # PY2
797 key = lambda d: d["position"]
798
799 # This awkward construction replaces the contents of
800 # "bindings" in place (because Builder expects it to be
801 # mutated in place, sigh, I'm sorry) with its contents sorted,
802 # supporting different versions of Python and ruamel.yaml with
803 # different behaviors/bugs in CommentedSeq.
804 bindings_copy = copy.deepcopy(bindings)
805 del bindings[:]
806 bindings.extend(sorted(bindings_copy, key=key))
807
808 if self.tool[u"class"] != 'Workflow':
809 builder.resources = self.evalResources(builder, runtime_context)
810 return builder
811
812 def evalResources(self, builder, runtimeContext):
813 # type: (Builder, RuntimeContext) -> Dict[str, int]
814 resourceReq, _ = self.get_requirement("ResourceRequirement")
815 if resourceReq is None:
816 resourceReq = {}
817 cwl_version = self.metadata.get(
818 "http://commonwl.org/cwltool#original_cwlVersion", None)
819 if cwl_version == "v1.0":
820 ram = 1024
821 else:
822 ram = 256
823 request = {
824 "coresMin": 1,
825 "coresMax": 1,
826 "ramMin": ram,
827 "ramMax": ram,
828 "tmpdirMin": 1024,
829 "tmpdirMax": 1024,
830 "outdirMin": 1024,
831 "outdirMax": 1024
832 } # type: Dict[str, int]
833 for a in ("cores", "ram", "tmpdir", "outdir"):
834 mn = None
835 mx = None
836 if resourceReq.get(a + "Min"):
837 mn = eval_resource(builder, resourceReq[a + "Min"])
838 if resourceReq.get(a + "Max"):
839 mx = eval_resource(builder, resourceReq[a + "Max"])
840 if mn is None:
841 mn = mx
842 elif mx is None:
843 mx = mn
844
845 if mn is not None:
846 request[a + "Min"] = cast(int, mn)
847 request[a + "Max"] = cast(int, mx)
848
849 if runtimeContext.select_resources is not None:
850 return runtimeContext.select_resources(request, runtimeContext)
851 return {
852 "cores": request["coresMin"],
853 "ram": request["ramMin"],
854 "tmpdirSize": request["tmpdirMin"],
855 "outdirSize": request["outdirMin"],
856 }
857
858 def validate_hints(self, avsc_names, hints, strict):
859 # type: (Any, List[Dict[Text, Any]], bool) -> None
860 for i, r in enumerate(hints):
861 sl = SourceLine(hints, i, validate.ValidationException)
862 with sl:
863 if avsc_names.get_name(r["class"], "") is not None and self.doc_loader is not None:
864 plain_hint = dict((key, r[key]) for key in r if key not in
865 self.doc_loader.identifiers) # strip identifiers
866 validate.validate_ex(
867 avsc_names.get_name(plain_hint["class"], ""),
868 plain_hint, strict=strict)
869 elif r["class"] in ("NetworkAccess", "LoadListingRequirement"):
870 pass
871 else:
872 _logger.info(Text(sl.makeError(u"Unknown hint %s" % (r["class"]))))
873
874 def visit(self, op): # type: (Callable[[MutableMapping[Text, Any]], None]) -> None
875 op(self.tool)
876
877 @abc.abstractmethod
878 def job(self,
879 job_order, # type: Mapping[Text, Text]
880 output_callbacks, # type: Callable[[Any, Any], Any]
881 runtimeContext # type: RuntimeContext
882 ): # type: (...) -> Generator[Any, None, None]
883 # FIXME: Declare base type for what Generator yields
884 pass
885
886
887 _names = set() # type: Set[Text]
888
889
890 def uniquename(stem, names=None): # type: (Text, Optional[Set[Text]]) -> Text
891 global _names
892 if names is None:
893 names = _names
894 c = 1
895 u = stem
896 while u in names:
897 c += 1
898 u = u"%s_%s" % (stem, c)
899 names.add(u)
900 return u
901
902
903 def nestdir(base, deps):
904 # type: (Text, Dict[Text, Any]) -> Dict[Text, Any]
905 dirname = os.path.dirname(base) + "/"
906 subid = deps["location"]
907 if subid.startswith(dirname):
908 s2 = subid[len(dirname):]
909 sp = s2.split('/')
910 sp.pop()
911 while sp:
912 nx = sp.pop()
913 deps = {
914 "class": "Directory",
915 "basename": nx,
916 "listing": [deps]
917 }
918 return deps
919
920
921 def mergedirs(listing):
922 # type: (List[Dict[Text, Any]]) -> List[Dict[Text, Any]]
923 r = [] # type: List[Dict[Text, Any]]
924 ents = {} # type: Dict[Text, Any]
925 collided = set() # type: Set[Text]
926 for e in listing:
927 if e["basename"] not in ents:
928 ents[e["basename"]] = e
929 elif e["class"] == "Directory":
930 if e.get("listing"):
931 ents[e["basename"]].setdefault("listing", []).extend(e["listing"])
932 if ents[e["basename"]]["location"].startswith("_:"):
933 ents[e["basename"]]["location"] = e["location"]
934 elif e["location"] != ents[e["basename"]]["location"]:
935 # same basename, different location, collision,
936 # rename both.
937 collided.add(e["basename"])
938 e2 = ents[e["basename"]]
939
940 e["basename"] = urllib.parse.quote(e["location"], safe="")
941 e2["basename"] = urllib.parse.quote(e2["location"], safe="")
942
943 e["nameroot"], e["nameext"] = os.path.splitext(e["basename"])
944 e2["nameroot"], e2["nameext"] = os.path.splitext(e2["basename"])
945
946 ents[e["basename"]] = e
947 ents[e2["basename"]] = e2
948 for c in collided:
949 del ents[c]
950 for e in itervalues(ents):
951 if e["class"] == "Directory" and "listing" in e:
952 e["listing"] = mergedirs(e["listing"])
953 r.extend(itervalues(ents))
954 return r
955
956
957 CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
958
959 def scandeps(base, # type: Text
960 doc, # type: Any
961 reffields, # type: Set[Text]
962 urlfields, # type: Set[Text]
963 loadref, # type: Callable[[Text, Text], Text]
964 urljoin=urllib.parse.urljoin, # type: Callable[[Text, Text], Text]
965 nestdirs=True # type: bool
966 ): # type: (...) -> List[Dict[Text, Text]]
967 r = [] # type: List[Dict[Text, Text]]
968 if isinstance(doc, MutableMapping):
969 if "id" in doc:
970 if doc["id"].startswith("file://"):
971 df, _ = urllib.parse.urldefrag(doc["id"])
972 if base != df:
973 r.append({
974 "class": "File",
975 "location": df,
976 "format": CWL_IANA
977 })
978 base = df
979
980 if doc.get("class") in ("File", "Directory") and "location" in urlfields:
981 u = doc.get("location", doc.get("path"))
982 if u and not u.startswith("_:"):
983 deps = {"class": doc["class"],
984 "location": urljoin(base, u)
985 } # type: Dict[Text, Any]
986 if "basename" in doc:
987 deps["basename"] = doc["basename"]
988 if doc["class"] == "Directory" and "listing" in doc:
989 deps["listing"] = doc["listing"]
990 if doc["class"] == "File" and "secondaryFiles" in doc:
991 deps["secondaryFiles"] = doc["secondaryFiles"]
992 if nestdirs:
993 deps = nestdir(base, deps)
994 r.append(deps)
995 else:
996 if doc["class"] == "Directory" and "listing" in doc:
997 r.extend(scandeps(
998 base, doc["listing"], reffields, urlfields, loadref,
999 urljoin=urljoin, nestdirs=nestdirs))
1000 elif doc["class"] == "File" and "secondaryFiles" in doc:
1001 r.extend(scandeps(
1002 base, doc["secondaryFiles"], reffields, urlfields,
1003 loadref, urljoin=urljoin, nestdirs=nestdirs))
1004
1005 for k, v in iteritems(doc):
1006 if k in reffields:
1007 for u in aslist(v):
1008 if isinstance(u, MutableMapping):
1009 r.extend(scandeps(
1010 base, u, reffields, urlfields, loadref,
1011 urljoin=urljoin, nestdirs=nestdirs))
1012 else:
1013 subid = urljoin(base, u)
1014 basedf, _ = urllib.parse.urldefrag(base)
1015 subiddf, _ = urllib.parse.urldefrag(subid)
1016 if basedf == subiddf:
1017 continue
1018 sub = loadref(base, u)
1019 deps = {
1020 "class": "File",
1021 "location": subid,
1022 "format": CWL_IANA
1023 }
1024 sf = scandeps(
1025 subid, sub, reffields, urlfields, loadref,
1026 urljoin=urljoin, nestdirs=nestdirs)
1027 if sf:
1028 deps["secondaryFiles"] = sf
1029 if nestdirs:
1030 deps = nestdir(base, deps)
1031 r.append(deps)
1032 elif k in urlfields and k != "location":
1033 for u in aslist(v):
1034 deps = {
1035 "class": "File",
1036 "location": urljoin(base, u)
1037 }
1038 if nestdirs:
1039 deps = nestdir(base, deps)
1040 r.append(deps)
1041 elif k not in ("listing", "secondaryFiles"):
1042 r.extend(scandeps(
1043 base, v, reffields, urlfields, loadref, urljoin=urljoin,
1044 nestdirs=nestdirs))
1045 elif isinstance(doc, MutableSequence):
1046 for d in doc:
1047 r.extend(scandeps(
1048 base, d, reffields, urlfields, loadref, urljoin=urljoin,
1049 nestdirs=nestdirs))
1050
1051 if r:
1052 normalizeFilesDirs(r)
1053 r = mergedirs(r)
1054
1055 return r
1056
1057
1058 def compute_checksums(fs_access, fileobj): # type: (StdFsAccess, Dict[Text, Any]) -> None
1059 if "checksum" not in fileobj:
1060 checksum = hashlib.sha1() # nosec
1061 with fs_access.open(fileobj["location"], "rb") as f:
1062 contents = f.read(1024 * 1024)
1063 while contents != b"":
1064 checksum.update(contents)
1065 contents = f.read(1024 * 1024)
1066 fileobj["checksum"] = "sha1$%s" % checksum.hexdigest()
1067 fileobj["size"] = fs_access.size(fileobj["location"])