Mercurial > repos > guerler > hhblits
diff lib/python3.8/site-packages/pip/_internal/req/req_set.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:47:31 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/python3.8/site-packages/pip/_internal/req/req_set.py Mon Jul 27 03:47:31 2020 -0400 @@ -0,0 +1,209 @@ +# The following comment should be removed at some point in the future. +# mypy: strict-optional=False + +from __future__ import absolute_import + +import logging +from collections import OrderedDict + +from pip._vendor.packaging.utils import canonicalize_name + +from pip._internal import pep425tags +from pip._internal.exceptions import InstallationError +from pip._internal.models.wheel import Wheel +from pip._internal.utils.logging import indent_log +from pip._internal.utils.typing import MYPY_CHECK_RUNNING + +if MYPY_CHECK_RUNNING: + from typing import Dict, Iterable, List, Optional, Tuple + from pip._internal.req.req_install import InstallRequirement + + +logger = logging.getLogger(__name__) + + +class RequirementSet(object): + + def __init__(self, check_supported_wheels=True): + # type: (bool) -> None + """Create a RequirementSet. + """ + + self.requirements = OrderedDict() # type: Dict[str, InstallRequirement] # noqa: E501 + self.check_supported_wheels = check_supported_wheels + + self.unnamed_requirements = [] # type: List[InstallRequirement] + self.successfully_downloaded = [] # type: List[InstallRequirement] + self.reqs_to_cleanup = [] # type: List[InstallRequirement] + + def __str__(self): + # type: () -> str + requirements = sorted( + (req for req in self.requirements.values() if not req.comes_from), + key=lambda req: canonicalize_name(req.name), + ) + return ' '.join(str(req.req) for req in requirements) + + def __repr__(self): + # type: () -> str + requirements = sorted( + self.requirements.values(), + key=lambda req: canonicalize_name(req.name), + ) + + format_string = '<{classname} object; {count} requirement(s): {reqs}>' + return format_string.format( + classname=self.__class__.__name__, + count=len(requirements), + reqs=', '.join(str(req.req) for req in requirements), + ) + + def add_unnamed_requirement(self, install_req): + # type: (InstallRequirement) -> None + assert not install_req.name + self.unnamed_requirements.append(install_req) + + def add_named_requirement(self, install_req): + # type: (InstallRequirement) -> None + assert install_req.name + + project_name = canonicalize_name(install_req.name) + self.requirements[project_name] = install_req + + def add_requirement( + self, + install_req, # type: InstallRequirement + parent_req_name=None, # type: Optional[str] + extras_requested=None # type: Optional[Iterable[str]] + ): + # type: (...) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]] # noqa: E501 + """Add install_req as a requirement to install. + + :param parent_req_name: The name of the requirement that needed this + added. The name is used because when multiple unnamed requirements + resolve to the same name, we could otherwise end up with dependency + links that point outside the Requirements set. parent_req must + already be added. Note that None implies that this is a user + supplied requirement, vs an inferred one. + :param extras_requested: an iterable of extras used to evaluate the + environment markers. + :return: Additional requirements to scan. That is either [] if + the requirement is not applicable, or [install_req] if the + requirement is applicable and has just been added. + """ + # If the markers do not match, ignore this requirement. + if not install_req.match_markers(extras_requested): + logger.info( + "Ignoring %s: markers '%s' don't match your environment", + install_req.name, install_req.markers, + ) + return [], None + + # If the wheel is not supported, raise an error. + # Should check this after filtering out based on environment markers to + # allow specifying different wheels based on the environment/OS, in a + # single requirements file. + if install_req.link and install_req.link.is_wheel: + wheel = Wheel(install_req.link.filename) + tags = pep425tags.get_supported() + if (self.check_supported_wheels and not wheel.supported(tags)): + raise InstallationError( + "%s is not a supported wheel on this platform." % + wheel.filename + ) + + # This next bit is really a sanity check. + assert install_req.is_direct == (parent_req_name is None), ( + "a direct req shouldn't have a parent and also, " + "a non direct req should have a parent" + ) + + # Unnamed requirements are scanned again and the requirement won't be + # added as a dependency until after scanning. + if not install_req.name: + self.add_unnamed_requirement(install_req) + return [install_req], None + + try: + existing_req = self.get_requirement(install_req.name) + except KeyError: + existing_req = None + + has_conflicting_requirement = ( + parent_req_name is None and + existing_req and + not existing_req.constraint and + existing_req.extras == install_req.extras and + existing_req.req.specifier != install_req.req.specifier + ) + if has_conflicting_requirement: + raise InstallationError( + "Double requirement given: %s (already in %s, name=%r)" + % (install_req, existing_req, install_req.name) + ) + + # When no existing requirement exists, add the requirement as a + # dependency and it will be scanned again after. + if not existing_req: + self.add_named_requirement(install_req) + # We'd want to rescan this requirement later + return [install_req], install_req + + # Assume there's no need to scan, and that we've already + # encountered this for scanning. + if install_req.constraint or not existing_req.constraint: + return [], existing_req + + does_not_satisfy_constraint = ( + install_req.link and + not ( + existing_req.link and + install_req.link.path == existing_req.link.path + ) + ) + if does_not_satisfy_constraint: + self.reqs_to_cleanup.append(install_req) + raise InstallationError( + "Could not satisfy constraints for '%s': " + "installation from path or url cannot be " + "constrained to a version" % install_req.name, + ) + # If we're now installing a constraint, mark the existing + # object for real installation. + existing_req.constraint = False + existing_req.extras = tuple(sorted( + set(existing_req.extras) | set(install_req.extras) + )) + logger.debug( + "Setting %s extras to: %s", + existing_req, existing_req.extras, + ) + # Return the existing requirement for addition to the parent and + # scanning again. + return [existing_req], existing_req + + def has_requirement(self, name): + # type: (str) -> bool + project_name = canonicalize_name(name) + + return ( + project_name in self.requirements and + not self.requirements[project_name].constraint + ) + + def get_requirement(self, name): + # type: (str) -> InstallRequirement + project_name = canonicalize_name(name) + + if project_name in self.requirements: + return self.requirements[project_name] + + raise KeyError("No project with the name %r" % name) + + def cleanup_files(self): + # type: () -> None + """Clean up files, remove builds.""" + logger.debug('Cleaning up...') + with indent_log(): + for req in self.reqs_to_cleanup: + req.remove_temporary_source()