Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/pip/_internal/req/req_uninstall.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author | guerler |
---|---|
date | Mon, 27 Jul 2020 03:47:31 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9e54283cc701 |
---|---|
1 from __future__ import absolute_import | |
2 | |
3 import csv | |
4 import functools | |
5 import logging | |
6 import os | |
7 import sys | |
8 import sysconfig | |
9 | |
10 from pip._vendor import pkg_resources | |
11 | |
12 from pip._internal.exceptions import UninstallationError | |
13 from pip._internal.locations import bin_py, bin_user | |
14 from pip._internal.utils.compat import WINDOWS, cache_from_source, uses_pycache | |
15 from pip._internal.utils.logging import indent_log | |
16 from pip._internal.utils.misc import ( | |
17 FakeFile, | |
18 ask, | |
19 dist_in_usersite, | |
20 dist_is_local, | |
21 egg_link_path, | |
22 is_local, | |
23 normalize_path, | |
24 renames, | |
25 rmtree, | |
26 ) | |
27 from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory | |
28 from pip._internal.utils.typing import MYPY_CHECK_RUNNING | |
29 | |
30 if MYPY_CHECK_RUNNING: | |
31 from typing import ( | |
32 Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple, | |
33 ) | |
34 from pip._vendor.pkg_resources import Distribution | |
35 | |
36 logger = logging.getLogger(__name__) | |
37 | |
38 | |
39 def _script_names(dist, script_name, is_gui): | |
40 # type: (Distribution, str, bool) -> List[str] | |
41 """Create the fully qualified name of the files created by | |
42 {console,gui}_scripts for the given ``dist``. | |
43 Returns the list of file names | |
44 """ | |
45 if dist_in_usersite(dist): | |
46 bin_dir = bin_user | |
47 else: | |
48 bin_dir = bin_py | |
49 exe_name = os.path.join(bin_dir, script_name) | |
50 paths_to_remove = [exe_name] | |
51 if WINDOWS: | |
52 paths_to_remove.append(exe_name + '.exe') | |
53 paths_to_remove.append(exe_name + '.exe.manifest') | |
54 if is_gui: | |
55 paths_to_remove.append(exe_name + '-script.pyw') | |
56 else: | |
57 paths_to_remove.append(exe_name + '-script.py') | |
58 return paths_to_remove | |
59 | |
60 | |
61 def _unique(fn): | |
62 # type: (Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]] | |
63 @functools.wraps(fn) | |
64 def unique(*args, **kw): | |
65 # type: (Any, Any) -> Iterator[Any] | |
66 seen = set() # type: Set[Any] | |
67 for item in fn(*args, **kw): | |
68 if item not in seen: | |
69 seen.add(item) | |
70 yield item | |
71 return unique | |
72 | |
73 | |
74 @_unique | |
75 def uninstallation_paths(dist): | |
76 # type: (Distribution) -> Iterator[str] | |
77 """ | |
78 Yield all the uninstallation paths for dist based on RECORD-without-.py[co] | |
79 | |
80 Yield paths to all the files in RECORD. For each .py file in RECORD, add | |
81 the .pyc and .pyo in the same directory. | |
82 | |
83 UninstallPathSet.add() takes care of the __pycache__ .py[co]. | |
84 """ | |
85 r = csv.reader(FakeFile(dist.get_metadata_lines('RECORD'))) | |
86 for row in r: | |
87 path = os.path.join(dist.location, row[0]) | |
88 yield path | |
89 if path.endswith('.py'): | |
90 dn, fn = os.path.split(path) | |
91 base = fn[:-3] | |
92 path = os.path.join(dn, base + '.pyc') | |
93 yield path | |
94 path = os.path.join(dn, base + '.pyo') | |
95 yield path | |
96 | |
97 | |
98 def compact(paths): | |
99 # type: (Iterable[str]) -> Set[str] | |
100 """Compact a path set to contain the minimal number of paths | |
101 necessary to contain all paths in the set. If /a/path/ and | |
102 /a/path/to/a/file.txt are both in the set, leave only the | |
103 shorter path.""" | |
104 | |
105 sep = os.path.sep | |
106 short_paths = set() # type: Set[str] | |
107 for path in sorted(paths, key=len): | |
108 should_skip = any( | |
109 path.startswith(shortpath.rstrip("*")) and | |
110 path[len(shortpath.rstrip("*").rstrip(sep))] == sep | |
111 for shortpath in short_paths | |
112 ) | |
113 if not should_skip: | |
114 short_paths.add(path) | |
115 return short_paths | |
116 | |
117 | |
118 def compress_for_rename(paths): | |
119 # type: (Iterable[str]) -> Set[str] | |
120 """Returns a set containing the paths that need to be renamed. | |
121 | |
122 This set may include directories when the original sequence of paths | |
123 included every file on disk. | |
124 """ | |
125 case_map = dict((os.path.normcase(p), p) for p in paths) | |
126 remaining = set(case_map) | |
127 unchecked = sorted(set(os.path.split(p)[0] | |
128 for p in case_map.values()), key=len) | |
129 wildcards = set() # type: Set[str] | |
130 | |
131 def norm_join(*a): | |
132 # type: (str) -> str | |
133 return os.path.normcase(os.path.join(*a)) | |
134 | |
135 for root in unchecked: | |
136 if any(os.path.normcase(root).startswith(w) | |
137 for w in wildcards): | |
138 # This directory has already been handled. | |
139 continue | |
140 | |
141 all_files = set() # type: Set[str] | |
142 all_subdirs = set() # type: Set[str] | |
143 for dirname, subdirs, files in os.walk(root): | |
144 all_subdirs.update(norm_join(root, dirname, d) | |
145 for d in subdirs) | |
146 all_files.update(norm_join(root, dirname, f) | |
147 for f in files) | |
148 # If all the files we found are in our remaining set of files to | |
149 # remove, then remove them from the latter set and add a wildcard | |
150 # for the directory. | |
151 if not (all_files - remaining): | |
152 remaining.difference_update(all_files) | |
153 wildcards.add(root + os.sep) | |
154 | |
155 return set(map(case_map.__getitem__, remaining)) | wildcards | |
156 | |
157 | |
158 def compress_for_output_listing(paths): | |
159 # type: (Iterable[str]) -> Tuple[Set[str], Set[str]] | |
160 """Returns a tuple of 2 sets of which paths to display to user | |
161 | |
162 The first set contains paths that would be deleted. Files of a package | |
163 are not added and the top-level directory of the package has a '*' added | |
164 at the end - to signify that all it's contents are removed. | |
165 | |
166 The second set contains files that would have been skipped in the above | |
167 folders. | |
168 """ | |
169 | |
170 will_remove = set(paths) | |
171 will_skip = set() | |
172 | |
173 # Determine folders and files | |
174 folders = set() | |
175 files = set() | |
176 for path in will_remove: | |
177 if path.endswith(".pyc"): | |
178 continue | |
179 if path.endswith("__init__.py") or ".dist-info" in path: | |
180 folders.add(os.path.dirname(path)) | |
181 files.add(path) | |
182 | |
183 # probably this one https://github.com/python/mypy/issues/390 | |
184 _normcased_files = set(map(os.path.normcase, files)) # type: ignore | |
185 | |
186 folders = compact(folders) | |
187 | |
188 # This walks the tree using os.walk to not miss extra folders | |
189 # that might get added. | |
190 for folder in folders: | |
191 for dirpath, _, dirfiles in os.walk(folder): | |
192 for fname in dirfiles: | |
193 if fname.endswith(".pyc"): | |
194 continue | |
195 | |
196 file_ = os.path.join(dirpath, fname) | |
197 if (os.path.isfile(file_) and | |
198 os.path.normcase(file_) not in _normcased_files): | |
199 # We are skipping this file. Add it to the set. | |
200 will_skip.add(file_) | |
201 | |
202 will_remove = files | { | |
203 os.path.join(folder, "*") for folder in folders | |
204 } | |
205 | |
206 return will_remove, will_skip | |
207 | |
208 | |
209 class StashedUninstallPathSet(object): | |
210 """A set of file rename operations to stash files while | |
211 tentatively uninstalling them.""" | |
212 def __init__(self): | |
213 # type: () -> None | |
214 # Mapping from source file root to [Adjacent]TempDirectory | |
215 # for files under that directory. | |
216 self._save_dirs = {} # type: Dict[str, TempDirectory] | |
217 # (old path, new path) tuples for each move that may need | |
218 # to be undone. | |
219 self._moves = [] # type: List[Tuple[str, str]] | |
220 | |
221 def _get_directory_stash(self, path): | |
222 # type: (str) -> str | |
223 """Stashes a directory. | |
224 | |
225 Directories are stashed adjacent to their original location if | |
226 possible, or else moved/copied into the user's temp dir.""" | |
227 | |
228 try: | |
229 save_dir = AdjacentTempDirectory(path) # type: TempDirectory | |
230 except OSError: | |
231 save_dir = TempDirectory(kind="uninstall") | |
232 self._save_dirs[os.path.normcase(path)] = save_dir | |
233 | |
234 return save_dir.path | |
235 | |
236 def _get_file_stash(self, path): | |
237 # type: (str) -> str | |
238 """Stashes a file. | |
239 | |
240 If no root has been provided, one will be created for the directory | |
241 in the user's temp directory.""" | |
242 path = os.path.normcase(path) | |
243 head, old_head = os.path.dirname(path), None | |
244 save_dir = None | |
245 | |
246 while head != old_head: | |
247 try: | |
248 save_dir = self._save_dirs[head] | |
249 break | |
250 except KeyError: | |
251 pass | |
252 head, old_head = os.path.dirname(head), head | |
253 else: | |
254 # Did not find any suitable root | |
255 head = os.path.dirname(path) | |
256 save_dir = TempDirectory(kind='uninstall') | |
257 self._save_dirs[head] = save_dir | |
258 | |
259 relpath = os.path.relpath(path, head) | |
260 if relpath and relpath != os.path.curdir: | |
261 return os.path.join(save_dir.path, relpath) | |
262 return save_dir.path | |
263 | |
264 def stash(self, path): | |
265 # type: (str) -> str | |
266 """Stashes the directory or file and returns its new location. | |
267 Handle symlinks as files to avoid modifying the symlink targets. | |
268 """ | |
269 path_is_dir = os.path.isdir(path) and not os.path.islink(path) | |
270 if path_is_dir: | |
271 new_path = self._get_directory_stash(path) | |
272 else: | |
273 new_path = self._get_file_stash(path) | |
274 | |
275 self._moves.append((path, new_path)) | |
276 if (path_is_dir and os.path.isdir(new_path)): | |
277 # If we're moving a directory, we need to | |
278 # remove the destination first or else it will be | |
279 # moved to inside the existing directory. | |
280 # We just created new_path ourselves, so it will | |
281 # be removable. | |
282 os.rmdir(new_path) | |
283 renames(path, new_path) | |
284 return new_path | |
285 | |
286 def commit(self): | |
287 # type: () -> None | |
288 """Commits the uninstall by removing stashed files.""" | |
289 for _, save_dir in self._save_dirs.items(): | |
290 save_dir.cleanup() | |
291 self._moves = [] | |
292 self._save_dirs = {} | |
293 | |
294 def rollback(self): | |
295 # type: () -> None | |
296 """Undoes the uninstall by moving stashed files back.""" | |
297 for p in self._moves: | |
298 logger.info("Moving to %s\n from %s", *p) | |
299 | |
300 for new_path, path in self._moves: | |
301 try: | |
302 logger.debug('Replacing %s from %s', new_path, path) | |
303 if os.path.isfile(new_path) or os.path.islink(new_path): | |
304 os.unlink(new_path) | |
305 elif os.path.isdir(new_path): | |
306 rmtree(new_path) | |
307 renames(path, new_path) | |
308 except OSError as ex: | |
309 logger.error("Failed to restore %s", new_path) | |
310 logger.debug("Exception: %s", ex) | |
311 | |
312 self.commit() | |
313 | |
314 @property | |
315 def can_rollback(self): | |
316 # type: () -> bool | |
317 return bool(self._moves) | |
318 | |
319 | |
320 class UninstallPathSet(object): | |
321 """A set of file paths to be removed in the uninstallation of a | |
322 requirement.""" | |
323 def __init__(self, dist): | |
324 # type: (Distribution) -> None | |
325 self.paths = set() # type: Set[str] | |
326 self._refuse = set() # type: Set[str] | |
327 self.pth = {} # type: Dict[str, UninstallPthEntries] | |
328 self.dist = dist | |
329 self._moved_paths = StashedUninstallPathSet() | |
330 | |
331 def _permitted(self, path): | |
332 # type: (str) -> bool | |
333 """ | |
334 Return True if the given path is one we are permitted to | |
335 remove/modify, False otherwise. | |
336 | |
337 """ | |
338 return is_local(path) | |
339 | |
340 def add(self, path): | |
341 # type: (str) -> None | |
342 head, tail = os.path.split(path) | |
343 | |
344 # we normalize the head to resolve parent directory symlinks, but not | |
345 # the tail, since we only want to uninstall symlinks, not their targets | |
346 path = os.path.join(normalize_path(head), os.path.normcase(tail)) | |
347 | |
348 if not os.path.exists(path): | |
349 return | |
350 if self._permitted(path): | |
351 self.paths.add(path) | |
352 else: | |
353 self._refuse.add(path) | |
354 | |
355 # __pycache__ files can show up after 'installed-files.txt' is created, | |
356 # due to imports | |
357 if os.path.splitext(path)[1] == '.py' and uses_pycache: | |
358 self.add(cache_from_source(path)) | |
359 | |
360 def add_pth(self, pth_file, entry): | |
361 # type: (str, str) -> None | |
362 pth_file = normalize_path(pth_file) | |
363 if self._permitted(pth_file): | |
364 if pth_file not in self.pth: | |
365 self.pth[pth_file] = UninstallPthEntries(pth_file) | |
366 self.pth[pth_file].add(entry) | |
367 else: | |
368 self._refuse.add(pth_file) | |
369 | |
370 def remove(self, auto_confirm=False, verbose=False): | |
371 # type: (bool, bool) -> None | |
372 """Remove paths in ``self.paths`` with confirmation (unless | |
373 ``auto_confirm`` is True).""" | |
374 | |
375 if not self.paths: | |
376 logger.info( | |
377 "Can't uninstall '%s'. No files were found to uninstall.", | |
378 self.dist.project_name, | |
379 ) | |
380 return | |
381 | |
382 dist_name_version = ( | |
383 self.dist.project_name + "-" + self.dist.version | |
384 ) | |
385 logger.info('Uninstalling %s:', dist_name_version) | |
386 | |
387 with indent_log(): | |
388 if auto_confirm or self._allowed_to_proceed(verbose): | |
389 moved = self._moved_paths | |
390 | |
391 for_rename = compress_for_rename(self.paths) | |
392 | |
393 for path in sorted(compact(for_rename)): | |
394 moved.stash(path) | |
395 logger.debug('Removing file or directory %s', path) | |
396 | |
397 for pth in self.pth.values(): | |
398 pth.remove() | |
399 | |
400 logger.info('Successfully uninstalled %s', dist_name_version) | |
401 | |
402 def _allowed_to_proceed(self, verbose): | |
403 # type: (bool) -> bool | |
404 """Display which files would be deleted and prompt for confirmation | |
405 """ | |
406 | |
407 def _display(msg, paths): | |
408 # type: (str, Iterable[str]) -> None | |
409 if not paths: | |
410 return | |
411 | |
412 logger.info(msg) | |
413 with indent_log(): | |
414 for path in sorted(compact(paths)): | |
415 logger.info(path) | |
416 | |
417 if not verbose: | |
418 will_remove, will_skip = compress_for_output_listing(self.paths) | |
419 else: | |
420 # In verbose mode, display all the files that are going to be | |
421 # deleted. | |
422 will_remove = set(self.paths) | |
423 will_skip = set() | |
424 | |
425 _display('Would remove:', will_remove) | |
426 _display('Would not remove (might be manually added):', will_skip) | |
427 _display('Would not remove (outside of prefix):', self._refuse) | |
428 if verbose: | |
429 _display('Will actually move:', compress_for_rename(self.paths)) | |
430 | |
431 return ask('Proceed (y/n)? ', ('y', 'n')) == 'y' | |
432 | |
433 def rollback(self): | |
434 # type: () -> None | |
435 """Rollback the changes previously made by remove().""" | |
436 if not self._moved_paths.can_rollback: | |
437 logger.error( | |
438 "Can't roll back %s; was not uninstalled", | |
439 self.dist.project_name, | |
440 ) | |
441 return | |
442 logger.info('Rolling back uninstall of %s', self.dist.project_name) | |
443 self._moved_paths.rollback() | |
444 for pth in self.pth.values(): | |
445 pth.rollback() | |
446 | |
447 def commit(self): | |
448 # type: () -> None | |
449 """Remove temporary save dir: rollback will no longer be possible.""" | |
450 self._moved_paths.commit() | |
451 | |
452 @classmethod | |
453 def from_dist(cls, dist): | |
454 # type: (Distribution) -> UninstallPathSet | |
455 dist_path = normalize_path(dist.location) | |
456 if not dist_is_local(dist): | |
457 logger.info( | |
458 "Not uninstalling %s at %s, outside environment %s", | |
459 dist.key, | |
460 dist_path, | |
461 sys.prefix, | |
462 ) | |
463 return cls(dist) | |
464 | |
465 if dist_path in {p for p in {sysconfig.get_path("stdlib"), | |
466 sysconfig.get_path("platstdlib")} | |
467 if p}: | |
468 logger.info( | |
469 "Not uninstalling %s at %s, as it is in the standard library.", | |
470 dist.key, | |
471 dist_path, | |
472 ) | |
473 return cls(dist) | |
474 | |
475 paths_to_remove = cls(dist) | |
476 develop_egg_link = egg_link_path(dist) | |
477 develop_egg_link_egg_info = '{}.egg-info'.format( | |
478 pkg_resources.to_filename(dist.project_name)) | |
479 egg_info_exists = dist.egg_info and os.path.exists(dist.egg_info) | |
480 # Special case for distutils installed package | |
481 distutils_egg_info = getattr(dist._provider, 'path', None) | |
482 | |
483 # Uninstall cases order do matter as in the case of 2 installs of the | |
484 # same package, pip needs to uninstall the currently detected version | |
485 if (egg_info_exists and dist.egg_info.endswith('.egg-info') and | |
486 not dist.egg_info.endswith(develop_egg_link_egg_info)): | |
487 # if dist.egg_info.endswith(develop_egg_link_egg_info), we | |
488 # are in fact in the develop_egg_link case | |
489 paths_to_remove.add(dist.egg_info) | |
490 if dist.has_metadata('installed-files.txt'): | |
491 for installed_file in dist.get_metadata( | |
492 'installed-files.txt').splitlines(): | |
493 path = os.path.normpath( | |
494 os.path.join(dist.egg_info, installed_file) | |
495 ) | |
496 paths_to_remove.add(path) | |
497 # FIXME: need a test for this elif block | |
498 # occurs with --single-version-externally-managed/--record outside | |
499 # of pip | |
500 elif dist.has_metadata('top_level.txt'): | |
501 if dist.has_metadata('namespace_packages.txt'): | |
502 namespaces = dist.get_metadata('namespace_packages.txt') | |
503 else: | |
504 namespaces = [] | |
505 for top_level_pkg in [ | |
506 p for p | |
507 in dist.get_metadata('top_level.txt').splitlines() | |
508 if p and p not in namespaces]: | |
509 path = os.path.join(dist.location, top_level_pkg) | |
510 paths_to_remove.add(path) | |
511 paths_to_remove.add(path + '.py') | |
512 paths_to_remove.add(path + '.pyc') | |
513 paths_to_remove.add(path + '.pyo') | |
514 | |
515 elif distutils_egg_info: | |
516 raise UninstallationError( | |
517 "Cannot uninstall {!r}. It is a distutils installed project " | |
518 "and thus we cannot accurately determine which files belong " | |
519 "to it which would lead to only a partial uninstall.".format( | |
520 dist.project_name, | |
521 ) | |
522 ) | |
523 | |
524 elif dist.location.endswith('.egg'): | |
525 # package installed by easy_install | |
526 # We cannot match on dist.egg_name because it can slightly vary | |
527 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg | |
528 paths_to_remove.add(dist.location) | |
529 easy_install_egg = os.path.split(dist.location)[1] | |
530 easy_install_pth = os.path.join(os.path.dirname(dist.location), | |
531 'easy-install.pth') | |
532 paths_to_remove.add_pth(easy_install_pth, './' + easy_install_egg) | |
533 | |
534 elif egg_info_exists and dist.egg_info.endswith('.dist-info'): | |
535 for path in uninstallation_paths(dist): | |
536 paths_to_remove.add(path) | |
537 | |
538 elif develop_egg_link: | |
539 # develop egg | |
540 with open(develop_egg_link, 'r') as fh: | |
541 link_pointer = os.path.normcase(fh.readline().strip()) | |
542 assert (link_pointer == dist.location), ( | |
543 'Egg-link %s does not match installed location of %s ' | |
544 '(at %s)' % (link_pointer, dist.project_name, dist.location) | |
545 ) | |
546 paths_to_remove.add(develop_egg_link) | |
547 easy_install_pth = os.path.join(os.path.dirname(develop_egg_link), | |
548 'easy-install.pth') | |
549 paths_to_remove.add_pth(easy_install_pth, dist.location) | |
550 | |
551 else: | |
552 logger.debug( | |
553 'Not sure how to uninstall: %s - Check: %s', | |
554 dist, dist.location, | |
555 ) | |
556 | |
557 # find distutils scripts= scripts | |
558 if dist.has_metadata('scripts') and dist.metadata_isdir('scripts'): | |
559 for script in dist.metadata_listdir('scripts'): | |
560 if dist_in_usersite(dist): | |
561 bin_dir = bin_user | |
562 else: | |
563 bin_dir = bin_py | |
564 paths_to_remove.add(os.path.join(bin_dir, script)) | |
565 if WINDOWS: | |
566 paths_to_remove.add(os.path.join(bin_dir, script) + '.bat') | |
567 | |
568 # find console_scripts | |
569 _scripts_to_remove = [] | |
570 console_scripts = dist.get_entry_map(group='console_scripts') | |
571 for name in console_scripts.keys(): | |
572 _scripts_to_remove.extend(_script_names(dist, name, False)) | |
573 # find gui_scripts | |
574 gui_scripts = dist.get_entry_map(group='gui_scripts') | |
575 for name in gui_scripts.keys(): | |
576 _scripts_to_remove.extend(_script_names(dist, name, True)) | |
577 | |
578 for s in _scripts_to_remove: | |
579 paths_to_remove.add(s) | |
580 | |
581 return paths_to_remove | |
582 | |
583 | |
584 class UninstallPthEntries(object): | |
585 def __init__(self, pth_file): | |
586 # type: (str) -> None | |
587 if not os.path.isfile(pth_file): | |
588 raise UninstallationError( | |
589 "Cannot remove entries from nonexistent file %s" % pth_file | |
590 ) | |
591 self.file = pth_file | |
592 self.entries = set() # type: Set[str] | |
593 self._saved_lines = None # type: Optional[List[bytes]] | |
594 | |
595 def add(self, entry): | |
596 # type: (str) -> None | |
597 entry = os.path.normcase(entry) | |
598 # On Windows, os.path.normcase converts the entry to use | |
599 # backslashes. This is correct for entries that describe absolute | |
600 # paths outside of site-packages, but all the others use forward | |
601 # slashes. | |
602 # os.path.splitdrive is used instead of os.path.isabs because isabs | |
603 # treats non-absolute paths with drive letter markings like c:foo\bar | |
604 # as absolute paths. It also does not recognize UNC paths if they don't | |
605 # have more than "\\sever\share". Valid examples: "\\server\share\" or | |
606 # "\\server\share\folder". Python 2.7.8+ support UNC in splitdrive. | |
607 if WINDOWS and not os.path.splitdrive(entry)[0]: | |
608 entry = entry.replace('\\', '/') | |
609 self.entries.add(entry) | |
610 | |
611 def remove(self): | |
612 # type: () -> None | |
613 logger.debug('Removing pth entries from %s:', self.file) | |
614 with open(self.file, 'rb') as fh: | |
615 # windows uses '\r\n' with py3k, but uses '\n' with py2.x | |
616 lines = fh.readlines() | |
617 self._saved_lines = lines | |
618 if any(b'\r\n' in line for line in lines): | |
619 endline = '\r\n' | |
620 else: | |
621 endline = '\n' | |
622 # handle missing trailing newline | |
623 if lines and not lines[-1].endswith(endline.encode("utf-8")): | |
624 lines[-1] = lines[-1] + endline.encode("utf-8") | |
625 for entry in self.entries: | |
626 try: | |
627 logger.debug('Removing entry: %s', entry) | |
628 lines.remove((entry + endline).encode("utf-8")) | |
629 except ValueError: | |
630 pass | |
631 with open(self.file, 'wb') as fh: | |
632 fh.writelines(lines) | |
633 | |
634 def rollback(self): | |
635 # type: () -> bool | |
636 if self._saved_lines is None: | |
637 logger.error( | |
638 'Cannot roll back changes to %s, none were made', self.file | |
639 ) | |
640 return False | |
641 logger.debug('Rolling %s back to previous state', self.file) | |
642 with open(self.file, 'wb') as fh: | |
643 fh.writelines(self._saved_lines) | |
644 return True |