comparison lib/python3.8/site-packages/wheel/wheelfile.py @ 0:9e54283cc701 draft

"planemo upload commit d12c32a45bcd441307e632fca6d9af7d60289d44"
author guerler
date Mon, 27 Jul 2020 03:47:31 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:9e54283cc701
1 from __future__ import print_function
2
3 import csv
4 import hashlib
5 import os.path
6 import re
7 import stat
8 import time
9 from collections import OrderedDict
10 from distutils import log as logger
11 from zipfile import ZIP_DEFLATED, ZipInfo, ZipFile
12
13 from wheel.cli import WheelError
14 from wheel.util import urlsafe_b64decode, as_unicode, native, urlsafe_b64encode, as_bytes, StringIO
15
16 # Non-greedy matching of an optional build number may be too clever (more
17 # invalid wheel filenames will match). Separate regex for .dist-info?
18 WHEEL_INFO_RE = re.compile(
19 r"""^(?P<namever>(?P<name>.+?)-(?P<ver>.+?))(-(?P<build>\d[^-]*))?
20 -(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?)\.whl$""",
21 re.VERBOSE)
22
23
24 def get_zipinfo_datetime(timestamp=None):
25 # Some applications need reproducible .whl files, but they can't do this without forcing
26 # the timestamp of the individual ZipInfo objects. See issue #143.
27 timestamp = int(os.environ.get('SOURCE_DATE_EPOCH', timestamp or time.time()))
28 return time.gmtime(timestamp)[0:6]
29
30
31 class WheelFile(ZipFile):
32 """A ZipFile derivative class that also reads SHA-256 hashes from
33 .dist-info/RECORD and checks any read files against those.
34 """
35
36 _default_algorithm = hashlib.sha256
37
38 def __init__(self, file, mode='r', compression=ZIP_DEFLATED):
39 basename = os.path.basename(file)
40 self.parsed_filename = WHEEL_INFO_RE.match(basename)
41 if not basename.endswith('.whl') or self.parsed_filename is None:
42 raise WheelError("Bad wheel filename {!r}".format(basename))
43
44 ZipFile.__init__(self, file, mode, compression=compression, allowZip64=True)
45
46 self.dist_info_path = '{}.dist-info'.format(self.parsed_filename.group('namever'))
47 self.record_path = self.dist_info_path + '/RECORD'
48 self._file_hashes = OrderedDict()
49 self._file_sizes = {}
50 if mode == 'r':
51 # Ignore RECORD and any embedded wheel signatures
52 self._file_hashes[self.record_path] = None, None
53 self._file_hashes[self.record_path + '.jws'] = None, None
54 self._file_hashes[self.record_path + '.p7s'] = None, None
55
56 # Fill in the expected hashes by reading them from RECORD
57 try:
58 record = self.open(self.record_path)
59 except KeyError:
60 raise WheelError('Missing {} file'.format(self.record_path))
61
62 with record:
63 for line in record:
64 line = line.decode('utf-8')
65 path, hash_sum, size = line.rsplit(u',', 2)
66 if hash_sum:
67 algorithm, hash_sum = hash_sum.split(u'=')
68 try:
69 hashlib.new(algorithm)
70 except ValueError:
71 raise WheelError('Unsupported hash algorithm: {}'.format(algorithm))
72
73 if algorithm.lower() in {'md5', 'sha1'}:
74 raise WheelError(
75 'Weak hash algorithm ({}) is not permitted by PEP 427'
76 .format(algorithm))
77
78 self._file_hashes[path] = (
79 algorithm, urlsafe_b64decode(hash_sum.encode('ascii')))
80
81 def open(self, name_or_info, mode="r", pwd=None):
82 def _update_crc(newdata, eof=None):
83 if eof is None:
84 eof = ef._eof
85 update_crc_orig(newdata)
86 else: # Python 2
87 update_crc_orig(newdata, eof)
88
89 running_hash.update(newdata)
90 if eof and running_hash.digest() != expected_hash:
91 raise WheelError("Hash mismatch for file '{}'".format(native(ef_name)))
92
93 ef = ZipFile.open(self, name_or_info, mode, pwd)
94 ef_name = as_unicode(name_or_info.filename if isinstance(name_or_info, ZipInfo)
95 else name_or_info)
96 if mode == 'r' and not ef_name.endswith('/'):
97 if ef_name not in self._file_hashes:
98 raise WheelError("No hash found for file '{}'".format(native(ef_name)))
99
100 algorithm, expected_hash = self._file_hashes[ef_name]
101 if expected_hash is not None:
102 # Monkey patch the _update_crc method to also check for the hash from RECORD
103 running_hash = hashlib.new(algorithm)
104 update_crc_orig, ef._update_crc = ef._update_crc, _update_crc
105
106 return ef
107
108 def write_files(self, base_dir):
109 logger.info("creating '%s' and adding '%s' to it", self.filename, base_dir)
110 deferred = []
111 for root, dirnames, filenames in os.walk(base_dir):
112 # Sort the directory names so that `os.walk` will walk them in a
113 # defined order on the next iteration.
114 dirnames.sort()
115 for name in sorted(filenames):
116 path = os.path.normpath(os.path.join(root, name))
117 if os.path.isfile(path):
118 arcname = os.path.relpath(path, base_dir).replace(os.path.sep, '/')
119 if arcname == self.record_path:
120 pass
121 elif root.endswith('.dist-info'):
122 deferred.append((path, arcname))
123 else:
124 self.write(path, arcname)
125
126 deferred.sort()
127 for path, arcname in deferred:
128 self.write(path, arcname)
129
130 def write(self, filename, arcname=None, compress_type=None):
131 with open(filename, 'rb') as f:
132 st = os.fstat(f.fileno())
133 data = f.read()
134
135 zinfo = ZipInfo(arcname or filename, date_time=get_zipinfo_datetime(st.st_mtime))
136 zinfo.external_attr = (stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode)) << 16
137 zinfo.compress_type = compress_type or self.compression
138 self.writestr(zinfo, data, compress_type)
139
140 def writestr(self, zinfo_or_arcname, bytes, compress_type=None):
141 ZipFile.writestr(self, zinfo_or_arcname, bytes, compress_type)
142 fname = (zinfo_or_arcname.filename if isinstance(zinfo_or_arcname, ZipInfo)
143 else zinfo_or_arcname)
144 logger.info("adding '%s'", fname)
145 if fname != self.record_path:
146 hash_ = self._default_algorithm(bytes)
147 self._file_hashes[fname] = hash_.name, native(urlsafe_b64encode(hash_.digest()))
148 self._file_sizes[fname] = len(bytes)
149
150 def close(self):
151 # Write RECORD
152 if self.fp is not None and self.mode == 'w' and self._file_hashes:
153 data = StringIO()
154 writer = csv.writer(data, delimiter=',', quotechar='"', lineterminator='\n')
155 writer.writerows((
156 (
157 fname,
158 algorithm + "=" + hash_,
159 self._file_sizes[fname]
160 )
161 for fname, (algorithm, hash_) in self._file_hashes.items()
162 ))
163 writer.writerow((format(self.record_path), "", ""))
164 zinfo = ZipInfo(native(self.record_path), date_time=get_zipinfo_datetime())
165 zinfo.compress_type = self.compression
166 zinfo.external_attr = 0o664 << 16
167 self.writestr(zinfo, as_bytes(data.getvalue()))
168
169 ZipFile.close(self)