comparison lib/python3.8/site-packages/pip/_internal/req/req_uninstall.py @ 0:9e54283cc701 draft

"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author guerler
date Mon, 27 Jul 2020 03:47:31 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:9e54283cc701
1 from __future__ import absolute_import
2
3 import csv
4 import functools
5 import logging
6 import os
7 import sys
8 import sysconfig
9
10 from pip._vendor import pkg_resources
11
12 from pip._internal.exceptions import UninstallationError
13 from pip._internal.locations import bin_py, bin_user
14 from pip._internal.utils.compat import WINDOWS, cache_from_source, uses_pycache
15 from pip._internal.utils.logging import indent_log
16 from pip._internal.utils.misc import (
17 FakeFile,
18 ask,
19 dist_in_usersite,
20 dist_is_local,
21 egg_link_path,
22 is_local,
23 normalize_path,
24 renames,
25 rmtree,
26 )
27 from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
28 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
29
30 if MYPY_CHECK_RUNNING:
31 from typing import (
32 Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple,
33 )
34 from pip._vendor.pkg_resources import Distribution
35
36 logger = logging.getLogger(__name__)
37
38
39 def _script_names(dist, script_name, is_gui):
40 # type: (Distribution, str, bool) -> List[str]
41 """Create the fully qualified name of the files created by
42 {console,gui}_scripts for the given ``dist``.
43 Returns the list of file names
44 """
45 if dist_in_usersite(dist):
46 bin_dir = bin_user
47 else:
48 bin_dir = bin_py
49 exe_name = os.path.join(bin_dir, script_name)
50 paths_to_remove = [exe_name]
51 if WINDOWS:
52 paths_to_remove.append(exe_name + '.exe')
53 paths_to_remove.append(exe_name + '.exe.manifest')
54 if is_gui:
55 paths_to_remove.append(exe_name + '-script.pyw')
56 else:
57 paths_to_remove.append(exe_name + '-script.py')
58 return paths_to_remove
59
60
61 def _unique(fn):
62 # type: (Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]]
63 @functools.wraps(fn)
64 def unique(*args, **kw):
65 # type: (Any, Any) -> Iterator[Any]
66 seen = set() # type: Set[Any]
67 for item in fn(*args, **kw):
68 if item not in seen:
69 seen.add(item)
70 yield item
71 return unique
72
73
74 @_unique
75 def uninstallation_paths(dist):
76 # type: (Distribution) -> Iterator[str]
77 """
78 Yield all the uninstallation paths for dist based on RECORD-without-.py[co]
79
80 Yield paths to all the files in RECORD. For each .py file in RECORD, add
81 the .pyc and .pyo in the same directory.
82
83 UninstallPathSet.add() takes care of the __pycache__ .py[co].
84 """
85 r = csv.reader(FakeFile(dist.get_metadata_lines('RECORD')))
86 for row in r:
87 path = os.path.join(dist.location, row[0])
88 yield path
89 if path.endswith('.py'):
90 dn, fn = os.path.split(path)
91 base = fn[:-3]
92 path = os.path.join(dn, base + '.pyc')
93 yield path
94 path = os.path.join(dn, base + '.pyo')
95 yield path
96
97
98 def compact(paths):
99 # type: (Iterable[str]) -> Set[str]
100 """Compact a path set to contain the minimal number of paths
101 necessary to contain all paths in the set. If /a/path/ and
102 /a/path/to/a/file.txt are both in the set, leave only the
103 shorter path."""
104
105 sep = os.path.sep
106 short_paths = set() # type: Set[str]
107 for path in sorted(paths, key=len):
108 should_skip = any(
109 path.startswith(shortpath.rstrip("*")) and
110 path[len(shortpath.rstrip("*").rstrip(sep))] == sep
111 for shortpath in short_paths
112 )
113 if not should_skip:
114 short_paths.add(path)
115 return short_paths
116
117
118 def compress_for_rename(paths):
119 # type: (Iterable[str]) -> Set[str]
120 """Returns a set containing the paths that need to be renamed.
121
122 This set may include directories when the original sequence of paths
123 included every file on disk.
124 """
125 case_map = dict((os.path.normcase(p), p) for p in paths)
126 remaining = set(case_map)
127 unchecked = sorted(set(os.path.split(p)[0]
128 for p in case_map.values()), key=len)
129 wildcards = set() # type: Set[str]
130
131 def norm_join(*a):
132 # type: (str) -> str
133 return os.path.normcase(os.path.join(*a))
134
135 for root in unchecked:
136 if any(os.path.normcase(root).startswith(w)
137 for w in wildcards):
138 # This directory has already been handled.
139 continue
140
141 all_files = set() # type: Set[str]
142 all_subdirs = set() # type: Set[str]
143 for dirname, subdirs, files in os.walk(root):
144 all_subdirs.update(norm_join(root, dirname, d)
145 for d in subdirs)
146 all_files.update(norm_join(root, dirname, f)
147 for f in files)
148 # If all the files we found are in our remaining set of files to
149 # remove, then remove them from the latter set and add a wildcard
150 # for the directory.
151 if not (all_files - remaining):
152 remaining.difference_update(all_files)
153 wildcards.add(root + os.sep)
154
155 return set(map(case_map.__getitem__, remaining)) | wildcards
156
157
158 def compress_for_output_listing(paths):
159 # type: (Iterable[str]) -> Tuple[Set[str], Set[str]]
160 """Returns a tuple of 2 sets of which paths to display to user
161
162 The first set contains paths that would be deleted. Files of a package
163 are not added and the top-level directory of the package has a '*' added
164 at the end - to signify that all it's contents are removed.
165
166 The second set contains files that would have been skipped in the above
167 folders.
168 """
169
170 will_remove = set(paths)
171 will_skip = set()
172
173 # Determine folders and files
174 folders = set()
175 files = set()
176 for path in will_remove:
177 if path.endswith(".pyc"):
178 continue
179 if path.endswith("__init__.py") or ".dist-info" in path:
180 folders.add(os.path.dirname(path))
181 files.add(path)
182
183 # probably this one https://github.com/python/mypy/issues/390
184 _normcased_files = set(map(os.path.normcase, files)) # type: ignore
185
186 folders = compact(folders)
187
188 # This walks the tree using os.walk to not miss extra folders
189 # that might get added.
190 for folder in folders:
191 for dirpath, _, dirfiles in os.walk(folder):
192 for fname in dirfiles:
193 if fname.endswith(".pyc"):
194 continue
195
196 file_ = os.path.join(dirpath, fname)
197 if (os.path.isfile(file_) and
198 os.path.normcase(file_) not in _normcased_files):
199 # We are skipping this file. Add it to the set.
200 will_skip.add(file_)
201
202 will_remove = files | {
203 os.path.join(folder, "*") for folder in folders
204 }
205
206 return will_remove, will_skip
207
208
209 class StashedUninstallPathSet(object):
210 """A set of file rename operations to stash files while
211 tentatively uninstalling them."""
212 def __init__(self):
213 # type: () -> None
214 # Mapping from source file root to [Adjacent]TempDirectory
215 # for files under that directory.
216 self._save_dirs = {} # type: Dict[str, TempDirectory]
217 # (old path, new path) tuples for each move that may need
218 # to be undone.
219 self._moves = [] # type: List[Tuple[str, str]]
220
221 def _get_directory_stash(self, path):
222 # type: (str) -> str
223 """Stashes a directory.
224
225 Directories are stashed adjacent to their original location if
226 possible, or else moved/copied into the user's temp dir."""
227
228 try:
229 save_dir = AdjacentTempDirectory(path) # type: TempDirectory
230 except OSError:
231 save_dir = TempDirectory(kind="uninstall")
232 self._save_dirs[os.path.normcase(path)] = save_dir
233
234 return save_dir.path
235
236 def _get_file_stash(self, path):
237 # type: (str) -> str
238 """Stashes a file.
239
240 If no root has been provided, one will be created for the directory
241 in the user's temp directory."""
242 path = os.path.normcase(path)
243 head, old_head = os.path.dirname(path), None
244 save_dir = None
245
246 while head != old_head:
247 try:
248 save_dir = self._save_dirs[head]
249 break
250 except KeyError:
251 pass
252 head, old_head = os.path.dirname(head), head
253 else:
254 # Did not find any suitable root
255 head = os.path.dirname(path)
256 save_dir = TempDirectory(kind='uninstall')
257 self._save_dirs[head] = save_dir
258
259 relpath = os.path.relpath(path, head)
260 if relpath and relpath != os.path.curdir:
261 return os.path.join(save_dir.path, relpath)
262 return save_dir.path
263
264 def stash(self, path):
265 # type: (str) -> str
266 """Stashes the directory or file and returns its new location.
267 Handle symlinks as files to avoid modifying the symlink targets.
268 """
269 path_is_dir = os.path.isdir(path) and not os.path.islink(path)
270 if path_is_dir:
271 new_path = self._get_directory_stash(path)
272 else:
273 new_path = self._get_file_stash(path)
274
275 self._moves.append((path, new_path))
276 if (path_is_dir and os.path.isdir(new_path)):
277 # If we're moving a directory, we need to
278 # remove the destination first or else it will be
279 # moved to inside the existing directory.
280 # We just created new_path ourselves, so it will
281 # be removable.
282 os.rmdir(new_path)
283 renames(path, new_path)
284 return new_path
285
286 def commit(self):
287 # type: () -> None
288 """Commits the uninstall by removing stashed files."""
289 for _, save_dir in self._save_dirs.items():
290 save_dir.cleanup()
291 self._moves = []
292 self._save_dirs = {}
293
294 def rollback(self):
295 # type: () -> None
296 """Undoes the uninstall by moving stashed files back."""
297 for p in self._moves:
298 logger.info("Moving to %s\n from %s", *p)
299
300 for new_path, path in self._moves:
301 try:
302 logger.debug('Replacing %s from %s', new_path, path)
303 if os.path.isfile(new_path) or os.path.islink(new_path):
304 os.unlink(new_path)
305 elif os.path.isdir(new_path):
306 rmtree(new_path)
307 renames(path, new_path)
308 except OSError as ex:
309 logger.error("Failed to restore %s", new_path)
310 logger.debug("Exception: %s", ex)
311
312 self.commit()
313
314 @property
315 def can_rollback(self):
316 # type: () -> bool
317 return bool(self._moves)
318
319
320 class UninstallPathSet(object):
321 """A set of file paths to be removed in the uninstallation of a
322 requirement."""
323 def __init__(self, dist):
324 # type: (Distribution) -> None
325 self.paths = set() # type: Set[str]
326 self._refuse = set() # type: Set[str]
327 self.pth = {} # type: Dict[str, UninstallPthEntries]
328 self.dist = dist
329 self._moved_paths = StashedUninstallPathSet()
330
331 def _permitted(self, path):
332 # type: (str) -> bool
333 """
334 Return True if the given path is one we are permitted to
335 remove/modify, False otherwise.
336
337 """
338 return is_local(path)
339
340 def add(self, path):
341 # type: (str) -> None
342 head, tail = os.path.split(path)
343
344 # we normalize the head to resolve parent directory symlinks, but not
345 # the tail, since we only want to uninstall symlinks, not their targets
346 path = os.path.join(normalize_path(head), os.path.normcase(tail))
347
348 if not os.path.exists(path):
349 return
350 if self._permitted(path):
351 self.paths.add(path)
352 else:
353 self._refuse.add(path)
354
355 # __pycache__ files can show up after 'installed-files.txt' is created,
356 # due to imports
357 if os.path.splitext(path)[1] == '.py' and uses_pycache:
358 self.add(cache_from_source(path))
359
360 def add_pth(self, pth_file, entry):
361 # type: (str, str) -> None
362 pth_file = normalize_path(pth_file)
363 if self._permitted(pth_file):
364 if pth_file not in self.pth:
365 self.pth[pth_file] = UninstallPthEntries(pth_file)
366 self.pth[pth_file].add(entry)
367 else:
368 self._refuse.add(pth_file)
369
370 def remove(self, auto_confirm=False, verbose=False):
371 # type: (bool, bool) -> None
372 """Remove paths in ``self.paths`` with confirmation (unless
373 ``auto_confirm`` is True)."""
374
375 if not self.paths:
376 logger.info(
377 "Can't uninstall '%s'. No files were found to uninstall.",
378 self.dist.project_name,
379 )
380 return
381
382 dist_name_version = (
383 self.dist.project_name + "-" + self.dist.version
384 )
385 logger.info('Uninstalling %s:', dist_name_version)
386
387 with indent_log():
388 if auto_confirm or self._allowed_to_proceed(verbose):
389 moved = self._moved_paths
390
391 for_rename = compress_for_rename(self.paths)
392
393 for path in sorted(compact(for_rename)):
394 moved.stash(path)
395 logger.debug('Removing file or directory %s', path)
396
397 for pth in self.pth.values():
398 pth.remove()
399
400 logger.info('Successfully uninstalled %s', dist_name_version)
401
402 def _allowed_to_proceed(self, verbose):
403 # type: (bool) -> bool
404 """Display which files would be deleted and prompt for confirmation
405 """
406
407 def _display(msg, paths):
408 # type: (str, Iterable[str]) -> None
409 if not paths:
410 return
411
412 logger.info(msg)
413 with indent_log():
414 for path in sorted(compact(paths)):
415 logger.info(path)
416
417 if not verbose:
418 will_remove, will_skip = compress_for_output_listing(self.paths)
419 else:
420 # In verbose mode, display all the files that are going to be
421 # deleted.
422 will_remove = set(self.paths)
423 will_skip = set()
424
425 _display('Would remove:', will_remove)
426 _display('Would not remove (might be manually added):', will_skip)
427 _display('Would not remove (outside of prefix):', self._refuse)
428 if verbose:
429 _display('Will actually move:', compress_for_rename(self.paths))
430
431 return ask('Proceed (y/n)? ', ('y', 'n')) == 'y'
432
433 def rollback(self):
434 # type: () -> None
435 """Rollback the changes previously made by remove()."""
436 if not self._moved_paths.can_rollback:
437 logger.error(
438 "Can't roll back %s; was not uninstalled",
439 self.dist.project_name,
440 )
441 return
442 logger.info('Rolling back uninstall of %s', self.dist.project_name)
443 self._moved_paths.rollback()
444 for pth in self.pth.values():
445 pth.rollback()
446
447 def commit(self):
448 # type: () -> None
449 """Remove temporary save dir: rollback will no longer be possible."""
450 self._moved_paths.commit()
451
452 @classmethod
453 def from_dist(cls, dist):
454 # type: (Distribution) -> UninstallPathSet
455 dist_path = normalize_path(dist.location)
456 if not dist_is_local(dist):
457 logger.info(
458 "Not uninstalling %s at %s, outside environment %s",
459 dist.key,
460 dist_path,
461 sys.prefix,
462 )
463 return cls(dist)
464
465 if dist_path in {p for p in {sysconfig.get_path("stdlib"),
466 sysconfig.get_path("platstdlib")}
467 if p}:
468 logger.info(
469 "Not uninstalling %s at %s, as it is in the standard library.",
470 dist.key,
471 dist_path,
472 )
473 return cls(dist)
474
475 paths_to_remove = cls(dist)
476 develop_egg_link = egg_link_path(dist)
477 develop_egg_link_egg_info = '{}.egg-info'.format(
478 pkg_resources.to_filename(dist.project_name))
479 egg_info_exists = dist.egg_info and os.path.exists(dist.egg_info)
480 # Special case for distutils installed package
481 distutils_egg_info = getattr(dist._provider, 'path', None)
482
483 # Uninstall cases order do matter as in the case of 2 installs of the
484 # same package, pip needs to uninstall the currently detected version
485 if (egg_info_exists and dist.egg_info.endswith('.egg-info') and
486 not dist.egg_info.endswith(develop_egg_link_egg_info)):
487 # if dist.egg_info.endswith(develop_egg_link_egg_info), we
488 # are in fact in the develop_egg_link case
489 paths_to_remove.add(dist.egg_info)
490 if dist.has_metadata('installed-files.txt'):
491 for installed_file in dist.get_metadata(
492 'installed-files.txt').splitlines():
493 path = os.path.normpath(
494 os.path.join(dist.egg_info, installed_file)
495 )
496 paths_to_remove.add(path)
497 # FIXME: need a test for this elif block
498 # occurs with --single-version-externally-managed/--record outside
499 # of pip
500 elif dist.has_metadata('top_level.txt'):
501 if dist.has_metadata('namespace_packages.txt'):
502 namespaces = dist.get_metadata('namespace_packages.txt')
503 else:
504 namespaces = []
505 for top_level_pkg in [
506 p for p
507 in dist.get_metadata('top_level.txt').splitlines()
508 if p and p not in namespaces]:
509 path = os.path.join(dist.location, top_level_pkg)
510 paths_to_remove.add(path)
511 paths_to_remove.add(path + '.py')
512 paths_to_remove.add(path + '.pyc')
513 paths_to_remove.add(path + '.pyo')
514
515 elif distutils_egg_info:
516 raise UninstallationError(
517 "Cannot uninstall {!r}. It is a distutils installed project "
518 "and thus we cannot accurately determine which files belong "
519 "to it which would lead to only a partial uninstall.".format(
520 dist.project_name,
521 )
522 )
523
524 elif dist.location.endswith('.egg'):
525 # package installed by easy_install
526 # We cannot match on dist.egg_name because it can slightly vary
527 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg
528 paths_to_remove.add(dist.location)
529 easy_install_egg = os.path.split(dist.location)[1]
530 easy_install_pth = os.path.join(os.path.dirname(dist.location),
531 'easy-install.pth')
532 paths_to_remove.add_pth(easy_install_pth, './' + easy_install_egg)
533
534 elif egg_info_exists and dist.egg_info.endswith('.dist-info'):
535 for path in uninstallation_paths(dist):
536 paths_to_remove.add(path)
537
538 elif develop_egg_link:
539 # develop egg
540 with open(develop_egg_link, 'r') as fh:
541 link_pointer = os.path.normcase(fh.readline().strip())
542 assert (link_pointer == dist.location), (
543 'Egg-link %s does not match installed location of %s '
544 '(at %s)' % (link_pointer, dist.project_name, dist.location)
545 )
546 paths_to_remove.add(develop_egg_link)
547 easy_install_pth = os.path.join(os.path.dirname(develop_egg_link),
548 'easy-install.pth')
549 paths_to_remove.add_pth(easy_install_pth, dist.location)
550
551 else:
552 logger.debug(
553 'Not sure how to uninstall: %s - Check: %s',
554 dist, dist.location,
555 )
556
557 # find distutils scripts= scripts
558 if dist.has_metadata('scripts') and dist.metadata_isdir('scripts'):
559 for script in dist.metadata_listdir('scripts'):
560 if dist_in_usersite(dist):
561 bin_dir = bin_user
562 else:
563 bin_dir = bin_py
564 paths_to_remove.add(os.path.join(bin_dir, script))
565 if WINDOWS:
566 paths_to_remove.add(os.path.join(bin_dir, script) + '.bat')
567
568 # find console_scripts
569 _scripts_to_remove = []
570 console_scripts = dist.get_entry_map(group='console_scripts')
571 for name in console_scripts.keys():
572 _scripts_to_remove.extend(_script_names(dist, name, False))
573 # find gui_scripts
574 gui_scripts = dist.get_entry_map(group='gui_scripts')
575 for name in gui_scripts.keys():
576 _scripts_to_remove.extend(_script_names(dist, name, True))
577
578 for s in _scripts_to_remove:
579 paths_to_remove.add(s)
580
581 return paths_to_remove
582
583
584 class UninstallPthEntries(object):
585 def __init__(self, pth_file):
586 # type: (str) -> None
587 if not os.path.isfile(pth_file):
588 raise UninstallationError(
589 "Cannot remove entries from nonexistent file %s" % pth_file
590 )
591 self.file = pth_file
592 self.entries = set() # type: Set[str]
593 self._saved_lines = None # type: Optional[List[bytes]]
594
595 def add(self, entry):
596 # type: (str) -> None
597 entry = os.path.normcase(entry)
598 # On Windows, os.path.normcase converts the entry to use
599 # backslashes. This is correct for entries that describe absolute
600 # paths outside of site-packages, but all the others use forward
601 # slashes.
602 # os.path.splitdrive is used instead of os.path.isabs because isabs
603 # treats non-absolute paths with drive letter markings like c:foo\bar
604 # as absolute paths. It also does not recognize UNC paths if they don't
605 # have more than "\\sever\share". Valid examples: "\\server\share\" or
606 # "\\server\share\folder". Python 2.7.8+ support UNC in splitdrive.
607 if WINDOWS and not os.path.splitdrive(entry)[0]:
608 entry = entry.replace('\\', '/')
609 self.entries.add(entry)
610
611 def remove(self):
612 # type: () -> None
613 logger.debug('Removing pth entries from %s:', self.file)
614 with open(self.file, 'rb') as fh:
615 # windows uses '\r\n' with py3k, but uses '\n' with py2.x
616 lines = fh.readlines()
617 self._saved_lines = lines
618 if any(b'\r\n' in line for line in lines):
619 endline = '\r\n'
620 else:
621 endline = '\n'
622 # handle missing trailing newline
623 if lines and not lines[-1].endswith(endline.encode("utf-8")):
624 lines[-1] = lines[-1] + endline.encode("utf-8")
625 for entry in self.entries:
626 try:
627 logger.debug('Removing entry: %s', entry)
628 lines.remove((entry + endline).encode("utf-8"))
629 except ValueError:
630 pass
631 with open(self.file, 'wb') as fh:
632 fh.writelines(lines)
633
634 def rollback(self):
635 # type: () -> bool
636 if self._saved_lines is None:
637 logger.error(
638 'Cannot roll back changes to %s, none were made', self.file
639 )
640 return False
641 logger.debug('Rolling %s back to previous state', self.file)
642 with open(self.file, 'wb') as fh:
643 fh.writelines(self._saved_lines)
644 return True