comparison env/lib/python3.7/site-packages/psutil/_compat.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Module which provides compatibility with older Python versions."""
6
7 import collections
8 import errno
9 import functools
10 import os
11 import sys
12
13 __all__ = ["PY3", "long", "xrange", "unicode", "basestring", "u", "b",
14 "lru_cache", "which", "get_terminal_size",
15 "FileNotFoundError", "PermissionError", "ProcessLookupError",
16 "InterruptedError", "ChildProcessError", "FileExistsError"]
17
18 PY3 = sys.version_info[0] == 3
19
20 if PY3:
21 long = int
22 xrange = range
23 unicode = str
24 basestring = str
25
26 def u(s):
27 return s
28
29 def b(s):
30 return s.encode("latin-1")
31 else:
32 long = long
33 xrange = xrange
34 unicode = unicode
35 basestring = basestring
36
37 def u(s):
38 return unicode(s, "unicode_escape")
39
40 def b(s):
41 return s
42
43
44 # --- exceptions
45
46
47 if PY3:
48 FileNotFoundError = FileNotFoundError # NOQA
49 PermissionError = PermissionError # NOQA
50 ProcessLookupError = ProcessLookupError # NOQA
51 InterruptedError = InterruptedError # NOQA
52 ChildProcessError = ChildProcessError # NOQA
53 FileExistsError = FileExistsError # NOQA
54 else:
55 # https://github.com/PythonCharmers/python-future/blob/exceptions/
56 # src/future/types/exceptions/pep3151.py
57 import platform
58
59 _singleton = object()
60
61 def instance_checking_exception(base_exception=Exception):
62 def wrapped(instance_checker):
63 class TemporaryClass(base_exception):
64
65 def __init__(self, *args, **kwargs):
66 if len(args) == 1 and isinstance(args[0], TemporaryClass):
67 unwrap_me = args[0]
68 for attr in dir(unwrap_me):
69 if not attr.startswith('__'):
70 setattr(self, attr, getattr(unwrap_me, attr))
71 else:
72 super(TemporaryClass, self).__init__(*args, **kwargs)
73
74 class __metaclass__(type):
75 def __instancecheck__(cls, inst):
76 return instance_checker(inst)
77
78 def __subclasscheck__(cls, classinfo):
79 value = sys.exc_info()[1]
80 return isinstance(value, cls)
81
82 TemporaryClass.__name__ = instance_checker.__name__
83 TemporaryClass.__doc__ = instance_checker.__doc__
84 return TemporaryClass
85
86 return wrapped
87
88 @instance_checking_exception(EnvironmentError)
89 def FileNotFoundError(inst):
90 return getattr(inst, 'errno', _singleton) == errno.ENOENT
91
92 @instance_checking_exception(EnvironmentError)
93 def ProcessLookupError(inst):
94 return getattr(inst, 'errno', _singleton) == errno.ESRCH
95
96 @instance_checking_exception(EnvironmentError)
97 def PermissionError(inst):
98 return getattr(inst, 'errno', _singleton) in (
99 errno.EACCES, errno.EPERM)
100
101 @instance_checking_exception(EnvironmentError)
102 def InterruptedError(inst):
103 return getattr(inst, 'errno', _singleton) == errno.EINTR
104
105 @instance_checking_exception(EnvironmentError)
106 def ChildProcessError(inst):
107 return getattr(inst, 'errno', _singleton) == errno.ECHILD
108
109 @instance_checking_exception(EnvironmentError)
110 def FileExistsError(inst):
111 return getattr(inst, 'errno', _singleton) == errno.EEXIST
112
113 if platform.python_implementation() != "CPython":
114 try:
115 raise OSError(errno.EEXIST, "perm")
116 except FileExistsError:
117 pass
118 except OSError:
119 raise RuntimeError(
120 "broken / incompatible Python implementation, see: "
121 "https://github.com/giampaolo/psutil/issues/1659")
122
123
124 # --- stdlib additions
125
126
127 # py 3.2 functools.lru_cache
128 # Taken from: http://code.activestate.com/recipes/578078
129 # Credit: Raymond Hettinger
130 try:
131 from functools import lru_cache
132 except ImportError:
133 try:
134 from threading import RLock
135 except ImportError:
136 from dummy_threading import RLock
137
138 _CacheInfo = collections.namedtuple(
139 "CacheInfo", ["hits", "misses", "maxsize", "currsize"])
140
141 class _HashedSeq(list):
142 __slots__ = 'hashvalue'
143
144 def __init__(self, tup, hash=hash):
145 self[:] = tup
146 self.hashvalue = hash(tup)
147
148 def __hash__(self):
149 return self.hashvalue
150
151 def _make_key(args, kwds, typed,
152 kwd_mark=(object(), ),
153 fasttypes=set((int, str, frozenset, type(None))),
154 sorted=sorted, tuple=tuple, type=type, len=len):
155 key = args
156 if kwds:
157 sorted_items = sorted(kwds.items())
158 key += kwd_mark
159 for item in sorted_items:
160 key += item
161 if typed:
162 key += tuple(type(v) for v in args)
163 if kwds:
164 key += tuple(type(v) for k, v in sorted_items)
165 elif len(key) == 1 and type(key[0]) in fasttypes:
166 return key[0]
167 return _HashedSeq(key)
168
169 def lru_cache(maxsize=100, typed=False):
170 """Least-recently-used cache decorator, see:
171 http://docs.python.org/3/library/functools.html#functools.lru_cache
172 """
173 def decorating_function(user_function):
174 cache = dict()
175 stats = [0, 0]
176 HITS, MISSES = 0, 1
177 make_key = _make_key
178 cache_get = cache.get
179 _len = len
180 lock = RLock()
181 root = []
182 root[:] = [root, root, None, None]
183 nonlocal_root = [root]
184 PREV, NEXT, KEY, RESULT = 0, 1, 2, 3
185 if maxsize == 0:
186 def wrapper(*args, **kwds):
187 result = user_function(*args, **kwds)
188 stats[MISSES] += 1
189 return result
190 elif maxsize is None:
191 def wrapper(*args, **kwds):
192 key = make_key(args, kwds, typed)
193 result = cache_get(key, root)
194 if result is not root:
195 stats[HITS] += 1
196 return result
197 result = user_function(*args, **kwds)
198 cache[key] = result
199 stats[MISSES] += 1
200 return result
201 else:
202 def wrapper(*args, **kwds):
203 if kwds or typed:
204 key = make_key(args, kwds, typed)
205 else:
206 key = args
207 lock.acquire()
208 try:
209 link = cache_get(key)
210 if link is not None:
211 root, = nonlocal_root
212 link_prev, link_next, key, result = link
213 link_prev[NEXT] = link_next
214 link_next[PREV] = link_prev
215 last = root[PREV]
216 last[NEXT] = root[PREV] = link
217 link[PREV] = last
218 link[NEXT] = root
219 stats[HITS] += 1
220 return result
221 finally:
222 lock.release()
223 result = user_function(*args, **kwds)
224 lock.acquire()
225 try:
226 root, = nonlocal_root
227 if key in cache:
228 pass
229 elif _len(cache) >= maxsize:
230 oldroot = root
231 oldroot[KEY] = key
232 oldroot[RESULT] = result
233 root = nonlocal_root[0] = oldroot[NEXT]
234 oldkey = root[KEY]
235 root[KEY] = root[RESULT] = None
236 del cache[oldkey]
237 cache[key] = oldroot
238 else:
239 last = root[PREV]
240 link = [last, root, key, result]
241 last[NEXT] = root[PREV] = cache[key] = link
242 stats[MISSES] += 1
243 finally:
244 lock.release()
245 return result
246
247 def cache_info():
248 """Report cache statistics"""
249 lock.acquire()
250 try:
251 return _CacheInfo(stats[HITS], stats[MISSES], maxsize,
252 len(cache))
253 finally:
254 lock.release()
255
256 def cache_clear():
257 """Clear the cache and cache statistics"""
258 lock.acquire()
259 try:
260 cache.clear()
261 root = nonlocal_root[0]
262 root[:] = [root, root, None, None]
263 stats[:] = [0, 0]
264 finally:
265 lock.release()
266
267 wrapper.__wrapped__ = user_function
268 wrapper.cache_info = cache_info
269 wrapper.cache_clear = cache_clear
270 return functools.update_wrapper(wrapper, user_function)
271
272 return decorating_function
273
274
275 # python 3.3
276 try:
277 from shutil import which
278 except ImportError:
279 def which(cmd, mode=os.F_OK | os.X_OK, path=None):
280 """Given a command, mode, and a PATH string, return the path which
281 conforms to the given mode on the PATH, or None if there is no such
282 file.
283
284 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
285 of os.environ.get("PATH"), or can be overridden with a custom search
286 path.
287 """
288 def _access_check(fn, mode):
289 return (os.path.exists(fn) and os.access(fn, mode) and
290 not os.path.isdir(fn))
291
292 if os.path.dirname(cmd):
293 if _access_check(cmd, mode):
294 return cmd
295 return None
296
297 if path is None:
298 path = os.environ.get("PATH", os.defpath)
299 if not path:
300 return None
301 path = path.split(os.pathsep)
302
303 if sys.platform == "win32":
304 if os.curdir not in path:
305 path.insert(0, os.curdir)
306
307 pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
308 if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
309 files = [cmd]
310 else:
311 files = [cmd + ext for ext in pathext]
312 else:
313 files = [cmd]
314
315 seen = set()
316 for dir in path:
317 normdir = os.path.normcase(dir)
318 if normdir not in seen:
319 seen.add(normdir)
320 for thefile in files:
321 name = os.path.join(dir, thefile)
322 if _access_check(name, mode):
323 return name
324 return None
325
326
327 # python 3.3
328 try:
329 from shutil import get_terminal_size
330 except ImportError:
331 def get_terminal_size(fallback=(80, 24)):
332 try:
333 import fcntl
334 import termios
335 import struct
336 except ImportError:
337 return fallback
338 else:
339 try:
340 # This should work on Linux.
341 res = struct.unpack(
342 'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234'))
343 return (res[1], res[0])
344 except Exception:
345 return fallback