comparison lib/python3.8/site-packages/pip/_internal/req/req_tracker.py @ 0:9e54283cc701 draft

"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author guerler
date Mon, 27 Jul 2020 03:47:31 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:9e54283cc701
1 # The following comment should be removed at some point in the future.
2 # mypy: strict-optional=False
3
4 from __future__ import absolute_import
5
6 import contextlib
7 import errno
8 import hashlib
9 import logging
10 import os
11
12 from pip._vendor import contextlib2
13
14 from pip._internal.utils.temp_dir import TempDirectory
15 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
16
17 if MYPY_CHECK_RUNNING:
18 from types import TracebackType
19 from typing import Dict, Iterator, Optional, Set, Type, Union
20 from pip._internal.req.req_install import InstallRequirement
21 from pip._internal.models.link import Link
22
23 logger = logging.getLogger(__name__)
24
25
26 @contextlib.contextmanager
27 def update_env_context_manager(**changes):
28 # type: (str) -> Iterator[None]
29 target = os.environ
30
31 # Save values from the target and change them.
32 non_existent_marker = object()
33 saved_values = {} # type: Dict[str, Union[object, str]]
34 for name, new_value in changes.items():
35 try:
36 saved_values[name] = target[name]
37 except KeyError:
38 saved_values[name] = non_existent_marker
39 target[name] = new_value
40
41 try:
42 yield
43 finally:
44 # Restore original values in the target.
45 for name, original_value in saved_values.items():
46 if original_value is non_existent_marker:
47 del target[name]
48 else:
49 assert isinstance(original_value, str) # for mypy
50 target[name] = original_value
51
52
53 @contextlib.contextmanager
54 def get_requirement_tracker():
55 # type: () -> Iterator[RequirementTracker]
56 root = os.environ.get('PIP_REQ_TRACKER')
57 with contextlib2.ExitStack() as ctx:
58 if root is None:
59 root = ctx.enter_context(
60 TempDirectory(kind='req-tracker')
61 ).path
62 ctx.enter_context(update_env_context_manager(PIP_REQ_TRACKER=root))
63 logger.debug("Initialized build tracking at %s", root)
64
65 with RequirementTracker(root) as tracker:
66 yield tracker
67
68
69 class RequirementTracker(object):
70
71 def __init__(self, root):
72 # type: (str) -> None
73 self._root = root
74 self._entries = set() # type: Set[InstallRequirement]
75 logger.debug("Created build tracker: %s", self._root)
76
77 def __enter__(self):
78 # type: () -> RequirementTracker
79 logger.debug("Entered build tracker: %s", self._root)
80 return self
81
82 def __exit__(
83 self,
84 exc_type, # type: Optional[Type[BaseException]]
85 exc_val, # type: Optional[BaseException]
86 exc_tb # type: Optional[TracebackType]
87 ):
88 # type: (...) -> None
89 self.cleanup()
90
91 def _entry_path(self, link):
92 # type: (Link) -> str
93 hashed = hashlib.sha224(link.url_without_fragment.encode()).hexdigest()
94 return os.path.join(self._root, hashed)
95
96 def add(self, req):
97 # type: (InstallRequirement) -> None
98 """Add an InstallRequirement to build tracking.
99 """
100
101 # Get the file to write information about this requirement.
102 entry_path = self._entry_path(req.link)
103
104 # Try reading from the file. If it exists and can be read from, a build
105 # is already in progress, so a LookupError is raised.
106 try:
107 with open(entry_path) as fp:
108 contents = fp.read()
109 except IOError as e:
110 # if the error is anything other than "file does not exist", raise.
111 if e.errno != errno.ENOENT:
112 raise
113 else:
114 message = '%s is already being built: %s' % (req.link, contents)
115 raise LookupError(message)
116
117 # If we're here, req should really not be building already.
118 assert req not in self._entries
119
120 # Start tracking this requirement.
121 with open(entry_path, 'w') as fp:
122 fp.write(str(req))
123 self._entries.add(req)
124
125 logger.debug('Added %s to build tracker %r', req, self._root)
126
127 def remove(self, req):
128 # type: (InstallRequirement) -> None
129 """Remove an InstallRequirement from build tracking.
130 """
131
132 # Delete the created file and the corresponding entries.
133 os.unlink(self._entry_path(req.link))
134 self._entries.remove(req)
135
136 logger.debug('Removed %s from build tracker %r', req, self._root)
137
138 def cleanup(self):
139 # type: () -> None
140 for req in set(self._entries):
141 self.remove(req)
142
143 logger.debug("Removed build tracker: %r", self._root)
144
145 @contextlib.contextmanager
146 def track(self, req):
147 # type: (InstallRequirement) -> Iterator[None]
148 self.add(req)
149 yield
150 self.remove(req)