Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/utils.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/cwltool/utils.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,265 @@ +"""Shared functions and other definitions.""" +from __future__ import absolute_import + +import collections +import os +import platform +import random +import shutil +import string +import sys +import tempfile +from functools import partial # pylint: disable=unused-import +from typing import (IO, Any, AnyStr, Callable, # pylint: disable=unused-import + Dict, Iterable, List, MutableMapping, MutableSequence, + Optional, Union) + +import pkg_resources +from mypy_extensions import TypedDict +from schema_salad.utils import json_dump, json_dumps # pylint: disable=unused-import +from six.moves import urllib, zip_longest +from typing_extensions import Deque, Text # pylint: disable=unused-import +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +from pathlib2 import Path + +# no imports from cwltool allowed +if os.name == 'posix': + if sys.version_info < (3, 5): + import subprocess32 as subprocess # nosec # pylint: disable=unused-import + else: + import subprocess # nosec # pylint: disable=unused-import +else: + import subprocess # type: ignore # nosec + +windows_default_container_id = "frolvlad/alpine-bash" + +Directory = TypedDict('Directory', + {'class': Text, 'listing': List[Dict[Text, Text]], + 'basename': Text}) + +DEFAULT_TMP_PREFIX = tempfile.gettempdir() + os.path.sep + +processes_to_kill = collections.deque() # type: Deque[subprocess.Popen] + +def versionstring(): + # type: () -> Text + """Version of CWLtool used to execute the workflow.""" + pkg = pkg_resources.require("cwltool") + if pkg: + return u"%s %s" % (sys.argv[0], pkg[0].version) + return u"%s %s" % (sys.argv[0], "unknown version") + +def aslist(l): # type: (Any) -> MutableSequence[Any] + """Wrap any non-MutableSequence/list in a list.""" + if isinstance(l, MutableSequence): + return l + return [l] + +def copytree_with_merge(src, dst): # type: (Text, Text) -> None + if not os.path.exists(dst): + os.makedirs(dst) + shutil.copystat(src, dst) + lst = os.listdir(src) + for item in lst: + spath = os.path.join(src, item) + dpath = os.path.join(dst, item) + if os.path.isdir(spath): + copytree_with_merge(spath, dpath) + else: + shutil.copy2(spath, dpath) + +def docker_windows_path_adjust(path): + # type: (Text) -> Text + r""" + Adjust only windows paths for Docker. + + The docker run command treats them as unix paths. + + Example: 'C:\Users\foo to /C/Users/foo (Docker for Windows) or /c/Users/foo + (Docker toolbox). + """ + if onWindows(): + split = path.split(':') + if len(split) == 2: + if platform.win32_ver()[0] in ('7', '8'): # type: ignore + split[0] = split[0].lower() # Docker toolbox uses lowecase windows Drive letters + else: + split[0] = split[0].capitalize() + # Docker for Windows uses uppercase windows Drive letters + path = ':'.join(split) + path = path.replace(':', '').replace('\\', '/') + return path if path[0] == '/' else '/' + path + return path + + +def docker_windows_reverse_path_adjust(path): + # type: (Text) -> (Text) + r""" + Change docker path (only on windows os) appropriately back to Windows path. + + Example: /C/Users/foo to C:\Users\foo + """ + if path is not None and onWindows(): + if path[0] == '/': + path = path[1:] + else: + raise ValueError("not a docker path") + splitpath = path.split('/') + splitpath[0] = splitpath[0]+':' + return '\\'.join(splitpath) + return path + + +def docker_windows_reverse_fileuri_adjust(fileuri): + # type: (Text) -> (Text) + r""" + Convert fileuri to be MS Windows comptabile, if needed. + + On docker in windows fileuri do not contain : in path + To convert this file uri to windows compatible add : after drive letter, + so file:///E/var becomes file:///E:/var + """ + if fileuri is not None and onWindows(): + if urllib.parse.urlsplit(fileuri).scheme == "file": + filesplit = fileuri.split("/") + if filesplit[3][-1] != ':': + filesplit[3] = filesplit[3]+':' + return '/'.join(filesplit) + return fileuri + raise ValueError("not a file URI") + return fileuri + + +def onWindows(): + # type: () -> (bool) + """Check if we are on Windows OS.""" + return os.name == 'nt' + + +def convert_pathsep_to_unix(path): # type: (Text) -> (Text) + """ + Convert path seperators to unix style. + + On windows os.path.join would use backslash to join path, since we would + use these paths in Docker we would convert it to use forward slashes: / + """ + if path is not None and onWindows(): + return path.replace('\\', '/') + return path + +def cmp_like_py2(dict1, dict2): # type: (Dict[Text, Any], Dict[Text, Any]) -> int + """ + Compare in the same manner as Python2. + + Comparision function to be used in sorting as python3 doesn't allow sorting + of different types like str() and int(). + This function re-creates sorting nature in py2 of heterogeneous list of + `int` and `str` + """ + # extract lists from both dicts + first, second = dict1["position"], dict2["position"] + # iterate through both list till max of their size + for i, j in zip_longest(first, second): + if i == j: + continue + # in case 1st list is smaller + # should come first in sorting + if i is None: + return -1 + # if 1st list is longer, + # it should come later in sort + elif j is None: + return 1 + + # if either of the list contains str element + # at any index, both should be str before comparing + if isinstance(i, str) or isinstance(j, str): + return 1 if str(i) > str(j) else -1 + # int comparison otherwise + return 1 if i > j else -1 + # if both lists are equal + return 0 + + +def bytes2str_in_dicts(inp # type: Union[MutableMapping[Text, Any], MutableSequence[Any], Any] + ): + # type: (...) -> Union[Text, MutableSequence[Any], MutableMapping[Text, Any]] + """ + Convert any present byte string to unicode string, inplace. + + input is a dict of nested dicts and lists + """ + # if input is dict, recursively call for each value + if isinstance(inp, MutableMapping): + for k in inp: + inp[k] = bytes2str_in_dicts(inp[k]) + return inp + + # if list, iterate through list and fn call + # for all its elements + if isinstance(inp, MutableSequence): + for idx, value in enumerate(inp): + inp[idx] = bytes2str_in_dicts(value) + return inp + + # if value is bytes, return decoded string, + elif isinstance(inp, bytes): + return inp.decode('utf-8') + + # simply return elements itself + return inp + + +def visit_class(rec, cls, op): + # type: (Any, Iterable[Any], Union[Callable[..., Any], partial[Any]]) -> None + """Apply a function to with "class" in cls.""" + if isinstance(rec, MutableMapping): + if "class" in rec and rec.get("class") in cls: + op(rec) + for d in rec: + visit_class(rec[d], cls, op) + if isinstance(rec, MutableSequence): + for d in rec: + visit_class(d, cls, op) + +def visit_field(rec, field, op): + # type: (Any, Text, Union[Callable[..., Any], partial[Any]]) -> None + """Apply a function to mapping with 'field'.""" + if isinstance(rec, MutableMapping): + if field in rec: + rec[field] = op(rec[field]) + for d in rec: + visit_field(rec[d], field, op) + if isinstance(rec, MutableSequence): + for d in rec: + visit_field(d, field, op) + + +def random_outdir(): # type: () -> Text + """Return the random directory name chosen to use for tool / workflow output.""" + # compute this once and store it as a function attribute - each subsequent call will return the same value + if not hasattr(random_outdir, 'outdir'): + random_outdir.outdir = '/' + ''.join([random.choice(string.ascii_letters) for _ in range(6)]) # type: ignore # nosec + return random_outdir.outdir # type: ignore + +# +# Simple multi-platform (fcntl/msvrt) file locking wrapper +# +try: + import fcntl + + def shared_file_lock(fd): # type: (IO[Any]) -> None + fcntl.flock(fd.fileno(), fcntl.LOCK_SH) + + def upgrade_lock(fd): # type: (IO[Any]) -> None + fcntl.flock(fd.fileno(), fcntl.LOCK_EX) + +except ImportError: + import msvcrt + + def shared_file_lock(fd): # type: (IO[Any]) -> None + msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1024) # type: ignore + + def upgrade_lock(fd): # type: (IO[Any]) -> None + pass