Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/pluggy/manager.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/pluggy/manager.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,394 @@ +import inspect +import sys +from . import _tracing +from .callers import _Result +from .hooks import HookImpl, _HookRelay, _HookCaller, normalize_hookimpl_opts +import warnings + +if sys.version_info >= (3, 8): + from importlib import metadata as importlib_metadata +else: + import importlib_metadata + + +def _warn_for_function(warning, function): + warnings.warn_explicit( + warning, + type(warning), + lineno=function.__code__.co_firstlineno, + filename=function.__code__.co_filename, + ) + + +class PluginValidationError(Exception): + """ plugin failed validation. + + :param object plugin: the plugin which failed validation, + may be a module or an arbitrary object. + """ + + def __init__(self, plugin, message): + self.plugin = plugin + super(Exception, self).__init__(message) + + +class DistFacade(object): + """Emulate a pkg_resources Distribution""" + + def __init__(self, dist): + self._dist = dist + + @property + def project_name(self): + return self.metadata["name"] + + def __getattr__(self, attr, default=None): + return getattr(self._dist, attr, default) + + def __dir__(self): + return sorted(dir(self._dist) + ["_dist", "project_name"]) + + +class PluginManager(object): + """ Core :py:class:`.PluginManager` class which manages registration + of plugin objects and 1:N hook calling. + + You can register new hooks by calling :py:meth:`add_hookspecs(module_or_class) + <.PluginManager.add_hookspecs>`. + You can register plugin objects (which contain hooks) by calling + :py:meth:`register(plugin) <.PluginManager.register>`. The :py:class:`.PluginManager` + is initialized with a prefix that is searched for in the names of the dict + of registered plugin objects. + + For debugging purposes you can call :py:meth:`.PluginManager.enable_tracing` + which will subsequently send debug information to the trace helper. + """ + + def __init__(self, project_name, implprefix=None): + """If ``implprefix`` is given implementation functions + will be recognized if their name matches the ``implprefix``. """ + self.project_name = project_name + self._name2plugin = {} + self._plugin2hookcallers = {} + self._plugin_distinfo = [] + self.trace = _tracing.TagTracer().get("pluginmanage") + self.hook = _HookRelay() + if implprefix is not None: + warnings.warn( + "Support for the `implprefix` arg is now deprecated and will " + "be removed in an upcoming release. Please use HookimplMarker.", + DeprecationWarning, + stacklevel=2, + ) + self._implprefix = implprefix + self._inner_hookexec = lambda hook, methods, kwargs: hook.multicall( + methods, + kwargs, + firstresult=hook.spec.opts.get("firstresult") if hook.spec else False, + ) + + def _hookexec(self, hook, methods, kwargs): + # called from all hookcaller instances. + # enable_tracing will set its own wrapping function at self._inner_hookexec + return self._inner_hookexec(hook, methods, kwargs) + + def register(self, plugin, name=None): + """ Register a plugin and return its canonical name or ``None`` if the name + is blocked from registering. Raise a :py:class:`ValueError` if the plugin + is already registered. """ + plugin_name = name or self.get_canonical_name(plugin) + + if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers: + if self._name2plugin.get(plugin_name, -1) is None: + return # blocked plugin, return None to indicate no registration + raise ValueError( + "Plugin already registered: %s=%s\n%s" + % (plugin_name, plugin, self._name2plugin) + ) + + # XXX if an error happens we should make sure no state has been + # changed at point of return + self._name2plugin[plugin_name] = plugin + + # register matching hook implementations of the plugin + self._plugin2hookcallers[plugin] = hookcallers = [] + for name in dir(plugin): + hookimpl_opts = self.parse_hookimpl_opts(plugin, name) + if hookimpl_opts is not None: + normalize_hookimpl_opts(hookimpl_opts) + method = getattr(plugin, name) + hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts) + hook = getattr(self.hook, name, None) + if hook is None: + hook = _HookCaller(name, self._hookexec) + setattr(self.hook, name, hook) + elif hook.has_spec(): + self._verify_hook(hook, hookimpl) + hook._maybe_apply_history(hookimpl) + hook._add_hookimpl(hookimpl) + hookcallers.append(hook) + return plugin_name + + def parse_hookimpl_opts(self, plugin, name): + method = getattr(plugin, name) + if not inspect.isroutine(method): + return + try: + res = getattr(method, self.project_name + "_impl", None) + except Exception: + res = {} + if res is not None and not isinstance(res, dict): + # false positive + res = None + # TODO: remove when we drop implprefix in 1.0 + elif res is None and self._implprefix and name.startswith(self._implprefix): + _warn_for_function( + DeprecationWarning( + "The `implprefix` system is deprecated please decorate " + "this function using an instance of HookimplMarker." + ), + method, + ) + res = {} + return res + + def unregister(self, plugin=None, name=None): + """ unregister a plugin object and all its contained hook implementations + from internal data structures. """ + if name is None: + assert plugin is not None, "one of name or plugin needs to be specified" + name = self.get_name(plugin) + + if plugin is None: + plugin = self.get_plugin(name) + + # if self._name2plugin[name] == None registration was blocked: ignore + if self._name2plugin.get(name): + del self._name2plugin[name] + + for hookcaller in self._plugin2hookcallers.pop(plugin, []): + hookcaller._remove_plugin(plugin) + + return plugin + + def set_blocked(self, name): + """ block registrations of the given name, unregister if already registered. """ + self.unregister(name=name) + self._name2plugin[name] = None + + def is_blocked(self, name): + """ return ``True`` if the given plugin name is blocked. """ + return name in self._name2plugin and self._name2plugin[name] is None + + def add_hookspecs(self, module_or_class): + """ add new hook specifications defined in the given ``module_or_class``. + Functions are recognized if they have been decorated accordingly. """ + names = [] + for name in dir(module_or_class): + spec_opts = self.parse_hookspec_opts(module_or_class, name) + if spec_opts is not None: + hc = getattr(self.hook, name, None) + if hc is None: + hc = _HookCaller(name, self._hookexec, module_or_class, spec_opts) + setattr(self.hook, name, hc) + else: + # plugins registered this hook without knowing the spec + hc.set_specification(module_or_class, spec_opts) + for hookfunction in hc.get_hookimpls(): + self._verify_hook(hc, hookfunction) + names.append(name) + + if not names: + raise ValueError( + "did not find any %r hooks in %r" % (self.project_name, module_or_class) + ) + + def parse_hookspec_opts(self, module_or_class, name): + method = getattr(module_or_class, name) + return getattr(method, self.project_name + "_spec", None) + + def get_plugins(self): + """ return the set of registered plugins. """ + return set(self._plugin2hookcallers) + + def is_registered(self, plugin): + """ Return ``True`` if the plugin is already registered. """ + return plugin in self._plugin2hookcallers + + def get_canonical_name(self, plugin): + """ Return canonical name for a plugin object. Note that a plugin + may be registered under a different name which was specified + by the caller of :py:meth:`register(plugin, name) <.PluginManager.register>`. + To obtain the name of an registered plugin use :py:meth:`get_name(plugin) + <.PluginManager.get_name>` instead.""" + return getattr(plugin, "__name__", None) or str(id(plugin)) + + def get_plugin(self, name): + """ Return a plugin or ``None`` for the given name. """ + return self._name2plugin.get(name) + + def has_plugin(self, name): + """ Return ``True`` if a plugin with the given name is registered. """ + return self.get_plugin(name) is not None + + def get_name(self, plugin): + """ Return name for registered plugin or ``None`` if not registered. """ + for name, val in self._name2plugin.items(): + if plugin == val: + return name + + def _verify_hook(self, hook, hookimpl): + if hook.is_historic() and hookimpl.hookwrapper: + raise PluginValidationError( + hookimpl.plugin, + "Plugin %r\nhook %r\nhistoric incompatible to hookwrapper" + % (hookimpl.plugin_name, hook.name), + ) + if hook.spec.warn_on_impl: + _warn_for_function(hook.spec.warn_on_impl, hookimpl.function) + # positional arg checking + notinspec = set(hookimpl.argnames) - set(hook.spec.argnames) + if notinspec: + raise PluginValidationError( + hookimpl.plugin, + "Plugin %r for hook %r\nhookimpl definition: %s\n" + "Argument(s) %s are declared in the hookimpl but " + "can not be found in the hookspec" + % ( + hookimpl.plugin_name, + hook.name, + _formatdef(hookimpl.function), + notinspec, + ), + ) + + def check_pending(self): + """ Verify that all hooks which have not been verified against + a hook specification are optional, otherwise raise :py:class:`.PluginValidationError`.""" + for name in self.hook.__dict__: + if name[0] != "_": + hook = getattr(self.hook, name) + if not hook.has_spec(): + for hookimpl in hook.get_hookimpls(): + if not hookimpl.optionalhook: + raise PluginValidationError( + hookimpl.plugin, + "unknown hook %r in plugin %r" + % (name, hookimpl.plugin), + ) + + def load_setuptools_entrypoints(self, group, name=None): + """ Load modules from querying the specified setuptools ``group``. + + :param str group: entry point group to load plugins + :param str name: if given, loads only plugins with the given ``name``. + :rtype: int + :return: return the number of loaded plugins by this call. + """ + count = 0 + for dist in importlib_metadata.distributions(): + for ep in dist.entry_points: + if ( + ep.group != group + or (name is not None and ep.name != name) + # already registered + or self.get_plugin(ep.name) + or self.is_blocked(ep.name) + ): + continue + plugin = ep.load() + self.register(plugin, name=ep.name) + self._plugin_distinfo.append((plugin, DistFacade(dist))) + count += 1 + return count + + def list_plugin_distinfo(self): + """ return list of distinfo/plugin tuples for all setuptools registered + plugins. """ + return list(self._plugin_distinfo) + + def list_name_plugin(self): + """ return list of name/plugin pairs. """ + return list(self._name2plugin.items()) + + def get_hookcallers(self, plugin): + """ get all hook callers for the specified plugin. """ + return self._plugin2hookcallers.get(plugin) + + def add_hookcall_monitoring(self, before, after): + """ add before/after tracing functions for all hooks + and return an undo function which, when called, + will remove the added tracers. + + ``before(hook_name, hook_impls, kwargs)`` will be called ahead + of all hook calls and receive a hookcaller instance, a list + of HookImpl instances and the keyword arguments for the hook call. + + ``after(outcome, hook_name, hook_impls, kwargs)`` receives the + same arguments as ``before`` but also a :py:class:`pluggy.callers._Result` object + which represents the result of the overall hook call. + """ + oldcall = self._inner_hookexec + + def traced_hookexec(hook, hook_impls, kwargs): + before(hook.name, hook_impls, kwargs) + outcome = _Result.from_call(lambda: oldcall(hook, hook_impls, kwargs)) + after(outcome, hook.name, hook_impls, kwargs) + return outcome.get_result() + + self._inner_hookexec = traced_hookexec + + def undo(): + self._inner_hookexec = oldcall + + return undo + + def enable_tracing(self): + """ enable tracing of hook calls and return an undo function. """ + hooktrace = self.trace.root.get("hook") + + def before(hook_name, methods, kwargs): + hooktrace.root.indent += 1 + hooktrace(hook_name, kwargs) + + def after(outcome, hook_name, methods, kwargs): + if outcome.excinfo is None: + hooktrace("finish", hook_name, "-->", outcome.get_result()) + hooktrace.root.indent -= 1 + + return self.add_hookcall_monitoring(before, after) + + def subset_hook_caller(self, name, remove_plugins): + """ Return a new :py:class:`.hooks._HookCaller` instance for the named method + which manages calls to all registered plugins except the + ones from remove_plugins. """ + orig = getattr(self.hook, name) + plugins_to_remove = [plug for plug in remove_plugins if hasattr(plug, name)] + if plugins_to_remove: + hc = _HookCaller( + orig.name, orig._hookexec, orig.spec.namespace, orig.spec.opts + ) + for hookimpl in orig.get_hookimpls(): + plugin = hookimpl.plugin + if plugin not in plugins_to_remove: + hc._add_hookimpl(hookimpl) + # we also keep track of this hook caller so it + # gets properly removed on plugin unregistration + self._plugin2hookcallers.setdefault(plugin, []).append(hc) + return hc + return orig + + +if hasattr(inspect, "signature"): + + def _formatdef(func): + return "%s%s" % (func.__name__, str(inspect.signature(func))) + + +else: + + def _formatdef(func): + return "%s%s" % ( + func.__name__, + inspect.formatargspec(*inspect.getargspec(func)), + )