comparison env/lib/python3.7/site-packages/cwltool/stdfsaccess.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
comparison
equal deleted inserted replaced
4:79f47841a781 5:9b1c78e6ba9c
1 """Abstracted IO access."""
2 from __future__ import absolute_import
3
4 import glob
5 import os
6 from io import open
7 from typing import IO, Any, List
8
9 from schema_salad.ref_resolver import file_uri, uri_file_path
10 from six.moves import urllib
11 from typing_extensions import Text
12 # move to a regular typing import when Python 3.3-3.6 is no longer supported
13
14 from .utils import onWindows
15
16
17 def abspath(src, basedir): # type: (Text, Text) -> Text
18 if src.startswith(u"file://"):
19 abpath = Text(uri_file_path(str(src)))
20 elif urllib.parse.urlsplit(src).scheme in ['http', 'https']:
21 return src
22 else:
23 if basedir.startswith(u"file://"):
24 abpath = src if os.path.isabs(src) else basedir+ '/'+ src
25 else:
26 abpath = src if os.path.isabs(src) else os.path.join(basedir, src)
27 return abpath
28
29 class StdFsAccess(object):
30 """Local filesystem implementation."""
31
32 def __init__(self, basedir): # type: (Text) -> None
33 """Perform operations with respect to a base directory."""
34 self.basedir = basedir
35
36 def _abs(self, p): # type: (Text) -> Text
37 return abspath(p, self.basedir)
38
39 def glob(self, pattern): # type: (Text) -> List[Text]
40 return [file_uri(str(self._abs(l))) for l in glob.glob(self._abs(pattern))]
41
42 def open(self, fn, mode): # type: (Text, str) -> IO[Any]
43 return open(self._abs(fn), mode)
44
45 def exists(self, fn): # type: (Text) -> bool
46 return os.path.exists(self._abs(fn))
47
48 def size(self, fn): # type: (Text) -> int
49 return os.stat(self._abs(fn)).st_size
50
51 def isfile(self, fn): # type: (Text) -> bool
52 return os.path.isfile(self._abs(fn))
53
54 def isdir(self, fn): # type: (Text) -> bool
55 return os.path.isdir(self._abs(fn))
56
57 def listdir(self, fn): # type: (Text) -> List[Text]
58 return [abspath(urllib.parse.quote(str(l)), fn) for l in os.listdir(self._abs(fn))]
59
60 def join(self, path, *paths): # type: (Text, *Text) -> Text
61 return os.path.join(path, *paths)
62
63 def realpath(self, path): # type: (Text) -> Text
64 return os.path.realpath(path)
65
66 # On windows os.path.realpath appends unecessary Drive, here we would avoid that
67 def docker_compatible_realpath(self, path): # type: (Text) -> Text
68 if onWindows():
69 if path.startswith('/'):
70 return path
71 return '/'+path
72 return self.realpath(path)