diff env/lib/python3.7/site-packages/cwltool/pathmapper.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/cwltool/pathmapper.py	Thu May 14 14:56:58 2020 -0400
@@ -0,0 +1,350 @@
+from __future__ import absolute_import
+
+import collections
+import logging
+import os
+import stat
+import uuid
+from functools import partial  # pylint: disable=unused-import
+from tempfile import NamedTemporaryFile
+from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence,
+                    Optional, Set, Tuple, Union)
+
+import requests
+from cachecontrol import CacheControl
+from cachecontrol.caches import FileCache
+from schema_salad import validate
+from schema_salad.ref_resolver import uri_file_path
+from schema_salad.sourceline import SourceLine
+from six.moves import urllib
+from typing_extensions import Text  # pylint: disable=unused-import
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+from .loghandler import _logger
+from .stdfsaccess import StdFsAccess, abspath  # pylint: disable=unused-import
+from .utils import Directory  # pylint: disable=unused-import
+from .utils import convert_pathsep_to_unix, visit_class
+
+
+CONTENT_LIMIT = 64 * 1024
+
+MapperEnt = collections.namedtuple("MapperEnt", ["resolved", "target", "type", "staged"])
+
+
+def adjustFiles(rec, op):  # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
+    """Apply a mapping function to each File path in the object `rec`."""
+    if isinstance(rec, MutableMapping):
+        if rec.get("class") == "File":
+            rec["path"] = op(rec["path"])
+        for d in rec:
+            adjustFiles(rec[d], op)
+    if isinstance(rec, MutableSequence):
+        for d in rec:
+            adjustFiles(d, op)
+
+
+def adjustFileObjs(rec, op):  # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
+    """Apply an update function to each File object in the object `rec`."""
+    visit_class(rec, ("File",), op)
+
+def adjustDirObjs(rec, op):
+    # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
+    """Apply an update function to each Directory object in the object `rec`."""
+    visit_class(rec, ("Directory",), op)
+
+def normalizeFilesDirs(job):
+    # type: (Optional[Union[List[Dict[Text, Any]], MutableMapping[Text, Any], Directory]]) -> None
+    def addLocation(d):  # type: (Dict[Text, Any]) -> None
+        if "location" not in d:
+            if d["class"] == "File" and ("contents" not in d):
+                raise validate.ValidationException("Anonymous file object must have 'contents' and 'basename' fields.")
+            if d["class"] == "Directory" and ("listing" not in d or "basename" not in d):
+                raise validate.ValidationException(
+                    "Anonymous directory object must have 'listing' and 'basename' fields.")
+            d["location"] = "_:" + Text(uuid.uuid4())
+            if "basename" not in d:
+                d["basename"] = d["location"][2:]
+
+        parse = urllib.parse.urlparse(d["location"])
+        path = parse.path
+        # strip trailing slash
+        if path.endswith("/"):
+            if d["class"] != "Directory":
+                raise validate.ValidationException(
+                    "location '%s' ends with '/' but is not a Directory" % d["location"])
+            path = path.rstrip("/")
+            d["location"] = urllib.parse.urlunparse((parse.scheme, parse.netloc, path, parse.params, parse.query, parse.fragment))
+
+        if not d.get("basename"):
+            if path.startswith("_:"):
+                d["basename"] = Text(path[2:])
+            else:
+                d["basename"] = Text(os.path.basename(urllib.request.url2pathname(path)))
+
+        if d["class"] == "File":
+            nr, ne = os.path.splitext(d["basename"])
+            if d.get("nameroot") != nr:
+                d["nameroot"] = Text(nr)
+            if d.get("nameext") != ne:
+                d["nameext"] = Text(ne)
+
+            contents = d.get("contents")
+            if contents and len(contents) > CONTENT_LIMIT:
+                if len(contents) > CONTENT_LIMIT:
+                    raise validate.ValidationException("File object contains contents with number of bytes that exceeds CONTENT_LIMIT length (%d)" % CONTENT_LIMIT)
+
+    visit_class(job, ("File", "Directory"), addLocation)
+
+
+def dedup(listing):  # type: (List[Any]) -> List[Any]
+    marksub = set()
+
+    def mark(d):  # type: (Dict[Text, Text]) -> None
+        marksub.add(d["location"])
+
+    for l in listing:
+        if l["class"] == "Directory":
+            for e in l.get("listing", []):
+                adjustFileObjs(e, mark)
+                adjustDirObjs(e, mark)
+
+    dd = []
+    markdup = set()  # type: Set[Text]
+    for r in listing:
+        if r["location"] not in marksub and r["location"] not in markdup:
+            dd.append(r)
+            markdup.add(r["location"])
+
+    return dd
+
+def get_listing(fs_access, rec, recursive=True):
+    # type: (StdFsAccess, MutableMapping[Text, Any], bool) -> None
+    if rec.get("class") != "Directory":
+        finddirs = []  # type: List[MutableMapping[Text, Text]]
+        visit_class(rec, ("Directory",), finddirs.append)
+        for f in finddirs:
+            get_listing(fs_access, f, recursive=recursive)
+        return
+    if "listing" in rec:
+        return
+    listing = []
+    loc = rec["location"]
+    for ld in fs_access.listdir(loc):
+        parse = urllib.parse.urlparse(ld)
+        bn = os.path.basename(urllib.request.url2pathname(parse.path))
+        if fs_access.isdir(ld):
+            ent = {u"class": u"Directory",
+                   u"location": ld,
+                   u"basename": bn}
+            if recursive:
+                get_listing(fs_access, ent, recursive)
+            listing.append(ent)
+        else:
+            listing.append({"class": "File", "location": ld, "basename": bn})
+    rec["listing"] = listing
+
+def trim_listing(obj):  # type: (Dict[Text, Any]) -> None
+    """
+    Remove 'listing' field from Directory objects that are file references.
+
+    It redundant and potentially expensive to pass fully enumerated Directory
+    objects around if not explicitly needed, so delete the 'listing' field when
+    it is safe to do so.
+    """
+    if obj.get("location", "").startswith("file://") and "listing" in obj:
+        del obj["listing"]
+
+# Download http Files
+def downloadHttpFile(httpurl):
+    # type: (Text) -> Text
+    cache_session = None
+    if "XDG_CACHE_HOME" in os.environ:
+        directory = os.environ["XDG_CACHE_HOME"]
+    elif "HOME" in os.environ:
+        directory = os.environ["HOME"]
+    else:
+        directory = os.path.expanduser('~')
+
+    cache_session = CacheControl(
+        requests.Session(),
+        cache=FileCache(
+            os.path.join(directory, ".cache", "cwltool")))
+
+    r = cache_session.get(httpurl, stream=True)
+    with NamedTemporaryFile(mode='wb', delete=False) as f:
+        for chunk in r.iter_content(chunk_size=16384):
+            if chunk:  # filter out keep-alive new chunks
+                f.write(chunk)
+    r.close()
+    return str(f.name)
+
+def ensure_writable(path):  # type: (Text) -> None
+    if os.path.isdir(path):
+        for root, dirs, files in os.walk(path):
+            for name in files:
+                j = os.path.join(root, name)
+                st = os.stat(j)
+                mode = stat.S_IMODE(st.st_mode)
+                os.chmod(j, mode | stat.S_IWUSR)
+            for name in dirs:
+                j = os.path.join(root, name)
+                st = os.stat(j)
+                mode = stat.S_IMODE(st.st_mode)
+                os.chmod(j, mode | stat.S_IWUSR)
+    else:
+        st = os.stat(path)
+        mode = stat.S_IMODE(st.st_mode)
+        os.chmod(path, mode | stat.S_IWUSR)
+
+def ensure_non_writable(path):  # type: (Text) -> None
+    if os.path.isdir(path):
+        for root, dirs, files in os.walk(path):
+            for name in files:
+                j = os.path.join(root, name)
+                st = os.stat(j)
+                mode = stat.S_IMODE(st.st_mode)
+                os.chmod(j,
+                         mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
+            for name in dirs:
+                j = os.path.join(root, name)
+                st = os.stat(j)
+                mode = stat.S_IMODE(st.st_mode)
+                os.chmod(j,
+                         mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
+    else:
+        st = os.stat(path)
+        mode = stat.S_IMODE(st.st_mode)
+        os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
+
+class PathMapper(object):
+    """
+    Mapping of files from relative path provided in the file to a tuple.
+
+    (absolute local path, absolute container path)
+
+    The tao of PathMapper:
+
+    The initializer takes a list of File and Directory objects, a base
+    directory (for resolving relative references) and a staging directory
+    (where the files are mapped to).
+
+    The purpose of the setup method is to determine where each File or
+    Directory should be placed on the target file system (relative to
+    stagedir).
+
+    If separatedirs=True, unrelated files will be isolated in their own
+    directories under stagedir. If separatedirs=False, files and directories
+    will all be placed in stagedir (with the possibility for name
+    collisions...)
+
+    The path map maps the "location" of the input Files and Directory objects
+    to a tuple (resolved, target, type). The "resolved" field is the "real"
+    path on the local file system (after resolving relative paths and
+    traversing symlinks). The "target" is the path on the target file system
+    (under stagedir). The type is the object type (one of File, Directory,
+    CreateFile, WritableFile, CreateWritableFile).
+
+    The latter three (CreateFile, WritableFile, CreateWritableFile) are used by
+    InitialWorkDirRequirement to indicate files that are generated on the fly
+    (CreateFile and CreateWritableFile, in this case "resolved" holds the file
+    contents instead of the path because they file doesn't exist) or copied
+    into the output directory so they can be opened for update ("r+" or "a")
+    (WritableFile and CreateWritableFile).
+
+    """
+
+    def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
+        # type: (List[Any], Text, Text, bool) -> None
+        """Initialize the PathMapper."""
+        self._pathmap = {}  # type: Dict[Text, MapperEnt]
+        self.stagedir = stagedir
+        self.separateDirs = separateDirs
+        self.setup(dedup(referenced_files), basedir)
+
+    def visitlisting(self, listing, stagedir, basedir, copy=False, staged=False):
+        # type: (List[Dict[Text, Any]], Text, Text, bool, bool) -> None
+        for ld in listing:
+            self.visit(ld, stagedir, basedir, copy=ld.get("writable", copy), staged=staged)
+
+    def visit(self, obj, stagedir, basedir, copy=False, staged=False):
+        # type: (Dict[Text, Any], Text, Text, bool, bool) -> None
+        tgt = convert_pathsep_to_unix(
+            os.path.join(stagedir, obj["basename"]))
+        if obj["location"] in self._pathmap:
+            return
+        if obj["class"] == "Directory":
+            if obj["location"].startswith("file://"):
+                resolved = uri_file_path(obj["location"])
+            else:
+                resolved = obj["location"]
+            self._pathmap[obj["location"]] = MapperEnt(resolved, tgt, "WritableDirectory" if copy else "Directory", staged)
+            if obj["location"].startswith("file://"):
+                staged = False
+            self.visitlisting(obj.get("listing", []), tgt, basedir, copy=copy, staged=staged)
+        elif obj["class"] == "File":
+            path = obj["location"]
+            ab = abspath(path, basedir)
+            if "contents" in obj and obj["location"].startswith("_:"):
+                self._pathmap[obj["location"]] = MapperEnt(
+                    obj["contents"], tgt,
+                    "CreateWritableFile" if copy else "CreateFile", staged)
+            else:
+                with SourceLine(obj, "location", validate.ValidationException, _logger.isEnabledFor(logging.DEBUG)):
+                    deref = ab
+                    if urllib.parse.urlsplit(deref).scheme in ['http', 'https']:
+                        deref = downloadHttpFile(path)
+                    else:
+                        # Dereference symbolic links
+                        st = os.lstat(deref)
+                        while stat.S_ISLNK(st.st_mode):
+                            rl = os.readlink(deref)
+                            deref = rl if os.path.isabs(rl) else os.path.join(
+                                os.path.dirname(deref), rl)
+                            st = os.lstat(deref)
+
+                    self._pathmap[path] = MapperEnt(
+                        deref, tgt, "WritableFile" if copy else "File", staged)
+            self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir,
+                              copy=copy, staged=staged)
+
+    def setup(self, referenced_files, basedir):
+        # type: (List[Any], Text) -> None
+
+        # Go through each file and set the target to its own directory along
+        # with any secondary files.
+        stagedir = self.stagedir
+        for fob in referenced_files:
+            if self.separateDirs:
+                stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4())
+            self.visit(fob, stagedir, basedir, copy=fob.get("writable"), staged=True)
+
+    def mapper(self, src):  # type: (Text) -> MapperEnt
+        if u"#" in src:
+            i = src.index(u"#")
+            p = self._pathmap[src[:i]]
+            return MapperEnt(p.resolved, p.target + src[i:], p.type, p.staged)
+        return self._pathmap[src]
+
+    def files(self):  # type: () -> List[Text]
+        return list(self._pathmap.keys())
+
+    def items(self):  # type: () -> List[Tuple[Text, MapperEnt]]
+        return list(self._pathmap.items())
+
+    def reversemap(self,
+                   target  # type: Text
+                  ):  # type: (...) -> Optional[Tuple[Text, Text]]
+        for k, v in self._pathmap.items():
+            if v[1] == target:
+                return (k, v[0])
+        return None
+
+    def update(self, key, resolved, target, ctype, stage):
+        # type: (Text, Text, Text, Text, bool) -> MapperEnt
+        m = MapperEnt(resolved, target, ctype, stage)
+        self._pathmap[key] = m
+        return m
+
+    def __contains__(self, key):  # type: (Text) -> bool
+        """Test for the presence of the given relative path in this mapper."""
+        return key in self._pathmap