Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/utils.py @ 3:758bc20232e8 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:20:52 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
"""Shared functions and other definitions.""" from __future__ import absolute_import import collections import os import platform import random import shutil import string import sys import tempfile from functools import partial # pylint: disable=unused-import from typing import (IO, Any, AnyStr, Callable, # pylint: disable=unused-import Dict, Iterable, List, MutableMapping, MutableSequence, Optional, Union) import pkg_resources from mypy_extensions import TypedDict from schema_salad.utils import json_dump, json_dumps # pylint: disable=unused-import from six.moves import urllib, zip_longest from typing_extensions import Deque, Text # pylint: disable=unused-import # move to a regular typing import when Python 3.3-3.6 is no longer supported from pathlib2 import Path # no imports from cwltool allowed if os.name == 'posix': if sys.version_info < (3, 5): import subprocess32 as subprocess # nosec # pylint: disable=unused-import else: import subprocess # nosec # pylint: disable=unused-import else: import subprocess # type: ignore # nosec windows_default_container_id = "frolvlad/alpine-bash" Directory = TypedDict('Directory', {'class': Text, 'listing': List[Dict[Text, Text]], 'basename': Text}) DEFAULT_TMP_PREFIX = tempfile.gettempdir() + os.path.sep processes_to_kill = collections.deque() # type: Deque[subprocess.Popen] def versionstring(): # type: () -> Text """Version of CWLtool used to execute the workflow.""" pkg = pkg_resources.require("cwltool") if pkg: return u"%s %s" % (sys.argv[0], pkg[0].version) return u"%s %s" % (sys.argv[0], "unknown version") def aslist(l): # type: (Any) -> MutableSequence[Any] """Wrap any non-MutableSequence/list in a list.""" if isinstance(l, MutableSequence): return l return [l] def copytree_with_merge(src, dst): # type: (Text, Text) -> None if not os.path.exists(dst): os.makedirs(dst) shutil.copystat(src, dst) lst = os.listdir(src) for item in lst: spath = os.path.join(src, item) dpath = os.path.join(dst, item) if os.path.isdir(spath): copytree_with_merge(spath, dpath) else: shutil.copy2(spath, dpath) def docker_windows_path_adjust(path): # type: (Text) -> Text r""" Adjust only windows paths for Docker. The docker run command treats them as unix paths. Example: 'C:\Users\foo to /C/Users/foo (Docker for Windows) or /c/Users/foo (Docker toolbox). """ if onWindows(): split = path.split(':') if len(split) == 2: if platform.win32_ver()[0] in ('7', '8'): # type: ignore split[0] = split[0].lower() # Docker toolbox uses lowecase windows Drive letters else: split[0] = split[0].capitalize() # Docker for Windows uses uppercase windows Drive letters path = ':'.join(split) path = path.replace(':', '').replace('\\', '/') return path if path[0] == '/' else '/' + path return path def docker_windows_reverse_path_adjust(path): # type: (Text) -> (Text) r""" Change docker path (only on windows os) appropriately back to Windows path. Example: /C/Users/foo to C:\Users\foo """ if path is not None and onWindows(): if path[0] == '/': path = path[1:] else: raise ValueError("not a docker path") splitpath = path.split('/') splitpath[0] = splitpath[0]+':' return '\\'.join(splitpath) return path def docker_windows_reverse_fileuri_adjust(fileuri): # type: (Text) -> (Text) r""" Convert fileuri to be MS Windows comptabile, if needed. On docker in windows fileuri do not contain : in path To convert this file uri to windows compatible add : after drive letter, so file:///E/var becomes file:///E:/var """ if fileuri is not None and onWindows(): if urllib.parse.urlsplit(fileuri).scheme == "file": filesplit = fileuri.split("/") if filesplit[3][-1] != ':': filesplit[3] = filesplit[3]+':' return '/'.join(filesplit) return fileuri raise ValueError("not a file URI") return fileuri def onWindows(): # type: () -> (bool) """Check if we are on Windows OS.""" return os.name == 'nt' def convert_pathsep_to_unix(path): # type: (Text) -> (Text) """ Convert path seperators to unix style. On windows os.path.join would use backslash to join path, since we would use these paths in Docker we would convert it to use forward slashes: / """ if path is not None and onWindows(): return path.replace('\\', '/') return path def cmp_like_py2(dict1, dict2): # type: (Dict[Text, Any], Dict[Text, Any]) -> int """ Compare in the same manner as Python2. Comparision function to be used in sorting as python3 doesn't allow sorting of different types like str() and int(). This function re-creates sorting nature in py2 of heterogeneous list of `int` and `str` """ # extract lists from both dicts first, second = dict1["position"], dict2["position"] # iterate through both list till max of their size for i, j in zip_longest(first, second): if i == j: continue # in case 1st list is smaller # should come first in sorting if i is None: return -1 # if 1st list is longer, # it should come later in sort elif j is None: return 1 # if either of the list contains str element # at any index, both should be str before comparing if isinstance(i, str) or isinstance(j, str): return 1 if str(i) > str(j) else -1 # int comparison otherwise return 1 if i > j else -1 # if both lists are equal return 0 def bytes2str_in_dicts(inp # type: Union[MutableMapping[Text, Any], MutableSequence[Any], Any] ): # type: (...) -> Union[Text, MutableSequence[Any], MutableMapping[Text, Any]] """ Convert any present byte string to unicode string, inplace. input is a dict of nested dicts and lists """ # if input is dict, recursively call for each value if isinstance(inp, MutableMapping): for k in inp: inp[k] = bytes2str_in_dicts(inp[k]) return inp # if list, iterate through list and fn call # for all its elements if isinstance(inp, MutableSequence): for idx, value in enumerate(inp): inp[idx] = bytes2str_in_dicts(value) return inp # if value is bytes, return decoded string, elif isinstance(inp, bytes): return inp.decode('utf-8') # simply return elements itself return inp def visit_class(rec, cls, op): # type: (Any, Iterable[Any], Union[Callable[..., Any], partial[Any]]) -> None """Apply a function to with "class" in cls.""" if isinstance(rec, MutableMapping): if "class" in rec and rec.get("class") in cls: op(rec) for d in rec: visit_class(rec[d], cls, op) if isinstance(rec, MutableSequence): for d in rec: visit_class(d, cls, op) def visit_field(rec, field, op): # type: (Any, Text, Union[Callable[..., Any], partial[Any]]) -> None """Apply a function to mapping with 'field'.""" if isinstance(rec, MutableMapping): if field in rec: rec[field] = op(rec[field]) for d in rec: visit_field(rec[d], field, op) if isinstance(rec, MutableSequence): for d in rec: visit_field(d, field, op) def random_outdir(): # type: () -> Text """Return the random directory name chosen to use for tool / workflow output.""" # compute this once and store it as a function attribute - each subsequent call will return the same value if not hasattr(random_outdir, 'outdir'): random_outdir.outdir = '/' + ''.join([random.choice(string.ascii_letters) for _ in range(6)]) # type: ignore # nosec return random_outdir.outdir # type: ignore # # Simple multi-platform (fcntl/msvrt) file locking wrapper # try: import fcntl def shared_file_lock(fd): # type: (IO[Any]) -> None fcntl.flock(fd.fileno(), fcntl.LOCK_SH) def upgrade_lock(fd): # type: (IO[Any]) -> None fcntl.flock(fd.fileno(), fcntl.LOCK_EX) except ImportError: import msvcrt def shared_file_lock(fd): # type: (IO[Any]) -> None msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1024) # type: ignore def upgrade_lock(fd): # type: (IO[Any]) -> None pass