Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/docker.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
"""Enables Docker software containers via the {dx-,u,}docker runtimes.""" from __future__ import absolute_import from distutils import spawn import datetime import os import re import shutil import sys import tempfile import threading from io import open # pylint: disable=redefined-builtin from typing import Dict, List, MutableMapping, Optional, Set, Tuple import requests from typing_extensions import Text # pylint: disable=unused-import # move to a regular typing import when Python 3.3-3.6 is no longer supported from .context import RuntimeContext # pylint: disable=unused-import from .docker_id import docker_vm_id from .errors import WorkflowException from .job import ContainerCommandLineJob from .loghandler import _logger from .pathmapper import PathMapper, MapperEnt # pylint: disable=unused-import from .pathmapper import ensure_writable, ensure_non_writable from .secrets import SecretStore # pylint: disable=unused-import from .utils import (DEFAULT_TMP_PREFIX, docker_windows_path_adjust, onWindows, subprocess) _IMAGES = set() # type: Set[Text] _IMAGES_LOCK = threading.Lock() __docker_machine_mounts = None # type: Optional[List[Text]] __docker_machine_mounts_lock = threading.Lock() def _get_docker_machine_mounts(): # type: () -> List[Text] global __docker_machine_mounts if __docker_machine_mounts is None: with __docker_machine_mounts_lock: if 'DOCKER_MACHINE_NAME' not in os.environ: __docker_machine_mounts = [] else: __docker_machine_mounts = [ u'/' + line.split(None, 1)[0] for line in subprocess.check_output( ['docker-machine', 'ssh', os.environ['DOCKER_MACHINE_NAME'], 'mount', '-t', 'vboxsf'], universal_newlines=True).splitlines()] return __docker_machine_mounts def _check_docker_machine_path(path): # type: (Optional[Text]) -> None if path is None: return if onWindows(): path = path.lower() mounts = _get_docker_machine_mounts() found = False for mount in mounts: if onWindows(): mount = mount.lower() if path.startswith(mount): found = True break if not found and mounts: name = os.environ.get("DOCKER_MACHINE_NAME", '???') raise WorkflowException( "Input path {path} is not in the list of host paths mounted " "into the Docker virtual machine named {name}. Already mounted " "paths: {mounts}.\n" "See https://docs.docker.com/toolbox/toolbox_install_windows/" "#optional-add-shared-directories for instructions on how to " "add this path to your VM.".format( path=path, name=name, mounts=mounts)) class DockerCommandLineJob(ContainerCommandLineJob): """Runs a CommandLineJob in a sofware container using the Docker engine.""" @staticmethod def get_image(docker_requirement, # type: Dict[Text, Text] pull_image, # type: bool force_pull=False, # type: bool tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text ): # type: (...) -> bool """ Retrieve the relevant Docker container image. Returns True upon success """ found = False if "dockerImageId" not in docker_requirement \ and "dockerPull" in docker_requirement: docker_requirement["dockerImageId"] = docker_requirement["dockerPull"] with _IMAGES_LOCK: if docker_requirement["dockerImageId"] in _IMAGES: return True for line in subprocess.check_output( ["docker", "images", "--no-trunc", "--all"]).decode('utf-8').splitlines(): try: match = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", line) split = docker_requirement["dockerImageId"].split(":") if len(split) == 1: split.append("latest") elif len(split) == 2: # if split[1] doesn't match valid tag names, it is a part of repository if not re.match(r'[\w][\w.-]{0,127}', split[1]): split[0] = split[0] + ":" + split[1] split[1] = "latest" elif len(split) == 3: if re.match(r'[\w][\w.-]{0,127}', split[2]): split[0] = split[0] + ":" + split[1] split[1] = split[2] del split[2] # check for repository:tag match or image id match if (match and ( (split[0] == match.group(1) and split[1] == match.group(2)) or docker_requirement["dockerImageId"] == match.group(3))): found = True break except ValueError: pass if (force_pull or not found) and pull_image: cmd = [] # type: List[Text] if "dockerPull" in docker_requirement: cmd = ["docker", "pull", str(docker_requirement["dockerPull"])] _logger.info(Text(cmd)) subprocess.check_call(cmd, stdout=sys.stderr) found = True elif "dockerFile" in docker_requirement: dockerfile_dir = str(tempfile.mkdtemp(prefix=tmp_outdir_prefix)) with open(os.path.join( dockerfile_dir, "Dockerfile"), "wb") as dfile: dfile.write(docker_requirement["dockerFile"].encode('utf-8')) cmd = ["docker", "build", "--tag=%s" % str(docker_requirement["dockerImageId"]), dockerfile_dir] _logger.info(Text(cmd)) subprocess.check_call(cmd, stdout=sys.stderr) found = True elif "dockerLoad" in docker_requirement: cmd = ["docker", "load"] _logger.info(Text(cmd)) if os.path.exists(docker_requirement["dockerLoad"]): _logger.info(u"Loading docker image from %s", docker_requirement["dockerLoad"]) with open(docker_requirement["dockerLoad"], "rb") as dload: loadproc = subprocess.Popen(cmd, stdin=dload, stdout=sys.stderr) else: loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr) assert loadproc.stdin is not None # nosec _logger.info(u"Sending GET request to %s", docker_requirement["dockerLoad"]) req = requests.get(docker_requirement["dockerLoad"], stream=True) size = 0 for chunk in req.iter_content(1024 * 1024): size += len(chunk) _logger.info("\r%i bytes", size) loadproc.stdin.write(chunk) loadproc.stdin.close() rcode = loadproc.wait() if rcode != 0: raise WorkflowException( "Docker load returned non-zero exit status %i" % (rcode)) found = True elif "dockerImport" in docker_requirement: cmd = ["docker", "import", str(docker_requirement["dockerImport"]), str(docker_requirement["dockerImageId"])] _logger.info(Text(cmd)) subprocess.check_call(cmd, stdout=sys.stderr) found = True if found: with _IMAGES_LOCK: _IMAGES.add(docker_requirement["dockerImageId"]) return found def get_from_requirements(self, r, # type: Dict[Text, Text] pull_image, # type: bool force_pull=False, # type: bool tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text ): # type: (...) -> Optional[Text] if not spawn.find_executable('docker'): raise WorkflowException("docker executable is not available") if self.get_image(r, pull_image, force_pull, tmp_outdir_prefix): return r["dockerImageId"] raise WorkflowException(u"Docker image %s not found" % r["dockerImageId"]) @staticmethod def append_volume(runtime, source, target, writable=False): # type: (List[Text], Text, Text, bool) -> None """Add binding arguments to the runtime list.""" runtime.append(u"--volume={}:{}:{}".format( docker_windows_path_adjust(source), target, "rw" if writable else "ro")) def add_file_or_directory_volume(self, runtime, # type: List[Text] volume, # type: MapperEnt host_outdir_tgt # type: Optional[Text] ): # type: (...) -> None """Append volume a file/dir mapping to the runtime option list.""" if not volume.resolved.startswith("_:"): _check_docker_machine_path(docker_windows_path_adjust( volume.resolved)) self.append_volume(runtime, volume.resolved, volume.target) def add_writable_file_volume(self, runtime, # type: List[Text] volume, # type: MapperEnt host_outdir_tgt, # type: Optional[Text] tmpdir_prefix # type: Text ): # type: (...) -> None """Append a writable file mapping to the runtime option list.""" if self.inplace_update: self.append_volume(runtime, volume.resolved, volume.target, writable=True) else: if host_outdir_tgt: # shortcut, just copy to the output directory # which is already going to be mounted if not os.path.exists(os.path.dirname(host_outdir_tgt)): os.makedirs(os.path.dirname(host_outdir_tgt)) shutil.copy(volume.resolved, host_outdir_tgt) else: tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) tmpdir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir) file_copy = os.path.join( tmpdir, os.path.basename(volume.resolved)) shutil.copy(volume.resolved, file_copy) self.append_volume(runtime, file_copy, volume.target, writable=True) ensure_writable(host_outdir_tgt or file_copy) def add_writable_directory_volume(self, runtime, # type: List[Text] volume, # type: MapperEnt host_outdir_tgt, # type: Optional[Text] tmpdir_prefix # type: Text ): # type: (...) -> None """Append a writable directory mapping to the runtime option list.""" if volume.resolved.startswith("_:"): # Synthetic directory that needs creating first if not host_outdir_tgt: tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) new_dir = os.path.join( tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir), os.path.basename(volume.target)) self.append_volume(runtime, new_dir, volume.target, writable=True) elif not os.path.exists(host_outdir_tgt): os.makedirs(host_outdir_tgt) else: if self.inplace_update: self.append_volume(runtime, volume.resolved, volume.target, writable=True) else: if not host_outdir_tgt: tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) tmpdir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir) new_dir = os.path.join( tmpdir, os.path.basename(volume.resolved)) shutil.copytree(volume.resolved, new_dir) self.append_volume( runtime, new_dir, volume.target, writable=True) else: shutil.copytree(volume.resolved, host_outdir_tgt) ensure_writable(host_outdir_tgt or new_dir) def create_runtime(self, env, # type: MutableMapping[Text, Text] runtimeContext # type: RuntimeContext ): # type: (...) -> Tuple[List[Text], Optional[Text]] any_path_okay = self.builder.get_requirement("DockerRequirement")[1] \ or False user_space_docker_cmd = runtimeContext.user_space_docker_cmd if user_space_docker_cmd: if 'udocker' in user_space_docker_cmd and not runtimeContext.debug: runtime = [user_space_docker_cmd, u"--quiet", u"run"] # udocker 1.1.1 will output diagnostic messages to stdout # without this else: runtime = [user_space_docker_cmd, u"run"] else: runtime = [u"docker", u"run", u"-i"] self.append_volume(runtime, os.path.realpath(self.outdir), self.builder.outdir, writable=True) tmpdir = "/tmp" # nosec self.append_volume(runtime, os.path.realpath(self.tmpdir), tmpdir, writable=True) self.add_volumes(self.pathmapper, runtime, any_path_okay=True, secret_store=runtimeContext.secret_store, tmpdir_prefix=runtimeContext.tmpdir_prefix) if self.generatemapper is not None: self.add_volumes( self.generatemapper, runtime, any_path_okay=any_path_okay, secret_store=runtimeContext.secret_store, tmpdir_prefix=runtimeContext.tmpdir_prefix) if user_space_docker_cmd: runtime = [x.replace(":ro", "") for x in runtime] runtime = [x.replace(":rw", "") for x in runtime] runtime.append(u"--workdir=%s" % ( docker_windows_path_adjust(self.builder.outdir))) if not user_space_docker_cmd: if not runtimeContext.no_read_only: runtime.append(u"--read-only=true") if self.networkaccess: if runtimeContext.custom_net: runtime.append(u"--net={0}".format(runtimeContext.custom_net)) else: runtime.append(u"--net=none") if self.stdout is not None: runtime.append("--log-driver=none") euid, egid = docker_vm_id() if not onWindows(): # MS Windows does not have getuid() or geteuid() functions euid, egid = euid or os.geteuid(), egid or os.getgid() if runtimeContext.no_match_user is False \ and (euid is not None and egid is not None): runtime.append(u"--user=%d:%d" % (euid, egid)) if runtimeContext.rm_container: runtime.append(u"--rm") runtime.append(u"--env=TMPDIR=/tmp") # spec currently says "HOME must be set to the designated output # directory." but spec might change to designated temp directory. # runtime.append("--env=HOME=/tmp") runtime.append(u"--env=HOME=%s" % self.builder.outdir) # add parameters to docker to write a container ID file if runtimeContext.user_space_docker_cmd is None: if runtimeContext.cidfile_dir: cidfile_dir = runtimeContext.cidfile_dir if not os.path.exists(str(cidfile_dir)): _logger.error("--cidfile-dir %s error:\n%s", cidfile_dir, "directory doesn't exist, please create it first") exit(2) if not os.path.isdir(cidfile_dir): _logger.error("--cidfile-dir %s error:\n%s", cidfile_dir, cidfile_dir + " is not a directory, " "please check it first") exit(2) else: tmp_dir, tmp_prefix = os.path.split(runtimeContext.tmpdir_prefix) cidfile_dir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir) cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid" if runtimeContext.cidfile_prefix is not None: cidfile_name = str(runtimeContext.cidfile_prefix + "-" + cidfile_name) cidfile_path = os.path.join(cidfile_dir, cidfile_name) runtime.append(u"--cidfile=%s" % cidfile_path) else: cidfile_path = None for key, value in self.environment.items(): runtime.append(u"--env=%s=%s" % (key, value)) if runtimeContext.strict_memory_limit and not user_space_docker_cmd: runtime.append("--memory=%dm" % self.builder.resources["ram"]) elif not user_space_docker_cmd: res_req, _ = self.builder.get_requirement("ResourceRequirement") if res_req is not None and ("ramMin" in res_req or "ramMax" is res_req): _logger.warning( u"[job %s] Skipping Docker software container '--memory' limit " "despite presence of ResourceRequirement with ramMin " "and/or ramMax setting. Consider running with " "--strict-memory-limit for increased portability " "assurance.", self.name) return runtime, cidfile_path