comparison env/lib/python3.7/site-packages/cwltool/docker.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
comparison
equal deleted inserted replaced
1:75ca89e9b81c 2:6af9afd405e9
1 """Enables Docker software containers via the {dx-,u,}docker runtimes."""
2 from __future__ import absolute_import
3
4 from distutils import spawn
5 import datetime
6 import os
7 import re
8 import shutil
9 import sys
10 import tempfile
11 import threading
12 from io import open # pylint: disable=redefined-builtin
13 from typing import Dict, List, MutableMapping, Optional, Set, Tuple
14
15 import requests
16 from typing_extensions import Text # pylint: disable=unused-import
17 # move to a regular typing import when Python 3.3-3.6 is no longer supported
18
19 from .context import RuntimeContext # pylint: disable=unused-import
20 from .docker_id import docker_vm_id
21 from .errors import WorkflowException
22 from .job import ContainerCommandLineJob
23 from .loghandler import _logger
24 from .pathmapper import PathMapper, MapperEnt # pylint: disable=unused-import
25 from .pathmapper import ensure_writable, ensure_non_writable
26 from .secrets import SecretStore # pylint: disable=unused-import
27 from .utils import (DEFAULT_TMP_PREFIX, docker_windows_path_adjust, onWindows,
28 subprocess)
29
30
31 _IMAGES = set() # type: Set[Text]
32 _IMAGES_LOCK = threading.Lock()
33 __docker_machine_mounts = None # type: Optional[List[Text]]
34 __docker_machine_mounts_lock = threading.Lock()
35
36 def _get_docker_machine_mounts(): # type: () -> List[Text]
37 global __docker_machine_mounts
38 if __docker_machine_mounts is None:
39 with __docker_machine_mounts_lock:
40 if 'DOCKER_MACHINE_NAME' not in os.environ:
41 __docker_machine_mounts = []
42 else:
43 __docker_machine_mounts = [
44 u'/' + line.split(None, 1)[0] for line in
45 subprocess.check_output(
46 ['docker-machine', 'ssh',
47 os.environ['DOCKER_MACHINE_NAME'], 'mount', '-t',
48 'vboxsf'],
49 universal_newlines=True).splitlines()]
50 return __docker_machine_mounts
51
52 def _check_docker_machine_path(path): # type: (Optional[Text]) -> None
53 if path is None:
54 return
55 if onWindows():
56 path = path.lower()
57 mounts = _get_docker_machine_mounts()
58
59 found = False
60 for mount in mounts:
61 if onWindows():
62 mount = mount.lower()
63 if path.startswith(mount):
64 found = True
65 break
66
67 if not found and mounts:
68 name = os.environ.get("DOCKER_MACHINE_NAME", '???')
69 raise WorkflowException(
70 "Input path {path} is not in the list of host paths mounted "
71 "into the Docker virtual machine named {name}. Already mounted "
72 "paths: {mounts}.\n"
73 "See https://docs.docker.com/toolbox/toolbox_install_windows/"
74 "#optional-add-shared-directories for instructions on how to "
75 "add this path to your VM.".format(
76 path=path, name=name,
77 mounts=mounts))
78
79 class DockerCommandLineJob(ContainerCommandLineJob):
80 """Runs a CommandLineJob in a sofware container using the Docker engine."""
81
82 @staticmethod
83 def get_image(docker_requirement, # type: Dict[Text, Text]
84 pull_image, # type: bool
85 force_pull=False, # type: bool
86 tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text
87 ): # type: (...) -> bool
88 """
89 Retrieve the relevant Docker container image.
90
91 Returns True upon success
92 """
93 found = False
94
95 if "dockerImageId" not in docker_requirement \
96 and "dockerPull" in docker_requirement:
97 docker_requirement["dockerImageId"] = docker_requirement["dockerPull"]
98
99 with _IMAGES_LOCK:
100 if docker_requirement["dockerImageId"] in _IMAGES:
101 return True
102
103 for line in subprocess.check_output(
104 ["docker", "images", "--no-trunc", "--all"]).decode('utf-8').splitlines():
105 try:
106 match = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", line)
107 split = docker_requirement["dockerImageId"].split(":")
108 if len(split) == 1:
109 split.append("latest")
110 elif len(split) == 2:
111 # if split[1] doesn't match valid tag names, it is a part of repository
112 if not re.match(r'[\w][\w.-]{0,127}', split[1]):
113 split[0] = split[0] + ":" + split[1]
114 split[1] = "latest"
115 elif len(split) == 3:
116 if re.match(r'[\w][\w.-]{0,127}', split[2]):
117 split[0] = split[0] + ":" + split[1]
118 split[1] = split[2]
119 del split[2]
120
121 # check for repository:tag match or image id match
122 if (match
123 and (
124 (split[0] == match.group(1) and split[1] == match.group(2))
125 or docker_requirement["dockerImageId"] == match.group(3))):
126 found = True
127 break
128 except ValueError:
129 pass
130
131 if (force_pull or not found) and pull_image:
132 cmd = [] # type: List[Text]
133 if "dockerPull" in docker_requirement:
134 cmd = ["docker", "pull", str(docker_requirement["dockerPull"])]
135 _logger.info(Text(cmd))
136 subprocess.check_call(cmd, stdout=sys.stderr)
137 found = True
138 elif "dockerFile" in docker_requirement:
139 dockerfile_dir = str(tempfile.mkdtemp(prefix=tmp_outdir_prefix))
140 with open(os.path.join(
141 dockerfile_dir, "Dockerfile"), "wb") as dfile:
142 dfile.write(docker_requirement["dockerFile"].encode('utf-8'))
143 cmd = ["docker", "build", "--tag=%s" %
144 str(docker_requirement["dockerImageId"]), dockerfile_dir]
145 _logger.info(Text(cmd))
146 subprocess.check_call(cmd, stdout=sys.stderr)
147 found = True
148 elif "dockerLoad" in docker_requirement:
149 cmd = ["docker", "load"]
150 _logger.info(Text(cmd))
151 if os.path.exists(docker_requirement["dockerLoad"]):
152 _logger.info(u"Loading docker image from %s", docker_requirement["dockerLoad"])
153 with open(docker_requirement["dockerLoad"], "rb") as dload:
154 loadproc = subprocess.Popen(cmd, stdin=dload, stdout=sys.stderr)
155 else:
156 loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE,
157 stdout=sys.stderr)
158 assert loadproc.stdin is not None # nosec
159 _logger.info(u"Sending GET request to %s", docker_requirement["dockerLoad"])
160 req = requests.get(docker_requirement["dockerLoad"], stream=True)
161 size = 0
162 for chunk in req.iter_content(1024 * 1024):
163 size += len(chunk)
164 _logger.info("\r%i bytes", size)
165 loadproc.stdin.write(chunk)
166 loadproc.stdin.close()
167 rcode = loadproc.wait()
168 if rcode != 0:
169 raise WorkflowException(
170 "Docker load returned non-zero exit status %i" % (rcode))
171 found = True
172 elif "dockerImport" in docker_requirement:
173 cmd = ["docker", "import", str(docker_requirement["dockerImport"]),
174 str(docker_requirement["dockerImageId"])]
175 _logger.info(Text(cmd))
176 subprocess.check_call(cmd, stdout=sys.stderr)
177 found = True
178
179 if found:
180 with _IMAGES_LOCK:
181 _IMAGES.add(docker_requirement["dockerImageId"])
182
183 return found
184
185 def get_from_requirements(self,
186 r, # type: Dict[Text, Text]
187 pull_image, # type: bool
188 force_pull=False, # type: bool
189 tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text
190 ): # type: (...) -> Optional[Text]
191 if not spawn.find_executable('docker'):
192 raise WorkflowException("docker executable is not available")
193
194
195 if self.get_image(r, pull_image, force_pull, tmp_outdir_prefix):
196 return r["dockerImageId"]
197 raise WorkflowException(u"Docker image %s not found" % r["dockerImageId"])
198
199 @staticmethod
200 def append_volume(runtime, source, target, writable=False):
201 # type: (List[Text], Text, Text, bool) -> None
202 """Add binding arguments to the runtime list."""
203 runtime.append(u"--volume={}:{}:{}".format(
204 docker_windows_path_adjust(source), target,
205 "rw" if writable else "ro"))
206
207 def add_file_or_directory_volume(self,
208 runtime, # type: List[Text]
209 volume, # type: MapperEnt
210 host_outdir_tgt # type: Optional[Text]
211 ): # type: (...) -> None
212 """Append volume a file/dir mapping to the runtime option list."""
213 if not volume.resolved.startswith("_:"):
214 _check_docker_machine_path(docker_windows_path_adjust(
215 volume.resolved))
216 self.append_volume(runtime, volume.resolved, volume.target)
217
218 def add_writable_file_volume(self,
219 runtime, # type: List[Text]
220 volume, # type: MapperEnt
221 host_outdir_tgt, # type: Optional[Text]
222 tmpdir_prefix # type: Text
223 ): # type: (...) -> None
224 """Append a writable file mapping to the runtime option list."""
225 if self.inplace_update:
226 self.append_volume(runtime, volume.resolved, volume.target,
227 writable=True)
228 else:
229 if host_outdir_tgt:
230 # shortcut, just copy to the output directory
231 # which is already going to be mounted
232 if not os.path.exists(os.path.dirname(host_outdir_tgt)):
233 os.makedirs(os.path.dirname(host_outdir_tgt))
234 shutil.copy(volume.resolved, host_outdir_tgt)
235 else:
236 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
237 tmpdir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
238 file_copy = os.path.join(
239 tmpdir, os.path.basename(volume.resolved))
240 shutil.copy(volume.resolved, file_copy)
241 self.append_volume(runtime, file_copy, volume.target,
242 writable=True)
243 ensure_writable(host_outdir_tgt or file_copy)
244
245 def add_writable_directory_volume(self,
246 runtime, # type: List[Text]
247 volume, # type: MapperEnt
248 host_outdir_tgt, # type: Optional[Text]
249 tmpdir_prefix # type: Text
250 ): # type: (...) -> None
251 """Append a writable directory mapping to the runtime option list."""
252 if volume.resolved.startswith("_:"):
253 # Synthetic directory that needs creating first
254 if not host_outdir_tgt:
255 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
256 new_dir = os.path.join(
257 tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir),
258 os.path.basename(volume.target))
259 self.append_volume(runtime, new_dir, volume.target,
260 writable=True)
261 elif not os.path.exists(host_outdir_tgt):
262 os.makedirs(host_outdir_tgt)
263 else:
264 if self.inplace_update:
265 self.append_volume(runtime, volume.resolved, volume.target,
266 writable=True)
267 else:
268 if not host_outdir_tgt:
269 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
270 tmpdir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
271 new_dir = os.path.join(
272 tmpdir, os.path.basename(volume.resolved))
273 shutil.copytree(volume.resolved, new_dir)
274 self.append_volume(
275 runtime, new_dir, volume.target,
276 writable=True)
277 else:
278 shutil.copytree(volume.resolved, host_outdir_tgt)
279 ensure_writable(host_outdir_tgt or new_dir)
280
281 def create_runtime(self,
282 env, # type: MutableMapping[Text, Text]
283 runtimeContext # type: RuntimeContext
284 ): # type: (...) -> Tuple[List[Text], Optional[Text]]
285 any_path_okay = self.builder.get_requirement("DockerRequirement")[1] \
286 or False
287 user_space_docker_cmd = runtimeContext.user_space_docker_cmd
288 if user_space_docker_cmd:
289 if 'udocker' in user_space_docker_cmd and not runtimeContext.debug:
290 runtime = [user_space_docker_cmd, u"--quiet", u"run"]
291 # udocker 1.1.1 will output diagnostic messages to stdout
292 # without this
293 else:
294 runtime = [user_space_docker_cmd, u"run"]
295 else:
296 runtime = [u"docker", u"run", u"-i"]
297 self.append_volume(runtime, os.path.realpath(self.outdir),
298 self.builder.outdir, writable=True)
299 tmpdir = "/tmp" # nosec
300 self.append_volume(runtime, os.path.realpath(self.tmpdir), tmpdir,
301 writable=True)
302 self.add_volumes(self.pathmapper, runtime, any_path_okay=True,
303 secret_store=runtimeContext.secret_store,
304 tmpdir_prefix=runtimeContext.tmpdir_prefix)
305 if self.generatemapper is not None:
306 self.add_volumes(
307 self.generatemapper, runtime, any_path_okay=any_path_okay,
308 secret_store=runtimeContext.secret_store,
309 tmpdir_prefix=runtimeContext.tmpdir_prefix)
310
311 if user_space_docker_cmd:
312 runtime = [x.replace(":ro", "") for x in runtime]
313 runtime = [x.replace(":rw", "") for x in runtime]
314
315 runtime.append(u"--workdir=%s" % (
316 docker_windows_path_adjust(self.builder.outdir)))
317 if not user_space_docker_cmd:
318
319 if not runtimeContext.no_read_only:
320 runtime.append(u"--read-only=true")
321
322 if self.networkaccess:
323 if runtimeContext.custom_net:
324 runtime.append(u"--net={0}".format(runtimeContext.custom_net))
325 else:
326 runtime.append(u"--net=none")
327
328 if self.stdout is not None:
329 runtime.append("--log-driver=none")
330
331 euid, egid = docker_vm_id()
332 if not onWindows():
333 # MS Windows does not have getuid() or geteuid() functions
334 euid, egid = euid or os.geteuid(), egid or os.getgid()
335
336 if runtimeContext.no_match_user is False \
337 and (euid is not None and egid is not None):
338 runtime.append(u"--user=%d:%d" % (euid, egid))
339
340 if runtimeContext.rm_container:
341 runtime.append(u"--rm")
342
343 runtime.append(u"--env=TMPDIR=/tmp")
344
345 # spec currently says "HOME must be set to the designated output
346 # directory." but spec might change to designated temp directory.
347 # runtime.append("--env=HOME=/tmp")
348 runtime.append(u"--env=HOME=%s" % self.builder.outdir)
349
350 # add parameters to docker to write a container ID file
351 if runtimeContext.user_space_docker_cmd is None:
352 if runtimeContext.cidfile_dir:
353 cidfile_dir = runtimeContext.cidfile_dir
354 if not os.path.exists(str(cidfile_dir)):
355 _logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
356 "directory doesn't exist, please create it first")
357 exit(2)
358 if not os.path.isdir(cidfile_dir):
359 _logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
360 cidfile_dir + " is not a directory, "
361 "please check it first")
362 exit(2)
363 else:
364 tmp_dir, tmp_prefix = os.path.split(runtimeContext.tmpdir_prefix)
365 cidfile_dir = tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
366
367 cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid"
368 if runtimeContext.cidfile_prefix is not None:
369 cidfile_name = str(runtimeContext.cidfile_prefix + "-" + cidfile_name)
370 cidfile_path = os.path.join(cidfile_dir, cidfile_name)
371 runtime.append(u"--cidfile=%s" % cidfile_path)
372 else:
373 cidfile_path = None
374 for key, value in self.environment.items():
375 runtime.append(u"--env=%s=%s" % (key, value))
376
377 if runtimeContext.strict_memory_limit and not user_space_docker_cmd:
378 runtime.append("--memory=%dm" % self.builder.resources["ram"])
379 elif not user_space_docker_cmd:
380 res_req, _ = self.builder.get_requirement("ResourceRequirement")
381 if res_req is not None and ("ramMin" in res_req or "ramMax" is res_req):
382 _logger.warning(
383 u"[job %s] Skipping Docker software container '--memory' limit "
384 "despite presence of ResourceRequirement with ramMin "
385 "and/or ramMax setting. Consider running with "
386 "--strict-memory-limit for increased portability "
387 "assurance.", self.name)
388
389 return runtime, cidfile_path