comparison planemo/lib/python3.7/site-packages/setuptools/archive_util.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 """Utilities for extracting common archive formats"""
2
3 import zipfile
4 import tarfile
5 import os
6 import shutil
7 import posixpath
8 import contextlib
9 from distutils.errors import DistutilsError
10
11 from pkg_resources import ensure_directory
12
13 __all__ = [
14 "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
15 "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
16 ]
17
18
19 class UnrecognizedFormat(DistutilsError):
20 """Couldn't recognize the archive type"""
21
22
23 def default_filter(src, dst):
24 """The default progress/filter callback; returns True for all files"""
25 return dst
26
27
28 def unpack_archive(filename, extract_dir, progress_filter=default_filter,
29 drivers=None):
30 """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
31
32 `progress_filter` is a function taking two arguments: a source path
33 internal to the archive ('/'-separated), and a filesystem path where it
34 will be extracted. The callback must return the desired extract path
35 (which may be the same as the one passed in), or else ``None`` to skip
36 that file or directory. The callback can thus be used to report on the
37 progress of the extraction, as well as to filter the items extracted or
38 alter their extraction paths.
39
40 `drivers`, if supplied, must be a non-empty sequence of functions with the
41 same signature as this function (minus the `drivers` argument), that raise
42 ``UnrecognizedFormat`` if they do not support extracting the designated
43 archive type. The `drivers` are tried in sequence until one is found that
44 does not raise an error, or until all are exhausted (in which case
45 ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
46 drivers, the module's ``extraction_drivers`` constant will be used, which
47 means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
48 order.
49 """
50 for driver in drivers or extraction_drivers:
51 try:
52 driver(filename, extract_dir, progress_filter)
53 except UnrecognizedFormat:
54 continue
55 else:
56 return
57 else:
58 raise UnrecognizedFormat(
59 "Not a recognized archive type: %s" % filename
60 )
61
62
63 def unpack_directory(filename, extract_dir, progress_filter=default_filter):
64 """"Unpack" a directory, using the same interface as for archives
65
66 Raises ``UnrecognizedFormat`` if `filename` is not a directory
67 """
68 if not os.path.isdir(filename):
69 raise UnrecognizedFormat("%s is not a directory" % filename)
70
71 paths = {
72 filename: ('', extract_dir),
73 }
74 for base, dirs, files in os.walk(filename):
75 src, dst = paths[base]
76 for d in dirs:
77 paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
78 for f in files:
79 target = os.path.join(dst, f)
80 target = progress_filter(src + f, target)
81 if not target:
82 # skip non-files
83 continue
84 ensure_directory(target)
85 f = os.path.join(base, f)
86 shutil.copyfile(f, target)
87 shutil.copystat(f, target)
88
89
90 def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
91 """Unpack zip `filename` to `extract_dir`
92
93 Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
94 by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
95 of the `progress_filter` argument.
96 """
97
98 if not zipfile.is_zipfile(filename):
99 raise UnrecognizedFormat("%s is not a zip file" % (filename,))
100
101 with zipfile.ZipFile(filename) as z:
102 for info in z.infolist():
103 name = info.filename
104
105 # don't extract absolute paths or ones with .. in them
106 if name.startswith('/') or '..' in name.split('/'):
107 continue
108
109 target = os.path.join(extract_dir, *name.split('/'))
110 target = progress_filter(name, target)
111 if not target:
112 continue
113 if name.endswith('/'):
114 # directory
115 ensure_directory(target)
116 else:
117 # file
118 ensure_directory(target)
119 data = z.read(info.filename)
120 with open(target, 'wb') as f:
121 f.write(data)
122 unix_attributes = info.external_attr >> 16
123 if unix_attributes:
124 os.chmod(target, unix_attributes)
125
126
127 def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
128 """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
129
130 Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
131 by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
132 of the `progress_filter` argument.
133 """
134 try:
135 tarobj = tarfile.open(filename)
136 except tarfile.TarError:
137 raise UnrecognizedFormat(
138 "%s is not a compressed or uncompressed tar file" % (filename,)
139 )
140 with contextlib.closing(tarobj):
141 # don't do any chowning!
142 tarobj.chown = lambda *args: None
143 for member in tarobj:
144 name = member.name
145 # don't extract absolute paths or ones with .. in them
146 if not name.startswith('/') and '..' not in name.split('/'):
147 prelim_dst = os.path.join(extract_dir, *name.split('/'))
148
149 # resolve any links and to extract the link targets as normal
150 # files
151 while member is not None and (member.islnk() or member.issym()):
152 linkpath = member.linkname
153 if member.issym():
154 base = posixpath.dirname(member.name)
155 linkpath = posixpath.join(base, linkpath)
156 linkpath = posixpath.normpath(linkpath)
157 member = tarobj._getmember(linkpath)
158
159 if member is not None and (member.isfile() or member.isdir()):
160 final_dst = progress_filter(name, prelim_dst)
161 if final_dst:
162 if final_dst.endswith(os.sep):
163 final_dst = final_dst[:-1]
164 try:
165 # XXX Ugh
166 tarobj._extract_member(member, final_dst)
167 except tarfile.ExtractError:
168 # chown/chmod/mkfifo/mknode/makedev failed
169 pass
170 return True
171
172
173 extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile