comparison env/lib/python3.7/site-packages/cwltool/command_line_tool.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """Implementation of CommandLineTool."""
2 from __future__ import absolute_import
3
4 import copy
5 import hashlib
6 import json
7 import locale
8 import logging
9 import os
10 import re
11 import shutil
12 import tempfile
13 import threading
14 from functools import cmp_to_key, partial
15 from typing import (Any, Callable, Dict, Generator, IO, List, Mapping,
16 MutableMapping, MutableSequence, Optional, Set, Union, cast)
17
18 from typing_extensions import Text, Type, TYPE_CHECKING # pylint: disable=unused-import
19 # move to a regular typing import when Python 3.3-3.6 is no longer supported
20
21 import shellescape
22 from schema_salad import validate
23 from schema_salad.avro.schema import Schema
24 from schema_salad.ref_resolver import file_uri, uri_file_path
25 from schema_salad.sourceline import SourceLine
26 from six import string_types
27 from future.utils import raise_from
28
29 from six.moves import map, urllib
30 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import
31 Text, Type)
32 # move to a regular typing import when Python 3.3-3.6 is no longer supported
33
34 from .builder import (Builder, content_limit_respected_read_bytes, # pylint: disable=unused-import
35 substitute)
36 from .context import LoadingContext # pylint: disable=unused-import
37 from .context import RuntimeContext, getdefault
38 from .docker import DockerCommandLineJob
39 from .errors import WorkflowException
40 from .flatten import flatten
41 from .job import CommandLineJob, JobBase # pylint: disable=unused-import
42 from .loghandler import _logger
43 from .mutation import MutationManager # pylint: disable=unused-import
44 from .pathmapper import (PathMapper, adjustDirObjs, adjustFileObjs,
45 get_listing, trim_listing, visit_class)
46 from .process import (Process, UnsupportedRequirement,
47 _logger_validation_warnings, compute_checksums,
48 normalizeFilesDirs, shortname, uniquename)
49 from .singularity import SingularityCommandLineJob
50 from .software_requirements import ( # pylint: disable=unused-import
51 DependenciesConfiguration)
52 from .stdfsaccess import StdFsAccess # pylint: disable=unused-import
53 from .utils import (aslist, convert_pathsep_to_unix,
54 docker_windows_path_adjust, json_dumps, onWindows,
55 random_outdir, windows_default_container_id,
56 shared_file_lock, upgrade_lock)
57 if TYPE_CHECKING:
58 from .provenance import ProvenanceProfile # pylint: disable=unused-import
59
60 ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
61 ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") # Accept anything
62 ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE
63 DEFAULT_CONTAINER_MSG = """
64 We are on Microsoft Windows and not all components of this CWL description have a
65 container specified. This means that these steps will be executed in the default container,
66 which is %s.
67
68 Note, this could affect portability if this CWL description relies on non-POSIX features
69 or commands in this container. For best results add the following to your CWL
70 description's hints section:
71
72 hints:
73 DockerRequirement:
74 dockerPull: %s
75 """
76
77
78 class ExpressionTool(Process):
79 class ExpressionJob(object):
80 """Job for ExpressionTools."""
81
82 def __init__(self,
83 builder, # type: Builder
84 script, # type: Dict[Text, Text]
85 output_callback, # type: Callable[[Any, Any], Any]
86 requirements, # type: List[Dict[Text, Text]]
87 hints, # type: List[Dict[Text, Text]]
88 outdir=None, # type: Optional[Text]
89 tmpdir=None, # type: Optional[Text]
90 ): # type: (...) -> None
91 """Initializet this ExpressionJob."""
92 self.builder = builder
93 self.requirements = requirements
94 self.hints = hints
95 self.collect_outputs = None # type: Optional[Callable[[Any], Any]]
96 self.output_callback = output_callback
97 self.outdir = outdir
98 self.tmpdir = tmpdir
99 self.script = script
100 self.prov_obj = None # type: Optional[ProvenanceProfile]
101
102 def run(self,
103 runtimeContext, # type: RuntimeContext
104 tmpdir_lock=None # type: Optional[threading.Lock]
105 ): # type: (...) -> None
106 try:
107 normalizeFilesDirs(self.builder.job)
108 ev = self.builder.do_eval(self.script)
109 normalizeFilesDirs(ev)
110 self.output_callback(ev, "success")
111 except Exception as err:
112 _logger.warning(u"Failed to evaluate expression:\n%s",
113 Text(err), exc_info=runtimeContext.debug)
114 self.output_callback({}, "permanentFail")
115
116 def job(self,
117 job_order, # type: Mapping[Text, Text]
118 output_callbacks, # type: Callable[[Any, Any], Any]
119 runtimeContext # type: RuntimeContext
120 ):
121 # type: (...) -> Generator[ExpressionTool.ExpressionJob, None, None]
122 builder = self._init_job(job_order, runtimeContext)
123
124 job = ExpressionTool.ExpressionJob(
125 builder, self.tool["expression"], output_callbacks,
126 self.requirements, self.hints)
127 job.prov_obj = runtimeContext.prov_obj
128 yield job
129
130
131 def remove_path(f): # type: (Dict[Text, Any]) -> None
132 if "path" in f:
133 del f["path"]
134
135
136 def revmap_file(builder, outdir, f):
137 # type: (Builder, Text, Dict[Text, Any]) -> Union[Dict[Text, Any], None]
138 """
139 Remap a file from internal path to external path.
140
141 For Docker, this maps from the path inside tho container to the path
142 outside the container. Recognizes files in the pathmapper or remaps
143 internal output directories to the external directory.
144 """
145 split = urllib.parse.urlsplit(outdir)
146 if not split.scheme:
147 outdir = file_uri(str(outdir))
148
149 # builder.outdir is the inner (container/compute node) output directory
150 # outdir is the outer (host/storage system) output directory
151
152 if "location" in f and "path" not in f:
153 if f["location"].startswith("file://"):
154 f["path"] = convert_pathsep_to_unix(uri_file_path(f["location"]))
155 else:
156 return f
157
158 if "path" in f:
159 path = f["path"]
160 uripath = file_uri(path)
161 del f["path"]
162
163 if "basename" not in f:
164 f["basename"] = os.path.basename(path)
165
166 if not builder.pathmapper:
167 raise ValueError("Do not call revmap_file using a builder that doesn't have a pathmapper.")
168 revmap_f = builder.pathmapper.reversemap(path)
169
170 if revmap_f and not builder.pathmapper.mapper(revmap_f[0]).type.startswith("Writable"):
171 f["location"] = revmap_f[1]
172 elif uripath == outdir or uripath.startswith(outdir+os.sep) or uripath.startswith(outdir+'/'):
173 f["location"] = file_uri(path)
174 elif path == builder.outdir or path.startswith(builder.outdir+os.sep) or path.startswith(builder.outdir+'/'):
175 f["location"] = builder.fs_access.join(outdir, path[len(builder.outdir) + 1:])
176 elif not os.path.isabs(path):
177 f["location"] = builder.fs_access.join(outdir, path)
178 else:
179 raise WorkflowException(u"Output file path %s must be within designated output directory (%s) or an input "
180 u"file pass through." % (path, builder.outdir))
181 return f
182
183 raise WorkflowException(u"Output File object is missing both 'location' "
184 "and 'path' fields: %s" % f)
185
186
187 class CallbackJob(object):
188 def __init__(self, job, output_callback, cachebuilder, jobcache):
189 # type: (CommandLineTool, Callable[[Any, Any], Any], Builder, Text) -> None
190 """Initialize this CallbackJob."""
191 self.job = job
192 self.output_callback = output_callback
193 self.cachebuilder = cachebuilder
194 self.outdir = jobcache
195 self.prov_obj = None # type: Optional[ProvenanceProfile]
196
197 def run(self,
198 runtimeContext, # type: RuntimeContext
199 tmpdir_lock=None # type: Optional[threading.Lock]
200 ): # type: (...) -> None
201 self.output_callback(self.job.collect_output_ports(
202 self.job.tool["outputs"],
203 self.cachebuilder,
204 self.outdir,
205 getdefault(runtimeContext.compute_checksum, True)), "success")
206
207
208 def check_adjust(builder, file_o):
209 # type: (Builder, Dict[Text, Any]) -> Dict[Text, Any]
210 """
211 Map files to assigned path inside a container.
212
213 We need to also explicitly walk over input, as implicit reassignment
214 doesn't reach everything in builder.bindings
215 """
216 if not builder.pathmapper:
217 raise ValueError("Do not call check_adjust using a builder that doesn't have a pathmapper.")
218 file_o["path"] = docker_windows_path_adjust(
219 builder.pathmapper.mapper(file_o["location"])[1])
220 dn, bn = os.path.split(file_o["path"])
221 if file_o.get("dirname") != dn:
222 file_o["dirname"] = Text(dn)
223 if file_o.get("basename") != bn:
224 file_o["basename"] = Text(bn)
225 if file_o["class"] == "File":
226 nr, ne = os.path.splitext(file_o["basename"])
227 if file_o.get("nameroot") != nr:
228 file_o["nameroot"] = Text(nr)
229 if file_o.get("nameext") != ne:
230 file_o["nameext"] = Text(ne)
231 if not ACCEPTLIST_RE.match(file_o["basename"]):
232 raise WorkflowException(
233 "Invalid filename: '{}' contains illegal characters".format(
234 file_o["basename"]))
235 return file_o
236
237 def check_valid_locations(fs_access, ob): # type: (StdFsAccess, Dict[Text, Any]) -> None
238 if ob["location"].startswith("_:"):
239 pass
240 if ob["class"] == "File" and not fs_access.isfile(ob["location"]):
241 raise validate.ValidationException("Does not exist or is not a File: '%s'" % ob["location"])
242 if ob["class"] == "Directory" and not fs_access.isdir(ob["location"]):
243 raise validate.ValidationException("Does not exist or is not a Directory: '%s'" % ob["location"])
244
245
246 OutputPorts = Dict[Text, Union[None, Text, List[Union[Dict[Text, Any], Text]], Dict[Text, Any]]]
247
248 class CommandLineTool(Process):
249 def __init__(self, toolpath_object, loadingContext):
250 # type: (MutableMapping[Text, Any], LoadingContext) -> None
251 """Initialize this CommandLineTool."""
252 super(CommandLineTool, self).__init__(toolpath_object, loadingContext)
253 self.prov_obj = loadingContext.prov_obj
254
255 def make_job_runner(self,
256 runtimeContext # type: RuntimeContext
257 ): # type: (...) -> Type[JobBase]
258 dockerReq, _ = self.get_requirement("DockerRequirement")
259 if not dockerReq and runtimeContext.use_container:
260 if runtimeContext.find_default_container is not None:
261 default_container = runtimeContext.find_default_container(self)
262 if default_container is not None:
263 self.requirements.insert(0, {
264 "class": "DockerRequirement",
265 "dockerPull": default_container
266 })
267 dockerReq = self.requirements[0]
268 if default_container == windows_default_container_id \
269 and runtimeContext.use_container and onWindows():
270 _logger.warning(
271 DEFAULT_CONTAINER_MSG, windows_default_container_id,
272 windows_default_container_id)
273
274 if dockerReq is not None and runtimeContext.use_container:
275 if runtimeContext.singularity:
276 return SingularityCommandLineJob
277 return DockerCommandLineJob
278 for t in reversed(self.requirements):
279 if t["class"] == "DockerRequirement":
280 raise UnsupportedRequirement(
281 "--no-container, but this CommandLineTool has "
282 "DockerRequirement under 'requirements'.")
283 return CommandLineJob
284
285 def make_path_mapper(self, reffiles, stagedir, runtimeContext, separateDirs):
286 # type: (List[Any], Text, RuntimeContext, bool) -> PathMapper
287 return PathMapper(reffiles, runtimeContext.basedir, stagedir, separateDirs)
288
289 def updatePathmap(self, outdir, pathmap, fn):
290 # type: (Text, PathMapper, Dict[Text, Any]) -> None
291 if "location" in fn and fn["location"] in pathmap:
292 pathmap.update(fn["location"], pathmap.mapper(fn["location"]).resolved,
293 os.path.join(outdir, fn["basename"]),
294 ("Writable" if fn.get("writable") else "") + fn["class"], False)
295 for sf in fn.get("secondaryFiles", []):
296 self.updatePathmap(outdir, pathmap, sf)
297 for ls in fn.get("listing", []):
298 self.updatePathmap(os.path.join(outdir, fn["basename"]), pathmap, ls)
299
300 def job(self,
301 job_order, # type: Mapping[Text, Text]
302 output_callbacks, # type: Callable[[Any, Any], Any]
303 runtimeContext # type: RuntimeContext
304 ):
305 # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]
306
307 workReuse, _ = self.get_requirement("WorkReuse")
308 enableReuse = workReuse.get("enableReuse", True) if workReuse else True
309
310 jobname = uniquename(runtimeContext.name or shortname(self.tool.get("id", "job")))
311 if runtimeContext.cachedir and enableReuse:
312 cachecontext = runtimeContext.copy()
313 cachecontext.outdir = "/out"
314 cachecontext.tmpdir = "/tmp" # nosec
315 cachecontext.stagedir = "/stage"
316 cachebuilder = self._init_job(job_order, cachecontext)
317 cachebuilder.pathmapper = PathMapper(cachebuilder.files,
318 runtimeContext.basedir,
319 cachebuilder.stagedir,
320 separateDirs=False)
321 _check_adjust = partial(check_adjust, cachebuilder)
322 visit_class([cachebuilder.files, cachebuilder.bindings],
323 ("File", "Directory"), _check_adjust)
324
325 cmdline = flatten(list(map(cachebuilder.generate_arg, cachebuilder.bindings)))
326 docker_req, _ = self.get_requirement("DockerRequirement")
327 if docker_req is not None and runtimeContext.use_container:
328 dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull")
329 elif runtimeContext.default_container is not None and runtimeContext.use_container:
330 dockerimg = runtimeContext.default_container
331 else:
332 dockerimg = None
333
334 if dockerimg is not None:
335 cmdline = ["docker", "run", dockerimg] + cmdline
336 # not really run using docker, just for hashing purposes
337 keydict = {u"cmdline": cmdline} # type: Dict[Text, Union[Dict[Text, Any], List[Any]]]
338
339 for shortcut in ["stdin", "stdout", "stderr"]:
340 if shortcut in self.tool:
341 keydict[shortcut] = self.tool[shortcut]
342
343 for location, fobj in cachebuilder.pathmapper.items():
344 if fobj.type == "File":
345 checksum = next(
346 (e['checksum'] for e in cachebuilder.files
347 if 'location' in e and e['location'] == location
348 and 'checksum' in e
349 and e['checksum'] != 'sha1$hash'), None)
350 fobj_stat = os.stat(fobj.resolved)
351 if checksum is not None:
352 keydict[fobj.resolved] = [fobj_stat.st_size, checksum]
353 else:
354 keydict[fobj.resolved] = [fobj_stat.st_size,
355 int(fobj_stat.st_mtime * 1000)]
356
357 interesting = {"DockerRequirement",
358 "EnvVarRequirement",
359 "InitialWorkDirRequirement",
360 "ShellCommandRequirement",
361 "NetworkAccess"}
362 for rh in (self.original_requirements, self.original_hints):
363 for r in reversed(rh):
364 if r["class"] in interesting and r["class"] not in keydict:
365 keydict[r["class"]] = r
366
367 keydictstr = json_dumps(keydict, separators=(',', ':'),
368 sort_keys=True)
369 cachekey = hashlib.md5( # nosec
370 keydictstr.encode('utf-8')).hexdigest()
371
372 _logger.debug("[job %s] keydictstr is %s -> %s", jobname,
373 keydictstr, cachekey)
374
375 jobcache = os.path.join(runtimeContext.cachedir, cachekey)
376
377 # Create a lockfile to manage cache status.
378 jobcachepending = "{}.status".format(jobcache)
379 jobcachelock = None
380 jobstatus = None
381
382 # Opens the file for read/write, or creates an empty file.
383 jobcachelock = open(jobcachepending, "a+")
384
385 # get the shared lock to ensure no other process is trying
386 # to write to this cache
387 shared_file_lock(jobcachelock)
388 jobcachelock.seek(0)
389 jobstatus = jobcachelock.read()
390
391 if os.path.isdir(jobcache) and jobstatus == "success":
392 if docker_req and runtimeContext.use_container:
393 cachebuilder.outdir = runtimeContext.docker_outdir or random_outdir()
394 else:
395 cachebuilder.outdir = jobcache
396
397 _logger.info("[job %s] Using cached output in %s", jobname, jobcache)
398 yield CallbackJob(self, output_callbacks, cachebuilder, jobcache)
399 # we're done with the cache so release lock
400 jobcachelock.close()
401 return
402 else:
403 _logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache)
404
405 # turn shared lock into an exclusive lock since we'll
406 # be writing the cache directory
407 upgrade_lock(jobcachelock)
408
409 shutil.rmtree(jobcache, True)
410 os.makedirs(jobcache)
411 runtimeContext = runtimeContext.copy()
412 runtimeContext.outdir = jobcache
413
414 def update_status_output_callback(
415 output_callbacks, # type: Callable[[List[Dict[Text, Any]], Text], None]
416 jobcachelock, # type: IO[Any]
417 outputs, # type: List[Dict[Text, Any]]
418 processStatus # type: Text
419 ): # type: (...) -> None
420 # save status to the lockfile then release the lock
421 jobcachelock.seek(0)
422 jobcachelock.truncate()
423 jobcachelock.write(processStatus)
424 jobcachelock.close()
425 output_callbacks(outputs, processStatus)
426
427 output_callbacks = partial(
428 update_status_output_callback, output_callbacks, jobcachelock)
429
430 builder = self._init_job(job_order, runtimeContext)
431
432 reffiles = copy.deepcopy(builder.files)
433
434 j = self.make_job_runner(runtimeContext)(
435 builder, builder.job, self.make_path_mapper, self.requirements,
436 self.hints, jobname)
437 j.prov_obj = self.prov_obj
438
439 j.successCodes = self.tool.get("successCodes", [])
440 j.temporaryFailCodes = self.tool.get("temporaryFailCodes", [])
441 j.permanentFailCodes = self.tool.get("permanentFailCodes", [])
442
443 debug = _logger.isEnabledFor(logging.DEBUG)
444
445 if debug:
446 _logger.debug(u"[job %s] initializing from %s%s",
447 j.name,
448 self.tool.get("id", ""),
449 u" as part of %s" % runtimeContext.part_of
450 if runtimeContext.part_of else "")
451 _logger.debug(u"[job %s] %s", j.name, json_dumps(builder.job,
452 indent=4))
453
454 builder.pathmapper = self.make_path_mapper(
455 reffiles, builder.stagedir, runtimeContext, True)
456 builder.requirements = j.requirements
457
458 _check_adjust = partial(check_adjust, builder)
459
460 visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust)
461
462 initialWorkdir, _ = self.get_requirement("InitialWorkDirRequirement")
463 if initialWorkdir is not None:
464 ls = [] # type: List[Dict[Text, Any]]
465 if isinstance(initialWorkdir["listing"], string_types):
466 ls = builder.do_eval(initialWorkdir["listing"])
467 else:
468 for t in initialWorkdir["listing"]:
469 if isinstance(t, Mapping) and "entry" in t:
470 entry_exp = builder.do_eval(t["entry"], strip_whitespace=False)
471 for entry in aslist(entry_exp):
472 et = {u"entry": entry}
473 if "entryname" in t:
474 et["entryname"] = builder.do_eval(t["entryname"])
475 else:
476 et["entryname"] = None
477 et["writable"] = t.get("writable", False)
478 if et[u"entry"] is not None:
479 ls.append(et)
480 else:
481 initwd_item = builder.do_eval(t)
482 if not initwd_item:
483 continue
484 if isinstance(initwd_item, MutableSequence):
485 ls.extend(initwd_item)
486 else:
487 ls.append(initwd_item)
488 for i, t in enumerate(ls):
489 if "entry" in t:
490 if isinstance(t["entry"], string_types):
491 ls[i] = {
492 "class": "File",
493 "basename": t["entryname"],
494 "contents": t["entry"],
495 "writable": t.get("writable")
496 }
497 else:
498 if t.get("entryname") or t.get("writable"):
499 t = copy.deepcopy(t)
500 if t.get("entryname"):
501 t["entry"]["basename"] = t["entryname"]
502 t["entry"]["writable"] = t.get("writable")
503 ls[i] = t["entry"]
504 j.generatefiles["listing"] = ls
505 for l in ls:
506 self.updatePathmap(builder.outdir, builder.pathmapper, l)
507 visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust)
508
509 if debug:
510 _logger.debug(u"[job %s] path mappings is %s", j.name,
511 json_dumps({p: builder.pathmapper.mapper(p)
512 for p in builder.pathmapper.files()},
513 indent=4))
514
515 if self.tool.get("stdin"):
516 with SourceLine(self.tool, "stdin", validate.ValidationException, debug):
517 j.stdin = builder.do_eval(self.tool["stdin"])
518 if j.stdin:
519 reffiles.append({"class": "File", "path": j.stdin})
520
521 if self.tool.get("stderr"):
522 with SourceLine(self.tool, "stderr", validate.ValidationException, debug):
523 j.stderr = builder.do_eval(self.tool["stderr"])
524 if j.stderr:
525 if os.path.isabs(j.stderr) or ".." in j.stderr:
526 raise validate.ValidationException(
527 "stderr must be a relative path, got '%s'" % j.stderr)
528
529 if self.tool.get("stdout"):
530 with SourceLine(self.tool, "stdout", validate.ValidationException, debug):
531 j.stdout = builder.do_eval(self.tool["stdout"])
532 if j.stdout:
533 if os.path.isabs(j.stdout) or ".." in j.stdout or not j.stdout:
534 raise validate.ValidationException(
535 "stdout must be a relative path, got '%s'" % j.stdout)
536
537 if debug:
538 _logger.debug(u"[job %s] command line bindings is %s", j.name,
539 json_dumps(builder.bindings, indent=4))
540 dockerReq, _ = self.get_requirement("DockerRequirement")
541 if dockerReq is not None and runtimeContext.use_container:
542 out_dir, out_prefix = os.path.split(
543 runtimeContext.tmp_outdir_prefix)
544 j.outdir = runtimeContext.outdir or \
545 tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)
546 tmpdir_dir, tmpdir_prefix = os.path.split(
547 runtimeContext.tmpdir_prefix)
548 j.tmpdir = runtimeContext.tmpdir or \
549 tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir)
550 j.stagedir = tempfile.mkdtemp(prefix=tmpdir_prefix, dir=tmpdir_dir)
551 else:
552 j.outdir = builder.outdir
553 j.tmpdir = builder.tmpdir
554 j.stagedir = builder.stagedir
555
556 inplaceUpdateReq, _ = self.get_requirement("InplaceUpdateRequirement")
557 if inplaceUpdateReq is not None:
558 j.inplace_update = inplaceUpdateReq["inplaceUpdate"]
559 normalizeFilesDirs(j.generatefiles)
560
561 readers = {} # type: Dict[Text, Any]
562 muts = set() # type: Set[Text]
563
564 if builder.mutation_manager is not None:
565 def register_mut(f): # type: (Dict[Text, Any]) -> None
566 mm = cast(MutationManager, builder.mutation_manager)
567 muts.add(f["location"])
568 mm.register_mutation(j.name, f)
569
570 def register_reader(f): # type: (Dict[Text, Any]) -> None
571 mm = cast(MutationManager, builder.mutation_manager)
572 if f["location"] not in muts:
573 mm.register_reader(j.name, f)
574 readers[f["location"]] = copy.deepcopy(f)
575
576 for li in j.generatefiles["listing"]:
577 li = cast(Dict[Text, Any], li)
578 if li.get("writable") and j.inplace_update:
579 adjustFileObjs(li, register_mut)
580 adjustDirObjs(li, register_mut)
581 else:
582 adjustFileObjs(li, register_reader)
583 adjustDirObjs(li, register_reader)
584
585 adjustFileObjs(builder.files, register_reader)
586 adjustFileObjs(builder.bindings, register_reader)
587 adjustDirObjs(builder.files, register_reader)
588 adjustDirObjs(builder.bindings, register_reader)
589
590 timelimit, _ = self.get_requirement("ToolTimeLimit")
591 if timelimit is not None:
592 with SourceLine(timelimit, "timelimit", validate.ValidationException, debug):
593 j.timelimit = builder.do_eval(timelimit["timelimit"])
594 if not isinstance(j.timelimit, int) or j.timelimit < 0:
595 raise Exception("timelimit must be an integer >= 0, got: %s" % j.timelimit)
596
597 networkaccess, _ = self.get_requirement("NetworkAccess")
598 if networkaccess is not None:
599 with SourceLine(networkaccess, "networkAccess", validate.ValidationException, debug):
600 j.networkaccess = builder.do_eval(networkaccess["networkAccess"])
601 if not isinstance(j.networkaccess, bool):
602 raise Exception("networkAccess must be a boolean, got: %s" % j.networkaccess)
603
604 j.environment = {}
605 evr, _ = self.get_requirement("EnvVarRequirement")
606 if evr is not None:
607 for t in evr["envDef"]:
608 j.environment[t["envName"]] = builder.do_eval(t["envValue"])
609
610 shellcmd, _ = self.get_requirement("ShellCommandRequirement")
611 if shellcmd is not None:
612 cmd = [] # type: List[Text]
613 for b in builder.bindings:
614 arg = builder.generate_arg(b)
615 if b.get("shellQuote", True):
616 arg = [shellescape.quote(a) for a in aslist(arg)]
617 cmd.extend(aslist(arg))
618 j.command_line = ["/bin/sh", "-c", " ".join(cmd)]
619 else:
620 j.command_line = flatten(list(map(builder.generate_arg, builder.bindings)))
621
622 j.pathmapper = builder.pathmapper
623 j.collect_outputs = partial(
624 self.collect_output_ports, self.tool["outputs"], builder,
625 compute_checksum=getdefault(runtimeContext.compute_checksum, True),
626 jobname=jobname,
627 readers=readers)
628 j.output_callback = output_callbacks
629
630 yield j
631
632 def collect_output_ports(self,
633 ports, # type: Set[Dict[Text, Any]]
634 builder, # type: Builder
635 outdir, # type: Text
636 rcode, # type: int
637 compute_checksum=True, # type: bool
638 jobname="", # type: Text
639 readers=None # type: Optional[Dict[Text, Any]]
640 ): # type: (...) -> OutputPorts
641 ret = {} # type: OutputPorts
642 debug = _logger.isEnabledFor(logging.DEBUG)
643 cwl_version = self.metadata.get(
644 "http://commonwl.org/cwltool#original_cwlVersion", None)
645 if cwl_version != "v1.0":
646 builder.resources["exitCode"] = rcode
647 try:
648 fs_access = builder.make_fs_access(outdir)
649 custom_output = fs_access.join(outdir, "cwl.output.json")
650 if fs_access.exists(custom_output):
651 with fs_access.open(custom_output, "r") as f:
652 ret = json.load(f)
653 if debug:
654 _logger.debug(u"Raw output from %s: %s", custom_output,
655 json_dumps(ret, indent=4))
656 else:
657 for i, port in enumerate(ports):
658 class ParameterOutputWorkflowException(WorkflowException):
659 def __init__(self, msg, **kwargs): # type: (Text, **Any) -> None
660 super(ParameterOutputWorkflowException, self).__init__(
661 u"Error collecting output for parameter '%s':\n%s"
662 % (shortname(port["id"]), msg), kwargs)
663 with SourceLine(ports, i, ParameterOutputWorkflowException, debug):
664 fragment = shortname(port["id"])
665 ret[fragment] = self.collect_output(port, builder, outdir, fs_access,
666 compute_checksum=compute_checksum)
667 if ret:
668 revmap = partial(revmap_file, builder, outdir)
669 adjustDirObjs(ret, trim_listing)
670 visit_class(ret, ("File", "Directory"), cast(Callable[[Any], Any], revmap))
671 visit_class(ret, ("File", "Directory"), remove_path)
672 normalizeFilesDirs(ret)
673 visit_class(ret, ("File", "Directory"), partial(check_valid_locations, fs_access))
674
675 if compute_checksum:
676 adjustFileObjs(ret, partial(compute_checksums, fs_access))
677 expected_schema = cast(Schema, self.names.get_name(
678 "outputs_record_schema", ""))
679 validate.validate_ex(expected_schema, ret,
680 strict=False, logger=_logger_validation_warnings)
681 if ret is not None and builder.mutation_manager is not None:
682 adjustFileObjs(ret, builder.mutation_manager.set_generation)
683 return ret if ret is not None else {}
684 except validate.ValidationException as e:
685 raise_from(WorkflowException(
686 "Error validating output record. " + Text(e) + "\n in "
687 + json_dumps(ret, indent=4)), e)
688 finally:
689 if builder.mutation_manager and readers:
690 for r in readers.values():
691 builder.mutation_manager.release_reader(jobname, r)
692
693 def collect_output(self,
694 schema, # type: Dict[Text, Any]
695 builder, # type: Builder
696 outdir, # type: Text
697 fs_access, # type: StdFsAccess
698 compute_checksum=True # type: bool
699 ):
700 # type: (...) -> Optional[Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]]
701 r = [] # type: List[Any]
702 empty_and_optional = False
703 debug = _logger.isEnabledFor(logging.DEBUG)
704 if "outputBinding" in schema:
705 binding = schema["outputBinding"]
706 globpatterns = [] # type: List[Text]
707
708 revmap = partial(revmap_file, builder, outdir)
709
710 if "glob" in binding:
711 with SourceLine(binding, "glob", WorkflowException, debug):
712 for gb in aslist(binding["glob"]):
713 gb = builder.do_eval(gb)
714 if gb:
715 globpatterns.extend(aslist(gb))
716
717 for gb in globpatterns:
718 if gb.startswith(builder.outdir):
719 gb = gb[len(builder.outdir) + 1:]
720 elif gb == ".":
721 gb = outdir
722 elif gb.startswith("/"):
723 raise WorkflowException(
724 "glob patterns must not start with '/'")
725 try:
726 prefix = fs_access.glob(outdir)
727 r.extend([{"location": g,
728 "path": fs_access.join(builder.outdir,
729 g[len(prefix[0])+1:]),
730 "basename": os.path.basename(g),
731 "nameroot": os.path.splitext(
732 os.path.basename(g))[0],
733 "nameext": os.path.splitext(
734 os.path.basename(g))[1],
735 "class": "File" if fs_access.isfile(g)
736 else "Directory"}
737 for g in sorted(fs_access.glob(
738 fs_access.join(outdir, gb)),
739 key=cmp_to_key(cast(
740 Callable[[Text, Text],
741 int], locale.strcoll)))])
742 except (OSError, IOError) as e:
743 _logger.warning(Text(e))
744 except Exception:
745 _logger.error("Unexpected error from fs_access", exc_info=True)
746 raise
747
748 for files in r:
749 rfile = files.copy()
750 revmap(rfile)
751 if files["class"] == "Directory":
752 ll = schema.get("loadListing") or builder.loadListing
753 if ll and ll != "no_listing":
754 get_listing(fs_access, files, (ll == "deep_listing"))
755 else:
756 if binding.get("loadContents"):
757 with fs_access.open(rfile["location"], "rb") as f:
758 files["contents"] = content_limit_respected_read_bytes(f).decode("utf-8")
759 if compute_checksum:
760 with fs_access.open(rfile["location"], "rb") as f:
761 checksum = hashlib.sha1() # nosec
762 contents = f.read(1024 * 1024)
763 while contents != b"":
764 checksum.update(contents)
765 contents = f.read(1024 * 1024)
766 files["checksum"] = "sha1$%s" % checksum.hexdigest()
767 files["size"] = fs_access.size(rfile["location"])
768
769 optional = False
770 single = False
771 if isinstance(schema["type"], MutableSequence):
772 if "null" in schema["type"]:
773 optional = True
774 if "File" in schema["type"] or "Directory" in schema["type"]:
775 single = True
776 elif schema["type"] == "File" or schema["type"] == "Directory":
777 single = True
778
779 if "outputEval" in binding:
780 with SourceLine(binding, "outputEval", WorkflowException, debug):
781 r = builder.do_eval(binding["outputEval"], context=r)
782
783 if single:
784 if not r and not optional:
785 with SourceLine(binding, "glob", WorkflowException, debug):
786 raise WorkflowException("Did not find output file with glob pattern: '{}'".format(globpatterns))
787 elif not r and optional:
788 pass
789 elif isinstance(r, MutableSequence):
790 if len(r) > 1:
791 raise WorkflowException("Multiple matches for output item that is a single file.")
792 else:
793 r = r[0]
794
795 if "secondaryFiles" in schema:
796 with SourceLine(schema, "secondaryFiles", WorkflowException, debug):
797 for primary in aslist(r):
798 if isinstance(primary, MutableMapping):
799 primary.setdefault("secondaryFiles", [])
800 pathprefix = primary["path"][0:primary["path"].rindex("/")+1]
801 for sf in aslist(schema["secondaryFiles"]):
802 if 'required' in sf:
803 sf_required = builder.do_eval(sf['required'], context=primary)
804 else:
805 sf_required = False
806
807 if "$(" in sf["pattern"] or "${" in sf["pattern"]:
808 sfpath = builder.do_eval(sf["pattern"], context=primary)
809 else:
810 sfpath = substitute(primary["basename"], sf["pattern"])
811
812 for sfitem in aslist(sfpath):
813 if not sfitem:
814 continue
815 if isinstance(sfitem, string_types):
816 sfitem = {"path": pathprefix+sfitem}
817 if not fs_access.exists(sfitem['path']) and sf_required:
818 raise WorkflowException(
819 "Missing required secondary file '%s'" % (
820 sfitem["path"]))
821 if "path" in sfitem and "location" not in sfitem:
822 revmap(sfitem)
823 if fs_access.isfile(sfitem["location"]):
824 sfitem["class"] = "File"
825 primary["secondaryFiles"].append(sfitem)
826 elif fs_access.isdir(sfitem["location"]):
827 sfitem["class"] = "Directory"
828 primary["secondaryFiles"].append(sfitem)
829
830 if "format" in schema:
831 for primary in aslist(r):
832 primary["format"] = builder.do_eval(schema["format"], context=primary)
833
834 # Ensure files point to local references outside of the run environment
835 adjustFileObjs(r, revmap)
836
837 if not r and optional:
838 # Don't convert zero or empty string to None
839 if r in [0, '']:
840 return r
841 # For [] or None, return None
842 else:
843 return None
844
845 if (not empty_and_optional and isinstance(schema["type"], MutableMapping)
846 and schema["type"]["type"] == "record"):
847 out = {}
848 for f in schema["type"]["fields"]:
849 out[shortname(f["name"])] = self.collect_output( # type: ignore
850 f, builder, outdir, fs_access,
851 compute_checksum=compute_checksum)
852 return out
853 return r