Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/pathmapper.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
comparison
equal
deleted
inserted
replaced
4:79f47841a781 | 5:9b1c78e6ba9c |
---|---|
1 from __future__ import absolute_import | |
2 | |
3 import collections | |
4 import logging | |
5 import os | |
6 import stat | |
7 import uuid | |
8 from functools import partial # pylint: disable=unused-import | |
9 from tempfile import NamedTemporaryFile | |
10 from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence, | |
11 Optional, Set, Tuple, Union) | |
12 | |
13 import requests | |
14 from cachecontrol import CacheControl | |
15 from cachecontrol.caches import FileCache | |
16 from schema_salad import validate | |
17 from schema_salad.ref_resolver import uri_file_path | |
18 from schema_salad.sourceline import SourceLine | |
19 from six.moves import urllib | |
20 from typing_extensions import Text # pylint: disable=unused-import | |
21 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
22 | |
23 from .loghandler import _logger | |
24 from .stdfsaccess import StdFsAccess, abspath # pylint: disable=unused-import | |
25 from .utils import Directory # pylint: disable=unused-import | |
26 from .utils import convert_pathsep_to_unix, visit_class | |
27 | |
28 | |
29 CONTENT_LIMIT = 64 * 1024 | |
30 | |
31 MapperEnt = collections.namedtuple("MapperEnt", ["resolved", "target", "type", "staged"]) | |
32 | |
33 | |
34 def adjustFiles(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None | |
35 """Apply a mapping function to each File path in the object `rec`.""" | |
36 if isinstance(rec, MutableMapping): | |
37 if rec.get("class") == "File": | |
38 rec["path"] = op(rec["path"]) | |
39 for d in rec: | |
40 adjustFiles(rec[d], op) | |
41 if isinstance(rec, MutableSequence): | |
42 for d in rec: | |
43 adjustFiles(d, op) | |
44 | |
45 | |
46 def adjustFileObjs(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None | |
47 """Apply an update function to each File object in the object `rec`.""" | |
48 visit_class(rec, ("File",), op) | |
49 | |
50 def adjustDirObjs(rec, op): | |
51 # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None | |
52 """Apply an update function to each Directory object in the object `rec`.""" | |
53 visit_class(rec, ("Directory",), op) | |
54 | |
55 def normalizeFilesDirs(job): | |
56 # type: (Optional[Union[List[Dict[Text, Any]], MutableMapping[Text, Any], Directory]]) -> None | |
57 def addLocation(d): # type: (Dict[Text, Any]) -> None | |
58 if "location" not in d: | |
59 if d["class"] == "File" and ("contents" not in d): | |
60 raise validate.ValidationException("Anonymous file object must have 'contents' and 'basename' fields.") | |
61 if d["class"] == "Directory" and ("listing" not in d or "basename" not in d): | |
62 raise validate.ValidationException( | |
63 "Anonymous directory object must have 'listing' and 'basename' fields.") | |
64 d["location"] = "_:" + Text(uuid.uuid4()) | |
65 if "basename" not in d: | |
66 d["basename"] = d["location"][2:] | |
67 | |
68 parse = urllib.parse.urlparse(d["location"]) | |
69 path = parse.path | |
70 # strip trailing slash | |
71 if path.endswith("/"): | |
72 if d["class"] != "Directory": | |
73 raise validate.ValidationException( | |
74 "location '%s' ends with '/' but is not a Directory" % d["location"]) | |
75 path = path.rstrip("/") | |
76 d["location"] = urllib.parse.urlunparse((parse.scheme, parse.netloc, path, parse.params, parse.query, parse.fragment)) | |
77 | |
78 if not d.get("basename"): | |
79 if path.startswith("_:"): | |
80 d["basename"] = Text(path[2:]) | |
81 else: | |
82 d["basename"] = Text(os.path.basename(urllib.request.url2pathname(path))) | |
83 | |
84 if d["class"] == "File": | |
85 nr, ne = os.path.splitext(d["basename"]) | |
86 if d.get("nameroot") != nr: | |
87 d["nameroot"] = Text(nr) | |
88 if d.get("nameext") != ne: | |
89 d["nameext"] = Text(ne) | |
90 | |
91 contents = d.get("contents") | |
92 if contents and len(contents) > CONTENT_LIMIT: | |
93 if len(contents) > CONTENT_LIMIT: | |
94 raise validate.ValidationException("File object contains contents with number of bytes that exceeds CONTENT_LIMIT length (%d)" % CONTENT_LIMIT) | |
95 | |
96 visit_class(job, ("File", "Directory"), addLocation) | |
97 | |
98 | |
99 def dedup(listing): # type: (List[Any]) -> List[Any] | |
100 marksub = set() | |
101 | |
102 def mark(d): # type: (Dict[Text, Text]) -> None | |
103 marksub.add(d["location"]) | |
104 | |
105 for l in listing: | |
106 if l["class"] == "Directory": | |
107 for e in l.get("listing", []): | |
108 adjustFileObjs(e, mark) | |
109 adjustDirObjs(e, mark) | |
110 | |
111 dd = [] | |
112 markdup = set() # type: Set[Text] | |
113 for r in listing: | |
114 if r["location"] not in marksub and r["location"] not in markdup: | |
115 dd.append(r) | |
116 markdup.add(r["location"]) | |
117 | |
118 return dd | |
119 | |
120 def get_listing(fs_access, rec, recursive=True): | |
121 # type: (StdFsAccess, MutableMapping[Text, Any], bool) -> None | |
122 if rec.get("class") != "Directory": | |
123 finddirs = [] # type: List[MutableMapping[Text, Text]] | |
124 visit_class(rec, ("Directory",), finddirs.append) | |
125 for f in finddirs: | |
126 get_listing(fs_access, f, recursive=recursive) | |
127 return | |
128 if "listing" in rec: | |
129 return | |
130 listing = [] | |
131 loc = rec["location"] | |
132 for ld in fs_access.listdir(loc): | |
133 parse = urllib.parse.urlparse(ld) | |
134 bn = os.path.basename(urllib.request.url2pathname(parse.path)) | |
135 if fs_access.isdir(ld): | |
136 ent = {u"class": u"Directory", | |
137 u"location": ld, | |
138 u"basename": bn} | |
139 if recursive: | |
140 get_listing(fs_access, ent, recursive) | |
141 listing.append(ent) | |
142 else: | |
143 listing.append({"class": "File", "location": ld, "basename": bn}) | |
144 rec["listing"] = listing | |
145 | |
146 def trim_listing(obj): # type: (Dict[Text, Any]) -> None | |
147 """ | |
148 Remove 'listing' field from Directory objects that are file references. | |
149 | |
150 It redundant and potentially expensive to pass fully enumerated Directory | |
151 objects around if not explicitly needed, so delete the 'listing' field when | |
152 it is safe to do so. | |
153 """ | |
154 if obj.get("location", "").startswith("file://") and "listing" in obj: | |
155 del obj["listing"] | |
156 | |
157 # Download http Files | |
158 def downloadHttpFile(httpurl): | |
159 # type: (Text) -> Text | |
160 cache_session = None | |
161 if "XDG_CACHE_HOME" in os.environ: | |
162 directory = os.environ["XDG_CACHE_HOME"] | |
163 elif "HOME" in os.environ: | |
164 directory = os.environ["HOME"] | |
165 else: | |
166 directory = os.path.expanduser('~') | |
167 | |
168 cache_session = CacheControl( | |
169 requests.Session(), | |
170 cache=FileCache( | |
171 os.path.join(directory, ".cache", "cwltool"))) | |
172 | |
173 r = cache_session.get(httpurl, stream=True) | |
174 with NamedTemporaryFile(mode='wb', delete=False) as f: | |
175 for chunk in r.iter_content(chunk_size=16384): | |
176 if chunk: # filter out keep-alive new chunks | |
177 f.write(chunk) | |
178 r.close() | |
179 return str(f.name) | |
180 | |
181 def ensure_writable(path): # type: (Text) -> None | |
182 if os.path.isdir(path): | |
183 for root, dirs, files in os.walk(path): | |
184 for name in files: | |
185 j = os.path.join(root, name) | |
186 st = os.stat(j) | |
187 mode = stat.S_IMODE(st.st_mode) | |
188 os.chmod(j, mode | stat.S_IWUSR) | |
189 for name in dirs: | |
190 j = os.path.join(root, name) | |
191 st = os.stat(j) | |
192 mode = stat.S_IMODE(st.st_mode) | |
193 os.chmod(j, mode | stat.S_IWUSR) | |
194 else: | |
195 st = os.stat(path) | |
196 mode = stat.S_IMODE(st.st_mode) | |
197 os.chmod(path, mode | stat.S_IWUSR) | |
198 | |
199 def ensure_non_writable(path): # type: (Text) -> None | |
200 if os.path.isdir(path): | |
201 for root, dirs, files in os.walk(path): | |
202 for name in files: | |
203 j = os.path.join(root, name) | |
204 st = os.stat(j) | |
205 mode = stat.S_IMODE(st.st_mode) | |
206 os.chmod(j, | |
207 mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) | |
208 for name in dirs: | |
209 j = os.path.join(root, name) | |
210 st = os.stat(j) | |
211 mode = stat.S_IMODE(st.st_mode) | |
212 os.chmod(j, | |
213 mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) | |
214 else: | |
215 st = os.stat(path) | |
216 mode = stat.S_IMODE(st.st_mode) | |
217 os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) | |
218 | |
219 class PathMapper(object): | |
220 """ | |
221 Mapping of files from relative path provided in the file to a tuple. | |
222 | |
223 (absolute local path, absolute container path) | |
224 | |
225 The tao of PathMapper: | |
226 | |
227 The initializer takes a list of File and Directory objects, a base | |
228 directory (for resolving relative references) and a staging directory | |
229 (where the files are mapped to). | |
230 | |
231 The purpose of the setup method is to determine where each File or | |
232 Directory should be placed on the target file system (relative to | |
233 stagedir). | |
234 | |
235 If separatedirs=True, unrelated files will be isolated in their own | |
236 directories under stagedir. If separatedirs=False, files and directories | |
237 will all be placed in stagedir (with the possibility for name | |
238 collisions...) | |
239 | |
240 The path map maps the "location" of the input Files and Directory objects | |
241 to a tuple (resolved, target, type). The "resolved" field is the "real" | |
242 path on the local file system (after resolving relative paths and | |
243 traversing symlinks). The "target" is the path on the target file system | |
244 (under stagedir). The type is the object type (one of File, Directory, | |
245 CreateFile, WritableFile, CreateWritableFile). | |
246 | |
247 The latter three (CreateFile, WritableFile, CreateWritableFile) are used by | |
248 InitialWorkDirRequirement to indicate files that are generated on the fly | |
249 (CreateFile and CreateWritableFile, in this case "resolved" holds the file | |
250 contents instead of the path because they file doesn't exist) or copied | |
251 into the output directory so they can be opened for update ("r+" or "a") | |
252 (WritableFile and CreateWritableFile). | |
253 | |
254 """ | |
255 | |
256 def __init__(self, referenced_files, basedir, stagedir, separateDirs=True): | |
257 # type: (List[Any], Text, Text, bool) -> None | |
258 """Initialize the PathMapper.""" | |
259 self._pathmap = {} # type: Dict[Text, MapperEnt] | |
260 self.stagedir = stagedir | |
261 self.separateDirs = separateDirs | |
262 self.setup(dedup(referenced_files), basedir) | |
263 | |
264 def visitlisting(self, listing, stagedir, basedir, copy=False, staged=False): | |
265 # type: (List[Dict[Text, Any]], Text, Text, bool, bool) -> None | |
266 for ld in listing: | |
267 self.visit(ld, stagedir, basedir, copy=ld.get("writable", copy), staged=staged) | |
268 | |
269 def visit(self, obj, stagedir, basedir, copy=False, staged=False): | |
270 # type: (Dict[Text, Any], Text, Text, bool, bool) -> None | |
271 tgt = convert_pathsep_to_unix( | |
272 os.path.join(stagedir, obj["basename"])) | |
273 if obj["location"] in self._pathmap: | |
274 return | |
275 if obj["class"] == "Directory": | |
276 if obj["location"].startswith("file://"): | |
277 resolved = uri_file_path(obj["location"]) | |
278 else: | |
279 resolved = obj["location"] | |
280 self._pathmap[obj["location"]] = MapperEnt(resolved, tgt, "WritableDirectory" if copy else "Directory", staged) | |
281 if obj["location"].startswith("file://"): | |
282 staged = False | |
283 self.visitlisting(obj.get("listing", []), tgt, basedir, copy=copy, staged=staged) | |
284 elif obj["class"] == "File": | |
285 path = obj["location"] | |
286 ab = abspath(path, basedir) | |
287 if "contents" in obj and obj["location"].startswith("_:"): | |
288 self._pathmap[obj["location"]] = MapperEnt( | |
289 obj["contents"], tgt, | |
290 "CreateWritableFile" if copy else "CreateFile", staged) | |
291 else: | |
292 with SourceLine(obj, "location", validate.ValidationException, _logger.isEnabledFor(logging.DEBUG)): | |
293 deref = ab | |
294 if urllib.parse.urlsplit(deref).scheme in ['http', 'https']: | |
295 deref = downloadHttpFile(path) | |
296 else: | |
297 # Dereference symbolic links | |
298 st = os.lstat(deref) | |
299 while stat.S_ISLNK(st.st_mode): | |
300 rl = os.readlink(deref) | |
301 deref = rl if os.path.isabs(rl) else os.path.join( | |
302 os.path.dirname(deref), rl) | |
303 st = os.lstat(deref) | |
304 | |
305 self._pathmap[path] = MapperEnt( | |
306 deref, tgt, "WritableFile" if copy else "File", staged) | |
307 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir, | |
308 copy=copy, staged=staged) | |
309 | |
310 def setup(self, referenced_files, basedir): | |
311 # type: (List[Any], Text) -> None | |
312 | |
313 # Go through each file and set the target to its own directory along | |
314 # with any secondary files. | |
315 stagedir = self.stagedir | |
316 for fob in referenced_files: | |
317 if self.separateDirs: | |
318 stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) | |
319 self.visit(fob, stagedir, basedir, copy=fob.get("writable"), staged=True) | |
320 | |
321 def mapper(self, src): # type: (Text) -> MapperEnt | |
322 if u"#" in src: | |
323 i = src.index(u"#") | |
324 p = self._pathmap[src[:i]] | |
325 return MapperEnt(p.resolved, p.target + src[i:], p.type, p.staged) | |
326 return self._pathmap[src] | |
327 | |
328 def files(self): # type: () -> List[Text] | |
329 return list(self._pathmap.keys()) | |
330 | |
331 def items(self): # type: () -> List[Tuple[Text, MapperEnt]] | |
332 return list(self._pathmap.items()) | |
333 | |
334 def reversemap(self, | |
335 target # type: Text | |
336 ): # type: (...) -> Optional[Tuple[Text, Text]] | |
337 for k, v in self._pathmap.items(): | |
338 if v[1] == target: | |
339 return (k, v[0]) | |
340 return None | |
341 | |
342 def update(self, key, resolved, target, ctype, stage): | |
343 # type: (Text, Text, Text, Text, bool) -> MapperEnt | |
344 m = MapperEnt(resolved, target, ctype, stage) | |
345 self._pathmap[key] = m | |
346 return m | |
347 | |
348 def __contains__(self, key): # type: (Text) -> bool | |
349 """Test for the presence of the given relative path in this mapper.""" | |
350 return key in self._pathmap |