Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/zipp.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 import io | |
2 import posixpath | |
3 import zipfile | |
4 import itertools | |
5 import contextlib | |
6 import sys | |
7 | |
8 if sys.version_info < (3, 7): | |
9 from collections import OrderedDict | |
10 else: | |
11 OrderedDict = dict | |
12 | |
13 | |
14 def _parents(path): | |
15 """ | |
16 Given a path with elements separated by | |
17 posixpath.sep, generate all parents of that path. | |
18 | |
19 >>> list(_parents('b/d')) | |
20 ['b'] | |
21 >>> list(_parents('/b/d/')) | |
22 ['/b'] | |
23 >>> list(_parents('b/d/f/')) | |
24 ['b/d', 'b'] | |
25 >>> list(_parents('b')) | |
26 [] | |
27 >>> list(_parents('')) | |
28 [] | |
29 """ | |
30 return itertools.islice(_ancestry(path), 1, None) | |
31 | |
32 | |
33 def _ancestry(path): | |
34 """ | |
35 Given a path with elements separated by | |
36 posixpath.sep, generate all elements of that path | |
37 | |
38 >>> list(_ancestry('b/d')) | |
39 ['b/d', 'b'] | |
40 >>> list(_ancestry('/b/d/')) | |
41 ['/b/d', '/b'] | |
42 >>> list(_ancestry('b/d/f/')) | |
43 ['b/d/f', 'b/d', 'b'] | |
44 >>> list(_ancestry('b')) | |
45 ['b'] | |
46 >>> list(_ancestry('')) | |
47 [] | |
48 """ | |
49 path = path.rstrip(posixpath.sep) | |
50 while path and path != posixpath.sep: | |
51 yield path | |
52 path, tail = posixpath.split(path) | |
53 | |
54 | |
55 _dedupe = OrderedDict.fromkeys | |
56 """Deduplicate an iterable in original order""" | |
57 | |
58 | |
59 def _difference(minuend, subtrahend): | |
60 """ | |
61 Return items in minuend not in subtrahend, retaining order | |
62 with O(1) lookup. | |
63 """ | |
64 return itertools.filterfalse(set(subtrahend).__contains__, minuend) | |
65 | |
66 | |
67 class CompleteDirs(zipfile.ZipFile): | |
68 """ | |
69 A ZipFile subclass that ensures that implied directories | |
70 are always included in the namelist. | |
71 """ | |
72 | |
73 @staticmethod | |
74 def _implied_dirs(names): | |
75 parents = itertools.chain.from_iterable(map(_parents, names)) | |
76 as_dirs = (p + posixpath.sep for p in parents) | |
77 return _dedupe(_difference(as_dirs, names)) | |
78 | |
79 def namelist(self): | |
80 names = super(CompleteDirs, self).namelist() | |
81 return names + list(self._implied_dirs(names)) | |
82 | |
83 def _name_set(self): | |
84 return set(self.namelist()) | |
85 | |
86 def resolve_dir(self, name): | |
87 """ | |
88 If the name represents a directory, return that name | |
89 as a directory (with the trailing slash). | |
90 """ | |
91 names = self._name_set() | |
92 dirname = name + '/' | |
93 dir_match = name not in names and dirname in names | |
94 return dirname if dir_match else name | |
95 | |
96 @classmethod | |
97 def make(cls, source): | |
98 """ | |
99 Given a source (filename or zipfile), return an | |
100 appropriate CompleteDirs subclass. | |
101 """ | |
102 if isinstance(source, CompleteDirs): | |
103 return source | |
104 | |
105 if not isinstance(source, zipfile.ZipFile): | |
106 return cls(_pathlib_compat(source)) | |
107 | |
108 # Only allow for FastPath when supplied zipfile is read-only | |
109 if 'r' not in source.mode: | |
110 cls = CompleteDirs | |
111 | |
112 res = cls.__new__(cls) | |
113 vars(res).update(vars(source)) | |
114 return res | |
115 | |
116 | |
117 class FastLookup(CompleteDirs): | |
118 """ | |
119 ZipFile subclass to ensure implicit | |
120 dirs exist and are resolved rapidly. | |
121 """ | |
122 def namelist(self): | |
123 with contextlib.suppress(AttributeError): | |
124 return self.__names | |
125 self.__names = super(FastLookup, self).namelist() | |
126 return self.__names | |
127 | |
128 def _name_set(self): | |
129 with contextlib.suppress(AttributeError): | |
130 return self.__lookup | |
131 self.__lookup = super(FastLookup, self)._name_set() | |
132 return self.__lookup | |
133 | |
134 | |
135 def _pathlib_compat(path): | |
136 """ | |
137 For path-like objects, convert to a filename for compatibility | |
138 on Python 3.6.1 and earlier. | |
139 """ | |
140 try: | |
141 return path.__fspath__() | |
142 except AttributeError: | |
143 return str(path) | |
144 | |
145 | |
146 class Path: | |
147 """ | |
148 A pathlib-compatible interface for zip files. | |
149 | |
150 Consider a zip file with this structure:: | |
151 | |
152 . | |
153 ├── a.txt | |
154 └── b | |
155 ├── c.txt | |
156 └── d | |
157 └── e.txt | |
158 | |
159 >>> data = io.BytesIO() | |
160 >>> zf = zipfile.ZipFile(data, 'w') | |
161 >>> zf.writestr('a.txt', 'content of a') | |
162 >>> zf.writestr('b/c.txt', 'content of c') | |
163 >>> zf.writestr('b/d/e.txt', 'content of e') | |
164 >>> zf.filename = 'abcde.zip' | |
165 | |
166 Path accepts the zipfile object itself or a filename | |
167 | |
168 >>> root = Path(zf) | |
169 | |
170 From there, several path operations are available. | |
171 | |
172 Directory iteration (including the zip file itself): | |
173 | |
174 >>> a, b = root.iterdir() | |
175 >>> a | |
176 Path('abcde.zip', 'a.txt') | |
177 >>> b | |
178 Path('abcde.zip', 'b/') | |
179 | |
180 name property: | |
181 | |
182 >>> b.name | |
183 'b' | |
184 | |
185 join with divide operator: | |
186 | |
187 >>> c = b / 'c.txt' | |
188 >>> c | |
189 Path('abcde.zip', 'b/c.txt') | |
190 >>> c.name | |
191 'c.txt' | |
192 | |
193 Read text: | |
194 | |
195 >>> c.read_text() | |
196 'content of c' | |
197 | |
198 existence: | |
199 | |
200 >>> c.exists() | |
201 True | |
202 >>> (b / 'missing.txt').exists() | |
203 False | |
204 | |
205 Coercion to string: | |
206 | |
207 >>> str(c) | |
208 'abcde.zip/b/c.txt' | |
209 """ | |
210 | |
211 __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" | |
212 | |
213 def __init__(self, root, at=""): | |
214 self.root = FastLookup.make(root) | |
215 self.at = at | |
216 | |
217 def open(self, mode='r', *args, pwd=None, **kwargs): | |
218 """ | |
219 Open this entry as text or binary following the semantics | |
220 of ``pathlib.Path.open()`` by passing arguments through | |
221 to io.TextIOWrapper(). | |
222 """ | |
223 if self.is_dir(): | |
224 raise IsADirectoryError(self) | |
225 zip_mode = mode[0] | |
226 if not self.exists() and zip_mode == 'r': | |
227 raise FileNotFoundError(self) | |
228 stream = self.root.open(self.at, zip_mode, pwd=pwd) | |
229 if 'b' in mode: | |
230 if args or kwargs: | |
231 raise ValueError("encoding args invalid for binary operation") | |
232 return stream | |
233 return io.TextIOWrapper(stream, *args, **kwargs) | |
234 | |
235 @property | |
236 def name(self): | |
237 return posixpath.basename(self.at.rstrip("/")) | |
238 | |
239 def read_text(self, *args, **kwargs): | |
240 with self.open('r', *args, **kwargs) as strm: | |
241 return strm.read() | |
242 | |
243 def read_bytes(self): | |
244 with self.open('rb') as strm: | |
245 return strm.read() | |
246 | |
247 def _is_child(self, path): | |
248 return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") | |
249 | |
250 def _next(self, at): | |
251 return Path(self.root, at) | |
252 | |
253 def is_dir(self): | |
254 return not self.at or self.at.endswith("/") | |
255 | |
256 def is_file(self): | |
257 return not self.is_dir() | |
258 | |
259 def exists(self): | |
260 return self.at in self.root._name_set() | |
261 | |
262 def iterdir(self): | |
263 if not self.is_dir(): | |
264 raise ValueError("Can't listdir a file") | |
265 subs = map(self._next, self.root.namelist()) | |
266 return filter(self._is_child, subs) | |
267 | |
268 def __str__(self): | |
269 return posixpath.join(self.root.filename, self.at) | |
270 | |
271 def __repr__(self): | |
272 return self.__repr.format(self=self) | |
273 | |
274 def joinpath(self, add): | |
275 next = posixpath.join(self.at, _pathlib_compat(add)) | |
276 return self._next(self.root.resolve_dir(next)) | |
277 | |
278 __truediv__ = joinpath | |
279 | |
280 @property | |
281 def parent(self): | |
282 parent_at = posixpath.dirname(self.at.rstrip('/')) | |
283 if parent_at: | |
284 parent_at += '/' | |
285 return self._next(parent_at) |