Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/operations/check.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:47:31 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9e54283cc701 |
---|---|
1 """Validation of dependencies of packages | |
2 """ | |
3 | |
4 # The following comment should be removed at some point in the future. | |
5 # mypy: strict-optional=False | |
6 # mypy: disallow-untyped-defs=False | |
7 | |
8 import logging | |
9 from collections import namedtuple | |
10 | |
11 from pip._vendor.packaging.utils import canonicalize_name | |
12 from pip._vendor.pkg_resources import RequirementParseError | |
13 | |
14 from pip._internal.distributions import ( | |
15 make_distribution_for_install_requirement, | |
16 ) | |
17 from pip._internal.utils.misc import get_installed_distributions | |
18 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
19 | |
20 logger = logging.getLogger(__name__) | |
21 | |
22 if MYPY_CHECK_RUNNING: | |
23 from pip._internal.req.req_install import InstallRequirement | |
24 from typing import ( | |
25 Any, Callable, Dict, Optional, Set, Tuple, List | |
26 ) | |
27 | |
28 # Shorthands | |
29 PackageSet = Dict[str, 'PackageDetails'] | |
30 Missing = Tuple[str, Any] | |
31 Conflicting = Tuple[str, str, Any] | |
32 | |
33 MissingDict = Dict[str, List[Missing]] | |
34 ConflictingDict = Dict[str, List[Conflicting]] | |
35 CheckResult = Tuple[MissingDict, ConflictingDict] | |
36 | |
37 PackageDetails = namedtuple('PackageDetails', ['version', 'requires']) | |
38 | |
39 | |
40 def create_package_set_from_installed(**kwargs): | |
41 # type: (**Any) -> Tuple[PackageSet, bool] | |
42 """Converts a list of distributions into a PackageSet. | |
43 """ | |
44 # Default to using all packages installed on the system | |
45 if kwargs == {}: | |
46 kwargs = {"local_only": False, "skip": ()} | |
47 | |
48 package_set = {} | |
49 problems = False | |
50 for dist in get_installed_distributions(**kwargs): | |
51 name = canonicalize_name(dist.project_name) | |
52 try: | |
53 package_set[name] = PackageDetails(dist.version, dist.requires()) | |
54 except RequirementParseError as e: | |
55 # Don't crash on broken metadata | |
56 logger.warning("Error parsing requirements for %s: %s", name, e) | |
57 problems = True | |
58 return package_set, problems | |
59 | |
60 | |
61 def check_package_set(package_set, should_ignore=None): | |
62 # type: (PackageSet, Optional[Callable[[str], bool]]) -> CheckResult | |
63 """Check if a package set is consistent | |
64 | |
65 If should_ignore is passed, it should be a callable that takes a | |
66 package name and returns a boolean. | |
67 """ | |
68 if should_ignore is None: | |
69 def should_ignore(name): | |
70 return False | |
71 | |
72 missing = {} | |
73 conflicting = {} | |
74 | |
75 for package_name in package_set: | |
76 # Info about dependencies of package_name | |
77 missing_deps = set() # type: Set[Missing] | |
78 conflicting_deps = set() # type: Set[Conflicting] | |
79 | |
80 if should_ignore(package_name): | |
81 continue | |
82 | |
83 for req in package_set[package_name].requires: | |
84 name = canonicalize_name(req.project_name) # type: str | |
85 | |
86 # Check if it's missing | |
87 if name not in package_set: | |
88 missed = True | |
89 if req.marker is not None: | |
90 missed = req.marker.evaluate() | |
91 if missed: | |
92 missing_deps.add((name, req)) | |
93 continue | |
94 | |
95 # Check if there's a conflict | |
96 version = package_set[name].version # type: str | |
97 if not req.specifier.contains(version, prereleases=True): | |
98 conflicting_deps.add((name, version, req)) | |
99 | |
100 if missing_deps: | |
101 missing[package_name] = sorted(missing_deps, key=str) | |
102 if conflicting_deps: | |
103 conflicting[package_name] = sorted(conflicting_deps, key=str) | |
104 | |
105 return missing, conflicting | |
106 | |
107 | |
108 def check_install_conflicts(to_install): | |
109 # type: (List[InstallRequirement]) -> Tuple[PackageSet, CheckResult] | |
110 """For checking if the dependency graph would be consistent after \ | |
111 installing given requirements | |
112 """ | |
113 # Start from the current state | |
114 package_set, _ = create_package_set_from_installed() | |
115 # Install packages | |
116 would_be_installed = _simulate_installation_of(to_install, package_set) | |
117 | |
118 # Only warn about directly-dependent packages; create a whitelist of them | |
119 whitelist = _create_whitelist(would_be_installed, package_set) | |
120 | |
121 return ( | |
122 package_set, | |
123 check_package_set( | |
124 package_set, should_ignore=lambda name: name not in whitelist | |
125 ) | |
126 ) | |
127 | |
128 | |
129 def _simulate_installation_of(to_install, package_set): | |
130 # type: (List[InstallRequirement], PackageSet) -> Set[str] | |
131 """Computes the version of packages after installing to_install. | |
132 """ | |
133 | |
134 # Keep track of packages that were installed | |
135 installed = set() | |
136 | |
137 # Modify it as installing requirement_set would (assuming no errors) | |
138 for inst_req in to_install: | |
139 abstract_dist = make_distribution_for_install_requirement(inst_req) | |
140 dist = abstract_dist.get_pkg_resources_distribution() | |
141 | |
142 name = canonicalize_name(dist.key) | |
143 package_set[name] = PackageDetails(dist.version, dist.requires()) | |
144 | |
145 installed.add(name) | |
146 | |
147 return installed | |
148 | |
149 | |
150 def _create_whitelist(would_be_installed, package_set): | |
151 # type: (Set[str], PackageSet) -> Set[str] | |
152 packages_affected = set(would_be_installed) | |
153 | |
154 for package_name in package_set: | |
155 if package_name in packages_affected: | |
156 continue | |
157 | |
158 for req in package_set[package_name].requires: | |
159 if canonicalize_name(req.name) in packages_affected: | |
160 packages_affected.add(package_name) | |
161 break | |
162 | |
163 return packages_affected |