Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/pathmapper.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
from __future__ import absolute_import import collections import logging import os import stat import uuid from functools import partial # pylint: disable=unused-import from tempfile import NamedTemporaryFile from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence, Optional, Set, Tuple, Union) import requests from cachecontrol import CacheControl from cachecontrol.caches import FileCache from schema_salad import validate from schema_salad.ref_resolver import uri_file_path from schema_salad.sourceline import SourceLine from six.moves import urllib from typing_extensions import Text # pylint: disable=unused-import # move to a regular typing import when Python 3.3-3.6 is no longer supported from .loghandler import _logger from .stdfsaccess import StdFsAccess, abspath # pylint: disable=unused-import from .utils import Directory # pylint: disable=unused-import from .utils import convert_pathsep_to_unix, visit_class CONTENT_LIMIT = 64 * 1024 MapperEnt = collections.namedtuple("MapperEnt", ["resolved", "target", "type", "staged"]) def adjustFiles(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None """Apply a mapping function to each File path in the object `rec`.""" if isinstance(rec, MutableMapping): if rec.get("class") == "File": rec["path"] = op(rec["path"]) for d in rec: adjustFiles(rec[d], op) if isinstance(rec, MutableSequence): for d in rec: adjustFiles(d, op) def adjustFileObjs(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None """Apply an update function to each File object in the object `rec`.""" visit_class(rec, ("File",), op) def adjustDirObjs(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None """Apply an update function to each Directory object in the object `rec`.""" visit_class(rec, ("Directory",), op) def normalizeFilesDirs(job): # type: (Optional[Union[List[Dict[Text, Any]], MutableMapping[Text, Any], Directory]]) -> None def addLocation(d): # type: (Dict[Text, Any]) -> None if "location" not in d: if d["class"] == "File" and ("contents" not in d): raise validate.ValidationException("Anonymous file object must have 'contents' and 'basename' fields.") if d["class"] == "Directory" and ("listing" not in d or "basename" not in d): raise validate.ValidationException( "Anonymous directory object must have 'listing' and 'basename' fields.") d["location"] = "_:" + Text(uuid.uuid4()) if "basename" not in d: d["basename"] = d["location"][2:] parse = urllib.parse.urlparse(d["location"]) path = parse.path # strip trailing slash if path.endswith("/"): if d["class"] != "Directory": raise validate.ValidationException( "location '%s' ends with '/' but is not a Directory" % d["location"]) path = path.rstrip("/") d["location"] = urllib.parse.urlunparse((parse.scheme, parse.netloc, path, parse.params, parse.query, parse.fragment)) if not d.get("basename"): if path.startswith("_:"): d["basename"] = Text(path[2:]) else: d["basename"] = Text(os.path.basename(urllib.request.url2pathname(path))) if d["class"] == "File": nr, ne = os.path.splitext(d["basename"]) if d.get("nameroot") != nr: d["nameroot"] = Text(nr) if d.get("nameext") != ne: d["nameext"] = Text(ne) contents = d.get("contents") if contents and len(contents) > CONTENT_LIMIT: if len(contents) > CONTENT_LIMIT: raise validate.ValidationException("File object contains contents with number of bytes that exceeds CONTENT_LIMIT length (%d)" % CONTENT_LIMIT) visit_class(job, ("File", "Directory"), addLocation) def dedup(listing): # type: (List[Any]) -> List[Any] marksub = set() def mark(d): # type: (Dict[Text, Text]) -> None marksub.add(d["location"]) for l in listing: if l["class"] == "Directory": for e in l.get("listing", []): adjustFileObjs(e, mark) adjustDirObjs(e, mark) dd = [] markdup = set() # type: Set[Text] for r in listing: if r["location"] not in marksub and r["location"] not in markdup: dd.append(r) markdup.add(r["location"]) return dd def get_listing(fs_access, rec, recursive=True): # type: (StdFsAccess, MutableMapping[Text, Any], bool) -> None if rec.get("class") != "Directory": finddirs = [] # type: List[MutableMapping[Text, Text]] visit_class(rec, ("Directory",), finddirs.append) for f in finddirs: get_listing(fs_access, f, recursive=recursive) return if "listing" in rec: return listing = [] loc = rec["location"] for ld in fs_access.listdir(loc): parse = urllib.parse.urlparse(ld) bn = os.path.basename(urllib.request.url2pathname(parse.path)) if fs_access.isdir(ld): ent = {u"class": u"Directory", u"location": ld, u"basename": bn} if recursive: get_listing(fs_access, ent, recursive) listing.append(ent) else: listing.append({"class": "File", "location": ld, "basename": bn}) rec["listing"] = listing def trim_listing(obj): # type: (Dict[Text, Any]) -> None """ Remove 'listing' field from Directory objects that are file references. It redundant and potentially expensive to pass fully enumerated Directory objects around if not explicitly needed, so delete the 'listing' field when it is safe to do so. """ if obj.get("location", "").startswith("file://") and "listing" in obj: del obj["listing"] # Download http Files def downloadHttpFile(httpurl): # type: (Text) -> Text cache_session = None if "XDG_CACHE_HOME" in os.environ: directory = os.environ["XDG_CACHE_HOME"] elif "HOME" in os.environ: directory = os.environ["HOME"] else: directory = os.path.expanduser('~') cache_session = CacheControl( requests.Session(), cache=FileCache( os.path.join(directory, ".cache", "cwltool"))) r = cache_session.get(httpurl, stream=True) with NamedTemporaryFile(mode='wb', delete=False) as f: for chunk in r.iter_content(chunk_size=16384): if chunk: # filter out keep-alive new chunks f.write(chunk) r.close() return str(f.name) def ensure_writable(path): # type: (Text) -> None if os.path.isdir(path): for root, dirs, files in os.walk(path): for name in files: j = os.path.join(root, name) st = os.stat(j) mode = stat.S_IMODE(st.st_mode) os.chmod(j, mode | stat.S_IWUSR) for name in dirs: j = os.path.join(root, name) st = os.stat(j) mode = stat.S_IMODE(st.st_mode) os.chmod(j, mode | stat.S_IWUSR) else: st = os.stat(path) mode = stat.S_IMODE(st.st_mode) os.chmod(path, mode | stat.S_IWUSR) def ensure_non_writable(path): # type: (Text) -> None if os.path.isdir(path): for root, dirs, files in os.walk(path): for name in files: j = os.path.join(root, name) st = os.stat(j) mode = stat.S_IMODE(st.st_mode) os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) for name in dirs: j = os.path.join(root, name) st = os.stat(j) mode = stat.S_IMODE(st.st_mode) os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) else: st = os.stat(path) mode = stat.S_IMODE(st.st_mode) os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) class PathMapper(object): """ Mapping of files from relative path provided in the file to a tuple. (absolute local path, absolute container path) The tao of PathMapper: The initializer takes a list of File and Directory objects, a base directory (for resolving relative references) and a staging directory (where the files are mapped to). The purpose of the setup method is to determine where each File or Directory should be placed on the target file system (relative to stagedir). If separatedirs=True, unrelated files will be isolated in their own directories under stagedir. If separatedirs=False, files and directories will all be placed in stagedir (with the possibility for name collisions...) The path map maps the "location" of the input Files and Directory objects to a tuple (resolved, target, type). The "resolved" field is the "real" path on the local file system (after resolving relative paths and traversing symlinks). The "target" is the path on the target file system (under stagedir). The type is the object type (one of File, Directory, CreateFile, WritableFile, CreateWritableFile). The latter three (CreateFile, WritableFile, CreateWritableFile) are used by InitialWorkDirRequirement to indicate files that are generated on the fly (CreateFile and CreateWritableFile, in this case "resolved" holds the file contents instead of the path because they file doesn't exist) or copied into the output directory so they can be opened for update ("r+" or "a") (WritableFile and CreateWritableFile). """ def __init__(self, referenced_files, basedir, stagedir, separateDirs=True): # type: (List[Any], Text, Text, bool) -> None """Initialize the PathMapper.""" self._pathmap = {} # type: Dict[Text, MapperEnt] self.stagedir = stagedir self.separateDirs = separateDirs self.setup(dedup(referenced_files), basedir) def visitlisting(self, listing, stagedir, basedir, copy=False, staged=False): # type: (List[Dict[Text, Any]], Text, Text, bool, bool) -> None for ld in listing: self.visit(ld, stagedir, basedir, copy=ld.get("writable", copy), staged=staged) def visit(self, obj, stagedir, basedir, copy=False, staged=False): # type: (Dict[Text, Any], Text, Text, bool, bool) -> None tgt = convert_pathsep_to_unix( os.path.join(stagedir, obj["basename"])) if obj["location"] in self._pathmap: return if obj["class"] == "Directory": if obj["location"].startswith("file://"): resolved = uri_file_path(obj["location"]) else: resolved = obj["location"] self._pathmap[obj["location"]] = MapperEnt(resolved, tgt, "WritableDirectory" if copy else "Directory", staged) if obj["location"].startswith("file://"): staged = False self.visitlisting(obj.get("listing", []), tgt, basedir, copy=copy, staged=staged) elif obj["class"] == "File": path = obj["location"] ab = abspath(path, basedir) if "contents" in obj and obj["location"].startswith("_:"): self._pathmap[obj["location"]] = MapperEnt( obj["contents"], tgt, "CreateWritableFile" if copy else "CreateFile", staged) else: with SourceLine(obj, "location", validate.ValidationException, _logger.isEnabledFor(logging.DEBUG)): deref = ab if urllib.parse.urlsplit(deref).scheme in ['http', 'https']: deref = downloadHttpFile(path) else: # Dereference symbolic links st = os.lstat(deref) while stat.S_ISLNK(st.st_mode): rl = os.readlink(deref) deref = rl if os.path.isabs(rl) else os.path.join( os.path.dirname(deref), rl) st = os.lstat(deref) self._pathmap[path] = MapperEnt( deref, tgt, "WritableFile" if copy else "File", staged) self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir, copy=copy, staged=staged) def setup(self, referenced_files, basedir): # type: (List[Any], Text) -> None # Go through each file and set the target to its own directory along # with any secondary files. stagedir = self.stagedir for fob in referenced_files: if self.separateDirs: stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) self.visit(fob, stagedir, basedir, copy=fob.get("writable"), staged=True) def mapper(self, src): # type: (Text) -> MapperEnt if u"#" in src: i = src.index(u"#") p = self._pathmap[src[:i]] return MapperEnt(p.resolved, p.target + src[i:], p.type, p.staged) return self._pathmap[src] def files(self): # type: () -> List[Text] return list(self._pathmap.keys()) def items(self): # type: () -> List[Tuple[Text, MapperEnt]] return list(self._pathmap.items()) def reversemap(self, target # type: Text ): # type: (...) -> Optional[Tuple[Text, Text]] for k, v in self._pathmap.items(): if v[1] == target: return (k, v[0]) return None def update(self, key, resolved, target, ctype, stage): # type: (Text, Text, Text, Text, bool) -> MapperEnt m = MapperEnt(resolved, target, ctype, stage) self._pathmap[key] = m return m def __contains__(self, key): # type: (Text) -> bool """Test for the presence of the given relative path in this mapper.""" return key in self._pathmap