diff lib/python3.8/site-packages/pip/_internal/index/collector.py @ 1:64071f2a4cf0 draft default tip

Deleted selected files
author guerler
date Mon, 27 Jul 2020 03:55:49 -0400
parents 9e54283cc701
children
line wrap: on
line diff
--- a/lib/python3.8/site-packages/pip/_internal/index/collector.py	Mon Jul 27 03:47:31 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,544 +0,0 @@
-"""
-The main purpose of this module is to expose LinkCollector.collect_links().
-"""
-
-import cgi
-import itertools
-import logging
-import mimetypes
-import os
-from collections import OrderedDict
-
-from pip._vendor import html5lib, requests
-from pip._vendor.distlib.compat import unescape
-from pip._vendor.requests.exceptions import HTTPError, RetryError, SSLError
-from pip._vendor.six.moves.urllib import parse as urllib_parse
-from pip._vendor.six.moves.urllib import request as urllib_request
-
-from pip._internal.models.link import Link
-from pip._internal.utils.filetypes import ARCHIVE_EXTENSIONS
-from pip._internal.utils.misc import redact_auth_from_url
-from pip._internal.utils.typing import MYPY_CHECK_RUNNING
-from pip._internal.utils.urls import path_to_url, url_to_path
-from pip._internal.vcs import is_url, vcs
-
-if MYPY_CHECK_RUNNING:
-    from typing import (
-        Callable, Iterable, List, MutableMapping, Optional, Sequence, Tuple,
-        Union,
-    )
-    import xml.etree.ElementTree
-
-    from pip._vendor.requests import Response
-
-    from pip._internal.models.search_scope import SearchScope
-    from pip._internal.network.session import PipSession
-
-    HTMLElement = xml.etree.ElementTree.Element
-    ResponseHeaders = MutableMapping[str, str]
-
-
-logger = logging.getLogger(__name__)
-
-
-def _match_vcs_scheme(url):
-    # type: (str) -> Optional[str]
-    """Look for VCS schemes in the URL.
-
-    Returns the matched VCS scheme, or None if there's no match.
-    """
-    for scheme in vcs.schemes:
-        if url.lower().startswith(scheme) and url[len(scheme)] in '+:':
-            return scheme
-    return None
-
-
-def _is_url_like_archive(url):
-    # type: (str) -> bool
-    """Return whether the URL looks like an archive.
-    """
-    filename = Link(url).filename
-    for bad_ext in ARCHIVE_EXTENSIONS:
-        if filename.endswith(bad_ext):
-            return True
-    return False
-
-
-class _NotHTML(Exception):
-    def __init__(self, content_type, request_desc):
-        # type: (str, str) -> None
-        super(_NotHTML, self).__init__(content_type, request_desc)
-        self.content_type = content_type
-        self.request_desc = request_desc
-
-
-def _ensure_html_header(response):
-    # type: (Response) -> None
-    """Check the Content-Type header to ensure the response contains HTML.
-
-    Raises `_NotHTML` if the content type is not text/html.
-    """
-    content_type = response.headers.get("Content-Type", "")
-    if not content_type.lower().startswith("text/html"):
-        raise _NotHTML(content_type, response.request.method)
-
-
-class _NotHTTP(Exception):
-    pass
-
-
-def _ensure_html_response(url, session):
-    # type: (str, PipSession) -> None
-    """Send a HEAD request to the URL, and ensure the response contains HTML.
-
-    Raises `_NotHTTP` if the URL is not available for a HEAD request, or
-    `_NotHTML` if the content type is not text/html.
-    """
-    scheme, netloc, path, query, fragment = urllib_parse.urlsplit(url)
-    if scheme not in {'http', 'https'}:
-        raise _NotHTTP()
-
-    resp = session.head(url, allow_redirects=True)
-    resp.raise_for_status()
-
-    _ensure_html_header(resp)
-
-
-def _get_html_response(url, session):
-    # type: (str, PipSession) -> Response
-    """Access an HTML page with GET, and return the response.
-
-    This consists of three parts:
-
-    1. If the URL looks suspiciously like an archive, send a HEAD first to
-       check the Content-Type is HTML, to avoid downloading a large file.
-       Raise `_NotHTTP` if the content type cannot be determined, or
-       `_NotHTML` if it is not HTML.
-    2. Actually perform the request. Raise HTTP exceptions on network failures.
-    3. Check the Content-Type header to make sure we got HTML, and raise
-       `_NotHTML` otherwise.
-    """
-    if _is_url_like_archive(url):
-        _ensure_html_response(url, session=session)
-
-    logger.debug('Getting page %s', redact_auth_from_url(url))
-
-    resp = session.get(
-        url,
-        headers={
-            "Accept": "text/html",
-            # We don't want to blindly returned cached data for
-            # /simple/, because authors generally expecting that
-            # twine upload && pip install will function, but if
-            # they've done a pip install in the last ~10 minutes
-            # it won't. Thus by setting this to zero we will not
-            # blindly use any cached data, however the benefit of
-            # using max-age=0 instead of no-cache, is that we will
-            # still support conditional requests, so we will still
-            # minimize traffic sent in cases where the page hasn't
-            # changed at all, we will just always incur the round
-            # trip for the conditional GET now instead of only
-            # once per 10 minutes.
-            # For more information, please see pypa/pip#5670.
-            "Cache-Control": "max-age=0",
-        },
-    )
-    resp.raise_for_status()
-
-    # The check for archives above only works if the url ends with
-    # something that looks like an archive. However that is not a
-    # requirement of an url. Unless we issue a HEAD request on every
-    # url we cannot know ahead of time for sure if something is HTML
-    # or not. However we can check after we've downloaded it.
-    _ensure_html_header(resp)
-
-    return resp
-
-
-def _get_encoding_from_headers(headers):
-    # type: (ResponseHeaders) -> Optional[str]
-    """Determine if we have any encoding information in our headers.
-    """
-    if headers and "Content-Type" in headers:
-        content_type, params = cgi.parse_header(headers["Content-Type"])
-        if "charset" in params:
-            return params['charset']
-    return None
-
-
-def _determine_base_url(document, page_url):
-    # type: (HTMLElement, str) -> str
-    """Determine the HTML document's base URL.
-
-    This looks for a ``<base>`` tag in the HTML document. If present, its href
-    attribute denotes the base URL of anchor tags in the document. If there is
-    no such tag (or if it does not have a valid href attribute), the HTML
-    file's URL is used as the base URL.
-
-    :param document: An HTML document representation. The current
-        implementation expects the result of ``html5lib.parse()``.
-    :param page_url: The URL of the HTML document.
-    """
-    for base in document.findall(".//base"):
-        href = base.get("href")
-        if href is not None:
-            return href
-    return page_url
-
-
-def _clean_link(url):
-    # type: (str) -> str
-    """Makes sure a link is fully encoded.  That is, if a ' ' shows up in
-    the link, it will be rewritten to %20 (while not over-quoting
-    % or other characters)."""
-    # Split the URL into parts according to the general structure
-    # `scheme://netloc/path;parameters?query#fragment`. Note that the
-    # `netloc` can be empty and the URI will then refer to a local
-    # filesystem path.
-    result = urllib_parse.urlparse(url)
-    # In both cases below we unquote prior to quoting to make sure
-    # nothing is double quoted.
-    if result.netloc == "":
-        # On Windows the path part might contain a drive letter which
-        # should not be quoted. On Linux where drive letters do not
-        # exist, the colon should be quoted. We rely on urllib.request
-        # to do the right thing here.
-        path = urllib_request.pathname2url(
-            urllib_request.url2pathname(result.path))
-    else:
-        # In addition to the `/` character we protect `@` so that
-        # revision strings in VCS URLs are properly parsed.
-        path = urllib_parse.quote(urllib_parse.unquote(result.path), safe="/@")
-    return urllib_parse.urlunparse(result._replace(path=path))
-
-
-def _create_link_from_element(
-    anchor,    # type: HTMLElement
-    page_url,  # type: str
-    base_url,  # type: str
-):
-    # type: (...) -> Optional[Link]
-    """
-    Convert an anchor element in a simple repository page to a Link.
-    """
-    href = anchor.get("href")
-    if not href:
-        return None
-
-    url = _clean_link(urllib_parse.urljoin(base_url, href))
-    pyrequire = anchor.get('data-requires-python')
-    pyrequire = unescape(pyrequire) if pyrequire else None
-
-    yanked_reason = anchor.get('data-yanked')
-    if yanked_reason:
-        # This is a unicode string in Python 2 (and 3).
-        yanked_reason = unescape(yanked_reason)
-
-    link = Link(
-        url,
-        comes_from=page_url,
-        requires_python=pyrequire,
-        yanked_reason=yanked_reason,
-    )
-
-    return link
-
-
-def parse_links(page):
-    # type: (HTMLPage) -> Iterable[Link]
-    """
-    Parse an HTML document, and yield its anchor elements as Link objects.
-    """
-    document = html5lib.parse(
-        page.content,
-        transport_encoding=page.encoding,
-        namespaceHTMLElements=False,
-    )
-
-    url = page.url
-    base_url = _determine_base_url(document, url)
-    for anchor in document.findall(".//a"):
-        link = _create_link_from_element(
-            anchor,
-            page_url=url,
-            base_url=base_url,
-        )
-        if link is None:
-            continue
-        yield link
-
-
-class HTMLPage(object):
-    """Represents one page, along with its URL"""
-
-    def __init__(
-        self,
-        content,   # type: bytes
-        encoding,  # type: Optional[str]
-        url,       # type: str
-    ):
-        # type: (...) -> None
-        """
-        :param encoding: the encoding to decode the given content.
-        :param url: the URL from which the HTML was downloaded.
-        """
-        self.content = content
-        self.encoding = encoding
-        self.url = url
-
-    def __str__(self):
-        # type: () -> str
-        return redact_auth_from_url(self.url)
-
-
-def _handle_get_page_fail(
-    link,  # type: Link
-    reason,  # type: Union[str, Exception]
-    meth=None  # type: Optional[Callable[..., None]]
-):
-    # type: (...) -> None
-    if meth is None:
-        meth = logger.debug
-    meth("Could not fetch URL %s: %s - skipping", link, reason)
-
-
-def _make_html_page(response):
-    # type: (Response) -> HTMLPage
-    encoding = _get_encoding_from_headers(response.headers)
-    return HTMLPage(response.content, encoding=encoding, url=response.url)
-
-
-def _get_html_page(link, session=None):
-    # type: (Link, Optional[PipSession]) -> Optional[HTMLPage]
-    if session is None:
-        raise TypeError(
-            "_get_html_page() missing 1 required keyword argument: 'session'"
-        )
-
-    url = link.url.split('#', 1)[0]
-
-    # Check for VCS schemes that do not support lookup as web pages.
-    vcs_scheme = _match_vcs_scheme(url)
-    if vcs_scheme:
-        logger.debug('Cannot look at %s URL %s', vcs_scheme, link)
-        return None
-
-    # Tack index.html onto file:// URLs that point to directories
-    scheme, _, path, _, _, _ = urllib_parse.urlparse(url)
-    if (scheme == 'file' and os.path.isdir(urllib_request.url2pathname(path))):
-        # add trailing slash if not present so urljoin doesn't trim
-        # final segment
-        if not url.endswith('/'):
-            url += '/'
-        url = urllib_parse.urljoin(url, 'index.html')
-        logger.debug(' file: URL is directory, getting %s', url)
-
-    try:
-        resp = _get_html_response(url, session=session)
-    except _NotHTTP:
-        logger.debug(
-            'Skipping page %s because it looks like an archive, and cannot '
-            'be checked by HEAD.', link,
-        )
-    except _NotHTML as exc:
-        logger.debug(
-            'Skipping page %s because the %s request got Content-Type: %s',
-            link, exc.request_desc, exc.content_type,
-        )
-    except HTTPError as exc:
-        _handle_get_page_fail(link, exc)
-    except RetryError as exc:
-        _handle_get_page_fail(link, exc)
-    except SSLError as exc:
-        reason = "There was a problem confirming the ssl certificate: "
-        reason += str(exc)
-        _handle_get_page_fail(link, reason, meth=logger.info)
-    except requests.ConnectionError as exc:
-        _handle_get_page_fail(link, "connection error: %s" % exc)
-    except requests.Timeout:
-        _handle_get_page_fail(link, "timed out")
-    else:
-        return _make_html_page(resp)
-    return None
-
-
-def _remove_duplicate_links(links):
-    # type: (Iterable[Link]) -> List[Link]
-    """
-    Return a list of links, with duplicates removed and ordering preserved.
-    """
-    # We preserve the ordering when removing duplicates because we can.
-    return list(OrderedDict.fromkeys(links))
-
-
-def group_locations(locations, expand_dir=False):
-    # type: (Sequence[str], bool) -> Tuple[List[str], List[str]]
-    """
-    Divide a list of locations into two groups: "files" (archives) and "urls."
-
-    :return: A pair of lists (files, urls).
-    """
-    files = []
-    urls = []
-
-    # puts the url for the given file path into the appropriate list
-    def sort_path(path):
-        # type: (str) -> None
-        url = path_to_url(path)
-        if mimetypes.guess_type(url, strict=False)[0] == 'text/html':
-            urls.append(url)
-        else:
-            files.append(url)
-
-    for url in locations:
-
-        is_local_path = os.path.exists(url)
-        is_file_url = url.startswith('file:')
-
-        if is_local_path or is_file_url:
-            if is_local_path:
-                path = url
-            else:
-                path = url_to_path(url)
-            if os.path.isdir(path):
-                if expand_dir:
-                    path = os.path.realpath(path)
-                    for item in os.listdir(path):
-                        sort_path(os.path.join(path, item))
-                elif is_file_url:
-                    urls.append(url)
-                else:
-                    logger.warning(
-                        "Path '{0}' is ignored: "
-                        "it is a directory.".format(path),
-                    )
-            elif os.path.isfile(path):
-                sort_path(path)
-            else:
-                logger.warning(
-                    "Url '%s' is ignored: it is neither a file "
-                    "nor a directory.", url,
-                )
-        elif is_url(url):
-            # Only add url with clear scheme
-            urls.append(url)
-        else:
-            logger.warning(
-                "Url '%s' is ignored. It is either a non-existing "
-                "path or lacks a specific scheme.", url,
-            )
-
-    return files, urls
-
-
-class CollectedLinks(object):
-
-    """
-    Encapsulates the return value of a call to LinkCollector.collect_links().
-
-    The return value includes both URLs to project pages containing package
-    links, as well as individual package Link objects collected from other
-    sources.
-
-    This info is stored separately as:
-
-    (1) links from the configured file locations,
-    (2) links from the configured find_links, and
-    (3) urls to HTML project pages, as described by the PEP 503 simple
-        repository API.
-    """
-
-    def __init__(
-        self,
-        files,         # type: List[Link]
-        find_links,    # type: List[Link]
-        project_urls,  # type: List[Link]
-    ):
-        # type: (...) -> None
-        """
-        :param files: Links from file locations.
-        :param find_links: Links from find_links.
-        :param project_urls: URLs to HTML project pages, as described by
-            the PEP 503 simple repository API.
-        """
-        self.files = files
-        self.find_links = find_links
-        self.project_urls = project_urls
-
-
-class LinkCollector(object):
-
-    """
-    Responsible for collecting Link objects from all configured locations,
-    making network requests as needed.
-
-    The class's main method is its collect_links() method.
-    """
-
-    def __init__(
-        self,
-        session,       # type: PipSession
-        search_scope,  # type: SearchScope
-    ):
-        # type: (...) -> None
-        self.search_scope = search_scope
-        self.session = session
-
-    @property
-    def find_links(self):
-        # type: () -> List[str]
-        return self.search_scope.find_links
-
-    def fetch_page(self, location):
-        # type: (Link) -> Optional[HTMLPage]
-        """
-        Fetch an HTML page containing package links.
-        """
-        return _get_html_page(location, session=self.session)
-
-    def collect_links(self, project_name):
-        # type: (str) -> CollectedLinks
-        """Find all available links for the given project name.
-
-        :return: All the Link objects (unfiltered), as a CollectedLinks object.
-        """
-        search_scope = self.search_scope
-        index_locations = search_scope.get_index_urls_locations(project_name)
-        index_file_loc, index_url_loc = group_locations(index_locations)
-        fl_file_loc, fl_url_loc = group_locations(
-            self.find_links, expand_dir=True,
-        )
-
-        file_links = [
-            Link(url) for url in itertools.chain(index_file_loc, fl_file_loc)
-        ]
-
-        # We trust every directly linked archive in find_links
-        find_link_links = [Link(url, '-f') for url in self.find_links]
-
-        # We trust every url that the user has given us whether it was given
-        # via --index-url or --find-links.
-        # We want to filter out anything that does not have a secure origin.
-        url_locations = [
-            link for link in itertools.chain(
-                (Link(url) for url in index_url_loc),
-                (Link(url) for url in fl_url_loc),
-            )
-            if self.session.is_secure_origin(link)
-        ]
-
-        url_locations = _remove_duplicate_links(url_locations)
-        lines = [
-            '{} location(s) to search for versions of {}:'.format(
-                len(url_locations), project_name,
-            ),
-        ]
-        for link in url_locations:
-            lines.append('* {}'.format(link))
-        logger.debug('\n'.join(lines))
-
-        return CollectedLinks(
-            files=file_links,
-            find_links=find_link_links,
-            project_urls=url_locations,
-        )