Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/psutil/_compat.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/psutil/_compat.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,424 @@ +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Module which provides compatibility with older Python versions. +This is more future-compatible rather than the opposite (prefer latest +Python 3 way of doing things). +""" + +import collections +import errno +import functools +import os +import sys +import types + +__all__ = [ + # constants + "PY3", + # builtins + "long", "range", "super", "unicode", "basestring", + # literals + "u", "b", + # collections module + "lru_cache", + # shutil module + "which", "get_terminal_size", + # python 3 exceptions + "FileNotFoundError", "PermissionError", "ProcessLookupError", + "InterruptedError", "ChildProcessError", "FileExistsError"] + + +PY3 = sys.version_info[0] == 3 +_SENTINEL = object() + +if PY3: + long = int + xrange = range + unicode = str + basestring = str + range = range + + def u(s): + return s + + def b(s): + return s.encode("latin-1") +else: + long = long + range = xrange + unicode = unicode + basestring = basestring + + def u(s): + return unicode(s, "unicode_escape") + + def b(s): + return s + + +# --- builtins + + +# Python 3 super(). +# Taken from "future" package. +# Credit: Ryan Kelly +if PY3: + super = super +else: + _builtin_super = super + + def super(type_=_SENTINEL, type_or_obj=_SENTINEL, framedepth=1): + """Like Python 3 builtin super(). If called without any arguments + it attempts to infer them at runtime. + """ + if type_ is _SENTINEL: + f = sys._getframe(framedepth) + try: + # Get the function's first positional argument. + type_or_obj = f.f_locals[f.f_code.co_varnames[0]] + except (IndexError, KeyError): + raise RuntimeError('super() used in a function with no args') + try: + # Get the MRO so we can crawl it. + mro = type_or_obj.__mro__ + except (AttributeError, RuntimeError): + try: + mro = type_or_obj.__class__.__mro__ + except AttributeError: + raise RuntimeError('super() used in a non-newstyle class') + for type_ in mro: + # Find the class that owns the currently-executing method. + for meth in type_.__dict__.values(): + # Drill down through any wrappers to the underlying func. + # This handles e.g. classmethod() and staticmethod(). + try: + while not isinstance(meth, types.FunctionType): + if isinstance(meth, property): + # Calling __get__ on the property will invoke + # user code which might throw exceptions or + # have side effects + meth = meth.fget + else: + try: + meth = meth.__func__ + except AttributeError: + meth = meth.__get__(type_or_obj, type_) + except (AttributeError, TypeError): + continue + if meth.func_code is f.f_code: + break # found + else: + # Not found. Move onto the next class in MRO. + continue + break # found + else: + raise RuntimeError('super() called outside a method') + + # Dispatch to builtin super(). + if type_or_obj is not _SENTINEL: + return _builtin_super(type_, type_or_obj) + return _builtin_super(type_) + + +# --- exceptions + + +if PY3: + FileNotFoundError = FileNotFoundError # NOQA + PermissionError = PermissionError # NOQA + ProcessLookupError = ProcessLookupError # NOQA + InterruptedError = InterruptedError # NOQA + ChildProcessError = ChildProcessError # NOQA + FileExistsError = FileExistsError # NOQA +else: + # https://github.com/PythonCharmers/python-future/blob/exceptions/ + # src/future/types/exceptions/pep3151.py + import platform + + def _instance_checking_exception(base_exception=Exception): + def wrapped(instance_checker): + class TemporaryClass(base_exception): + + def __init__(self, *args, **kwargs): + if len(args) == 1 and isinstance(args[0], TemporaryClass): + unwrap_me = args[0] + for attr in dir(unwrap_me): + if not attr.startswith('__'): + setattr(self, attr, getattr(unwrap_me, attr)) + else: + super(TemporaryClass, self).__init__(*args, **kwargs) + + class __metaclass__(type): + def __instancecheck__(cls, inst): + return instance_checker(inst) + + def __subclasscheck__(cls, classinfo): + value = sys.exc_info()[1] + return isinstance(value, cls) + + TemporaryClass.__name__ = instance_checker.__name__ + TemporaryClass.__doc__ = instance_checker.__doc__ + return TemporaryClass + + return wrapped + + @_instance_checking_exception(EnvironmentError) + def FileNotFoundError(inst): + return getattr(inst, 'errno', _SENTINEL) == errno.ENOENT + + @_instance_checking_exception(EnvironmentError) + def ProcessLookupError(inst): + return getattr(inst, 'errno', _SENTINEL) == errno.ESRCH + + @_instance_checking_exception(EnvironmentError) + def PermissionError(inst): + return getattr(inst, 'errno', _SENTINEL) in ( + errno.EACCES, errno.EPERM) + + @_instance_checking_exception(EnvironmentError) + def InterruptedError(inst): + return getattr(inst, 'errno', _SENTINEL) == errno.EINTR + + @_instance_checking_exception(EnvironmentError) + def ChildProcessError(inst): + return getattr(inst, 'errno', _SENTINEL) == errno.ECHILD + + @_instance_checking_exception(EnvironmentError) + def FileExistsError(inst): + return getattr(inst, 'errno', _SENTINEL) == errno.EEXIST + + if platform.python_implementation() != "CPython": + try: + raise OSError(errno.EEXIST, "perm") + except FileExistsError: + pass + except OSError: + raise RuntimeError( + "broken or incompatible Python implementation, see: " + "https://github.com/giampaolo/psutil/issues/1659") + + +# --- stdlib additions + + +# py 3.2 functools.lru_cache +# Taken from: http://code.activestate.com/recipes/578078 +# Credit: Raymond Hettinger +try: + from functools import lru_cache +except ImportError: + try: + from threading import RLock + except ImportError: + from dummy_threading import RLock + + _CacheInfo = collections.namedtuple( + "CacheInfo", ["hits", "misses", "maxsize", "currsize"]) + + class _HashedSeq(list): + __slots__ = 'hashvalue' + + def __init__(self, tup, hash=hash): + self[:] = tup + self.hashvalue = hash(tup) + + def __hash__(self): + return self.hashvalue + + def _make_key(args, kwds, typed, + kwd_mark=(object(), ), + fasttypes=set((int, str, frozenset, type(None))), + sorted=sorted, tuple=tuple, type=type, len=len): + key = args + if kwds: + sorted_items = sorted(kwds.items()) + key += kwd_mark + for item in sorted_items: + key += item + if typed: + key += tuple(type(v) for v in args) + if kwds: + key += tuple(type(v) for k, v in sorted_items) + elif len(key) == 1 and type(key[0]) in fasttypes: + return key[0] + return _HashedSeq(key) + + def lru_cache(maxsize=100, typed=False): + """Least-recently-used cache decorator, see: + http://docs.python.org/3/library/functools.html#functools.lru_cache + """ + def decorating_function(user_function): + cache = dict() + stats = [0, 0] + HITS, MISSES = 0, 1 + make_key = _make_key + cache_get = cache.get + _len = len + lock = RLock() + root = [] + root[:] = [root, root, None, None] + nonlocal_root = [root] + PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 + if maxsize == 0: + def wrapper(*args, **kwds): + result = user_function(*args, **kwds) + stats[MISSES] += 1 + return result + elif maxsize is None: + def wrapper(*args, **kwds): + key = make_key(args, kwds, typed) + result = cache_get(key, root) + if result is not root: + stats[HITS] += 1 + return result + result = user_function(*args, **kwds) + cache[key] = result + stats[MISSES] += 1 + return result + else: + def wrapper(*args, **kwds): + if kwds or typed: + key = make_key(args, kwds, typed) + else: + key = args + lock.acquire() + try: + link = cache_get(key) + if link is not None: + root, = nonlocal_root + link_prev, link_next, key, result = link + link_prev[NEXT] = link_next + link_next[PREV] = link_prev + last = root[PREV] + last[NEXT] = root[PREV] = link + link[PREV] = last + link[NEXT] = root + stats[HITS] += 1 + return result + finally: + lock.release() + result = user_function(*args, **kwds) + lock.acquire() + try: + root, = nonlocal_root + if key in cache: + pass + elif _len(cache) >= maxsize: + oldroot = root + oldroot[KEY] = key + oldroot[RESULT] = result + root = nonlocal_root[0] = oldroot[NEXT] + oldkey = root[KEY] + root[KEY] = root[RESULT] = None + del cache[oldkey] + cache[key] = oldroot + else: + last = root[PREV] + link = [last, root, key, result] + last[NEXT] = root[PREV] = cache[key] = link + stats[MISSES] += 1 + finally: + lock.release() + return result + + def cache_info(): + """Report cache statistics""" + lock.acquire() + try: + return _CacheInfo(stats[HITS], stats[MISSES], maxsize, + len(cache)) + finally: + lock.release() + + def cache_clear(): + """Clear the cache and cache statistics""" + lock.acquire() + try: + cache.clear() + root = nonlocal_root[0] + root[:] = [root, root, None, None] + stats[:] = [0, 0] + finally: + lock.release() + + wrapper.__wrapped__ = user_function + wrapper.cache_info = cache_info + wrapper.cache_clear = cache_clear + return functools.update_wrapper(wrapper, user_function) + + return decorating_function + + +# python 3.3 +try: + from shutil import which +except ImportError: + def which(cmd, mode=os.F_OK | os.X_OK, path=None): + """Given a command, mode, and a PATH string, return the path which + conforms to the given mode on the PATH, or None if there is no such + file. + + `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result + of os.environ.get("PATH"), or can be overridden with a custom search + path. + """ + def _access_check(fn, mode): + return (os.path.exists(fn) and os.access(fn, mode) and + not os.path.isdir(fn)) + + if os.path.dirname(cmd): + if _access_check(cmd, mode): + return cmd + return None + + if path is None: + path = os.environ.get("PATH", os.defpath) + if not path: + return None + path = path.split(os.pathsep) + + if sys.platform == "win32": + if os.curdir not in path: + path.insert(0, os.curdir) + + pathext = os.environ.get("PATHEXT", "").split(os.pathsep) + if any(cmd.lower().endswith(ext.lower()) for ext in pathext): + files = [cmd] + else: + files = [cmd + ext for ext in pathext] + else: + files = [cmd] + + seen = set() + for dir in path: + normdir = os.path.normcase(dir) + if normdir not in seen: + seen.add(normdir) + for thefile in files: + name = os.path.join(dir, thefile) + if _access_check(name, mode): + return name + return None + + +# python 3.3 +try: + from shutil import get_terminal_size +except ImportError: + def get_terminal_size(fallback=(80, 24)): + try: + import fcntl + import termios + import struct + except ImportError: + return fallback + else: + try: + # This should work on Linux. + res = struct.unpack( + 'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234')) + return (res[1], res[0]) + except Exception: + return fallback