diff planemo/lib/python3.7/site-packages/cwltool/stdfsaccess.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/cwltool/stdfsaccess.py	Fri Jul 31 00:18:57 2020 -0400
@@ -0,0 +1,72 @@
+"""Abstracted IO access."""
+from __future__ import absolute_import
+
+import glob
+import os
+from io import open
+from typing import IO, Any, List
+
+from schema_salad.ref_resolver import file_uri, uri_file_path
+from six.moves import urllib
+from typing_extensions import Text
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+from .utils import onWindows
+
+
+def abspath(src, basedir):  # type: (Text, Text) -> Text
+    if src.startswith(u"file://"):
+        abpath = Text(uri_file_path(str(src)))
+    elif urllib.parse.urlsplit(src).scheme in ['http', 'https']:
+        return src
+    else:
+        if basedir.startswith(u"file://"):
+            abpath = src if os.path.isabs(src) else basedir+ '/'+ src
+        else:
+            abpath = src if os.path.isabs(src) else os.path.join(basedir, src)
+    return abpath
+
+class StdFsAccess(object):
+    """Local filesystem implementation."""
+
+    def __init__(self, basedir):  # type: (Text) -> None
+        """Perform operations with respect to a base directory."""
+        self.basedir = basedir
+
+    def _abs(self, p):  # type: (Text) -> Text
+        return abspath(p, self.basedir)
+
+    def glob(self, pattern):  # type: (Text) -> List[Text]
+        return [file_uri(str(self._abs(l))) for l in glob.glob(self._abs(pattern))]
+
+    def open(self, fn, mode):  # type: (Text, str) -> IO[Any]
+        return open(self._abs(fn), mode)
+
+    def exists(self, fn):  # type: (Text) -> bool
+        return os.path.exists(self._abs(fn))
+
+    def size(self, fn):    # type: (Text) -> int
+        return os.stat(self._abs(fn)).st_size
+
+    def isfile(self, fn):  # type: (Text) -> bool
+        return os.path.isfile(self._abs(fn))
+
+    def isdir(self, fn):  # type: (Text) -> bool
+        return os.path.isdir(self._abs(fn))
+
+    def listdir(self, fn):  # type: (Text) -> List[Text]
+        return [abspath(urllib.parse.quote(str(l)), fn) for l in os.listdir(self._abs(fn))]
+
+    def join(self, path, *paths):  # type: (Text, *Text) -> Text
+        return os.path.join(path, *paths)
+
+    def realpath(self, path):  # type: (Text) -> Text
+        return os.path.realpath(path)
+
+    # On windows os.path.realpath appends unecessary Drive, here we would avoid that
+    def docker_compatible_realpath(self, path):  # type: (Text) -> Text
+        if onWindows():
+            if path.startswith('/'):
+                return path
+            return '/'+path
+        return self.realpath(path)