Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/stdfsaccess.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
"""Abstracted IO access.""" from __future__ import absolute_import import glob import os from io import open from typing import IO, Any, List from schema_salad.ref_resolver import file_uri, uri_file_path from six.moves import urllib from typing_extensions import Text # move to a regular typing import when Python 3.3-3.6 is no longer supported from .utils import onWindows def abspath(src, basedir): # type: (Text, Text) -> Text if src.startswith(u"file://"): abpath = Text(uri_file_path(str(src))) elif urllib.parse.urlsplit(src).scheme in ['http', 'https']: return src else: if basedir.startswith(u"file://"): abpath = src if os.path.isabs(src) else basedir+ '/'+ src else: abpath = src if os.path.isabs(src) else os.path.join(basedir, src) return abpath class StdFsAccess(object): """Local filesystem implementation.""" def __init__(self, basedir): # type: (Text) -> None """Perform operations with respect to a base directory.""" self.basedir = basedir def _abs(self, p): # type: (Text) -> Text return abspath(p, self.basedir) def glob(self, pattern): # type: (Text) -> List[Text] return [file_uri(str(self._abs(l))) for l in glob.glob(self._abs(pattern))] def open(self, fn, mode): # type: (Text, str) -> IO[Any] return open(self._abs(fn), mode) def exists(self, fn): # type: (Text) -> bool return os.path.exists(self._abs(fn)) def size(self, fn): # type: (Text) -> int return os.stat(self._abs(fn)).st_size def isfile(self, fn): # type: (Text) -> bool return os.path.isfile(self._abs(fn)) def isdir(self, fn): # type: (Text) -> bool return os.path.isdir(self._abs(fn)) def listdir(self, fn): # type: (Text) -> List[Text] return [abspath(urllib.parse.quote(str(l)), fn) for l in os.listdir(self._abs(fn))] def join(self, path, *paths): # type: (Text, *Text) -> Text return os.path.join(path, *paths) def realpath(self, path): # type: (Text) -> Text return os.path.realpath(path) # On windows os.path.realpath appends unecessary Drive, here we would avoid that def docker_compatible_realpath(self, path): # type: (Text) -> Text if onWindows(): if path.startswith('/'): return path return '/'+path return self.realpath(path)