Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/job.py @ 4:79f47841a781 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:47:39 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
from __future__ import absolute_import import datetime import functools import itertools import logging import threading import os import re import shutil import stat import sys import tempfile import time import uuid from abc import ABCMeta, abstractmethod from io import IOBase, open # pylint: disable=redefined-builtin from threading import Timer from typing import (IO, Any, AnyStr, Callable, Dict, Iterable, List, Tuple, MutableMapping, MutableSequence, Optional, Union, cast) import psutil import shellescape from prov.model import PROV from schema_salad.sourceline import SourceLine from six import PY2, with_metaclass from future.utils import raise_from from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import Text) from .builder import Builder, HasReqsHints # pylint: disable=unused-import from .context import RuntimeContext # pylint: disable=unused-import from .context import getdefault from .errors import WorkflowException from .expression import JSON from .loghandler import _logger from .pathmapper import (MapperEnt, PathMapper, # pylint: disable=unused-import ensure_writable, ensure_non_writable) from .process import UnsupportedRequirement, stage_files from .secrets import SecretStore # pylint: disable=unused-import from .utils import (DEFAULT_TMP_PREFIX, Directory, bytes2str_in_dicts, copytree_with_merge, json_dump, json_dumps, onWindows, processes_to_kill, subprocess) if TYPE_CHECKING: from .provenance import ProvenanceProfile # pylint: disable=unused-import needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1" SHELL_COMMAND_TEMPLATE = u"""#!/bin/bash python "run_job.py" "job.json" """ PYTHON_RUN_SCRIPT = u""" import json import os import sys if os.name == 'posix': try: import subprocess32 as subprocess # type: ignore except Exception: import subprocess else: import subprocess # type: ignore with open(sys.argv[1], "r") as f: popen_description = json.load(f) commands = popen_description["commands"] cwd = popen_description["cwd"] env = popen_description["env"] env["PATH"] = os.environ.get("PATH") stdin_path = popen_description["stdin_path"] stdout_path = popen_description["stdout_path"] stderr_path = popen_description["stderr_path"] if stdin_path is not None: stdin = open(stdin_path, "rb") else: stdin = subprocess.PIPE if stdout_path is not None: stdout = open(stdout_path, "wb") else: stdout = sys.stderr if stderr_path is not None: stderr = open(stderr_path, "wb") else: stderr = sys.stderr if os.name == 'nt': close_fds = False for key, value in env.items(): env[key] = str(value) else: close_fds = True sp = subprocess.Popen(commands, shell=False, close_fds=close_fds, stdin=stdin, stdout=stdout, stderr=stderr, env=env, cwd=cwd) if sp.stdin: sp.stdin.close() rcode = sp.wait() if stdin is not subprocess.PIPE: stdin.close() if stdout is not sys.stderr: stdout.close() if stderr is not sys.stderr: stderr.close() sys.exit(rcode) """ def deref_links(outputs): # type: (Any) -> None if isinstance(outputs, MutableMapping): if outputs.get("class") == "File": st = os.lstat(outputs["path"]) if stat.S_ISLNK(st.st_mode): outputs["basename"] = os.path.basename(outputs["path"]) outputs["path"] = os.readlink(outputs["path"]) else: for v in outputs.values(): deref_links(v) if isinstance(outputs, MutableSequence): for output in outputs: deref_links(output) def relink_initialworkdir(pathmapper, # type: PathMapper host_outdir, # type: Text container_outdir, # type: Text inplace_update=False # type: bool ): # type: (...) -> None for _, vol in pathmapper.items(): if not vol.staged: continue if (vol.type in ("File", "Directory") or ( inplace_update and vol.type in ("WritableFile", "WritableDirectory"))): if not vol.target.startswith(container_outdir): # this is an input file written outside of the working # directory, so therefor ineligable for being an output file. # Thus, none of our business continue host_outdir_tgt = os.path.join( host_outdir, vol.target[len(container_outdir) + 1:]) if os.path.islink(host_outdir_tgt) \ or os.path.isfile(host_outdir_tgt): os.remove(host_outdir_tgt) elif os.path.isdir(host_outdir_tgt) \ and not vol.resolved.startswith("_:"): shutil.rmtree(host_outdir_tgt) if onWindows(): # If this becomes a big issue for someone then we could # refactor the code to process output from a running container # and avoid all the extra IO below if vol.type in ("File", "WritableFile"): shutil.copy(vol.resolved, host_outdir_tgt) elif vol.type in ("Directory", "WritableDirectory"): copytree_with_merge(vol.resolved, host_outdir_tgt) elif not vol.resolved.startswith("_:"): os.symlink(vol.resolved, host_outdir_tgt) class JobBase(with_metaclass(ABCMeta, HasReqsHints)): def __init__(self, builder, # type: Builder joborder, # type: JSON make_path_mapper, # type: Callable[..., PathMapper] requirements, # type: List[Dict[Text, Text]] hints, # type: List[Dict[Text, Text]] name, # type: Text ): # type: (...) -> None """Initialize the job object.""" self.builder = builder self.joborder = joborder self.stdin = None # type: Optional[Text] self.stderr = None # type: Optional[Text] self.stdout = None # type: Optional[Text] self.successCodes = [] # type: Iterable[int] self.temporaryFailCodes = [] # type: Iterable[int] self.permanentFailCodes = [] # type: Iterable[int] self.requirements = requirements self.hints = hints self.name = name self.command_line = [] # type: List[Text] self.pathmapper = PathMapper([], u"", u"") self.make_path_mapper = make_path_mapper self.generatemapper = None # type: Optional[PathMapper] # set in CommandLineTool.job(i) self.collect_outputs = cast(Callable[[Text, int], MutableMapping[Text, Any]], None) # type: Union[Callable[[Text, int], MutableMapping[Text, Any]], functools.partial[MutableMapping[Text, Any]]] self.output_callback = cast(Callable[[Any, Any], Any], None) self.outdir = u"" self.tmpdir = u"" self.environment = {} # type: MutableMapping[Text, Text] self.generatefiles = {"class": "Directory", "listing": [], "basename": ""} # type: Directory self.stagedir = None # type: Optional[Text] self.inplace_update = False self.prov_obj = None # type: Optional[ProvenanceProfile] self.parent_wf = None # type: Optional[ProvenanceProfile] self.timelimit = None # type: Optional[int] self.networkaccess = False # type: bool def __repr__(self): # type: () -> str """Represent this Job object.""" return "CommandLineJob(%s)" % self.name @abstractmethod def run(self, runtimeContext, # type: RuntimeContext tmpdir_lock=None # type: Optional[threading.Lock] ): # type: (...) -> None pass def _setup(self, runtimeContext): # type: (RuntimeContext) -> None if not os.path.exists(self.outdir): os.makedirs(self.outdir) for knownfile in self.pathmapper.files(): p = self.pathmapper.mapper(knownfile) if p.type == "File" and not os.path.isfile(p[0]) and p.staged: raise WorkflowException( u"Input file %s (at %s) not found or is not a regular " "file." % (knownfile, self.pathmapper.mapper(knownfile)[0])) if 'listing' in self.generatefiles: runtimeContext = runtimeContext.copy() runtimeContext.outdir = self.outdir self.generatemapper = self.make_path_mapper( cast(List[Any], self.generatefiles["listing"]), self.builder.outdir, runtimeContext, False) if _logger.isEnabledFor(logging.DEBUG): _logger.debug( u"[job %s] initial work dir %s", self.name, json_dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4)) def _execute(self, runtime, # type: List[Text] env, # type: MutableMapping[Text, Text] runtimeContext, # type: RuntimeContext monitor_function=None, # type: Optional[Callable[[subprocess.Popen], None]] ): # type: (...) -> None scr, _ = self.get_requirement("ShellCommandRequirement") shouldquote = needs_shell_quoting_re.search # type: Callable[[Any], Any] if scr is not None: shouldquote = lambda x: False _logger.info(u"[job %s] %s$ %s%s%s%s", self.name, self.outdir, " \\\n ".join([shellescape.quote(Text(arg)) if shouldquote(Text(arg)) else Text(arg) for arg in (runtime + self.command_line)]), u' < %s' % self.stdin if self.stdin else '', u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '', u' 2> %s' % os.path.join(self.outdir, self.stderr) if self.stderr else '') if self.joborder is not None and runtimeContext.research_obj is not None: job_order = self.joborder if runtimeContext.process_run_id is not None \ and runtimeContext.prov_obj is not None and isinstance(job_order, (list, dict)): runtimeContext.prov_obj.used_artefacts( job_order, runtimeContext.process_run_id, str(self.name)) else: _logger.warning("research_obj set but one of process_run_id " "or prov_obj is missing from runtimeContext: " "{}". format(runtimeContext)) outputs = {} # type: MutableMapping[Text,Any] try: stdin_path = None if self.stdin is not None: rmap = self.pathmapper.reversemap(self.stdin) if rmap is None: raise WorkflowException( "{} missing from pathmapper".format(self.stdin)) else: stdin_path = rmap[1] stderr_path = None if self.stderr is not None: abserr = os.path.join(self.outdir, self.stderr) dnerr = os.path.dirname(abserr) if dnerr and not os.path.exists(dnerr): os.makedirs(dnerr) stderr_path = abserr stdout_path = None if self.stdout is not None: absout = os.path.join(self.outdir, self.stdout) dnout = os.path.dirname(absout) if dnout and not os.path.exists(dnout): os.makedirs(dnout) stdout_path = absout commands = [Text(x) for x in runtime + self.command_line] if runtimeContext.secret_store is not None: commands = runtimeContext.secret_store.retrieve(commands) env = runtimeContext.secret_store.retrieve(env) job_script_contents = None # type: Optional[Text] builder = getattr(self, "builder", None) # type: Builder if builder is not None: job_script_contents = builder.build_job_script(commands) rcode = _job_popen( commands, stdin_path=stdin_path, stdout_path=stdout_path, stderr_path=stderr_path, env=env, cwd=self.outdir, job_dir=tempfile.mkdtemp(prefix=getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX)), job_script_contents=job_script_contents, timelimit=self.timelimit, name=self.name, monitor_function=monitor_function ) if rcode in self.successCodes: processStatus = "success" elif rcode in self.temporaryFailCodes: processStatus = "temporaryFail" elif rcode in self.permanentFailCodes: processStatus = "permanentFail" elif rcode == 0: processStatus = "success" else: processStatus = "permanentFail" if 'listing' in self.generatefiles: if self.generatemapper: relink_initialworkdir( self.generatemapper, self.outdir, self.builder.outdir, inplace_update=self.inplace_update) else: raise ValueError("'lsiting' in self.generatefiles but no " "generatemapper was setup.") outputs = self.collect_outputs(self.outdir, rcode) outputs = bytes2str_in_dicts(outputs) # type: ignore except OSError as e: if e.errno == 2: if runtime: _logger.error(u"'%s' not found: %s", runtime[0], Text(e)) else: _logger.error(u"'%s' not found: %s", self.command_line[0], Text(e)) else: _logger.exception(u"Exception while running job") processStatus = "permanentFail" except WorkflowException as err: _logger.error(u"[job %s] Job error:\n%s", self.name, Text(err)) processStatus = "permanentFail" except Exception as e: _logger.exception(u"Exception while running job") processStatus = "permanentFail" if runtimeContext.research_obj is not None \ and self.prov_obj is not None \ and runtimeContext.process_run_id is not None: # creating entities for the outputs produced by each step (in the provenance document) self.prov_obj.record_process_end(str(self.name), runtimeContext.process_run_id, outputs, datetime.datetime.now()) if processStatus != "success": _logger.warning(u"[job %s] completed %s", self.name, processStatus) else: _logger.info(u"[job %s] completed %s", self.name, processStatus) if _logger.isEnabledFor(logging.DEBUG): _logger.debug(u"[job %s] %s", self.name, json_dumps(outputs, indent=4)) if self.generatemapper is not None and runtimeContext.secret_store is not None: # Delete any runtime-generated files containing secrets. for _, p in self.generatemapper.items(): if p.type == "CreateFile": if runtimeContext.secret_store.has_secret(p.resolved): host_outdir = self.outdir container_outdir = self.builder.outdir host_outdir_tgt = p.target if p.target.startswith(container_outdir + "/"): host_outdir_tgt = os.path.join( host_outdir, p.target[len(container_outdir) + 1:]) os.remove(host_outdir_tgt) if runtimeContext.workflow_eval_lock is None: raise WorkflowException("runtimeContext.workflow_eval_lock must not be None") with runtimeContext.workflow_eval_lock: self.output_callback(outputs, processStatus) if self.stagedir is not None and os.path.exists(self.stagedir): _logger.debug(u"[job %s] Removing input staging directory %s", self.name, self.stagedir) shutil.rmtree(self.stagedir, True) if runtimeContext.rm_tmpdir: _logger.debug(u"[job %s] Removing temporary directory %s", self.name, self.tmpdir) shutil.rmtree(self.tmpdir, True) def process_monitor(self, sproc): # type: (subprocess.Popen) -> None monitor = psutil.Process(sproc.pid) memory_usage = [None] # Value must be list rather than integer to utilise pass-by-reference in python def get_tree_mem_usage(memory_usage): # type: (List[int]) -> None children = monitor.children() rss = monitor.memory_info().rss while len(children): rss += sum([process.memory_info().rss for process in children]) children = list(itertools.chain(*[process.children() for process in children])) if memory_usage[0] is None or rss > memory_usage[0]: memory_usage[0] = rss mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,)) mem_tm.daemon = True mem_tm.start() sproc.wait() mem_tm.cancel() if memory_usage[0] is not None: _logger.info(u"[job %s] Max memory used: %iMiB", self.name, round(memory_usage[0] / (2 ** 20))) else: _logger.debug(u"Could not collect memory usage, job ended before monitoring began.") class CommandLineJob(JobBase): def run(self, runtimeContext, # type: RuntimeContext tmpdir_lock=None # type: Optional[threading.Lock] ): # type: (...) -> None if tmpdir_lock: with tmpdir_lock: if not os.path.exists(self.tmpdir): os.makedirs(self.tmpdir) else: if not os.path.exists(self.tmpdir): os.makedirs(self.tmpdir) self._setup(runtimeContext) env = self.environment vars_to_preserve = runtimeContext.preserve_environment if runtimeContext.preserve_entire_environment is not False: vars_to_preserve = os.environ if vars_to_preserve: for key, value in os.environ.items(): if key in vars_to_preserve and key not in env: # On Windows, subprocess env can't handle unicode. env[key] = str(value) if onWindows() else value env["HOME"] = str(self.outdir) if onWindows() else self.outdir env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir if "PATH" not in env: env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"] if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ: env["SYSTEMROOT"] = str(os.environ["SYSTEMROOT"]) if onWindows() \ else os.environ["SYSTEMROOT"] stage_files(self.pathmapper, ignore_writable=True, symlink=True, secret_store=runtimeContext.secret_store) if self.generatemapper is not None: stage_files(self.generatemapper, ignore_writable=self.inplace_update, symlink=True, secret_store=runtimeContext.secret_store) relink_initialworkdir( self.generatemapper, self.outdir, self.builder.outdir, inplace_update=self.inplace_update) monitor_function = functools.partial(self.process_monitor) self._execute([], env, runtimeContext, monitor_function) CONTROL_CODE_RE = r'\x1b\[[0-9;]*[a-zA-Z]' class ContainerCommandLineJob(with_metaclass(ABCMeta, JobBase)): """Commandline job using containers.""" @abstractmethod def get_from_requirements(self, r, # type: Dict[Text, Text] pull_image, # type: bool force_pull=False, # type: bool tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text ): # type: (...) -> Optional[Text] pass @abstractmethod def create_runtime(self, env, # type: MutableMapping[Text, Text] runtime_context # type: RuntimeContext ): # type: (...) -> Tuple[List[Text], Optional[Text]] """Return the list of commands to run the selected container engine.""" pass @staticmethod @abstractmethod def append_volume(runtime, source, target, writable=False): # type: (List[Text], Text, Text, bool) -> None """Add binding arguments to the runtime list.""" pass @abstractmethod def add_file_or_directory_volume(self, runtime, # type: List[Text] volume, # type: MapperEnt host_outdir_tgt # type: Optional[Text] ): # type: (...) -> None """Append volume a file/dir mapping to the runtime option list.""" pass @abstractmethod def add_writable_file_volume(self, runtime, # type: List[Text] volume, # type: MapperEnt host_outdir_tgt, # type: Optional[Text] tmpdir_prefix # type: Text ): # type: (...) -> None """Append a writable file mapping to the runtime option list.""" pass @abstractmethod def add_writable_directory_volume(self, runtime, # type: List[Text] volume, # type: MapperEnt host_outdir_tgt, # type: Optional[Text] tmpdir_prefix # type: Text ): # type: (...) -> None """Append a writable directory mapping to the runtime option list.""" pass def create_file_and_add_volume(self, runtime, # type: List[Text] volume, # type: MapperEnt host_outdir_tgt, # type: Optional[Text] secret_store, # type: Optional[SecretStore] tmpdir_prefix # type: Text ): # type: (...) -> Text """Create the file and add a mapping.""" if not host_outdir_tgt: tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) new_file = os.path.join( tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir), os.path.basename(volume.target)) writable = True if volume.type == "CreateWritableFile" else False if secret_store: contents = secret_store.retrieve(volume.resolved) else: contents = volume.resolved dirname = os.path.dirname(host_outdir_tgt or new_file) if not os.path.exists(dirname): os.makedirs(dirname) with open(host_outdir_tgt or new_file, "wb") as file_literal: file_literal.write(contents.encode("utf-8")) if not host_outdir_tgt: self.append_volume(runtime, new_file, volume.target, writable=writable) if writable: ensure_writable(host_outdir_tgt or new_file) else: ensure_non_writable(host_outdir_tgt or new_file) return host_outdir_tgt or new_file def add_volumes(self, pathmapper, # type: PathMapper runtime, # type: List[Text] tmpdir_prefix, # type: Text secret_store=None, # type: Optional[SecretStore] any_path_okay=False # type: bool ): # type: (...) -> None """Append volume mappings to the runtime option list.""" container_outdir = self.builder.outdir for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): host_outdir_tgt = None # type: Optional[Text] if vol.target.startswith(container_outdir + "/"): host_outdir_tgt = os.path.join( self.outdir, vol.target[len(container_outdir) + 1:]) if not host_outdir_tgt and not any_path_okay: raise WorkflowException( "No mandatory DockerRequirement, yet path is outside " "the designated output directory, also know as " "$(runtime.outdir): {}".format(vol)) if vol.type in ("File", "Directory"): self.add_file_or_directory_volume( runtime, vol, host_outdir_tgt) elif vol.type == "WritableFile": self.add_writable_file_volume( runtime, vol, host_outdir_tgt, tmpdir_prefix) elif vol.type == "WritableDirectory": self.add_writable_directory_volume( runtime, vol, host_outdir_tgt, tmpdir_prefix) elif vol.type in ["CreateFile", "CreateWritableFile"]: new_path = self.create_file_and_add_volume( runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix) pathmapper.update( key, new_path, vol.target, vol.type, vol.staged) def run(self, runtimeContext, # type: RuntimeContext tmpdir_lock=None # type: Optional[threading.Lock] ): # type: (...) -> None if tmpdir_lock: with tmpdir_lock: if not os.path.exists(self.tmpdir): os.makedirs(self.tmpdir) else: if not os.path.exists(self.tmpdir): os.makedirs(self.tmpdir) (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") self.prov_obj = runtimeContext.prov_obj img_id = None env = cast(MutableMapping[Text, Text], os.environ) user_space_docker_cmd = runtimeContext.user_space_docker_cmd if docker_req is not None and user_space_docker_cmd: # For user-space docker implementations, a local image name or ID # takes precedence over a network pull if 'dockerImageId' in docker_req: img_id = str(docker_req["dockerImageId"]) elif 'dockerPull' in docker_req: img_id = str(docker_req["dockerPull"]) cmd = [user_space_docker_cmd, "pull", img_id] _logger.info(Text(cmd)) try: subprocess.check_call(cmd, stdout=sys.stderr) except OSError: raise WorkflowException(SourceLine(docker_req).makeError( "Either Docker container {} is not available with " "user space docker implementation {} or {} is missing " "or broken.".format(img_id, user_space_docker_cmd, user_space_docker_cmd))) else: raise WorkflowException(SourceLine(docker_req).makeError( "Docker image must be specified as 'dockerImageId' or " "'dockerPull' when using user space implementations of " "Docker")) else: try: if docker_req is not None and runtimeContext.use_container: img_id = str( self.get_from_requirements( docker_req, runtimeContext.pull_image, getdefault(runtimeContext.force_docker_pull, False), getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))) if img_id is None: if self.builder.find_default_container: default_container = self.builder.find_default_container() if default_container: img_id = str(default_container) if docker_req is not None and img_id is None and runtimeContext.use_container: raise Exception("Docker image not available") if self.prov_obj is not None and img_id is not None \ and runtimeContext.process_run_id is not None: container_agent = self.prov_obj.document.agent( uuid.uuid4().urn, {"prov:type": PROV["SoftwareAgent"], "cwlprov:image": img_id, "prov:label": "Container execution of image %s" % img_id}) # FIXME: img_id is not a sha256 id, it might just be "debian:8" # img_entity = document.entity("nih:sha-256;%s" % img_id, # {"prov:label": "Container image %s" % img_id} ) # The image is the plan for this activity-agent association # document.wasAssociatedWith(process_run_ID, container_agent, img_entity) self.prov_obj.document.wasAssociatedWith( runtimeContext.process_run_id, container_agent) except Exception as err: container = "Singularity" if runtimeContext.singularity else "Docker" _logger.debug(u"%s error", container, exc_info=True) if docker_is_req: raise_from(UnsupportedRequirement( "%s is required to run this tool: %s" % (container, Text(err))), err) else: raise WorkflowException( "{0} is not available for this tool, try " "--no-container to disable {0}, or install " "a user space Docker replacement like uDocker with " "--user-space-docker-cmd.: {1}".format(container, err)) self._setup(runtimeContext) (runtime, cidfile) = self.create_runtime(env, runtimeContext) runtime.append(Text(img_id)) monitor_function = None if cidfile: monitor_function = functools.partial( self.docker_monitor, cidfile, runtimeContext.tmpdir_prefix, not bool(runtimeContext.cidfile_dir)) elif runtimeContext.user_space_docker_cmd: monitor_function = functools.partial(self.process_monitor) self._execute(runtime, env, runtimeContext, monitor_function) def docker_monitor(self, cidfile, tmpdir_prefix, cleanup_cidfile, process): # type: (Text, Text, bool, subprocess.Popen) -> None """Record memory usage of the running Docker container.""" # Todo: consider switching to `docker create` / `docker start` # instead of `docker run` as `docker create` outputs the container ID # to stdout, but the container is frozen, thus allowing us to start the # monitoring process without dealing with the cidfile or too-fast # container execution cid = None while cid is None: time.sleep(1) if process.returncode is not None: if cleanup_cidfile: try: os.remove(cidfile) except OSError as exc: _logger.warn("Ignored error cleaning up Docker cidfile: %s", exc) return try: with open(cidfile) as cidhandle: cid = cidhandle.readline().strip() except (OSError, IOError): cid = None max_mem = psutil.virtual_memory().total tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) stats_file = tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir) try: with open(stats_file.name, mode="w") as stats_file_handle: stats_proc = subprocess.Popen( ['docker', 'stats', '--no-trunc', '--format', '{{.MemPerc}}', cid], stdout=stats_file_handle, stderr=subprocess.DEVNULL) process.wait() stats_proc.kill() except OSError as exc: _logger.warn("Ignored error with docker stats: %s", exc) return max_mem_percent = 0 with open(stats_file.name, mode="r") as stats: for line in stats: try: mem_percent = float(re.sub( CONTROL_CODE_RE, '', line).replace('%', '')) if mem_percent > max_mem_percent: max_mem_percent = mem_percent except ValueError: break _logger.info(u"[job %s] Max memory used: %iMiB", self.name, int((max_mem_percent / 100 * max_mem) / (2 ** 20))) if cleanup_cidfile: os.remove(cidfile) def _job_popen(commands, # type: List[Text] stdin_path, # type: Optional[Text] stdout_path, # type: Optional[Text] stderr_path, # type: Optional[Text] env, # type: MutableMapping[AnyStr, AnyStr] cwd, # type: Text job_dir, # type: Text job_script_contents=None, # type: Optional[Text] timelimit=None, # type: Optional[int] name=None, # type: Optional[Text] monitor_function=None # type: Optional[Callable[[subprocess.Popen], None]] ): # type: (...) -> int if job_script_contents is None and not FORCE_SHELLED_POPEN: stdin = subprocess.PIPE # type: Union[IO[Any], int] if stdin_path is not None: stdin = open(stdin_path, "rb") stdout = sys.stderr # type: IO[Any] if stdout_path is not None: stdout = open(stdout_path, "wb") stderr = sys.stderr # type: IO[Any] if stderr_path is not None: stderr = open(stderr_path, "wb") sproc = subprocess.Popen(commands, shell=False, close_fds=not onWindows(), stdin=stdin, stdout=stdout, stderr=stderr, env=env, cwd=cwd) processes_to_kill.append(sproc) if sproc.stdin is not None: sproc.stdin.close() tm = None if timelimit is not None and timelimit > 0: def terminate(): # type: () -> None try: _logger.warning( u"[job %s] exceeded time limit of %d seconds and will" "be terminated", name, timelimit) sproc.terminate() except OSError: pass tm = Timer(timelimit, terminate) tm.daemon = True tm.start() if monitor_function: monitor_function(sproc) rcode = sproc.wait() if tm is not None: tm.cancel() if isinstance(stdin, IOBase): stdin.close() if stdout is not sys.stderr: stdout.close() if stderr is not sys.stderr: stderr.close() return rcode else: if job_script_contents is None: job_script_contents = SHELL_COMMAND_TEMPLATE env_copy = {} key = None # type: Any for key in env: env_copy[key] = env[key] job_description = { u"commands": commands, u"cwd": cwd, u"env": env_copy, u"stdout_path": stdout_path, u"stderr_path": stderr_path, u"stdin_path": stdin_path} if PY2: with open(os.path.join(job_dir, "job.json"), mode="wb") as job_file: json_dump(job_description, job_file, ensure_ascii=False) else: with open(os.path.join(job_dir, "job.json"), mode="w", encoding='utf-8') as job_file: json_dump(job_description, job_file, ensure_ascii=False) try: job_script = os.path.join(job_dir, "run_job.bash") with open(job_script, "wb") as _: _.write(job_script_contents.encode('utf-8')) job_run = os.path.join(job_dir, "run_job.py") with open(job_run, "wb") as _: _.write(PYTHON_RUN_SCRIPT.encode('utf-8')) sproc = subprocess.Popen( ["bash", job_script.encode("utf-8")], shell=False, cwd=job_dir, # The nested script will output the paths to the correct files if they need # to be captured. Else just write everything to stderr (same as above). stdout=sys.stderr, stderr=sys.stderr, stdin=subprocess.PIPE, ) processes_to_kill.append(sproc) if sproc.stdin is not None: sproc.stdin.close() rcode = sproc.wait() return rcode finally: shutil.rmtree(job_dir)