Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/galaxy/util/object_wrapper.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/galaxy/util/object_wrapper.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,498 @@ +""" +Classes for wrapping Objects and Sanitizing string output. +""" +from __future__ import absolute_import + +import collections +import inspect +import logging +import string +import sys +from numbers import Number +from types import ( + BuiltinFunctionType, + BuiltinMethodType, + CodeType, + FrameType, + FunctionType, + GeneratorType, + GetSetDescriptorType, + MemberDescriptorType, + MethodType, + ModuleType, + TracebackType, +) + +try: + from types import NoneType +except ImportError: + NoneType = type(None) +try: + from types import NotImplementedType +except ImportError: + NotImplementedType = type(NotImplemented) + +try: + from types import EllipsisType +except ImportError: + EllipsisType = type(Ellipsis) + +try: + from types import XRangeType +except ImportError: + XRangeType = range + +try: + from types import SliceType +except ImportError: + SliceType = slice + +try: + from types import ( + BufferType, + DictProxyType + ) +except ImportError: + # Py3 doesn't have these concepts, just treat them like SliceType that + # so they are __WRAP_NO_SUBCLASS__. + BufferType = SliceType + DictProxyType = SliceType + +from six.moves import ( + copyreg as copy_reg, + UserDict +) + +from galaxy.util import sanitize_lists_to_string as _sanitize_lists_to_string + +log = logging.getLogger(__name__) + +# Define different behaviors for different types, see also: https://docs.python.org/2/library/types.html + +# Known Callable types +__CALLABLE_TYPES__ = (FunctionType, MethodType, GeneratorType, CodeType, BuiltinFunctionType, BuiltinMethodType, ) + +# Always wrap these types without attempting to subclass +__WRAP_NO_SUBCLASS__ = (ModuleType, XRangeType, SliceType, BufferType, TracebackType, FrameType, DictProxyType, + GetSetDescriptorType, MemberDescriptorType) + __CALLABLE_TYPES__ + +# Don't wrap or sanitize. +__DONT_SANITIZE_TYPES__ = (Number, bool, NoneType, NotImplementedType, EllipsisType, bytearray, ) + +# Don't wrap, but do sanitize. +__DONT_WRAP_TYPES__ = tuple() # ( basestring, ) so that we can get the unsanitized string, we will now wrap basestring instances + +# Wrap contents, but not the container +__WRAP_SEQUENCES__ = (tuple, list, ) +__WRAP_SETS__ = (set, frozenset, ) +__WRAP_MAPPINGS__ = (dict, UserDict, ) + + +# Define the set of characters that are not sanitized, and define a set of mappings for those that are. +# characters that are valid +VALID_CHARACTERS = set(string.ascii_letters + string.digits + " -=_.()/+*^,:?!@") + +# characters that are allowed but need to be escaped +CHARACTER_MAP = {'>': '__gt__', + '<': '__lt__', + "'": '__sq__', + '"': '__dq__', + '[': '__ob__', + ']': '__cb__', + '{': '__oc__', + '}': '__cc__', + '\n': '__cn__', + '\r': '__cr__', + '\t': '__tc__', + '#': '__pd__'} + +INVALID_CHARACTER = "X" + +if sys.version_info > (3, 0): + # __coerce__ doesn't do anything under Python anyway. + def coerce(x, y): + return x + + +def cmp(x, y): + # Builtin in Python 2, but not Python 3. + return (x > y) - (x < y) + + +def sanitize_lists_to_string(values, valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP, invalid_character=INVALID_CHARACTER): + return _sanitize_lists_to_string(values, valid_characters=valid_characters, character_map=character_map, invalid_character=invalid_character) + + +def wrap_with_safe_string(value, no_wrap_classes=None): + """ + Recursively wrap values that should be wrapped. + """ + + def __do_wrap(value): + if isinstance(value, SafeStringWrapper): + # Only ever wrap one-layer + return value + if isinstance(value, collections.Callable): + safe_class = CallableSafeStringWrapper + else: + safe_class = SafeStringWrapper + if isinstance(value, no_wrap_classes): + return value + if isinstance(value, __DONT_WRAP_TYPES__): + return sanitize_lists_to_string(value, valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP) + if isinstance(value, __WRAP_NO_SUBCLASS__): + return safe_class(value, safe_string_wrapper_function=__do_wrap) + for this_type in __WRAP_SEQUENCES__ + __WRAP_SETS__: + if isinstance(value, this_type): + return this_type(list(map(__do_wrap, value))) + for this_type in __WRAP_MAPPINGS__: + if isinstance(value, this_type): + # Wrap both key and value + return this_type((__do_wrap(x[0]), __do_wrap(x[1])) for x in value.items()) + # Create a dynamic class that joins SafeStringWrapper with the object being wrapped. + # This allows e.g. isinstance to continue to work. + try: + wrapped_class_name = value.__name__ + wrapped_class = value + except Exception: + wrapped_class_name = value.__class__.__name__ + wrapped_class = value.__class__ + value_mod = inspect.getmodule(value) + if value_mod: + wrapped_class_name = "%s.%s" % (value_mod.__name__, wrapped_class_name) + wrapped_class_name = "SafeStringWrapper(%s:%s)" % (wrapped_class_name, ",".join(sorted(map(str, no_wrap_classes)))) + do_wrap_func_name = "__do_wrap_%s" % (wrapped_class_name) + do_wrap_func = __do_wrap + global_dict = globals() + if wrapped_class_name in global_dict: + # Check to see if we have created a wrapper for this class yet, if so, reuse + wrapped_class = global_dict.get(wrapped_class_name) + do_wrap_func = global_dict.get(do_wrap_func_name, __do_wrap) + else: + try: + wrapped_class = type(wrapped_class_name, (safe_class, wrapped_class, ), {}) + except TypeError as e: + # Fail-safe for when a class cannot be dynamically subclassed. + log.warning("Unable to create dynamic subclass for %s, %s: %s", type(value), value, e) + wrapped_class = type(wrapped_class_name, (safe_class, ), {}) + if wrapped_class not in (SafeStringWrapper, CallableSafeStringWrapper): + # Save this wrapper for reuse and pickling/copying + global_dict[wrapped_class_name] = wrapped_class + do_wrap_func.__name__ = do_wrap_func_name + global_dict[do_wrap_func_name] = do_wrap_func + + def pickle_safe_object(safe_object): + return (wrapped_class, (safe_object.unsanitized, do_wrap_func, )) + # Set pickle and copy properties + copy_reg.pickle(wrapped_class, pickle_safe_object, do_wrap_func) + return wrapped_class(value, safe_string_wrapper_function=do_wrap_func) + + # Determine classes not to wrap + if no_wrap_classes: + if not isinstance(no_wrap_classes, (tuple, list)): + no_wrap_classes = [no_wrap_classes] + no_wrap_classes = list(no_wrap_classes) + list(__DONT_SANITIZE_TYPES__) + [SafeStringWrapper] + else: + no_wrap_classes = list(__DONT_SANITIZE_TYPES__) + [SafeStringWrapper] + no_wrap_classes = tuple(set(sorted(no_wrap_classes, key=str))) + return __do_wrap(value) + + +# N.B. refer to e.g. https://docs.python.org/2/reference/datamodel.html for information on Python's Data Model. + + +class SafeStringWrapper(object): + """ + Class that wraps and sanitizes any provided value's attributes + that will attempt to be cast into a string. + + Attempts to mimic behavior of original class, including operands. + + To ensure proper handling of e.g. subclass checks, the *wrap_with_safe_string()* + method should be used. + + This wrapping occurs in a recursive/parasitic fashion, as all called attributes of + the originally wrapped object will also be wrapped and sanitized, unless the attribute + is of a type found in __DONT_SANITIZE_TYPES__ + __DONT_WRAP_TYPES__, where e.g. ~(strings + will still be sanitized, but not wrapped), and e.g. integers will have neither. + """ + __UNSANITIZED_ATTRIBUTE_NAME__ = 'unsanitized' + __NO_WRAP_NAMES__ = ['__safe_string_wrapper_function__', '__class__', __UNSANITIZED_ATTRIBUTE_NAME__] + + def __new__(cls, *arg, **kwd): + # We need to define a __new__ since, we are subclassing from e.g. immutable str, which internally sets data + # that will be used when other + this (this + other is handled by __add__) + try: + sanitized_value = sanitize_lists_to_string(arg[0], valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP) + return super(SafeStringWrapper, cls).__new__(cls, sanitized_value) + except TypeError: + # Class to be wrapped takes no parameters. + # This is pefectly normal for mutable types. + return super(SafeStringWrapper, cls).__new__(cls) + + def __init__(self, value, safe_string_wrapper_function=wrap_with_safe_string): + self.unsanitized = value + self.__safe_string_wrapper_function__ = safe_string_wrapper_function + + def __str__(self): + return sanitize_lists_to_string(self.unsanitized, valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP) + + def __repr__(self): + return "%s object at %x on: %s" % (sanitize_lists_to_string(self.__class__.__name__, valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP), id(self), sanitize_lists_to_string(repr(self.unsanitized), valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP)) + + def __lt__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.unsanitized < other + + def __le__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.unsanitized <= other + + def __eq__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.unsanitized == other + + def __ne__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.unsanitized != other + + def __gt__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.unsanitized > other + + def __ge__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.unsanitized >= other + + def __cmp__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return cmp(self.unsanitized, other) + + # Do not implement __rcmp__, python 2.2 < 2.6 + + def __hash__(self): + return hash(self.unsanitized) + + def __bool__(self): + return bool(self.unsanitized) + __nonzero__ = __bool__ + + # Do not implement __unicode__, we will rely on __str__ + + def __getattr__(self, name): + if name in SafeStringWrapper.__NO_WRAP_NAMES__: + # FIXME: is this ever reached? + return object.__getattribute__(self, name) + return self.__safe_string_wrapper_function__(getattr(self.unsanitized, name)) + + def __setattr__(self, name, value): + if name in SafeStringWrapper.__NO_WRAP_NAMES__: + return object.__setattr__(self, name, value) + return setattr(self.unsanitized, name, value) + + def __delattr__(self, name): + if name in SafeStringWrapper.__NO_WRAP_NAMES__: + return object.__delattr__(self, name) + return delattr(self.unsanitized, name) + + def __getattribute__(self, name): + if name in SafeStringWrapper.__NO_WRAP_NAMES__: + return object.__getattribute__(self, name) + return self.__safe_string_wrapper_function__(getattr(object.__getattribute__(self, 'unsanitized'), name)) + + # Skip Descriptors + + # Skip __slots__ + + # Don't need to define a metaclass, we'll use the helper function to handle with subclassing for e.g. isinstance() + + # Revisit: + # __instancecheck__ + # __subclasscheck__ + # We are using a helper class to create dynamic subclasses to handle class checks + + # We address __call__ as needed based upon unsanitized, through the use of a CallableSafeStringWrapper class + + def __len__(self): + original_value = self.unsanitized + while isinstance(original_value, SafeStringWrapper): + original_value = self.unsanitized + return len(self.unsanitized) + + def __getitem__(self, key): + return self.__safe_string_wrapper_function__(self.unsanitized[key]) + + def __setitem__(self, key, value): + while isinstance(value, SafeStringWrapper): + value = value.unsanitized + self.unsanitized[key] = value + + def __delitem__(self, key): + del self.unsanitized[key] + + def __iter__(self): + return iter(map(self.__safe_string_wrapper_function__, iter(self.unsanitized))) + + # Do not implement __reversed__ + + def __contains__(self, item): + # FIXME: Do we need to consider if item is/isn't or does/doesn't contain SafeStringWrapper? + # When considering e.g. nested lists/dicts/etc, this gets complicated + while isinstance(item, SafeStringWrapper): + item = item.unsanitized + return item in self.unsanitized + + # Not sure that we need these slice methods, but will provide anyway + def __getslice__(self, i, j): + return self.__safe_string_wrapper_function__(self.unsanitized[i:j]) + + def __setslice__(self, i, j, value): + self.unsanitized[i:j] = value + + def __delslice__(self, i, j): + del self.unsanitized[i:j] + + def __add__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized + other) + + def __sub__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized - other) + + def __mul__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized * other) + + def __floordiv__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized // other) + + def __mod__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized % other) + + def __divmod__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(divmod(self.unsanitized, other)) + + def __pow__(self, *other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(pow(self.unsanitized, *other)) + + def __lshift__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized << other) + + def __rshift__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized >> other) + + def __and__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized & other) + + def __xor__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized ^ other) + + def __or__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized | other) + + def __div__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized / other) + + def __truediv__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(self.unsanitized / other) + + # The only reflected operand that we will define is __rpow__, due to coercion rules complications as per docs + def __rpow__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return self.__safe_string_wrapper_function__(pow(other, self.unsanitized)) + + # Do not implement in-place operands + + def __neg__(self): + return self.__safe_string_wrapper_function__(-self.unsanitized) + + def __pos__(self): + return self.__safe_string_wrapper_function__(+self.unsanitized) + + def __abs__(self): + return self.__safe_string_wrapper_function__(abs(self.unsanitized)) + + def __invert__(self): + return self.__safe_string_wrapper_function__(~self.unsanitized) + + def __complex__(self): + return self.__safe_string_wrapper_function__(complex(self.unsanitized)) + + def __int__(self): + return int(self.unsanitized) + + def __float__(self): + return float(self.unsanitized) + + def __oct__(self): + return oct(self.unsanitized) + + def __hex__(self): + return hex(self.unsanitized) + + def __index__(self): + return self.unsanitized.index() + + def __coerce__(self, other): + while isinstance(other, SafeStringWrapper): + other = other.unsanitized + return coerce(self.unsanitized, other) + + def __enter__(self): + return self.unsanitized.__enter__() + + def __exit__(self, *args): + return self.unsanitized.__exit__(*args) + + +class CallableSafeStringWrapper(SafeStringWrapper): + + def __call__(self, *args, **kwds): + return self.__safe_string_wrapper_function__(self.unsanitized(*args, **kwds)) + + +# Enable pickling/deepcopy +def pickle_SafeStringWrapper(safe_object): + args = (safe_object.unsanitized, ) + cls = SafeStringWrapper + if isinstance(safe_object, CallableSafeStringWrapper): + cls = CallableSafeStringWrapper + return (cls, args) + + +copy_reg.pickle(SafeStringWrapper, pickle_SafeStringWrapper, wrap_with_safe_string) +copy_reg.pickle(CallableSafeStringWrapper, pickle_SafeStringWrapper, wrap_with_safe_string)