comparison env/lib/python3.7/site-packages/cwltool/pathmapper.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 from __future__ import absolute_import
2
3 import collections
4 import logging
5 import os
6 import stat
7 import uuid
8 from functools import partial # pylint: disable=unused-import
9 from tempfile import NamedTemporaryFile
10 from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence,
11 Optional, Set, Tuple, Union)
12
13 import requests
14 from cachecontrol import CacheControl
15 from cachecontrol.caches import FileCache
16 from schema_salad import validate
17 from schema_salad.ref_resolver import uri_file_path
18 from schema_salad.sourceline import SourceLine
19 from six.moves import urllib
20 from typing_extensions import Text # pylint: disable=unused-import
21 # move to a regular typing import when Python 3.3-3.6 is no longer supported
22
23 from .loghandler import _logger
24 from .stdfsaccess import StdFsAccess, abspath # pylint: disable=unused-import
25 from .utils import Directory # pylint: disable=unused-import
26 from .utils import convert_pathsep_to_unix, visit_class
27
28
29 CONTENT_LIMIT = 64 * 1024
30
31 MapperEnt = collections.namedtuple("MapperEnt", ["resolved", "target", "type", "staged"])
32
33
34 def adjustFiles(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
35 """Apply a mapping function to each File path in the object `rec`."""
36 if isinstance(rec, MutableMapping):
37 if rec.get("class") == "File":
38 rec["path"] = op(rec["path"])
39 for d in rec:
40 adjustFiles(rec[d], op)
41 if isinstance(rec, MutableSequence):
42 for d in rec:
43 adjustFiles(d, op)
44
45
46 def adjustFileObjs(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
47 """Apply an update function to each File object in the object `rec`."""
48 visit_class(rec, ("File",), op)
49
50 def adjustDirObjs(rec, op):
51 # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
52 """Apply an update function to each Directory object in the object `rec`."""
53 visit_class(rec, ("Directory",), op)
54
55 def normalizeFilesDirs(job):
56 # type: (Optional[Union[List[Dict[Text, Any]], MutableMapping[Text, Any], Directory]]) -> None
57 def addLocation(d): # type: (Dict[Text, Any]) -> None
58 if "location" not in d:
59 if d["class"] == "File" and ("contents" not in d):
60 raise validate.ValidationException("Anonymous file object must have 'contents' and 'basename' fields.")
61 if d["class"] == "Directory" and ("listing" not in d or "basename" not in d):
62 raise validate.ValidationException(
63 "Anonymous directory object must have 'listing' and 'basename' fields.")
64 d["location"] = "_:" + Text(uuid.uuid4())
65 if "basename" not in d:
66 d["basename"] = d["location"][2:]
67
68 parse = urllib.parse.urlparse(d["location"])
69 path = parse.path
70 # strip trailing slash
71 if path.endswith("/"):
72 if d["class"] != "Directory":
73 raise validate.ValidationException(
74 "location '%s' ends with '/' but is not a Directory" % d["location"])
75 path = path.rstrip("/")
76 d["location"] = urllib.parse.urlunparse((parse.scheme, parse.netloc, path, parse.params, parse.query, parse.fragment))
77
78 if not d.get("basename"):
79 if path.startswith("_:"):
80 d["basename"] = Text(path[2:])
81 else:
82 d["basename"] = Text(os.path.basename(urllib.request.url2pathname(path)))
83
84 if d["class"] == "File":
85 nr, ne = os.path.splitext(d["basename"])
86 if d.get("nameroot") != nr:
87 d["nameroot"] = Text(nr)
88 if d.get("nameext") != ne:
89 d["nameext"] = Text(ne)
90
91 contents = d.get("contents")
92 if contents and len(contents) > CONTENT_LIMIT:
93 if len(contents) > CONTENT_LIMIT:
94 raise validate.ValidationException("File object contains contents with number of bytes that exceeds CONTENT_LIMIT length (%d)" % CONTENT_LIMIT)
95
96 visit_class(job, ("File", "Directory"), addLocation)
97
98
99 def dedup(listing): # type: (List[Any]) -> List[Any]
100 marksub = set()
101
102 def mark(d): # type: (Dict[Text, Text]) -> None
103 marksub.add(d["location"])
104
105 for l in listing:
106 if l["class"] == "Directory":
107 for e in l.get("listing", []):
108 adjustFileObjs(e, mark)
109 adjustDirObjs(e, mark)
110
111 dd = []
112 markdup = set() # type: Set[Text]
113 for r in listing:
114 if r["location"] not in marksub and r["location"] not in markdup:
115 dd.append(r)
116 markdup.add(r["location"])
117
118 return dd
119
120 def get_listing(fs_access, rec, recursive=True):
121 # type: (StdFsAccess, MutableMapping[Text, Any], bool) -> None
122 if rec.get("class") != "Directory":
123 finddirs = [] # type: List[MutableMapping[Text, Text]]
124 visit_class(rec, ("Directory",), finddirs.append)
125 for f in finddirs:
126 get_listing(fs_access, f, recursive=recursive)
127 return
128 if "listing" in rec:
129 return
130 listing = []
131 loc = rec["location"]
132 for ld in fs_access.listdir(loc):
133 parse = urllib.parse.urlparse(ld)
134 bn = os.path.basename(urllib.request.url2pathname(parse.path))
135 if fs_access.isdir(ld):
136 ent = {u"class": u"Directory",
137 u"location": ld,
138 u"basename": bn}
139 if recursive:
140 get_listing(fs_access, ent, recursive)
141 listing.append(ent)
142 else:
143 listing.append({"class": "File", "location": ld, "basename": bn})
144 rec["listing"] = listing
145
146 def trim_listing(obj): # type: (Dict[Text, Any]) -> None
147 """
148 Remove 'listing' field from Directory objects that are file references.
149
150 It redundant and potentially expensive to pass fully enumerated Directory
151 objects around if not explicitly needed, so delete the 'listing' field when
152 it is safe to do so.
153 """
154 if obj.get("location", "").startswith("file://") and "listing" in obj:
155 del obj["listing"]
156
157 # Download http Files
158 def downloadHttpFile(httpurl):
159 # type: (Text) -> Text
160 cache_session = None
161 if "XDG_CACHE_HOME" in os.environ:
162 directory = os.environ["XDG_CACHE_HOME"]
163 elif "HOME" in os.environ:
164 directory = os.environ["HOME"]
165 else:
166 directory = os.path.expanduser('~')
167
168 cache_session = CacheControl(
169 requests.Session(),
170 cache=FileCache(
171 os.path.join(directory, ".cache", "cwltool")))
172
173 r = cache_session.get(httpurl, stream=True)
174 with NamedTemporaryFile(mode='wb', delete=False) as f:
175 for chunk in r.iter_content(chunk_size=16384):
176 if chunk: # filter out keep-alive new chunks
177 f.write(chunk)
178 r.close()
179 return str(f.name)
180
181 def ensure_writable(path): # type: (Text) -> None
182 if os.path.isdir(path):
183 for root, dirs, files in os.walk(path):
184 for name in files:
185 j = os.path.join(root, name)
186 st = os.stat(j)
187 mode = stat.S_IMODE(st.st_mode)
188 os.chmod(j, mode | stat.S_IWUSR)
189 for name in dirs:
190 j = os.path.join(root, name)
191 st = os.stat(j)
192 mode = stat.S_IMODE(st.st_mode)
193 os.chmod(j, mode | stat.S_IWUSR)
194 else:
195 st = os.stat(path)
196 mode = stat.S_IMODE(st.st_mode)
197 os.chmod(path, mode | stat.S_IWUSR)
198
199 def ensure_non_writable(path): # type: (Text) -> None
200 if os.path.isdir(path):
201 for root, dirs, files in os.walk(path):
202 for name in files:
203 j = os.path.join(root, name)
204 st = os.stat(j)
205 mode = stat.S_IMODE(st.st_mode)
206 os.chmod(j,
207 mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
208 for name in dirs:
209 j = os.path.join(root, name)
210 st = os.stat(j)
211 mode = stat.S_IMODE(st.st_mode)
212 os.chmod(j,
213 mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
214 else:
215 st = os.stat(path)
216 mode = stat.S_IMODE(st.st_mode)
217 os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
218
219 class PathMapper(object):
220 """
221 Mapping of files from relative path provided in the file to a tuple.
222
223 (absolute local path, absolute container path)
224
225 The tao of PathMapper:
226
227 The initializer takes a list of File and Directory objects, a base
228 directory (for resolving relative references) and a staging directory
229 (where the files are mapped to).
230
231 The purpose of the setup method is to determine where each File or
232 Directory should be placed on the target file system (relative to
233 stagedir).
234
235 If separatedirs=True, unrelated files will be isolated in their own
236 directories under stagedir. If separatedirs=False, files and directories
237 will all be placed in stagedir (with the possibility for name
238 collisions...)
239
240 The path map maps the "location" of the input Files and Directory objects
241 to a tuple (resolved, target, type). The "resolved" field is the "real"
242 path on the local file system (after resolving relative paths and
243 traversing symlinks). The "target" is the path on the target file system
244 (under stagedir). The type is the object type (one of File, Directory,
245 CreateFile, WritableFile, CreateWritableFile).
246
247 The latter three (CreateFile, WritableFile, CreateWritableFile) are used by
248 InitialWorkDirRequirement to indicate files that are generated on the fly
249 (CreateFile and CreateWritableFile, in this case "resolved" holds the file
250 contents instead of the path because they file doesn't exist) or copied
251 into the output directory so they can be opened for update ("r+" or "a")
252 (WritableFile and CreateWritableFile).
253
254 """
255
256 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
257 # type: (List[Any], Text, Text, bool) -> None
258 """Initialize the PathMapper."""
259 self._pathmap = {} # type: Dict[Text, MapperEnt]
260 self.stagedir = stagedir
261 self.separateDirs = separateDirs
262 self.setup(dedup(referenced_files), basedir)
263
264 def visitlisting(self, listing, stagedir, basedir, copy=False, staged=False):
265 # type: (List[Dict[Text, Any]], Text, Text, bool, bool) -> None
266 for ld in listing:
267 self.visit(ld, stagedir, basedir, copy=ld.get("writable", copy), staged=staged)
268
269 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
270 # type: (Dict[Text, Any], Text, Text, bool, bool) -> None
271 tgt = convert_pathsep_to_unix(
272 os.path.join(stagedir, obj["basename"]))
273 if obj["location"] in self._pathmap:
274 return
275 if obj["class"] == "Directory":
276 if obj["location"].startswith("file://"):
277 resolved = uri_file_path(obj["location"])
278 else:
279 resolved = obj["location"]
280 self._pathmap[obj["location"]] = MapperEnt(resolved, tgt, "WritableDirectory" if copy else "Directory", staged)
281 if obj["location"].startswith("file://"):
282 staged = False
283 self.visitlisting(obj.get("listing", []), tgt, basedir, copy=copy, staged=staged)
284 elif obj["class"] == "File":
285 path = obj["location"]
286 ab = abspath(path, basedir)
287 if "contents" in obj and obj["location"].startswith("_:"):
288 self._pathmap[obj["location"]] = MapperEnt(
289 obj["contents"], tgt,
290 "CreateWritableFile" if copy else "CreateFile", staged)
291 else:
292 with SourceLine(obj, "location", validate.ValidationException, _logger.isEnabledFor(logging.DEBUG)):
293 deref = ab
294 if urllib.parse.urlsplit(deref).scheme in ['http', 'https']:
295 deref = downloadHttpFile(path)
296 else:
297 # Dereference symbolic links
298 st = os.lstat(deref)
299 while stat.S_ISLNK(st.st_mode):
300 rl = os.readlink(deref)
301 deref = rl if os.path.isabs(rl) else os.path.join(
302 os.path.dirname(deref), rl)
303 st = os.lstat(deref)
304
305 self._pathmap[path] = MapperEnt(
306 deref, tgt, "WritableFile" if copy else "File", staged)
307 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir,
308 copy=copy, staged=staged)
309
310 def setup(self, referenced_files, basedir):
311 # type: (List[Any], Text) -> None
312
313 # Go through each file and set the target to its own directory along
314 # with any secondary files.
315 stagedir = self.stagedir
316 for fob in referenced_files:
317 if self.separateDirs:
318 stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4())
319 self.visit(fob, stagedir, basedir, copy=fob.get("writable"), staged=True)
320
321 def mapper(self, src): # type: (Text) -> MapperEnt
322 if u"#" in src:
323 i = src.index(u"#")
324 p = self._pathmap[src[:i]]
325 return MapperEnt(p.resolved, p.target + src[i:], p.type, p.staged)
326 return self._pathmap[src]
327
328 def files(self): # type: () -> List[Text]
329 return list(self._pathmap.keys())
330
331 def items(self): # type: () -> List[Tuple[Text, MapperEnt]]
332 return list(self._pathmap.items())
333
334 def reversemap(self,
335 target # type: Text
336 ): # type: (...) -> Optional[Tuple[Text, Text]]
337 for k, v in self._pathmap.items():
338 if v[1] == target:
339 return (k, v[0])
340 return None
341
342 def update(self, key, resolved, target, ctype, stage):
343 # type: (Text, Text, Text, Text, bool) -> MapperEnt
344 m = MapperEnt(resolved, target, ctype, stage)
345 self._pathmap[key] = m
346 return m
347
348 def __contains__(self, key): # type: (Text) -> bool
349 """Test for the presence of the given relative path in this mapper."""
350 return key in self._pathmap