Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/schema_salad/fetcher.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Resource fetching.""" | |
2 import logging | |
3 import os | |
4 import re | |
5 import sys | |
6 import urllib | |
7 from typing import List, Optional | |
8 | |
9 import requests | |
10 | |
11 from .exceptions import ValidationException | |
12 from .utils import CacheType | |
13 | |
14 _re_drive = re.compile(r"/([a-zA-Z]):") | |
15 _logger = logging.getLogger("salad") | |
16 | |
17 | |
18 class Fetcher: | |
19 def __init__( | |
20 self, | |
21 cache: CacheType, | |
22 session: Optional[requests.sessions.Session], | |
23 ) -> None: | |
24 pass | |
25 | |
26 def fetch_text(self, url: str, content_types: Optional[List[str]] = None) -> str: | |
27 raise NotImplementedError() | |
28 | |
29 def check_exists(self, url: str) -> bool: | |
30 raise NotImplementedError() | |
31 | |
32 def urljoin(self, base_url: str, url: str) -> str: | |
33 raise NotImplementedError() | |
34 | |
35 schemes = ["file", "http", "https", "mailto"] | |
36 | |
37 def supported_schemes(self) -> List[str]: | |
38 return self.schemes | |
39 | |
40 | |
41 class DefaultFetcher(Fetcher): | |
42 def __init__( | |
43 self, | |
44 cache: CacheType, | |
45 session: Optional[requests.sessions.Session], | |
46 ) -> None: | |
47 self.cache = cache | |
48 self.session = session | |
49 | |
50 def fetch_text(self, url: str, content_types: Optional[List[str]] = None) -> str: | |
51 """Retrieve the given resource as a string.""" | |
52 result = self.cache.get(url, None) | |
53 if isinstance(result, str): | |
54 return result | |
55 | |
56 split = urllib.parse.urlsplit(url) | |
57 scheme, path = split.scheme, split.path | |
58 | |
59 if scheme in ["http", "https"] and self.session is not None: | |
60 try: | |
61 headers = {} | |
62 if content_types: | |
63 headers["Accept"] = ", ".join(content_types) + ", */*;q=0.8" | |
64 resp = self.session.get(url, headers=headers) | |
65 resp.raise_for_status() | |
66 except Exception as e: | |
67 raise ValidationException(f"Error fetching {url}: {e}") from e | |
68 if content_types and "content-type" in resp.headers: | |
69 content_type = resp.headers["content-type"].split(";")[:1][0] | |
70 if content_type not in content_types: | |
71 _logger.warning( | |
72 f"While fetching {url}, got content-type of " | |
73 f"'{content_type}'. Expected one of {content_types}." | |
74 ) | |
75 return resp.text | |
76 if scheme == "file": | |
77 try: | |
78 # On Windows, url.path will be /drive:/path ; on Unix systems, | |
79 # /path. As we want drive:/path instead of /drive:/path on Windows, | |
80 # remove the leading /. | |
81 if os.path.isabs( | |
82 path[1:] | |
83 ): # checking if pathis valid after removing front / or not | |
84 path = path[1:] | |
85 with open( | |
86 urllib.request.url2pathname(str(path)), encoding="utf-8" | |
87 ) as fp: | |
88 return str(fp.read()) | |
89 | |
90 except OSError as err: | |
91 if err.filename == path: | |
92 raise ValidationException(str(err)) from err | |
93 else: | |
94 raise ValidationException(f"Error reading {url}: {err}") from err | |
95 raise ValidationException(f"Unsupported scheme in url: {url}") | |
96 | |
97 def check_exists(self, url: str) -> bool: | |
98 if url in self.cache: | |
99 return True | |
100 | |
101 split = urllib.parse.urlsplit(url) | |
102 scheme, path = split.scheme, split.path | |
103 | |
104 if scheme in ["http", "https"] and self.session is not None: | |
105 try: | |
106 resp = self.session.head(url) | |
107 resp.raise_for_status() | |
108 except Exception: | |
109 return False | |
110 self.cache[url] = True | |
111 return True | |
112 if scheme == "file": | |
113 return os.path.exists(urllib.request.url2pathname(str(path))) | |
114 if scheme == "mailto": | |
115 return True | |
116 raise ValidationException(f"Unsupported scheme in url: {url}") | |
117 | |
118 def urljoin(self, base_url: str, url: str) -> str: | |
119 if url.startswith("_:"): | |
120 return url | |
121 | |
122 basesplit = urllib.parse.urlsplit(base_url) | |
123 split = urllib.parse.urlsplit(url) | |
124 if basesplit.scheme and basesplit.scheme != "file" and split.scheme == "file": | |
125 raise ValidationException( | |
126 "Not resolving potential remote exploit {} from base {}".format( | |
127 url, base_url | |
128 ) | |
129 ) | |
130 | |
131 if sys.platform == "win32": | |
132 if base_url == url: | |
133 return url | |
134 basesplit = urllib.parse.urlsplit(base_url) | |
135 # note that below might split | |
136 # "C:" with "C" as URI scheme | |
137 split = urllib.parse.urlsplit(url) | |
138 | |
139 has_drive = split.scheme and len(split.scheme) == 1 | |
140 | |
141 if basesplit.scheme == "file": | |
142 # Special handling of relative file references on Windows | |
143 # as urllib seems to not be quite up to the job | |
144 | |
145 # netloc MIGHT appear in equivalents of UNC Strings | |
146 # \\server1.example.com\path as | |
147 # file:///server1.example.com/path | |
148 # https://tools.ietf.org/html/rfc8089#appendix-E.3.2 | |
149 # (TODO: test this) | |
150 netloc = split.netloc or basesplit.netloc | |
151 | |
152 # Check if url is a local path like "C:/Users/fred" | |
153 # or actually an absolute URI like http://example.com/fred | |
154 if has_drive: | |
155 # Assume split.scheme is actually a drive, e.g. "C:" | |
156 # so we'll recombine into a path | |
157 path_with_drive = urllib.parse.urlunsplit( | |
158 (split.scheme, "", split.path, "", "") | |
159 ) | |
160 # Compose new file:/// URI with path_with_drive | |
161 # .. carrying over any #fragment (?query just in case..) | |
162 return urllib.parse.urlunsplit( | |
163 ("file", netloc, path_with_drive, split.query, split.fragment) | |
164 ) | |
165 if ( | |
166 not split.scheme | |
167 and not netloc | |
168 and split.path | |
169 and split.path.startswith("/") | |
170 ): | |
171 # Relative - but does it have a drive? | |
172 base_drive = _re_drive.match(basesplit.path) | |
173 drive = _re_drive.match(split.path) | |
174 if base_drive and not drive: | |
175 # Keep drive letter from base_url | |
176 # https://tools.ietf.org/html/rfc8089#appendix-E.2.1 | |
177 # e.g. urljoin("file:///D:/bar/a.txt", "/foo/b.txt") | |
178 # == file:///D:/foo/b.txt | |
179 path_with_drive = "/{}:{}".format( | |
180 base_drive.group(1), split.path | |
181 ) | |
182 return urllib.parse.urlunsplit( | |
183 ( | |
184 "file", | |
185 netloc, | |
186 path_with_drive, | |
187 split.query, | |
188 split.fragment, | |
189 ) | |
190 ) | |
191 | |
192 # else: fall-through to resolve as relative URI | |
193 elif has_drive: | |
194 # Base is http://something but url is C:/something - which urllib | |
195 # would wrongly resolve as an absolute path that could later be used | |
196 # to access local files | |
197 raise ValidationException( | |
198 "Not resolving potential remote exploit {} from base {}".format( | |
199 url, base_url | |
200 ) | |
201 ) | |
202 | |
203 return urllib.parse.urljoin(base_url, url) |