Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/pip/_internal/operations/check.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 """Validation of dependencies of packages | |
2 """ | |
3 | |
4 import logging | |
5 from collections import namedtuple | |
6 | |
7 from pip._vendor.packaging.utils import canonicalize_name | |
8 from pip._vendor.pkg_resources import RequirementParseError | |
9 | |
10 from pip._internal.distributions import ( | |
11 make_distribution_for_install_requirement, | |
12 ) | |
13 from pip._internal.utils.misc import get_installed_distributions | |
14 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
15 | |
16 logger = logging.getLogger(__name__) | |
17 | |
18 if MYPY_CHECK_RUNNING: | |
19 from pip._internal.req.req_install import InstallRequirement | |
20 from typing import ( | |
21 Any, Callable, Dict, Optional, Set, Tuple, List | |
22 ) | |
23 | |
24 # Shorthands | |
25 PackageSet = Dict[str, 'PackageDetails'] | |
26 Missing = Tuple[str, Any] | |
27 Conflicting = Tuple[str, str, Any] | |
28 | |
29 MissingDict = Dict[str, List[Missing]] | |
30 ConflictingDict = Dict[str, List[Conflicting]] | |
31 CheckResult = Tuple[MissingDict, ConflictingDict] | |
32 | |
33 PackageDetails = namedtuple('PackageDetails', ['version', 'requires']) | |
34 | |
35 | |
36 def create_package_set_from_installed(**kwargs): | |
37 # type: (**Any) -> Tuple[PackageSet, bool] | |
38 """Converts a list of distributions into a PackageSet. | |
39 """ | |
40 # Default to using all packages installed on the system | |
41 if kwargs == {}: | |
42 kwargs = {"local_only": False, "skip": ()} | |
43 | |
44 package_set = {} | |
45 problems = False | |
46 for dist in get_installed_distributions(**kwargs): | |
47 name = canonicalize_name(dist.project_name) | |
48 try: | |
49 package_set[name] = PackageDetails(dist.version, dist.requires()) | |
50 except RequirementParseError as e: | |
51 # Don't crash on broken metadata | |
52 logging.warning("Error parsing requirements for %s: %s", name, e) | |
53 problems = True | |
54 return package_set, problems | |
55 | |
56 | |
57 def check_package_set(package_set, should_ignore=None): | |
58 # type: (PackageSet, Optional[Callable[[str], bool]]) -> CheckResult | |
59 """Check if a package set is consistent | |
60 | |
61 If should_ignore is passed, it should be a callable that takes a | |
62 package name and returns a boolean. | |
63 """ | |
64 if should_ignore is None: | |
65 def should_ignore(name): | |
66 return False | |
67 | |
68 missing = dict() | |
69 conflicting = dict() | |
70 | |
71 for package_name in package_set: | |
72 # Info about dependencies of package_name | |
73 missing_deps = set() # type: Set[Missing] | |
74 conflicting_deps = set() # type: Set[Conflicting] | |
75 | |
76 if should_ignore(package_name): | |
77 continue | |
78 | |
79 for req in package_set[package_name].requires: | |
80 name = canonicalize_name(req.project_name) # type: str | |
81 | |
82 # Check if it's missing | |
83 if name not in package_set: | |
84 missed = True | |
85 if req.marker is not None: | |
86 missed = req.marker.evaluate() | |
87 if missed: | |
88 missing_deps.add((name, req)) | |
89 continue | |
90 | |
91 # Check if there's a conflict | |
92 version = package_set[name].version # type: str | |
93 if not req.specifier.contains(version, prereleases=True): | |
94 conflicting_deps.add((name, version, req)) | |
95 | |
96 if missing_deps: | |
97 missing[package_name] = sorted(missing_deps, key=str) | |
98 if conflicting_deps: | |
99 conflicting[package_name] = sorted(conflicting_deps, key=str) | |
100 | |
101 return missing, conflicting | |
102 | |
103 | |
104 def check_install_conflicts(to_install): | |
105 # type: (List[InstallRequirement]) -> Tuple[PackageSet, CheckResult] | |
106 """For checking if the dependency graph would be consistent after \ | |
107 installing given requirements | |
108 """ | |
109 # Start from the current state | |
110 package_set, _ = create_package_set_from_installed() | |
111 # Install packages | |
112 would_be_installed = _simulate_installation_of(to_install, package_set) | |
113 | |
114 # Only warn about directly-dependent packages; create a whitelist of them | |
115 whitelist = _create_whitelist(would_be_installed, package_set) | |
116 | |
117 return ( | |
118 package_set, | |
119 check_package_set( | |
120 package_set, should_ignore=lambda name: name not in whitelist | |
121 ) | |
122 ) | |
123 | |
124 | |
125 def _simulate_installation_of(to_install, package_set): | |
126 # type: (List[InstallRequirement], PackageSet) -> Set[str] | |
127 """Computes the version of packages after installing to_install. | |
128 """ | |
129 | |
130 # Keep track of packages that were installed | |
131 installed = set() | |
132 | |
133 # Modify it as installing requirement_set would (assuming no errors) | |
134 for inst_req in to_install: | |
135 abstract_dist = make_distribution_for_install_requirement(inst_req) | |
136 dist = abstract_dist.get_pkg_resources_distribution() | |
137 | |
138 name = canonicalize_name(dist.key) | |
139 package_set[name] = PackageDetails(dist.version, dist.requires()) | |
140 | |
141 installed.add(name) | |
142 | |
143 return installed | |
144 | |
145 | |
146 def _create_whitelist(would_be_installed, package_set): | |
147 # type: (Set[str], PackageSet) -> Set[str] | |
148 packages_affected = set(would_be_installed) | |
149 | |
150 for package_name in package_set: | |
151 if package_name in packages_affected: | |
152 continue | |
153 | |
154 for req in package_set[package_name].requires: | |
155 if canonicalize_name(req.name) in packages_affected: | |
156 packages_affected.add(package_name) | |
157 break | |
158 | |
159 return packages_affected |