diff env/lib/python3.7/site-packages/cwltool/docker.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/cwltool/docker.py	Thu May 14 14:56:58 2020 -0400
@@ -0,0 +1,389 @@
+"""Enables Docker software containers via the {dx-,u,}docker runtimes."""
+from __future__ import absolute_import
+
+from distutils import spawn
+import datetime
+import os
+import re
+import shutil
+import sys
+import tempfile
+import threading
+from io import open  # pylint: disable=redefined-builtin
+from typing import Dict, List, MutableMapping, Optional, Set, Tuple
+
+import requests
+from typing_extensions import Text  # pylint: disable=unused-import
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+from .context import RuntimeContext  # pylint: disable=unused-import
+from .docker_id import docker_vm_id
+from .errors import WorkflowException
+from .job import ContainerCommandLineJob
+from .loghandler import _logger
+from .pathmapper import PathMapper, MapperEnt  # pylint: disable=unused-import
+from .pathmapper import ensure_writable, ensure_non_writable
+from .secrets import SecretStore  # pylint: disable=unused-import
+from .utils import (DEFAULT_TMP_PREFIX, docker_windows_path_adjust, onWindows,
+                    subprocess)
+
+
+_IMAGES = set()  # type: Set[Text]
+_IMAGES_LOCK = threading.Lock()
+__docker_machine_mounts = None  # type: Optional[List[Text]]
+__docker_machine_mounts_lock = threading.Lock()
+
+def _get_docker_machine_mounts():  # type: () -> List[Text]
+    global __docker_machine_mounts
+    if __docker_machine_mounts is None:
+        with __docker_machine_mounts_lock:
+            if 'DOCKER_MACHINE_NAME' not in os.environ:
+                __docker_machine_mounts = []
+            else:
+                __docker_machine_mounts = [
+                    u'/' + line.split(None, 1)[0] for line in
+                    subprocess.check_output(
+                        ['docker-machine', 'ssh',
+                         os.environ['DOCKER_MACHINE_NAME'], 'mount', '-t',
+                         'vboxsf'],
+                        universal_newlines=True).splitlines()]
+    return __docker_machine_mounts
+
+def _check_docker_machine_path(path):  # type: (Optional[Text]) -> None
+    if path is None:
+        return
+    if onWindows():
+        path = path.lower()
+    mounts = _get_docker_machine_mounts()
+
+    found = False
+    for mount in mounts:
+        if onWindows():
+            mount = mount.lower()
+        if path.startswith(mount):
+            found = True
+            break
+
+    if not found and mounts:
+        name = os.environ.get("DOCKER_MACHINE_NAME", '???')
+        raise WorkflowException(
+            "Input path {path} is not in the list of host paths mounted "
+            "into the Docker virtual machine named {name}. Already mounted "
+            "paths: {mounts}.\n"
+            "See https://docs.docker.com/toolbox/toolbox_install_windows/"
+            "#optional-add-shared-directories for instructions on how to "
+            "add this path to your VM.".format(
+                path=path, name=name,
+                mounts=mounts))
+
+class DockerCommandLineJob(ContainerCommandLineJob):
+    """Runs a CommandLineJob in a sofware container using the Docker engine."""
+
+    @staticmethod
+    def get_image(docker_requirement,     # type: Dict[Text, Text]
+                  pull_image,             # type: bool
+                  force_pull=False,       # type: bool
+                  tmp_outdir_prefix=DEFAULT_TMP_PREFIX  # type: Text
+                 ):  # type: (...) -> bool
+        """
+        Retrieve the relevant Docker container image.
+
+        Returns True upon success
+        """
+        found = False
+
+        if "dockerImageId" not in docker_requirement \
+                and "dockerPull" in docker_requirement:
+            docker_requirement["dockerImageId"] = docker_requirement["dockerPull"]
+
+        with _IMAGES_LOCK:
+            if docker_requirement["dockerImageId"] in _IMAGES:
+                return True
+
+        for line in subprocess.check_output(
+                ["docker", "images", "--no-trunc", "--all"]).decode('utf-8').splitlines():
+            try:
+                match = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", line)
+                split = docker_requirement["dockerImageId"].split(":")
+                if len(split) == 1:
+                    split.append("latest")
+                elif len(split) == 2:
+                    #  if split[1] doesn't  match valid tag names, it is a part of repository
+                    if not re.match(r'[\w][\w.-]{0,127}', split[1]):
+                        split[0] = split[0] + ":" + split[1]
+                        split[1] = "latest"
+                elif len(split) == 3:
+                    if re.match(r'[\w][\w.-]{0,127}', split[2]):
+                        split[0] = split[0] + ":" + split[1]
+                        split[1] = split[2]
+                        del split[2]
+
+                # check for repository:tag match or image id match
+                if (match
+                        and (
+                            (split[0] == match.group(1) and split[1] == match.group(2))
+                            or docker_requirement["dockerImageId"] == match.group(3))):
+                    found = True
+                    break
+            except ValueError:
+                pass
+
+        if (force_pull or not found) and pull_image:
+            cmd = []  # type: List[Text]
+            if "dockerPull" in docker_requirement:
+                cmd = ["docker", "pull", str(docker_requirement["dockerPull"])]
+                _logger.info(Text(cmd))
+                subprocess.check_call(cmd, stdout=sys.stderr)
+                found = True
+            elif "dockerFile" in docker_requirement:
+                dockerfile_dir = str(tempfile.mkdtemp(prefix=tmp_outdir_prefix))
+                with open(os.path.join(
+                        dockerfile_dir, "Dockerfile"), "wb") as dfile:
+                    dfile.write(docker_requirement["dockerFile"].encode('utf-8'))
+                cmd = ["docker", "build", "--tag=%s" %
+                       str(docker_requirement["dockerImageId"]), dockerfile_dir]
+                _logger.info(Text(cmd))
+                subprocess.check_call(cmd, stdout=sys.stderr)
+                found = True
+            elif "dockerLoad" in docker_requirement:
+                cmd = ["docker", "load"]
+                _logger.info(Text(cmd))
+                if os.path.exists(docker_requirement["dockerLoad"]):
+                    _logger.info(u"Loading docker image from %s", docker_requirement["dockerLoad"])
+                    with open(docker_requirement["dockerLoad"], "rb") as dload:
+                        loadproc = subprocess.Popen(cmd, stdin=dload, stdout=sys.stderr)
+                else:
+                    loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+                                                stdout=sys.stderr)
+                    assert loadproc.stdin is not None  # nosec
+                    _logger.info(u"Sending GET request to %s", docker_requirement["dockerLoad"])
+                    req = requests.get(docker_requirement["dockerLoad"], stream=True)
+                    size = 0
+                    for chunk in req.iter_content(1024 * 1024):
+                        size += len(chunk)
+                        _logger.info("\r%i bytes", size)
+                        loadproc.stdin.write(chunk)
+                    loadproc.stdin.close()
+                rcode = loadproc.wait()
+                if rcode != 0:
+                    raise WorkflowException(
+                        "Docker load returned non-zero exit status %i" % (rcode))
+                found = True
+            elif "dockerImport" in docker_requirement:
+                cmd = ["docker", "import", str(docker_requirement["dockerImport"]),
+                       str(docker_requirement["dockerImageId"])]
+                _logger.info(Text(cmd))
+                subprocess.check_call(cmd, stdout=sys.stderr)
+                found = True
+
+        if found:
+            with _IMAGES_LOCK:
+                _IMAGES.add(docker_requirement["dockerImageId"])
+
+        return found
+
+    def get_from_requirements(self,
+                              r,                      # type: Dict[Text, Text]
+                              pull_image,             # type: bool
+                              force_pull=False,       # type: bool
+                              tmp_outdir_prefix=DEFAULT_TMP_PREFIX  # type: Text
+                             ):  # type: (...) -> Optional[Text]
+        if not spawn.find_executable('docker'):
+            raise WorkflowException("docker executable is not available")
+
+
+        if self.get_image(r, pull_image, force_pull, tmp_outdir_prefix):
+            return r["dockerImageId"]
+        raise WorkflowException(u"Docker image %s not found" % r["dockerImageId"])
+
+    @staticmethod
+    def append_volume(runtime, source, target, writable=False):
+        # type: (List[Text], Text, Text, bool) -> None
+        """Add binding arguments to the runtime list."""
+        runtime.append(u"--volume={}:{}:{}".format(
+            docker_windows_path_adjust(source), target,
+            "rw" if writable else "ro"))
+
+    def add_file_or_directory_volume(self,
+                                     runtime,         # type: List[Text]
+                                     volume,          # type: MapperEnt
+                                     host_outdir_tgt  # type: Optional[Text]
+                                     ):  # type: (...) -> None
+        """Append volume a file/dir mapping to the runtime option list."""
+        if not volume.resolved.startswith("_:"):
+            _check_docker_machine_path(docker_windows_path_adjust(
+                volume.resolved))
+            self.append_volume(runtime, volume.resolved, volume.target)
+
+    def add_writable_file_volume(self,
+                                 runtime,          # type: List[Text]
+                                 volume,           # type: MapperEnt
+                                 host_outdir_tgt,  # type: Optional[Text]
+                                 tmpdir_prefix     # type: Text
+                                ):  # type: (...) -> None
+        """Append a writable file mapping to the runtime option list."""
+        if self.inplace_update:
+            self.append_volume(runtime, volume.resolved, volume.target,
+                               writable=True)
+        else:
+            if host_outdir_tgt:
+                # shortcut, just copy to the output directory
+                # which is already going to be mounted
+                if not os.path.exists(os.path.dirname(host_outdir_tgt)):
+                    os.makedirs(os.path.dirname(host_outdir_tgt))
+                shutil.copy(volume.resolved, host_outdir_tgt)
+            else:
+                tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
+                tmpdir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
+                file_copy = os.path.join(
+                    tmpdir, os.path.basename(volume.resolved))
+                shutil.copy(volume.resolved, file_copy)
+                self.append_volume(runtime, file_copy, volume.target,
+                                   writable=True)
+            ensure_writable(host_outdir_tgt or file_copy)
+
+    def add_writable_directory_volume(self,
+                                      runtime,          # type: List[Text]
+                                      volume,           # type: MapperEnt
+                                      host_outdir_tgt,  # type: Optional[Text]
+                                      tmpdir_prefix     # type: Text
+                                     ):  # type: (...) -> None
+        """Append a writable directory mapping to the runtime option list."""
+        if volume.resolved.startswith("_:"):
+            # Synthetic directory that needs creating first
+            if not host_outdir_tgt:
+                tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
+                new_dir = os.path.join(
+                    tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir),
+                    os.path.basename(volume.target))
+                self.append_volume(runtime, new_dir, volume.target,
+                                   writable=True)
+            elif not os.path.exists(host_outdir_tgt):
+                os.makedirs(host_outdir_tgt)
+        else:
+            if self.inplace_update:
+                self.append_volume(runtime, volume.resolved, volume.target,
+                                   writable=True)
+            else:
+                if not host_outdir_tgt:
+                    tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
+                    tmpdir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
+                    new_dir = os.path.join(
+                        tmpdir, os.path.basename(volume.resolved))
+                    shutil.copytree(volume.resolved, new_dir)
+                    self.append_volume(
+                        runtime, new_dir, volume.target,
+                        writable=True)
+                else:
+                    shutil.copytree(volume.resolved, host_outdir_tgt)
+                ensure_writable(host_outdir_tgt or new_dir)
+
+    def create_runtime(self,
+                       env,            # type: MutableMapping[Text, Text]
+                       runtimeContext  # type: RuntimeContext
+                      ):  # type: (...) -> Tuple[List[Text], Optional[Text]]
+        any_path_okay = self.builder.get_requirement("DockerRequirement")[1] \
+            or False
+        user_space_docker_cmd = runtimeContext.user_space_docker_cmd
+        if user_space_docker_cmd:
+            if 'udocker' in user_space_docker_cmd and not runtimeContext.debug:
+                runtime = [user_space_docker_cmd, u"--quiet", u"run"]
+                # udocker 1.1.1 will output diagnostic messages to stdout
+                # without this
+            else:
+                runtime = [user_space_docker_cmd, u"run"]
+        else:
+            runtime = [u"docker", u"run", u"-i"]
+        self.append_volume(runtime, os.path.realpath(self.outdir),
+                           self.builder.outdir, writable=True)
+        tmpdir = "/tmp"  # nosec
+        self.append_volume(runtime, os.path.realpath(self.tmpdir), tmpdir,
+                           writable=True)
+        self.add_volumes(self.pathmapper, runtime, any_path_okay=True,
+                         secret_store=runtimeContext.secret_store,
+                         tmpdir_prefix=runtimeContext.tmpdir_prefix)
+        if self.generatemapper is not None:
+            self.add_volumes(
+                self.generatemapper, runtime, any_path_okay=any_path_okay,
+                secret_store=runtimeContext.secret_store,
+                tmpdir_prefix=runtimeContext.tmpdir_prefix)
+
+        if user_space_docker_cmd:
+            runtime = [x.replace(":ro", "") for x in runtime]
+            runtime = [x.replace(":rw", "") for x in runtime]
+
+        runtime.append(u"--workdir=%s" % (
+            docker_windows_path_adjust(self.builder.outdir)))
+        if not user_space_docker_cmd:
+
+            if not runtimeContext.no_read_only:
+                runtime.append(u"--read-only=true")
+
+            if self.networkaccess:
+                if runtimeContext.custom_net:
+                    runtime.append(u"--net={0}".format(runtimeContext.custom_net))
+            else:
+                runtime.append(u"--net=none")
+
+            if self.stdout is not None:
+                runtime.append("--log-driver=none")
+
+            euid, egid = docker_vm_id()
+            if not onWindows():
+                # MS Windows does not have getuid() or geteuid() functions
+                euid, egid = euid or os.geteuid(), egid or os.getgid()
+
+            if runtimeContext.no_match_user is False \
+                    and (euid is not None and egid is not None):
+                runtime.append(u"--user=%d:%d" % (euid, egid))
+
+        if runtimeContext.rm_container:
+            runtime.append(u"--rm")
+
+        runtime.append(u"--env=TMPDIR=/tmp")
+
+        # spec currently says "HOME must be set to the designated output
+        # directory." but spec might change to designated temp directory.
+        # runtime.append("--env=HOME=/tmp")
+        runtime.append(u"--env=HOME=%s" % self.builder.outdir)
+
+        # add parameters to docker to write a container ID file
+        if runtimeContext.user_space_docker_cmd is None:
+            if runtimeContext.cidfile_dir:
+                cidfile_dir = runtimeContext.cidfile_dir
+                if not os.path.exists(str(cidfile_dir)):
+                    _logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
+                                  "directory doesn't exist, please create it first")
+                    exit(2)
+                if not os.path.isdir(cidfile_dir):
+                    _logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
+                                  cidfile_dir + " is not a directory, "
+                                  "please check it first")
+                    exit(2)
+            else:
+                tmp_dir, tmp_prefix = os.path.split(runtimeContext.tmpdir_prefix)
+                cidfile_dir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
+
+            cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid"
+            if runtimeContext.cidfile_prefix is not None:
+                cidfile_name = str(runtimeContext.cidfile_prefix + "-" + cidfile_name)
+            cidfile_path = os.path.join(cidfile_dir, cidfile_name)
+            runtime.append(u"--cidfile=%s" % cidfile_path)
+        else:
+            cidfile_path = None
+        for key, value in self.environment.items():
+            runtime.append(u"--env=%s=%s" % (key, value))
+
+        if runtimeContext.strict_memory_limit and not user_space_docker_cmd:
+            runtime.append("--memory=%dm" % self.builder.resources["ram"])
+        elif not user_space_docker_cmd:
+            res_req, _ = self.builder.get_requirement("ResourceRequirement")
+            if res_req is not None and ("ramMin" in res_req or "ramMax" is res_req):
+                _logger.warning(
+                    u"[job %s] Skipping Docker software container '--memory' limit "
+                    "despite presence of ResourceRequirement with ramMin "
+                    "and/or ramMax setting. Consider running with "
+                    "--strict-memory-limit for increased portability "
+                    "assurance.", self.name)
+
+        return runtime, cidfile_path