diff env/lib/python3.7/site-packages/cwltool/pathmapper.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/pathmapper.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,350 +0,0 @@
-from __future__ import absolute_import
-
-import collections
-import logging
-import os
-import stat
-import uuid
-from functools import partial  # pylint: disable=unused-import
-from tempfile import NamedTemporaryFile
-from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence,
-                    Optional, Set, Tuple, Union)
-
-import requests
-from cachecontrol import CacheControl
-from cachecontrol.caches import FileCache
-from schema_salad import validate
-from schema_salad.ref_resolver import uri_file_path
-from schema_salad.sourceline import SourceLine
-from six.moves import urllib
-from typing_extensions import Text  # pylint: disable=unused-import
-# move to a regular typing import when Python 3.3-3.6 is no longer supported
-
-from .loghandler import _logger
-from .stdfsaccess import StdFsAccess, abspath  # pylint: disable=unused-import
-from .utils import Directory  # pylint: disable=unused-import
-from .utils import convert_pathsep_to_unix, visit_class
-
-
-CONTENT_LIMIT = 64 * 1024
-
-MapperEnt = collections.namedtuple("MapperEnt", ["resolved", "target", "type", "staged"])
-
-
-def adjustFiles(rec, op):  # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
-    """Apply a mapping function to each File path in the object `rec`."""
-    if isinstance(rec, MutableMapping):
-        if rec.get("class") == "File":
-            rec["path"] = op(rec["path"])
-        for d in rec:
-            adjustFiles(rec[d], op)
-    if isinstance(rec, MutableSequence):
-        for d in rec:
-            adjustFiles(d, op)
-
-
-def adjustFileObjs(rec, op):  # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
-    """Apply an update function to each File object in the object `rec`."""
-    visit_class(rec, ("File",), op)
-
-def adjustDirObjs(rec, op):
-    # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
-    """Apply an update function to each Directory object in the object `rec`."""
-    visit_class(rec, ("Directory",), op)
-
-def normalizeFilesDirs(job):
-    # type: (Optional[Union[List[Dict[Text, Any]], MutableMapping[Text, Any], Directory]]) -> None
-    def addLocation(d):  # type: (Dict[Text, Any]) -> None
-        if "location" not in d:
-            if d["class"] == "File" and ("contents" not in d):
-                raise validate.ValidationException("Anonymous file object must have 'contents' and 'basename' fields.")
-            if d["class"] == "Directory" and ("listing" not in d or "basename" not in d):
-                raise validate.ValidationException(
-                    "Anonymous directory object must have 'listing' and 'basename' fields.")
-            d["location"] = "_:" + Text(uuid.uuid4())
-            if "basename" not in d:
-                d["basename"] = d["location"][2:]
-
-        parse = urllib.parse.urlparse(d["location"])
-        path = parse.path
-        # strip trailing slash
-        if path.endswith("/"):
-            if d["class"] != "Directory":
-                raise validate.ValidationException(
-                    "location '%s' ends with '/' but is not a Directory" % d["location"])
-            path = path.rstrip("/")
-            d["location"] = urllib.parse.urlunparse((parse.scheme, parse.netloc, path, parse.params, parse.query, parse.fragment))
-
-        if not d.get("basename"):
-            if path.startswith("_:"):
-                d["basename"] = Text(path[2:])
-            else:
-                d["basename"] = Text(os.path.basename(urllib.request.url2pathname(path)))
-
-        if d["class"] == "File":
-            nr, ne = os.path.splitext(d["basename"])
-            if d.get("nameroot") != nr:
-                d["nameroot"] = Text(nr)
-            if d.get("nameext") != ne:
-                d["nameext"] = Text(ne)
-
-            contents = d.get("contents")
-            if contents and len(contents) > CONTENT_LIMIT:
-                if len(contents) > CONTENT_LIMIT:
-                    raise validate.ValidationException("File object contains contents with number of bytes that exceeds CONTENT_LIMIT length (%d)" % CONTENT_LIMIT)
-
-    visit_class(job, ("File", "Directory"), addLocation)
-
-
-def dedup(listing):  # type: (List[Any]) -> List[Any]
-    marksub = set()
-
-    def mark(d):  # type: (Dict[Text, Text]) -> None
-        marksub.add(d["location"])
-
-    for l in listing:
-        if l["class"] == "Directory":
-            for e in l.get("listing", []):
-                adjustFileObjs(e, mark)
-                adjustDirObjs(e, mark)
-
-    dd = []
-    markdup = set()  # type: Set[Text]
-    for r in listing:
-        if r["location"] not in marksub and r["location"] not in markdup:
-            dd.append(r)
-            markdup.add(r["location"])
-
-    return dd
-
-def get_listing(fs_access, rec, recursive=True):
-    # type: (StdFsAccess, MutableMapping[Text, Any], bool) -> None
-    if rec.get("class") != "Directory":
-        finddirs = []  # type: List[MutableMapping[Text, Text]]
-        visit_class(rec, ("Directory",), finddirs.append)
-        for f in finddirs:
-            get_listing(fs_access, f, recursive=recursive)
-        return
-    if "listing" in rec:
-        return
-    listing = []
-    loc = rec["location"]
-    for ld in fs_access.listdir(loc):
-        parse = urllib.parse.urlparse(ld)
-        bn = os.path.basename(urllib.request.url2pathname(parse.path))
-        if fs_access.isdir(ld):
-            ent = {u"class": u"Directory",
-                   u"location": ld,
-                   u"basename": bn}
-            if recursive:
-                get_listing(fs_access, ent, recursive)
-            listing.append(ent)
-        else:
-            listing.append({"class": "File", "location": ld, "basename": bn})
-    rec["listing"] = listing
-
-def trim_listing(obj):  # type: (Dict[Text, Any]) -> None
-    """
-    Remove 'listing' field from Directory objects that are file references.
-
-    It redundant and potentially expensive to pass fully enumerated Directory
-    objects around if not explicitly needed, so delete the 'listing' field when
-    it is safe to do so.
-    """
-    if obj.get("location", "").startswith("file://") and "listing" in obj:
-        del obj["listing"]
-
-# Download http Files
-def downloadHttpFile(httpurl):
-    # type: (Text) -> Text
-    cache_session = None
-    if "XDG_CACHE_HOME" in os.environ:
-        directory = os.environ["XDG_CACHE_HOME"]
-    elif "HOME" in os.environ:
-        directory = os.environ["HOME"]
-    else:
-        directory = os.path.expanduser('~')
-
-    cache_session = CacheControl(
-        requests.Session(),
-        cache=FileCache(
-            os.path.join(directory, ".cache", "cwltool")))
-
-    r = cache_session.get(httpurl, stream=True)
-    with NamedTemporaryFile(mode='wb', delete=False) as f:
-        for chunk in r.iter_content(chunk_size=16384):
-            if chunk:  # filter out keep-alive new chunks
-                f.write(chunk)
-    r.close()
-    return str(f.name)
-
-def ensure_writable(path):  # type: (Text) -> None
-    if os.path.isdir(path):
-        for root, dirs, files in os.walk(path):
-            for name in files:
-                j = os.path.join(root, name)
-                st = os.stat(j)
-                mode = stat.S_IMODE(st.st_mode)
-                os.chmod(j, mode | stat.S_IWUSR)
-            for name in dirs:
-                j = os.path.join(root, name)
-                st = os.stat(j)
-                mode = stat.S_IMODE(st.st_mode)
-                os.chmod(j, mode | stat.S_IWUSR)
-    else:
-        st = os.stat(path)
-        mode = stat.S_IMODE(st.st_mode)
-        os.chmod(path, mode | stat.S_IWUSR)
-
-def ensure_non_writable(path):  # type: (Text) -> None
-    if os.path.isdir(path):
-        for root, dirs, files in os.walk(path):
-            for name in files:
-                j = os.path.join(root, name)
-                st = os.stat(j)
-                mode = stat.S_IMODE(st.st_mode)
-                os.chmod(j,
-                         mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
-            for name in dirs:
-                j = os.path.join(root, name)
-                st = os.stat(j)
-                mode = stat.S_IMODE(st.st_mode)
-                os.chmod(j,
-                         mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
-    else:
-        st = os.stat(path)
-        mode = stat.S_IMODE(st.st_mode)
-        os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
-
-class PathMapper(object):
-    """
-    Mapping of files from relative path provided in the file to a tuple.
-
-    (absolute local path, absolute container path)
-
-    The tao of PathMapper:
-
-    The initializer takes a list of File and Directory objects, a base
-    directory (for resolving relative references) and a staging directory
-    (where the files are mapped to).
-
-    The purpose of the setup method is to determine where each File or
-    Directory should be placed on the target file system (relative to
-    stagedir).
-
-    If separatedirs=True, unrelated files will be isolated in their own
-    directories under stagedir. If separatedirs=False, files and directories
-    will all be placed in stagedir (with the possibility for name
-    collisions...)
-
-    The path map maps the "location" of the input Files and Directory objects
-    to a tuple (resolved, target, type). The "resolved" field is the "real"
-    path on the local file system (after resolving relative paths and
-    traversing symlinks). The "target" is the path on the target file system
-    (under stagedir). The type is the object type (one of File, Directory,
-    CreateFile, WritableFile, CreateWritableFile).
-
-    The latter three (CreateFile, WritableFile, CreateWritableFile) are used by
-    InitialWorkDirRequirement to indicate files that are generated on the fly
-    (CreateFile and CreateWritableFile, in this case "resolved" holds the file
-    contents instead of the path because they file doesn't exist) or copied
-    into the output directory so they can be opened for update ("r+" or "a")
-    (WritableFile and CreateWritableFile).
-
-    """
-
-    def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
-        # type: (List[Any], Text, Text, bool) -> None
-        """Initialize the PathMapper."""
-        self._pathmap = {}  # type: Dict[Text, MapperEnt]
-        self.stagedir = stagedir
-        self.separateDirs = separateDirs
-        self.setup(dedup(referenced_files), basedir)
-
-    def visitlisting(self, listing, stagedir, basedir, copy=False, staged=False):
-        # type: (List[Dict[Text, Any]], Text, Text, bool, bool) -> None
-        for ld in listing:
-            self.visit(ld, stagedir, basedir, copy=ld.get("writable", copy), staged=staged)
-
-    def visit(self, obj, stagedir, basedir, copy=False, staged=False):
-        # type: (Dict[Text, Any], Text, Text, bool, bool) -> None
-        tgt = convert_pathsep_to_unix(
-            os.path.join(stagedir, obj["basename"]))
-        if obj["location"] in self._pathmap:
-            return
-        if obj["class"] == "Directory":
-            if obj["location"].startswith("file://"):
-                resolved = uri_file_path(obj["location"])
-            else:
-                resolved = obj["location"]
-            self._pathmap[obj["location"]] = MapperEnt(resolved, tgt, "WritableDirectory" if copy else "Directory", staged)
-            if obj["location"].startswith("file://"):
-                staged = False
-            self.visitlisting(obj.get("listing", []), tgt, basedir, copy=copy, staged=staged)
-        elif obj["class"] == "File":
-            path = obj["location"]
-            ab = abspath(path, basedir)
-            if "contents" in obj and obj["location"].startswith("_:"):
-                self._pathmap[obj["location"]] = MapperEnt(
-                    obj["contents"], tgt,
-                    "CreateWritableFile" if copy else "CreateFile", staged)
-            else:
-                with SourceLine(obj, "location", validate.ValidationException, _logger.isEnabledFor(logging.DEBUG)):
-                    deref = ab
-                    if urllib.parse.urlsplit(deref).scheme in ['http', 'https']:
-                        deref = downloadHttpFile(path)
-                    else:
-                        # Dereference symbolic links
-                        st = os.lstat(deref)
-                        while stat.S_ISLNK(st.st_mode):
-                            rl = os.readlink(deref)
-                            deref = rl if os.path.isabs(rl) else os.path.join(
-                                os.path.dirname(deref), rl)
-                            st = os.lstat(deref)
-
-                    self._pathmap[path] = MapperEnt(
-                        deref, tgt, "WritableFile" if copy else "File", staged)
-            self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir,
-                              copy=copy, staged=staged)
-
-    def setup(self, referenced_files, basedir):
-        # type: (List[Any], Text) -> None
-
-        # Go through each file and set the target to its own directory along
-        # with any secondary files.
-        stagedir = self.stagedir
-        for fob in referenced_files:
-            if self.separateDirs:
-                stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4())
-            self.visit(fob, stagedir, basedir, copy=fob.get("writable"), staged=True)
-
-    def mapper(self, src):  # type: (Text) -> MapperEnt
-        if u"#" in src:
-            i = src.index(u"#")
-            p = self._pathmap[src[:i]]
-            return MapperEnt(p.resolved, p.target + src[i:], p.type, p.staged)
-        return self._pathmap[src]
-
-    def files(self):  # type: () -> List[Text]
-        return list(self._pathmap.keys())
-
-    def items(self):  # type: () -> List[Tuple[Text, MapperEnt]]
-        return list(self._pathmap.items())
-
-    def reversemap(self,
-                   target  # type: Text
-                  ):  # type: (...) -> Optional[Tuple[Text, Text]]
-        for k, v in self._pathmap.items():
-            if v[1] == target:
-                return (k, v[0])
-        return None
-
-    def update(self, key, resolved, target, ctype, stage):
-        # type: (Text, Text, Text, Text, bool) -> MapperEnt
-        m = MapperEnt(resolved, target, ctype, stage)
-        self._pathmap[key] = m
-        return m
-
-    def __contains__(self, key):  # type: (Text) -> bool
-        """Test for the presence of the given relative path in this mapper."""
-        return key in self._pathmap