Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/job.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/job.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,1038 @@ +import datetime +import functools +import itertools +import logging +import os +import re +import shutil +import subprocess # nosec +import sys +import tempfile +import threading +import time +import uuid +from abc import ABCMeta, abstractmethod +from io import IOBase +from threading import Timer +from typing import ( + IO, + Callable, + Iterable, + List, + Match, + MutableMapping, + MutableSequence, + Optional, + TextIO, + Tuple, + Union, + cast, +) + +import psutil +import shellescape +from prov.model import PROV +from schema_salad.sourceline import SourceLine +from schema_salad.utils import json_dump, json_dumps +from typing_extensions import TYPE_CHECKING + +from .builder import Builder, HasReqsHints +from .context import RuntimeContext +from .errors import UnsupportedRequirement, WorkflowException +from .loghandler import _logger +from .pathmapper import MapperEnt, PathMapper +from .process import stage_files +from .secrets import SecretStore +from .utils import ( + CWLObjectType, + CWLOutputType, + DirectoryType, + OutputCallbackType, + bytes2str_in_dicts, + copytree_with_merge, + create_tmp_dir, + ensure_non_writable, + ensure_writable, + onWindows, + processes_to_kill, +) + +if TYPE_CHECKING: + from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import +needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") + +FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1" + +SHELL_COMMAND_TEMPLATE = """#!/bin/bash +python "run_job.py" "job.json" +""" + +PYTHON_RUN_SCRIPT = """ +import json +import os +import sys +if os.name == 'posix': + try: + import subprocess32 as subprocess # type: ignore + except Exception: + import subprocess +else: + import subprocess # type: ignore + +with open(sys.argv[1], "r") as f: + popen_description = json.load(f) + commands = popen_description["commands"] + cwd = popen_description["cwd"] + env = popen_description["env"] + env["PATH"] = os.environ.get("PATH") + stdin_path = popen_description["stdin_path"] + stdout_path = popen_description["stdout_path"] + stderr_path = popen_description["stderr_path"] + if stdin_path is not None: + stdin = open(stdin_path, "rb") + else: + stdin = subprocess.PIPE + if stdout_path is not None: + stdout = open(stdout_path, "wb") + else: + stdout = sys.stderr + if stderr_path is not None: + stderr = open(stderr_path, "wb") + else: + stderr = sys.stderr + if os.name == 'nt': + close_fds = False + for key, value in env.items(): + env[key] = str(value) + else: + close_fds = True + sp = subprocess.Popen(commands, + shell=False, + close_fds=close_fds, + stdin=stdin, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd) + if sp.stdin: + sp.stdin.close() + rcode = sp.wait() + if stdin is not subprocess.PIPE: + stdin.close() + if stdout is not sys.stderr: + stdout.close() + if stderr is not sys.stderr: + stderr.close() + sys.exit(rcode) +""" + + +def relink_initialworkdir( + pathmapper: PathMapper, + host_outdir: str, + container_outdir: str, + inplace_update: bool = False, +) -> None: + for _, vol in pathmapper.items(): + if not vol.staged: + continue + + if vol.type in ("File", "Directory") or ( + inplace_update and vol.type in ("WritableFile", "WritableDirectory") + ): + if not vol.target.startswith(container_outdir): + # this is an input file written outside of the working + # directory, so therefor ineligable for being an output file. + # Thus, none of our business + continue + host_outdir_tgt = os.path.join( + host_outdir, vol.target[len(container_outdir) + 1 :] + ) + if os.path.islink(host_outdir_tgt) or os.path.isfile(host_outdir_tgt): + try: + os.remove(host_outdir_tgt) + except PermissionError: + pass + elif os.path.isdir(host_outdir_tgt) and not vol.resolved.startswith("_:"): + shutil.rmtree(host_outdir_tgt) + if onWindows(): + # If this becomes a big issue for someone then we could + # refactor the code to process output from a running container + # and avoid all the extra IO below + if vol.type in ("File", "WritableFile"): + shutil.copy(vol.resolved, host_outdir_tgt) + elif vol.type in ("Directory", "WritableDirectory"): + copytree_with_merge(vol.resolved, host_outdir_tgt) + elif not vol.resolved.startswith("_:"): + try: + os.symlink(vol.resolved, host_outdir_tgt) + except FileExistsError: + pass + + +def neverquote(string: str, pos: int = 0, endpos: int = 0) -> Optional[Match[str]]: + return None + + +CollectOutputsType = Union[Callable[[str, int], CWLObjectType], functools.partial] + + +class JobBase(HasReqsHints, metaclass=ABCMeta): + def __init__( + self, + builder: Builder, + joborder: CWLObjectType, + make_path_mapper: Callable[..., PathMapper], + requirements: List[CWLObjectType], + hints: List[CWLObjectType], + name: str, + ) -> None: + """Initialize the job object.""" + super().__init__() + self.builder = builder + self.joborder = joborder + self.stdin = None # type: Optional[str] + self.stderr = None # type: Optional[str] + self.stdout = None # type: Optional[str] + self.successCodes = [] # type: Iterable[int] + self.temporaryFailCodes = [] # type: Iterable[int] + self.permanentFailCodes = [] # type: Iterable[int] + self.requirements = requirements + self.hints = hints + self.name = name + self.command_line = [] # type: List[str] + self.pathmapper = PathMapper([], "", "") + self.make_path_mapper = make_path_mapper + self.generatemapper = None # type: Optional[PathMapper] + + # set in CommandLineTool.job(i) + self.collect_outputs = cast(CollectOutputsType, None) + self.output_callback = None # type: Optional[OutputCallbackType] + self.outdir = "" + self.tmpdir = "" + + self.environment = {} # type: MutableMapping[str, str] + self.generatefiles = { + "class": "Directory", + "listing": [], + "basename": "", + } # type: DirectoryType + self.stagedir = None # type: Optional[str] + self.inplace_update = False + self.prov_obj = None # type: Optional[ProvenanceProfile] + self.parent_wf = None # type: Optional[ProvenanceProfile] + self.timelimit = None # type: Optional[int] + self.networkaccess = False # type: bool + self.mpi_procs = None # type: Optional[int] + + def __repr__(self): # type: () -> str + """Represent this Job object.""" + return "CommandLineJob(%s)" % self.name + + @abstractmethod + def run( + self, + runtimeContext: RuntimeContext, + tmpdir_lock: Optional[threading.Lock] = None, + ) -> None: + pass + + def _setup(self, runtimeContext: RuntimeContext) -> None: + if not os.path.exists(self.outdir): + os.makedirs(self.outdir) + + for knownfile in self.pathmapper.files(): + p = self.pathmapper.mapper(knownfile) + if p.type == "File" and not os.path.isfile(p[0]) and p.staged: + raise WorkflowException( + "Input file %s (at %s) not found or is not a regular " + "file." % (knownfile, self.pathmapper.mapper(knownfile)[0]) + ) + + if "listing" in self.generatefiles: + runtimeContext = runtimeContext.copy() + runtimeContext.outdir = self.outdir + self.generatemapper = self.make_path_mapper( + self.generatefiles["listing"], + self.builder.outdir, + runtimeContext, + False, + ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "[job %s] initial work dir %s", + self.name, + json_dumps( + { + p: self.generatemapper.mapper(p) + for p in self.generatemapper.files() + }, + indent=4, + ), + ) + + def _execute( + self, + runtime: List[str], + env: MutableMapping[str, str], + runtimeContext: RuntimeContext, + monitor_function=None, # type: Optional[Callable[[subprocess.Popen[str]], None]] + ) -> None: + + scr = self.get_requirement("ShellCommandRequirement")[0] + + shouldquote = needs_shell_quoting_re.search + if scr is not None: + shouldquote = neverquote + + # If mpi_procs (is not None and > 0) then prepend the + # appropriate MPI job launch command and flags before the + # execution. + if self.mpi_procs: + menv = runtimeContext.mpi_config + mpi_runtime = [ + menv.runner, + menv.nproc_flag, + str(self.mpi_procs), + ] + menv.extra_flags + runtime = mpi_runtime + runtime + menv.pass_through_env_vars(env) + menv.set_env_vars(env) + + _logger.info( + "[job %s] %s$ %s%s%s%s", + self.name, + self.outdir, + " \\\n ".join( + [ + shellescape.quote(str(arg)) if shouldquote(str(arg)) else str(arg) + for arg in (runtime + self.command_line) + ] + ), + " < %s" % self.stdin if self.stdin else "", + " > %s" % os.path.join(self.outdir, self.stdout) if self.stdout else "", + " 2> %s" % os.path.join(self.outdir, self.stderr) if self.stderr else "", + ) + if self.joborder is not None and runtimeContext.research_obj is not None: + job_order = self.joborder + if ( + runtimeContext.process_run_id is not None + and runtimeContext.prov_obj is not None + and isinstance(job_order, (list, dict)) + ): + runtimeContext.prov_obj.used_artefacts( + job_order, runtimeContext.process_run_id, str(self.name) + ) + else: + _logger.warning( + "research_obj set but one of process_run_id " + "or prov_obj is missing from runtimeContext: " + "{}".format(runtimeContext) + ) + outputs = {} # type: CWLObjectType + try: + stdin_path = None + if self.stdin is not None: + rmap = self.pathmapper.reversemap(self.stdin) + if rmap is None: + raise WorkflowException(f"{self.stdin} missing from pathmapper") + else: + stdin_path = rmap[1] + + stderr_path = None + if self.stderr is not None: + abserr = os.path.join(self.outdir, self.stderr) + dnerr = os.path.dirname(abserr) + if dnerr and not os.path.exists(dnerr): + os.makedirs(dnerr) + stderr_path = abserr + + stdout_path = None + if self.stdout is not None: + absout = os.path.join(self.outdir, self.stdout) + dnout = os.path.dirname(absout) + if dnout and not os.path.exists(dnout): + os.makedirs(dnout) + stdout_path = absout + + commands = [str(x) for x in runtime + self.command_line] + if runtimeContext.secret_store is not None: + commands = cast( + List[str], + runtimeContext.secret_store.retrieve(cast(CWLOutputType, commands)), + ) + env = cast( + MutableMapping[str, str], + runtimeContext.secret_store.retrieve(cast(CWLOutputType, env)), + ) + + job_script_contents = None # type: Optional[str] + builder = getattr(self, "builder", None) # type: Builder + if builder is not None: + job_script_contents = builder.build_job_script(commands) + rcode = _job_popen( + commands, + stdin_path=stdin_path, + stdout_path=stdout_path, + stderr_path=stderr_path, + env=env, + cwd=self.outdir, + make_job_dir=lambda: runtimeContext.create_outdir(), + job_script_contents=job_script_contents, + timelimit=self.timelimit, + name=self.name, + monitor_function=monitor_function, + default_stdout=runtimeContext.default_stdout, + default_stderr=runtimeContext.default_stderr, + ) + + if rcode in self.successCodes: + processStatus = "success" + elif rcode in self.temporaryFailCodes: + processStatus = "temporaryFail" + elif rcode in self.permanentFailCodes: + processStatus = "permanentFail" + elif rcode == 0: + processStatus = "success" + else: + processStatus = "permanentFail" + + if "listing" in self.generatefiles: + if self.generatemapper: + relink_initialworkdir( + self.generatemapper, + self.outdir, + self.builder.outdir, + inplace_update=self.inplace_update, + ) + else: + raise ValueError( + "'listing' in self.generatefiles but no " + "generatemapper was setup." + ) + + outputs = self.collect_outputs(self.outdir, rcode) + outputs = bytes2str_in_dicts(outputs) # type: ignore + except OSError as e: + if e.errno == 2: + if runtime: + _logger.error("'%s' not found: %s", runtime[0], str(e)) + else: + _logger.error("'%s' not found: %s", self.command_line[0], str(e)) + else: + _logger.exception("Exception while running job") + processStatus = "permanentFail" + except WorkflowException as err: + _logger.error("[job %s] Job error:\n%s", self.name, str(err)) + processStatus = "permanentFail" + except Exception: + _logger.exception("Exception while running job") + processStatus = "permanentFail" + if ( + runtimeContext.research_obj is not None + and self.prov_obj is not None + and runtimeContext.process_run_id is not None + ): + # creating entities for the outputs produced by each step (in the provenance document) + self.prov_obj.record_process_end( + str(self.name), + runtimeContext.process_run_id, + outputs, + datetime.datetime.now(), + ) + if processStatus != "success": + _logger.warning("[job %s] completed %s", self.name, processStatus) + else: + _logger.info("[job %s] completed %s", self.name, processStatus) + + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "[job %s] outputs %s", self.name, json_dumps(outputs, indent=4) + ) + + if self.generatemapper is not None and runtimeContext.secret_store is not None: + # Delete any runtime-generated files containing secrets. + for _, p in self.generatemapper.items(): + if p.type == "CreateFile": + if runtimeContext.secret_store.has_secret(p.resolved): + host_outdir = self.outdir + container_outdir = self.builder.outdir + host_outdir_tgt = p.target + if p.target.startswith(container_outdir + "/"): + host_outdir_tgt = os.path.join( + host_outdir, p.target[len(container_outdir) + 1 :] + ) + os.remove(host_outdir_tgt) + + if runtimeContext.workflow_eval_lock is None: + raise WorkflowException( + "runtimeContext.workflow_eval_lock must not be None" + ) + + if self.output_callback: + with runtimeContext.workflow_eval_lock: + self.output_callback(outputs, processStatus) + + if self.stagedir is not None and os.path.exists(self.stagedir): + _logger.debug( + "[job %s] Removing input staging directory %s", + self.name, + self.stagedir, + ) + shutil.rmtree(self.stagedir, True) + + if runtimeContext.rm_tmpdir: + _logger.debug( + "[job %s] Removing temporary directory %s", self.name, self.tmpdir + ) + shutil.rmtree(self.tmpdir, True) + + def process_monitor(self, sproc): # type: (subprocess.Popen[str]) -> None + monitor = psutil.Process(sproc.pid) + # Value must be list rather than integer to utilise pass-by-reference in python + memory_usage = [None] # type: MutableSequence[Optional[int]] + + def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None: + children = monitor.children() + rss = monitor.memory_info().rss + while len(children): + rss += sum([process.memory_info().rss for process in children]) + children = list( + itertools.chain(*[process.children() for process in children]) + ) + if memory_usage[0] is None or rss > memory_usage[0]: + memory_usage[0] = rss + + mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,)) + mem_tm.daemon = True + mem_tm.start() + sproc.wait() + mem_tm.cancel() + if memory_usage[0] is not None: + _logger.info( + "[job %s] Max memory used: %iMiB", + self.name, + round(memory_usage[0] / (2 ** 20)), + ) + else: + _logger.debug( + "Could not collect memory usage, job ended before monitoring began." + ) + + +class CommandLineJob(JobBase): + def run( + self, + runtimeContext: RuntimeContext, + tmpdir_lock: Optional[threading.Lock] = None, + ) -> None: + + if tmpdir_lock: + with tmpdir_lock: + if not os.path.exists(self.tmpdir): + os.makedirs(self.tmpdir) + else: + if not os.path.exists(self.tmpdir): + os.makedirs(self.tmpdir) + + self._setup(runtimeContext) + + env = self.environment + vars_to_preserve = runtimeContext.preserve_environment + if runtimeContext.preserve_entire_environment is not False: + vars_to_preserve = os.environ + if vars_to_preserve: + for key, value in os.environ.items(): + if key in vars_to_preserve and key not in env: + # On Windows, subprocess env can't handle unicode. + env[key] = str(value) if onWindows() else value + env["HOME"] = str(self.outdir) if onWindows() else self.outdir + env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir + if "PATH" not in env: + env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"] + if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ: + env["SYSTEMROOT"] = ( + str(os.environ["SYSTEMROOT"]) + if onWindows() + else os.environ["SYSTEMROOT"] + ) + + stage_files( + self.pathmapper, + ignore_writable=True, + symlink=True, + secret_store=runtimeContext.secret_store, + ) + if self.generatemapper is not None: + stage_files( + self.generatemapper, + ignore_writable=self.inplace_update, + symlink=True, + secret_store=runtimeContext.secret_store, + ) + relink_initialworkdir( + self.generatemapper, + self.outdir, + self.builder.outdir, + inplace_update=self.inplace_update, + ) + + monitor_function = functools.partial(self.process_monitor) + + self._execute([], env, runtimeContext, monitor_function) + + +CONTROL_CODE_RE = r"\x1b\[[0-9;]*[a-zA-Z]" + + +class ContainerCommandLineJob(JobBase, metaclass=ABCMeta): + """Commandline job using containers.""" + + @abstractmethod + def get_from_requirements( + self, + r: CWLObjectType, + pull_image: bool, + force_pull: bool, + tmp_outdir_prefix: str, + ) -> Optional[str]: + pass + + @abstractmethod + def create_runtime( + self, + env: MutableMapping[str, str], + runtime_context: RuntimeContext, + ) -> Tuple[List[str], Optional[str]]: + """Return the list of commands to run the selected container engine.""" + + @staticmethod + @abstractmethod + def append_volume( + runtime: List[str], source: str, target: str, writable: bool = False + ) -> None: + """Add binding arguments to the runtime list.""" + + @abstractmethod + def add_file_or_directory_volume( + self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str] + ) -> None: + """Append volume a file/dir mapping to the runtime option list.""" + + @abstractmethod + def add_writable_file_volume( + self, + runtime: List[str], + volume: MapperEnt, + host_outdir_tgt: Optional[str], + tmpdir_prefix: str, + ) -> None: + """Append a writable file mapping to the runtime option list.""" + + @abstractmethod + def add_writable_directory_volume( + self, + runtime: List[str], + volume: MapperEnt, + host_outdir_tgt: Optional[str], + tmpdir_prefix: str, + ) -> None: + """Append a writable directory mapping to the runtime option list.""" + + def create_file_and_add_volume( + self, + runtime: List[str], + volume: MapperEnt, + host_outdir_tgt: Optional[str], + secret_store: Optional[SecretStore], + tmpdir_prefix: str, + ) -> str: + """Create the file and add a mapping.""" + if not host_outdir_tgt: + new_file = os.path.join( + create_tmp_dir(tmpdir_prefix), + os.path.basename(volume.target), + ) + writable = True if volume.type == "CreateWritableFile" else False + contents = volume.resolved + if secret_store: + contents = cast(str, secret_store.retrieve(volume.resolved)) + dirname = os.path.dirname(host_outdir_tgt or new_file) + if not os.path.exists(dirname): + os.makedirs(dirname) + with open(host_outdir_tgt or new_file, "wb") as file_literal: + file_literal.write(contents.encode("utf-8")) + if not host_outdir_tgt: + self.append_volume(runtime, new_file, volume.target, writable=writable) + if writable: + ensure_writable(host_outdir_tgt or new_file) + else: + ensure_non_writable(host_outdir_tgt or new_file) + return host_outdir_tgt or new_file + + def add_volumes( + self, + pathmapper: PathMapper, + runtime: List[str], + tmpdir_prefix: str, + secret_store: Optional[SecretStore] = None, + any_path_okay: bool = False, + ) -> None: + """Append volume mappings to the runtime option list.""" + container_outdir = self.builder.outdir + for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): + host_outdir_tgt = None # type: Optional[str] + if vol.target.startswith(container_outdir + "/"): + host_outdir_tgt = os.path.join( + self.outdir, vol.target[len(container_outdir) + 1 :] + ) + if not host_outdir_tgt and not any_path_okay: + raise WorkflowException( + "No mandatory DockerRequirement, yet path is outside " + "the designated output directory, also know as " + "$(runtime.outdir): {}".format(vol) + ) + if vol.type in ("File", "Directory"): + self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) + elif vol.type == "WritableFile": + self.add_writable_file_volume( + runtime, vol, host_outdir_tgt, tmpdir_prefix + ) + elif vol.type == "WritableDirectory": + self.add_writable_directory_volume( + runtime, vol, host_outdir_tgt, tmpdir_prefix + ) + elif vol.type in ["CreateFile", "CreateWritableFile"]: + new_path = self.create_file_and_add_volume( + runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix + ) + pathmapper.update(key, new_path, vol.target, vol.type, vol.staged) + + def run( + self, + runtimeContext: RuntimeContext, + tmpdir_lock: Optional[threading.Lock] = None, + ) -> None: + if tmpdir_lock: + with tmpdir_lock: + if not os.path.exists(self.tmpdir): + os.makedirs(self.tmpdir) + else: + if not os.path.exists(self.tmpdir): + os.makedirs(self.tmpdir) + + (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") + self.prov_obj = runtimeContext.prov_obj + img_id = None + env = cast(MutableMapping[str, str], os.environ) + user_space_docker_cmd = runtimeContext.user_space_docker_cmd + if docker_req is not None and user_space_docker_cmd: + # For user-space docker implementations, a local image name or ID + # takes precedence over a network pull + if "dockerImageId" in docker_req: + img_id = str(docker_req["dockerImageId"]) + elif "dockerPull" in docker_req: + img_id = str(docker_req["dockerPull"]) + cmd = [user_space_docker_cmd, "pull", img_id] + _logger.info(str(cmd)) + try: + subprocess.check_call(cmd, stdout=sys.stderr) # nosec + except OSError: + raise WorkflowException( + SourceLine(docker_req).makeError( + "Either Docker container {} is not available with " + "user space docker implementation {} or {} is missing " + "or broken.".format( + img_id, user_space_docker_cmd, user_space_docker_cmd + ) + ) + ) + else: + raise WorkflowException( + SourceLine(docker_req).makeError( + "Docker image must be specified as 'dockerImageId' or " + "'dockerPull' when using user space implementations of " + "Docker" + ) + ) + else: + try: + if docker_req is not None and runtimeContext.use_container: + img_id = str( + self.get_from_requirements( + docker_req, + runtimeContext.pull_image, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + ) + ) + if img_id is None: + if self.builder.find_default_container: + default_container = self.builder.find_default_container() + if default_container: + img_id = str(default_container) + + if ( + docker_req is not None + and img_id is None + and runtimeContext.use_container + ): + raise Exception("Docker image not available") + + if ( + self.prov_obj is not None + and img_id is not None + and runtimeContext.process_run_id is not None + ): + container_agent = self.prov_obj.document.agent( + uuid.uuid4().urn, + { + "prov:type": PROV["SoftwareAgent"], + "cwlprov:image": img_id, + "prov:label": "Container execution of image %s" % img_id, + }, + ) + # FIXME: img_id is not a sha256 id, it might just be "debian:8" + # img_entity = document.entity("nih:sha-256;%s" % img_id, + # {"prov:label": "Container image %s" % img_id} ) + # The image is the plan for this activity-agent association + # document.wasAssociatedWith(process_run_ID, container_agent, img_entity) + self.prov_obj.document.wasAssociatedWith( + runtimeContext.process_run_id, container_agent + ) + except Exception as err: + container = "Singularity" if runtimeContext.singularity else "Docker" + _logger.debug("%s error", container, exc_info=True) + if docker_is_req: + raise UnsupportedRequirement( + "{} is required to run this tool: {}".format( + container, str(err) + ) + ) from err + else: + raise WorkflowException( + "{0} is not available for this tool, try " + "--no-container to disable {0}, or install " + "a user space Docker replacement like uDocker with " + "--user-space-docker-cmd.: {1}".format(container, err) + ) + + self._setup(runtimeContext) + (runtime, cidfile) = self.create_runtime(env, runtimeContext) + runtime.append(str(img_id)) + monitor_function = None + if cidfile: + monitor_function = functools.partial( + self.docker_monitor, + cidfile, + runtimeContext.tmpdir_prefix, + not bool(runtimeContext.cidfile_dir), + ) + elif runtimeContext.user_space_docker_cmd: + monitor_function = functools.partial(self.process_monitor) + self._execute(runtime, env, runtimeContext, monitor_function) + + def docker_monitor( + self, + cidfile: str, + tmpdir_prefix: str, + cleanup_cidfile: bool, + process, # type: subprocess.Popen[str] + ) -> None: + """Record memory usage of the running Docker container.""" + # Todo: consider switching to `docker create` / `docker start` + # instead of `docker run` as `docker create` outputs the container ID + # to stdout, but the container is frozen, thus allowing us to start the + # monitoring process without dealing with the cidfile or too-fast + # container execution + cid = None # type: Optional[str] + while cid is None: + time.sleep(1) + if process.returncode is not None: + if cleanup_cidfile: + try: + os.remove(cidfile) + except OSError as exc: + _logger.warn( + "Ignored error cleaning up Docker cidfile: %s", exc + ) + return + try: + with open(cidfile) as cidhandle: + cid = cidhandle.readline().strip() + except (OSError): + cid = None + max_mem = psutil.virtual_memory().total + tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) + stats_file = tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir) + stats_file_name = stats_file.name + try: + with open(stats_file_name, mode="w") as stats_file_handle: + stats_proc = subprocess.Popen( # nosec + ["docker", "stats", "--no-trunc", "--format", "{{.MemPerc}}", cid], + stdout=stats_file_handle, + stderr=subprocess.DEVNULL, + ) + process.wait() + stats_proc.kill() + except OSError as exc: + _logger.warn("Ignored error with docker stats: %s", exc) + return + max_mem_percent = 0 # type: float + mem_percent = 0 # type: float + with open(stats_file_name, mode="r") as stats: + while True: + line = stats.readline() + if not line: + break + try: + mem_percent = float( + re.sub(CONTROL_CODE_RE, "", line).replace("%", "") + ) + if mem_percent > max_mem_percent: + max_mem_percent = mem_percent + except ValueError: + break + _logger.info( + "[job %s] Max memory used: %iMiB", + self.name, + int((max_mem_percent / 100 * max_mem) / (2 ** 20)), + ) + if cleanup_cidfile: + os.remove(cidfile) + + +def _job_popen( + commands: List[str], + stdin_path: Optional[str], + stdout_path: Optional[str], + stderr_path: Optional[str], + env: MutableMapping[str, str], + cwd: str, + make_job_dir: Callable[[], str], + job_script_contents: Optional[str] = None, + timelimit: Optional[int] = None, + name: Optional[str] = None, + monitor_function=None, # type: Optional[Callable[[subprocess.Popen[str]], None]] + default_stdout=None, # type: Optional[Union[IO[bytes], TextIO]] + default_stderr=None, # type: Optional[Union[IO[bytes], TextIO]] +) -> int: + + if job_script_contents is None and not FORCE_SHELLED_POPEN: + + stdin = subprocess.PIPE # type: Union[IO[bytes], int] + if stdin_path is not None: + stdin = open(stdin_path, "rb") + + stdout = ( + default_stdout if default_stdout is not None else sys.stderr + ) # type: Union[IO[bytes], TextIO] + if stdout_path is not None: + stdout = open(stdout_path, "wb") + + stderr = ( + default_stderr if default_stderr is not None else sys.stderr + ) # type: Union[IO[bytes], TextIO] + if stderr_path is not None: + stderr = open(stderr_path, "wb") + + sproc = subprocess.Popen( + commands, + shell=False, # nosec + close_fds=not onWindows(), + stdin=stdin, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd, + universal_newlines=True, + ) + processes_to_kill.append(sproc) + + if sproc.stdin is not None: + sproc.stdin.close() + + tm = None + if timelimit is not None and timelimit > 0: + + def terminate(): # type: () -> None + try: + _logger.warning( + "[job %s] exceeded time limit of %d seconds and will be terminated", + name, + timelimit, + ) + sproc.terminate() + except OSError: + pass + + tm = Timer(timelimit, terminate) + tm.daemon = True + tm.start() + if monitor_function: + monitor_function(sproc) + rcode = sproc.wait() + + if tm is not None: + tm.cancel() + + if isinstance(stdin, IOBase) and hasattr(stdin, "close"): + stdin.close() + + if stdout is not sys.stderr and hasattr(stdout, "close"): + stdout.close() + + if stderr is not sys.stderr and hasattr(stderr, "close"): + stderr.close() + + return rcode + else: + if job_script_contents is None: + job_script_contents = SHELL_COMMAND_TEMPLATE + + env_copy = {} + key = None # type: Optional[str] + for key in env: + env_copy[key] = env[key] + + job_description = { + "commands": commands, + "cwd": cwd, + "env": env_copy, + "stdout_path": stdout_path, + "stderr_path": stderr_path, + "stdin_path": stdin_path, + } + + job_dir = make_job_dir() + try: + with open( + os.path.join(job_dir, "job.json"), mode="w", encoding="utf-8" + ) as job_file: + json_dump(job_description, job_file, ensure_ascii=False) + job_script = os.path.join(job_dir, "run_job.bash") + with open(job_script, "wb") as _: + _.write(job_script_contents.encode("utf-8")) + job_run = os.path.join(job_dir, "run_job.py") + with open(job_run, "wb") as _: + _.write(PYTHON_RUN_SCRIPT.encode("utf-8")) + sproc = subprocess.Popen( # nosec + ["bash", job_script.encode("utf-8")], + shell=False, # nosec + cwd=job_dir, + # The nested script will output the paths to the correct files if they need + # to be captured. Else just write everything to stderr (same as above). + stdout=sys.stderr, + stderr=sys.stderr, + stdin=subprocess.PIPE, + universal_newlines=True, + ) + processes_to_kill.append(sproc) + if sproc.stdin is not None: + sproc.stdin.close() + + rcode = sproc.wait() + + return rcode + finally: + shutil.rmtree(job_dir)