comparison lib/python3.8/site-packages/pip/_internal/vcs/versioncontrol.py @ 0:9e54283cc701 draft

"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author guerler
date Mon, 27 Jul 2020 03:47:31 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:9e54283cc701
1 """Handles all VCS (version control) support"""
2
3 from __future__ import absolute_import
4
5 import errno
6 import logging
7 import os
8 import shutil
9 import sys
10
11 from pip._vendor import pkg_resources
12 from pip._vendor.six.moves.urllib import parse as urllib_parse
13
14 from pip._internal.exceptions import BadCommand
15 from pip._internal.utils.compat import samefile
16 from pip._internal.utils.misc import (
17 ask_path_exists,
18 backup_dir,
19 display_path,
20 hide_url,
21 hide_value,
22 rmtree,
23 )
24 from pip._internal.utils.subprocess import call_subprocess, make_command
25 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
26 from pip._internal.utils.urls import get_url_scheme
27
28 if MYPY_CHECK_RUNNING:
29 from typing import (
30 Any, Dict, Iterable, Iterator, List, Mapping, Optional, Text, Tuple,
31 Type, Union
32 )
33 from pip._internal.utils.ui import SpinnerInterface
34 from pip._internal.utils.misc import HiddenText
35 from pip._internal.utils.subprocess import CommandArgs
36
37 AuthInfo = Tuple[Optional[str], Optional[str]]
38
39
40 __all__ = ['vcs']
41
42
43 logger = logging.getLogger(__name__)
44
45
46 def is_url(name):
47 # type: (Union[str, Text]) -> bool
48 """
49 Return true if the name looks like a URL.
50 """
51 scheme = get_url_scheme(name)
52 if scheme is None:
53 return False
54 return scheme in ['http', 'https', 'file', 'ftp'] + vcs.all_schemes
55
56
57 def make_vcs_requirement_url(repo_url, rev, project_name, subdir=None):
58 # type: (str, str, str, Optional[str]) -> str
59 """
60 Return the URL for a VCS requirement.
61
62 Args:
63 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
64 project_name: the (unescaped) project name.
65 """
66 egg_project_name = pkg_resources.to_filename(project_name)
67 req = '{}@{}#egg={}'.format(repo_url, rev, egg_project_name)
68 if subdir:
69 req += '&subdirectory={}'.format(subdir)
70
71 return req
72
73
74 def find_path_to_setup_from_repo_root(location, repo_root):
75 # type: (str, str) -> Optional[str]
76 """
77 Find the path to `setup.py` by searching up the filesystem from `location`.
78 Return the path to `setup.py` relative to `repo_root`.
79 Return None if `setup.py` is in `repo_root` or cannot be found.
80 """
81 # find setup.py
82 orig_location = location
83 while not os.path.exists(os.path.join(location, 'setup.py')):
84 last_location = location
85 location = os.path.dirname(location)
86 if location == last_location:
87 # We've traversed up to the root of the filesystem without
88 # finding setup.py
89 logger.warning(
90 "Could not find setup.py for directory %s (tried all "
91 "parent directories)",
92 orig_location,
93 )
94 return None
95
96 if samefile(repo_root, location):
97 return None
98
99 return os.path.relpath(location, repo_root)
100
101
102 class RemoteNotFoundError(Exception):
103 pass
104
105
106 class RevOptions(object):
107
108 """
109 Encapsulates a VCS-specific revision to install, along with any VCS
110 install options.
111
112 Instances of this class should be treated as if immutable.
113 """
114
115 def __init__(
116 self,
117 vc_class, # type: Type[VersionControl]
118 rev=None, # type: Optional[str]
119 extra_args=None, # type: Optional[CommandArgs]
120 ):
121 # type: (...) -> None
122 """
123 Args:
124 vc_class: a VersionControl subclass.
125 rev: the name of the revision to install.
126 extra_args: a list of extra options.
127 """
128 if extra_args is None:
129 extra_args = []
130
131 self.extra_args = extra_args
132 self.rev = rev
133 self.vc_class = vc_class
134 self.branch_name = None # type: Optional[str]
135
136 def __repr__(self):
137 # type: () -> str
138 return '<RevOptions {}: rev={!r}>'.format(self.vc_class.name, self.rev)
139
140 @property
141 def arg_rev(self):
142 # type: () -> Optional[str]
143 if self.rev is None:
144 return self.vc_class.default_arg_rev
145
146 return self.rev
147
148 def to_args(self):
149 # type: () -> CommandArgs
150 """
151 Return the VCS-specific command arguments.
152 """
153 args = [] # type: CommandArgs
154 rev = self.arg_rev
155 if rev is not None:
156 args += self.vc_class.get_base_rev_args(rev)
157 args += self.extra_args
158
159 return args
160
161 def to_display(self):
162 # type: () -> str
163 if not self.rev:
164 return ''
165
166 return ' (to revision {})'.format(self.rev)
167
168 def make_new(self, rev):
169 # type: (str) -> RevOptions
170 """
171 Make a copy of the current instance, but with a new rev.
172
173 Args:
174 rev: the name of the revision for the new object.
175 """
176 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
177
178
179 class VcsSupport(object):
180 _registry = {} # type: Dict[str, VersionControl]
181 schemes = ['ssh', 'git', 'hg', 'bzr', 'sftp', 'svn']
182
183 def __init__(self):
184 # type: () -> None
185 # Register more schemes with urlparse for various version control
186 # systems
187 urllib_parse.uses_netloc.extend(self.schemes)
188 # Python >= 2.7.4, 3.3 doesn't have uses_fragment
189 if getattr(urllib_parse, 'uses_fragment', None):
190 urllib_parse.uses_fragment.extend(self.schemes)
191 super(VcsSupport, self).__init__()
192
193 def __iter__(self):
194 # type: () -> Iterator[str]
195 return self._registry.__iter__()
196
197 @property
198 def backends(self):
199 # type: () -> List[VersionControl]
200 return list(self._registry.values())
201
202 @property
203 def dirnames(self):
204 # type: () -> List[str]
205 return [backend.dirname for backend in self.backends]
206
207 @property
208 def all_schemes(self):
209 # type: () -> List[str]
210 schemes = [] # type: List[str]
211 for backend in self.backends:
212 schemes.extend(backend.schemes)
213 return schemes
214
215 def register(self, cls):
216 # type: (Type[VersionControl]) -> None
217 if not hasattr(cls, 'name'):
218 logger.warning('Cannot register VCS %s', cls.__name__)
219 return
220 if cls.name not in self._registry:
221 self._registry[cls.name] = cls()
222 logger.debug('Registered VCS backend: %s', cls.name)
223
224 def unregister(self, name):
225 # type: (str) -> None
226 if name in self._registry:
227 del self._registry[name]
228
229 def get_backend_for_dir(self, location):
230 # type: (str) -> Optional[VersionControl]
231 """
232 Return a VersionControl object if a repository of that type is found
233 at the given directory.
234 """
235 for vcs_backend in self._registry.values():
236 if vcs_backend.controls_location(location):
237 logger.debug('Determine that %s uses VCS: %s',
238 location, vcs_backend.name)
239 return vcs_backend
240 return None
241
242 def get_backend_for_scheme(self, scheme):
243 # type: (str) -> Optional[VersionControl]
244 """
245 Return a VersionControl object or None.
246 """
247 for vcs_backend in self._registry.values():
248 if scheme in vcs_backend.schemes:
249 return vcs_backend
250 return None
251
252 def get_backend(self, name):
253 # type: (str) -> Optional[VersionControl]
254 """
255 Return a VersionControl object or None.
256 """
257 name = name.lower()
258 return self._registry.get(name)
259
260
261 vcs = VcsSupport()
262
263
264 class VersionControl(object):
265 name = ''
266 dirname = ''
267 repo_name = ''
268 # List of supported schemes for this Version Control
269 schemes = () # type: Tuple[str, ...]
270 # Iterable of environment variable names to pass to call_subprocess().
271 unset_environ = () # type: Tuple[str, ...]
272 default_arg_rev = None # type: Optional[str]
273
274 @classmethod
275 def should_add_vcs_url_prefix(cls, remote_url):
276 # type: (str) -> bool
277 """
278 Return whether the vcs prefix (e.g. "git+") should be added to a
279 repository's remote url when used in a requirement.
280 """
281 return not remote_url.lower().startswith('{}:'.format(cls.name))
282
283 @classmethod
284 def get_subdirectory(cls, location):
285 # type: (str) -> Optional[str]
286 """
287 Return the path to setup.py, relative to the repo root.
288 Return None if setup.py is in the repo root.
289 """
290 return None
291
292 @classmethod
293 def get_requirement_revision(cls, repo_dir):
294 # type: (str) -> str
295 """
296 Return the revision string that should be used in a requirement.
297 """
298 return cls.get_revision(repo_dir)
299
300 @classmethod
301 def get_src_requirement(cls, repo_dir, project_name):
302 # type: (str, str) -> Optional[str]
303 """
304 Return the requirement string to use to redownload the files
305 currently at the given repository directory.
306
307 Args:
308 project_name: the (unescaped) project name.
309
310 The return value has a form similar to the following:
311
312 {repository_url}@{revision}#egg={project_name}
313 """
314 repo_url = cls.get_remote_url(repo_dir)
315 if repo_url is None:
316 return None
317
318 if cls.should_add_vcs_url_prefix(repo_url):
319 repo_url = '{}+{}'.format(cls.name, repo_url)
320
321 revision = cls.get_requirement_revision(repo_dir)
322 subdir = cls.get_subdirectory(repo_dir)
323 req = make_vcs_requirement_url(repo_url, revision, project_name,
324 subdir=subdir)
325
326 return req
327
328 @staticmethod
329 def get_base_rev_args(rev):
330 # type: (str) -> List[str]
331 """
332 Return the base revision arguments for a vcs command.
333
334 Args:
335 rev: the name of a revision to install. Cannot be None.
336 """
337 raise NotImplementedError
338
339 def is_immutable_rev_checkout(self, url, dest):
340 # type: (str, str) -> bool
341 """
342 Return true if the commit hash checked out at dest matches
343 the revision in url.
344
345 Always return False, if the VCS does not support immutable commit
346 hashes.
347
348 This method does not check if there are local uncommitted changes
349 in dest after checkout, as pip currently has no use case for that.
350 """
351 return False
352
353 @classmethod
354 def make_rev_options(cls, rev=None, extra_args=None):
355 # type: (Optional[str], Optional[CommandArgs]) -> RevOptions
356 """
357 Return a RevOptions object.
358
359 Args:
360 rev: the name of a revision to install.
361 extra_args: a list of extra options.
362 """
363 return RevOptions(cls, rev, extra_args=extra_args)
364
365 @classmethod
366 def _is_local_repository(cls, repo):
367 # type: (str) -> bool
368 """
369 posix absolute paths start with os.path.sep,
370 win32 ones start with drive (like c:\\folder)
371 """
372 drive, tail = os.path.splitdrive(repo)
373 return repo.startswith(os.path.sep) or bool(drive)
374
375 def export(self, location, url):
376 # type: (str, HiddenText) -> None
377 """
378 Export the repository at the url to the destination location
379 i.e. only download the files, without vcs informations
380
381 :param url: the repository URL starting with a vcs prefix.
382 """
383 raise NotImplementedError
384
385 @classmethod
386 def get_netloc_and_auth(cls, netloc, scheme):
387 # type: (str, str) -> Tuple[str, Tuple[Optional[str], Optional[str]]]
388 """
389 Parse the repository URL's netloc, and return the new netloc to use
390 along with auth information.
391
392 Args:
393 netloc: the original repository URL netloc.
394 scheme: the repository URL's scheme without the vcs prefix.
395
396 This is mainly for the Subversion class to override, so that auth
397 information can be provided via the --username and --password options
398 instead of through the URL. For other subclasses like Git without
399 such an option, auth information must stay in the URL.
400
401 Returns: (netloc, (username, password)).
402 """
403 return netloc, (None, None)
404
405 @classmethod
406 def get_url_rev_and_auth(cls, url):
407 # type: (str) -> Tuple[str, Optional[str], AuthInfo]
408 """
409 Parse the repository URL to use, and return the URL, revision,
410 and auth info to use.
411
412 Returns: (url, rev, (username, password)).
413 """
414 scheme, netloc, path, query, frag = urllib_parse.urlsplit(url)
415 if '+' not in scheme:
416 raise ValueError(
417 "Sorry, {!r} is a malformed VCS url. "
418 "The format is <vcs>+<protocol>://<url>, "
419 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)
420 )
421 # Remove the vcs prefix.
422 scheme = scheme.split('+', 1)[1]
423 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
424 rev = None
425 if '@' in path:
426 path, rev = path.rsplit('@', 1)
427 url = urllib_parse.urlunsplit((scheme, netloc, path, query, ''))
428 return url, rev, user_pass
429
430 @staticmethod
431 def make_rev_args(username, password):
432 # type: (Optional[str], Optional[HiddenText]) -> CommandArgs
433 """
434 Return the RevOptions "extra arguments" to use in obtain().
435 """
436 return []
437
438 def get_url_rev_options(self, url):
439 # type: (HiddenText) -> Tuple[HiddenText, RevOptions]
440 """
441 Return the URL and RevOptions object to use in obtain() and in
442 some cases export(), as a tuple (url, rev_options).
443 """
444 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
445 username, secret_password = user_pass
446 password = None # type: Optional[HiddenText]
447 if secret_password is not None:
448 password = hide_value(secret_password)
449 extra_args = self.make_rev_args(username, password)
450 rev_options = self.make_rev_options(rev, extra_args=extra_args)
451
452 return hide_url(secret_url), rev_options
453
454 @staticmethod
455 def normalize_url(url):
456 # type: (str) -> str
457 """
458 Normalize a URL for comparison by unquoting it and removing any
459 trailing slash.
460 """
461 return urllib_parse.unquote(url).rstrip('/')
462
463 @classmethod
464 def compare_urls(cls, url1, url2):
465 # type: (str, str) -> bool
466 """
467 Compare two repo URLs for identity, ignoring incidental differences.
468 """
469 return (cls.normalize_url(url1) == cls.normalize_url(url2))
470
471 def fetch_new(self, dest, url, rev_options):
472 # type: (str, HiddenText, RevOptions) -> None
473 """
474 Fetch a revision from a repository, in the case that this is the
475 first fetch from the repository.
476
477 Args:
478 dest: the directory to fetch the repository to.
479 rev_options: a RevOptions object.
480 """
481 raise NotImplementedError
482
483 def switch(self, dest, url, rev_options):
484 # type: (str, HiddenText, RevOptions) -> None
485 """
486 Switch the repo at ``dest`` to point to ``URL``.
487
488 Args:
489 rev_options: a RevOptions object.
490 """
491 raise NotImplementedError
492
493 def update(self, dest, url, rev_options):
494 # type: (str, HiddenText, RevOptions) -> None
495 """
496 Update an already-existing repo to the given ``rev_options``.
497
498 Args:
499 rev_options: a RevOptions object.
500 """
501 raise NotImplementedError
502
503 @classmethod
504 def is_commit_id_equal(cls, dest, name):
505 # type: (str, Optional[str]) -> bool
506 """
507 Return whether the id of the current commit equals the given name.
508
509 Args:
510 dest: the repository directory.
511 name: a string name.
512 """
513 raise NotImplementedError
514
515 def obtain(self, dest, url):
516 # type: (str, HiddenText) -> None
517 """
518 Install or update in editable mode the package represented by this
519 VersionControl object.
520
521 :param dest: the repository directory in which to install or update.
522 :param url: the repository URL starting with a vcs prefix.
523 """
524 url, rev_options = self.get_url_rev_options(url)
525
526 if not os.path.exists(dest):
527 self.fetch_new(dest, url, rev_options)
528 return
529
530 rev_display = rev_options.to_display()
531 if self.is_repository_directory(dest):
532 existing_url = self.get_remote_url(dest)
533 if self.compare_urls(existing_url, url.secret):
534 logger.debug(
535 '%s in %s exists, and has correct URL (%s)',
536 self.repo_name.title(),
537 display_path(dest),
538 url,
539 )
540 if not self.is_commit_id_equal(dest, rev_options.rev):
541 logger.info(
542 'Updating %s %s%s',
543 display_path(dest),
544 self.repo_name,
545 rev_display,
546 )
547 self.update(dest, url, rev_options)
548 else:
549 logger.info('Skipping because already up-to-date.')
550 return
551
552 logger.warning(
553 '%s %s in %s exists with URL %s',
554 self.name,
555 self.repo_name,
556 display_path(dest),
557 existing_url,
558 )
559 prompt = ('(s)witch, (i)gnore, (w)ipe, (b)ackup ',
560 ('s', 'i', 'w', 'b'))
561 else:
562 logger.warning(
563 'Directory %s already exists, and is not a %s %s.',
564 dest,
565 self.name,
566 self.repo_name,
567 )
568 # https://github.com/python/mypy/issues/1174
569 prompt = ('(i)gnore, (w)ipe, (b)ackup ', # type: ignore
570 ('i', 'w', 'b'))
571
572 logger.warning(
573 'The plan is to install the %s repository %s',
574 self.name,
575 url,
576 )
577 response = ask_path_exists('What to do? %s' % prompt[0], prompt[1])
578
579 if response == 'a':
580 sys.exit(-1)
581
582 if response == 'w':
583 logger.warning('Deleting %s', display_path(dest))
584 rmtree(dest)
585 self.fetch_new(dest, url, rev_options)
586 return
587
588 if response == 'b':
589 dest_dir = backup_dir(dest)
590 logger.warning(
591 'Backing up %s to %s', display_path(dest), dest_dir,
592 )
593 shutil.move(dest, dest_dir)
594 self.fetch_new(dest, url, rev_options)
595 return
596
597 # Do nothing if the response is "i".
598 if response == 's':
599 logger.info(
600 'Switching %s %s to %s%s',
601 self.repo_name,
602 display_path(dest),
603 url,
604 rev_display,
605 )
606 self.switch(dest, url, rev_options)
607
608 def unpack(self, location, url):
609 # type: (str, HiddenText) -> None
610 """
611 Clean up current location and download the url repository
612 (and vcs infos) into location
613
614 :param url: the repository URL starting with a vcs prefix.
615 """
616 if os.path.exists(location):
617 rmtree(location)
618 self.obtain(location, url=url)
619
620 @classmethod
621 def get_remote_url(cls, location):
622 # type: (str) -> str
623 """
624 Return the url used at location
625
626 Raises RemoteNotFoundError if the repository does not have a remote
627 url configured.
628 """
629 raise NotImplementedError
630
631 @classmethod
632 def get_revision(cls, location):
633 # type: (str) -> str
634 """
635 Return the current commit id of the files at the given location.
636 """
637 raise NotImplementedError
638
639 @classmethod
640 def run_command(
641 cls,
642 cmd, # type: Union[List[str], CommandArgs]
643 show_stdout=True, # type: bool
644 cwd=None, # type: Optional[str]
645 on_returncode='raise', # type: str
646 extra_ok_returncodes=None, # type: Optional[Iterable[int]]
647 command_desc=None, # type: Optional[str]
648 extra_environ=None, # type: Optional[Mapping[str, Any]]
649 spinner=None, # type: Optional[SpinnerInterface]
650 log_failed_cmd=True # type: bool
651 ):
652 # type: (...) -> Text
653 """
654 Run a VCS subcommand
655 This is simply a wrapper around call_subprocess that adds the VCS
656 command name, and checks that the VCS is available
657 """
658 cmd = make_command(cls.name, *cmd)
659 try:
660 return call_subprocess(cmd, show_stdout, cwd,
661 on_returncode=on_returncode,
662 extra_ok_returncodes=extra_ok_returncodes,
663 command_desc=command_desc,
664 extra_environ=extra_environ,
665 unset_environ=cls.unset_environ,
666 spinner=spinner,
667 log_failed_cmd=log_failed_cmd)
668 except OSError as e:
669 # errno.ENOENT = no such file or directory
670 # In other words, the VCS executable isn't available
671 if e.errno == errno.ENOENT:
672 raise BadCommand(
673 'Cannot find command %r - do you have '
674 '%r installed and in your '
675 'PATH?' % (cls.name, cls.name))
676 else:
677 raise # re-raise exception if a different error occurred
678
679 @classmethod
680 def is_repository_directory(cls, path):
681 # type: (str) -> bool
682 """
683 Return whether a directory path is a repository directory.
684 """
685 logger.debug('Checking in %s for %s (%s)...',
686 path, cls.dirname, cls.name)
687 return os.path.exists(os.path.join(path, cls.dirname))
688
689 @classmethod
690 def controls_location(cls, location):
691 # type: (str) -> bool
692 """
693 Check if a location is controlled by the vcs.
694 It is meant to be overridden to implement smarter detection
695 mechanisms for specific vcs.
696
697 This can do more than is_repository_directory() alone. For example,
698 the Git override checks that Git is actually available.
699 """
700 return cls.is_repository_directory(location)