Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/stdfsaccess.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 """Abstracted IO access.""" | |
2 from __future__ import absolute_import | |
3 | |
4 import glob | |
5 import os | |
6 from io import open | |
7 from typing import IO, Any, List | |
8 | |
9 from schema_salad.ref_resolver import file_uri, uri_file_path | |
10 from six.moves import urllib | |
11 from typing_extensions import Text | |
12 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
13 | |
14 from .utils import onWindows | |
15 | |
16 | |
17 def abspath(src, basedir): # type: (Text, Text) -> Text | |
18 if src.startswith(u"file://"): | |
19 abpath = Text(uri_file_path(str(src))) | |
20 elif urllib.parse.urlsplit(src).scheme in ['http', 'https']: | |
21 return src | |
22 else: | |
23 if basedir.startswith(u"file://"): | |
24 abpath = src if os.path.isabs(src) else basedir+ '/'+ src | |
25 else: | |
26 abpath = src if os.path.isabs(src) else os.path.join(basedir, src) | |
27 return abpath | |
28 | |
29 class StdFsAccess(object): | |
30 """Local filesystem implementation.""" | |
31 | |
32 def __init__(self, basedir): # type: (Text) -> None | |
33 """Perform operations with respect to a base directory.""" | |
34 self.basedir = basedir | |
35 | |
36 def _abs(self, p): # type: (Text) -> Text | |
37 return abspath(p, self.basedir) | |
38 | |
39 def glob(self, pattern): # type: (Text) -> List[Text] | |
40 return [file_uri(str(self._abs(l))) for l in glob.glob(self._abs(pattern))] | |
41 | |
42 def open(self, fn, mode): # type: (Text, str) -> IO[Any] | |
43 return open(self._abs(fn), mode) | |
44 | |
45 def exists(self, fn): # type: (Text) -> bool | |
46 return os.path.exists(self._abs(fn)) | |
47 | |
48 def size(self, fn): # type: (Text) -> int | |
49 return os.stat(self._abs(fn)).st_size | |
50 | |
51 def isfile(self, fn): # type: (Text) -> bool | |
52 return os.path.isfile(self._abs(fn)) | |
53 | |
54 def isdir(self, fn): # type: (Text) -> bool | |
55 return os.path.isdir(self._abs(fn)) | |
56 | |
57 def listdir(self, fn): # type: (Text) -> List[Text] | |
58 return [abspath(urllib.parse.quote(str(l)), fn) for l in os.listdir(self._abs(fn))] | |
59 | |
60 def join(self, path, *paths): # type: (Text, *Text) -> Text | |
61 return os.path.join(path, *paths) | |
62 | |
63 def realpath(self, path): # type: (Text) -> Text | |
64 return os.path.realpath(path) | |
65 | |
66 # On windows os.path.realpath appends unecessary Drive, here we would avoid that | |
67 def docker_compatible_realpath(self, path): # type: (Text) -> Text | |
68 if onWindows(): | |
69 if path.startswith('/'): | |
70 return path | |
71 return '/'+path | |
72 return self.realpath(path) |