Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/docker.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/cwltool/docker.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,389 @@ +"""Enables Docker software containers via the {dx-,u,}docker runtimes.""" +from __future__ import absolute_import + +from distutils import spawn +import datetime +import os +import re +import shutil +import sys +import tempfile +import threading +from io import open # pylint: disable=redefined-builtin +from typing import Dict, List, MutableMapping, Optional, Set, Tuple + +import requests +from typing_extensions import Text # pylint: disable=unused-import +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +from .context import RuntimeContext # pylint: disable=unused-import +from .docker_id import docker_vm_id +from .errors import WorkflowException +from .job import ContainerCommandLineJob +from .loghandler import _logger +from .pathmapper import PathMapper, MapperEnt # pylint: disable=unused-import +from .pathmapper import ensure_writable, ensure_non_writable +from .secrets import SecretStore # pylint: disable=unused-import +from .utils import (DEFAULT_TMP_PREFIX, docker_windows_path_adjust, onWindows, + subprocess) + + +_IMAGES = set() # type: Set[Text] +_IMAGES_LOCK = threading.Lock() +__docker_machine_mounts = None # type: Optional[List[Text]] +__docker_machine_mounts_lock = threading.Lock() + +def _get_docker_machine_mounts(): # type: () -> List[Text] + global __docker_machine_mounts + if __docker_machine_mounts is None: + with __docker_machine_mounts_lock: + if 'DOCKER_MACHINE_NAME' not in os.environ: + __docker_machine_mounts = [] + else: + __docker_machine_mounts = [ + u'/' + line.split(None, 1)[0] for line in + subprocess.check_output( + ['docker-machine', 'ssh', + os.environ['DOCKER_MACHINE_NAME'], 'mount', '-t', + 'vboxsf'], + universal_newlines=True).splitlines()] + return __docker_machine_mounts + +def _check_docker_machine_path(path): # type: (Optional[Text]) -> None + if path is None: + return + if onWindows(): + path = path.lower() + mounts = _get_docker_machine_mounts() + + found = False + for mount in mounts: + if onWindows(): + mount = mount.lower() + if path.startswith(mount): + found = True + break + + if not found and mounts: + name = os.environ.get("DOCKER_MACHINE_NAME", '???') + raise WorkflowException( + "Input path {path} is not in the list of host paths mounted " + "into the Docker virtual machine named {name}. Already mounted " + "paths: {mounts}.\n" + "See https://docs.docker.com/toolbox/toolbox_install_windows/" + "#optional-add-shared-directories for instructions on how to " + "add this path to your VM.".format( + path=path, name=name, + mounts=mounts)) + +class DockerCommandLineJob(ContainerCommandLineJob): + """Runs a CommandLineJob in a sofware container using the Docker engine.""" + + @staticmethod + def get_image(docker_requirement, # type: Dict[Text, Text] + pull_image, # type: bool + force_pull=False, # type: bool + tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text + ): # type: (...) -> bool + """ + Retrieve the relevant Docker container image. + + Returns True upon success + """ + found = False + + if "dockerImageId" not in docker_requirement \ + and "dockerPull" in docker_requirement: + docker_requirement["dockerImageId"] = docker_requirement["dockerPull"] + + with _IMAGES_LOCK: + if docker_requirement["dockerImageId"] in _IMAGES: + return True + + for line in subprocess.check_output( + ["docker", "images", "--no-trunc", "--all"]).decode('utf-8').splitlines(): + try: + match = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", line) + split = docker_requirement["dockerImageId"].split(":") + if len(split) == 1: + split.append("latest") + elif len(split) == 2: + # if split[1] doesn't match valid tag names, it is a part of repository + if not re.match(r'[\w][\w.-]{0,127}', split[1]): + split[0] = split[0] + ":" + split[1] + split[1] = "latest" + elif len(split) == 3: + if re.match(r'[\w][\w.-]{0,127}', split[2]): + split[0] = split[0] + ":" + split[1] + split[1] = split[2] + del split[2] + + # check for repository:tag match or image id match + if (match + and ( + (split[0] == match.group(1) and split[1] == match.group(2)) + or docker_requirement["dockerImageId"] == match.group(3))): + found = True + break + except ValueError: + pass + + if (force_pull or not found) and pull_image: + cmd = [] # type: List[Text] + if "dockerPull" in docker_requirement: + cmd = ["docker", "pull", str(docker_requirement["dockerPull"])] + _logger.info(Text(cmd)) + subprocess.check_call(cmd, stdout=sys.stderr) + found = True + elif "dockerFile" in docker_requirement: + dockerfile_dir = str(tempfile.mkdtemp(prefix=tmp_outdir_prefix)) + with open(os.path.join( + dockerfile_dir, "Dockerfile"), "wb") as dfile: + dfile.write(docker_requirement["dockerFile"].encode('utf-8')) + cmd = ["docker", "build", "--tag=%s" % + str(docker_requirement["dockerImageId"]), dockerfile_dir] + _logger.info(Text(cmd)) + subprocess.check_call(cmd, stdout=sys.stderr) + found = True + elif "dockerLoad" in docker_requirement: + cmd = ["docker", "load"] + _logger.info(Text(cmd)) + if os.path.exists(docker_requirement["dockerLoad"]): + _logger.info(u"Loading docker image from %s", docker_requirement["dockerLoad"]) + with open(docker_requirement["dockerLoad"], "rb") as dload: + loadproc = subprocess.Popen(cmd, stdin=dload, stdout=sys.stderr) + else: + loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, + stdout=sys.stderr) + assert loadproc.stdin is not None # nosec + _logger.info(u"Sending GET request to %s", docker_requirement["dockerLoad"]) + req = requests.get(docker_requirement["dockerLoad"], stream=True) + size = 0 + for chunk in req.iter_content(1024 * 1024): + size += len(chunk) + _logger.info("\r%i bytes", size) + loadproc.stdin.write(chunk) + loadproc.stdin.close() + rcode = loadproc.wait() + if rcode != 0: + raise WorkflowException( + "Docker load returned non-zero exit status %i" % (rcode)) + found = True + elif "dockerImport" in docker_requirement: + cmd = ["docker", "import", str(docker_requirement["dockerImport"]), + str(docker_requirement["dockerImageId"])] + _logger.info(Text(cmd)) + subprocess.check_call(cmd, stdout=sys.stderr) + found = True + + if found: + with _IMAGES_LOCK: + _IMAGES.add(docker_requirement["dockerImageId"]) + + return found + + def get_from_requirements(self, + r, # type: Dict[Text, Text] + pull_image, # type: bool + force_pull=False, # type: bool + tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text + ): # type: (...) -> Optional[Text] + if not spawn.find_executable('docker'): + raise WorkflowException("docker executable is not available") + + + if self.get_image(r, pull_image, force_pull, tmp_outdir_prefix): + return r["dockerImageId"] + raise WorkflowException(u"Docker image %s not found" % r["dockerImageId"]) + + @staticmethod + def append_volume(runtime, source, target, writable=False): + # type: (List[Text], Text, Text, bool) -> None + """Add binding arguments to the runtime list.""" + runtime.append(u"--volume={}:{}:{}".format( + docker_windows_path_adjust(source), target, + "rw" if writable else "ro")) + + def add_file_or_directory_volume(self, + runtime, # type: List[Text] + volume, # type: MapperEnt + host_outdir_tgt # type: Optional[Text] + ): # type: (...) -> None + """Append volume a file/dir mapping to the runtime option list.""" + if not volume.resolved.startswith("_:"): + _check_docker_machine_path(docker_windows_path_adjust( + volume.resolved)) + self.append_volume(runtime, volume.resolved, volume.target) + + def add_writable_file_volume(self, + runtime, # type: List[Text] + volume, # type: MapperEnt + host_outdir_tgt, # type: Optional[Text] + tmpdir_prefix # type: Text + ): # type: (...) -> None + """Append a writable file mapping to the runtime option list.""" + if self.inplace_update: + self.append_volume(runtime, volume.resolved, volume.target, + writable=True) + else: + if host_outdir_tgt: + # shortcut, just copy to the output directory + # which is already going to be mounted + if not os.path.exists(os.path.dirname(host_outdir_tgt)): + os.makedirs(os.path.dirname(host_outdir_tgt)) + shutil.copy(volume.resolved, host_outdir_tgt) + else: + tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) + tmpdir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir) + file_copy = os.path.join( + tmpdir, os.path.basename(volume.resolved)) + shutil.copy(volume.resolved, file_copy) + self.append_volume(runtime, file_copy, volume.target, + writable=True) + ensure_writable(host_outdir_tgt or file_copy) + + def add_writable_directory_volume(self, + runtime, # type: List[Text] + volume, # type: MapperEnt + host_outdir_tgt, # type: Optional[Text] + tmpdir_prefix # type: Text + ): # type: (...) -> None + """Append a writable directory mapping to the runtime option list.""" + if volume.resolved.startswith("_:"): + # Synthetic directory that needs creating first + if not host_outdir_tgt: + tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) + new_dir = os.path.join( + tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir), + os.path.basename(volume.target)) + self.append_volume(runtime, new_dir, volume.target, + writable=True) + elif not os.path.exists(host_outdir_tgt): + os.makedirs(host_outdir_tgt) + else: + if self.inplace_update: + self.append_volume(runtime, volume.resolved, volume.target, + writable=True) + else: + if not host_outdir_tgt: + tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) + tmpdir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir) + new_dir = os.path.join( + tmpdir, os.path.basename(volume.resolved)) + shutil.copytree(volume.resolved, new_dir) + self.append_volume( + runtime, new_dir, volume.target, + writable=True) + else: + shutil.copytree(volume.resolved, host_outdir_tgt) + ensure_writable(host_outdir_tgt or new_dir) + + def create_runtime(self, + env, # type: MutableMapping[Text, Text] + runtimeContext # type: RuntimeContext + ): # type: (...) -> Tuple[List[Text], Optional[Text]] + any_path_okay = self.builder.get_requirement("DockerRequirement")[1] \ + or False + user_space_docker_cmd = runtimeContext.user_space_docker_cmd + if user_space_docker_cmd: + if 'udocker' in user_space_docker_cmd and not runtimeContext.debug: + runtime = [user_space_docker_cmd, u"--quiet", u"run"] + # udocker 1.1.1 will output diagnostic messages to stdout + # without this + else: + runtime = [user_space_docker_cmd, u"run"] + else: + runtime = [u"docker", u"run", u"-i"] + self.append_volume(runtime, os.path.realpath(self.outdir), + self.builder.outdir, writable=True) + tmpdir = "/tmp" # nosec + self.append_volume(runtime, os.path.realpath(self.tmpdir), tmpdir, + writable=True) + self.add_volumes(self.pathmapper, runtime, any_path_okay=True, + secret_store=runtimeContext.secret_store, + tmpdir_prefix=runtimeContext.tmpdir_prefix) + if self.generatemapper is not None: + self.add_volumes( + self.generatemapper, runtime, any_path_okay=any_path_okay, + secret_store=runtimeContext.secret_store, + tmpdir_prefix=runtimeContext.tmpdir_prefix) + + if user_space_docker_cmd: + runtime = [x.replace(":ro", "") for x in runtime] + runtime = [x.replace(":rw", "") for x in runtime] + + runtime.append(u"--workdir=%s" % ( + docker_windows_path_adjust(self.builder.outdir))) + if not user_space_docker_cmd: + + if not runtimeContext.no_read_only: + runtime.append(u"--read-only=true") + + if self.networkaccess: + if runtimeContext.custom_net: + runtime.append(u"--net={0}".format(runtimeContext.custom_net)) + else: + runtime.append(u"--net=none") + + if self.stdout is not None: + runtime.append("--log-driver=none") + + euid, egid = docker_vm_id() + if not onWindows(): + # MS Windows does not have getuid() or geteuid() functions + euid, egid = euid or os.geteuid(), egid or os.getgid() + + if runtimeContext.no_match_user is False \ + and (euid is not None and egid is not None): + runtime.append(u"--user=%d:%d" % (euid, egid)) + + if runtimeContext.rm_container: + runtime.append(u"--rm") + + runtime.append(u"--env=TMPDIR=/tmp") + + # spec currently says "HOME must be set to the designated output + # directory." but spec might change to designated temp directory. + # runtime.append("--env=HOME=/tmp") + runtime.append(u"--env=HOME=%s" % self.builder.outdir) + + # add parameters to docker to write a container ID file + if runtimeContext.user_space_docker_cmd is None: + if runtimeContext.cidfile_dir: + cidfile_dir = runtimeContext.cidfile_dir + if not os.path.exists(str(cidfile_dir)): + _logger.error("--cidfile-dir %s error:\n%s", cidfile_dir, + "directory doesn't exist, please create it first") + exit(2) + if not os.path.isdir(cidfile_dir): + _logger.error("--cidfile-dir %s error:\n%s", cidfile_dir, + cidfile_dir + " is not a directory, " + "please check it first") + exit(2) + else: + tmp_dir, tmp_prefix = os.path.split(runtimeContext.tmpdir_prefix) + cidfile_dir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir) + + cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid" + if runtimeContext.cidfile_prefix is not None: + cidfile_name = str(runtimeContext.cidfile_prefix + "-" + cidfile_name) + cidfile_path = os.path.join(cidfile_dir, cidfile_name) + runtime.append(u"--cidfile=%s" % cidfile_path) + else: + cidfile_path = None + for key, value in self.environment.items(): + runtime.append(u"--env=%s=%s" % (key, value)) + + if runtimeContext.strict_memory_limit and not user_space_docker_cmd: + runtime.append("--memory=%dm" % self.builder.resources["ram"]) + elif not user_space_docker_cmd: + res_req, _ = self.builder.get_requirement("ResourceRequirement") + if res_req is not None and ("ramMin" in res_req or "ramMax" is res_req): + _logger.warning( + u"[job %s] Skipping Docker software container '--memory' limit " + "despite presence of ResourceRequirement with ramMin " + "and/or ramMax setting. Consider running with " + "--strict-memory-limit for increased portability " + "assurance.", self.name) + + return runtime, cidfile_path