Mercurial > repos > guerler > hhblits
comparison lib/python3.8/site-packages/setuptools/archive_util.py @ 0:9e54283cc701 draft
"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
| author | guerler |
|---|---|
| date | Mon, 27 Jul 2020 03:47:31 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:9e54283cc701 |
|---|---|
| 1 """Utilities for extracting common archive formats""" | |
| 2 | |
| 3 import zipfile | |
| 4 import tarfile | |
| 5 import os | |
| 6 import shutil | |
| 7 import posixpath | |
| 8 import contextlib | |
| 9 from distutils.errors import DistutilsError | |
| 10 | |
| 11 from pkg_resources import ensure_directory | |
| 12 | |
| 13 __all__ = [ | |
| 14 "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter", | |
| 15 "UnrecognizedFormat", "extraction_drivers", "unpack_directory", | |
| 16 ] | |
| 17 | |
| 18 | |
| 19 class UnrecognizedFormat(DistutilsError): | |
| 20 """Couldn't recognize the archive type""" | |
| 21 | |
| 22 | |
| 23 def default_filter(src, dst): | |
| 24 """The default progress/filter callback; returns True for all files""" | |
| 25 return dst | |
| 26 | |
| 27 | |
| 28 def unpack_archive( | |
| 29 filename, extract_dir, progress_filter=default_filter, | |
| 30 drivers=None): | |
| 31 """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat`` | |
| 32 | |
| 33 `progress_filter` is a function taking two arguments: a source path | |
| 34 internal to the archive ('/'-separated), and a filesystem path where it | |
| 35 will be extracted. The callback must return the desired extract path | |
| 36 (which may be the same as the one passed in), or else ``None`` to skip | |
| 37 that file or directory. The callback can thus be used to report on the | |
| 38 progress of the extraction, as well as to filter the items extracted or | |
| 39 alter their extraction paths. | |
| 40 | |
| 41 `drivers`, if supplied, must be a non-empty sequence of functions with the | |
| 42 same signature as this function (minus the `drivers` argument), that raise | |
| 43 ``UnrecognizedFormat`` if they do not support extracting the designated | |
| 44 archive type. The `drivers` are tried in sequence until one is found that | |
| 45 does not raise an error, or until all are exhausted (in which case | |
| 46 ``UnrecognizedFormat`` is raised). If you do not supply a sequence of | |
| 47 drivers, the module's ``extraction_drivers`` constant will be used, which | |
| 48 means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that | |
| 49 order. | |
| 50 """ | |
| 51 for driver in drivers or extraction_drivers: | |
| 52 try: | |
| 53 driver(filename, extract_dir, progress_filter) | |
| 54 except UnrecognizedFormat: | |
| 55 continue | |
| 56 else: | |
| 57 return | |
| 58 else: | |
| 59 raise UnrecognizedFormat( | |
| 60 "Not a recognized archive type: %s" % filename | |
| 61 ) | |
| 62 | |
| 63 | |
| 64 def unpack_directory(filename, extract_dir, progress_filter=default_filter): | |
| 65 """"Unpack" a directory, using the same interface as for archives | |
| 66 | |
| 67 Raises ``UnrecognizedFormat`` if `filename` is not a directory | |
| 68 """ | |
| 69 if not os.path.isdir(filename): | |
| 70 raise UnrecognizedFormat("%s is not a directory" % filename) | |
| 71 | |
| 72 paths = { | |
| 73 filename: ('', extract_dir), | |
| 74 } | |
| 75 for base, dirs, files in os.walk(filename): | |
| 76 src, dst = paths[base] | |
| 77 for d in dirs: | |
| 78 paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d) | |
| 79 for f in files: | |
| 80 target = os.path.join(dst, f) | |
| 81 target = progress_filter(src + f, target) | |
| 82 if not target: | |
| 83 # skip non-files | |
| 84 continue | |
| 85 ensure_directory(target) | |
| 86 f = os.path.join(base, f) | |
| 87 shutil.copyfile(f, target) | |
| 88 shutil.copystat(f, target) | |
| 89 | |
| 90 | |
| 91 def unpack_zipfile(filename, extract_dir, progress_filter=default_filter): | |
| 92 """Unpack zip `filename` to `extract_dir` | |
| 93 | |
| 94 Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined | |
| 95 by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation | |
| 96 of the `progress_filter` argument. | |
| 97 """ | |
| 98 | |
| 99 if not zipfile.is_zipfile(filename): | |
| 100 raise UnrecognizedFormat("%s is not a zip file" % (filename,)) | |
| 101 | |
| 102 with zipfile.ZipFile(filename) as z: | |
| 103 for info in z.infolist(): | |
| 104 name = info.filename | |
| 105 | |
| 106 # don't extract absolute paths or ones with .. in them | |
| 107 if name.startswith('/') or '..' in name.split('/'): | |
| 108 continue | |
| 109 | |
| 110 target = os.path.join(extract_dir, *name.split('/')) | |
| 111 target = progress_filter(name, target) | |
| 112 if not target: | |
| 113 continue | |
| 114 if name.endswith('/'): | |
| 115 # directory | |
| 116 ensure_directory(target) | |
| 117 else: | |
| 118 # file | |
| 119 ensure_directory(target) | |
| 120 data = z.read(info.filename) | |
| 121 with open(target, 'wb') as f: | |
| 122 f.write(data) | |
| 123 unix_attributes = info.external_attr >> 16 | |
| 124 if unix_attributes: | |
| 125 os.chmod(target, unix_attributes) | |
| 126 | |
| 127 | |
| 128 def unpack_tarfile(filename, extract_dir, progress_filter=default_filter): | |
| 129 """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` | |
| 130 | |
| 131 Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined | |
| 132 by ``tarfile.open()``). See ``unpack_archive()`` for an explanation | |
| 133 of the `progress_filter` argument. | |
| 134 """ | |
| 135 try: | |
| 136 tarobj = tarfile.open(filename) | |
| 137 except tarfile.TarError: | |
| 138 raise UnrecognizedFormat( | |
| 139 "%s is not a compressed or uncompressed tar file" % (filename,) | |
| 140 ) | |
| 141 with contextlib.closing(tarobj): | |
| 142 # don't do any chowning! | |
| 143 tarobj.chown = lambda *args: None | |
| 144 for member in tarobj: | |
| 145 name = member.name | |
| 146 # don't extract absolute paths or ones with .. in them | |
| 147 if not name.startswith('/') and '..' not in name.split('/'): | |
| 148 prelim_dst = os.path.join(extract_dir, *name.split('/')) | |
| 149 | |
| 150 # resolve any links and to extract the link targets as normal | |
| 151 # files | |
| 152 while member is not None and ( | |
| 153 member.islnk() or member.issym()): | |
| 154 linkpath = member.linkname | |
| 155 if member.issym(): | |
| 156 base = posixpath.dirname(member.name) | |
| 157 linkpath = posixpath.join(base, linkpath) | |
| 158 linkpath = posixpath.normpath(linkpath) | |
| 159 member = tarobj._getmember(linkpath) | |
| 160 | |
| 161 if member is not None and (member.isfile() or member.isdir()): | |
| 162 final_dst = progress_filter(name, prelim_dst) | |
| 163 if final_dst: | |
| 164 if final_dst.endswith(os.sep): | |
| 165 final_dst = final_dst[:-1] | |
| 166 try: | |
| 167 # XXX Ugh | |
| 168 tarobj._extract_member(member, final_dst) | |
| 169 except tarfile.ExtractError: | |
| 170 # chown/chmod/mkfifo/mknode/makedev failed | |
| 171 pass | |
| 172 return True | |
| 173 | |
| 174 | |
| 175 extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile |
