Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/galaxy/util/compression_utils.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 from __future__ import absolute_import | |
2 | |
3 import gzip | |
4 import io | |
5 import logging | |
6 import os | |
7 import tarfile | |
8 import zipfile | |
9 | |
10 from galaxy.util.path import safe_relpath | |
11 from .checkers import ( | |
12 bz2, | |
13 is_bz2, | |
14 is_gzip | |
15 ) | |
16 | |
17 log = logging.getLogger(__name__) | |
18 | |
19 | |
20 def get_fileobj(filename, mode="r", compressed_formats=None): | |
21 """ | |
22 Returns a fileobj. If the file is compressed, return an appropriate file | |
23 reader. In text mode, always use 'utf-8' encoding. | |
24 | |
25 :param filename: path to file that should be opened | |
26 :param mode: mode to pass to opener | |
27 :param compressed_formats: list of allowed compressed file formats among | |
28 'bz2', 'gzip' and 'zip'. If left to None, all 3 formats are allowed | |
29 """ | |
30 return get_fileobj_raw(filename, mode, compressed_formats)[1] | |
31 | |
32 | |
33 def get_fileobj_raw(filename, mode="r", compressed_formats=None): | |
34 if compressed_formats is None: | |
35 compressed_formats = ['bz2', 'gzip', 'zip'] | |
36 # Remove 't' from mode, which may cause an error for compressed files | |
37 mode = mode.replace('t', '') | |
38 # 'U' mode is deprecated, we open in 'r'. | |
39 if mode == 'U': | |
40 mode = 'r' | |
41 compressed_format = None | |
42 if 'gzip' in compressed_formats and is_gzip(filename): | |
43 fh = gzip.GzipFile(filename, mode) | |
44 compressed_format = 'gzip' | |
45 elif 'bz2' in compressed_formats and is_bz2(filename): | |
46 fh = bz2.BZ2File(filename, mode) | |
47 compressed_format = 'bz2' | |
48 elif 'zip' in compressed_formats and zipfile.is_zipfile(filename): | |
49 # Return fileobj for the first file in a zip file. | |
50 # 'b' is not allowed in the ZipFile mode argument | |
51 # since it always opens files in binary mode. | |
52 # For emulating text mode, we will be returning the binary fh in a | |
53 # TextIOWrapper. | |
54 zf_mode = mode.replace('b', '') | |
55 with zipfile.ZipFile(filename, zf_mode) as zh: | |
56 fh = zh.open(zh.namelist()[0], zf_mode) | |
57 compressed_format = 'zip' | |
58 elif 'b' in mode: | |
59 return compressed_format, open(filename, mode) | |
60 else: | |
61 return compressed_format, io.open(filename, mode, encoding='utf-8') | |
62 if 'b' not in mode: | |
63 return compressed_format, io.TextIOWrapper(fh, encoding='utf-8') | |
64 else: | |
65 return compressed_format, fh | |
66 | |
67 | |
68 def file_iter(fname, sep=None): | |
69 """ | |
70 This generator iterates over a file and yields its lines | |
71 splitted via the C{sep} parameter. Skips empty lines and lines starting with | |
72 the C{#} character. | |
73 | |
74 >>> lines = [ line for line in file_iter(__file__) ] | |
75 >>> len(lines) != 0 | |
76 True | |
77 """ | |
78 with get_fileobj(fname) as fh: | |
79 for line in fh: | |
80 if line and line[0] != '#': | |
81 yield line.split(sep) | |
82 | |
83 | |
84 class CompressedFile(object): | |
85 | |
86 @staticmethod | |
87 def can_decompress(file_path): | |
88 return tarfile.is_tarfile(file_path) or zipfile.is_zipfile(file_path) | |
89 | |
90 def __init__(self, file_path, mode='r'): | |
91 if tarfile.is_tarfile(file_path): | |
92 self.file_type = 'tar' | |
93 elif zipfile.is_zipfile(file_path) and not file_path.endswith('.jar'): | |
94 self.file_type = 'zip' | |
95 self.file_name = os.path.splitext(os.path.basename(file_path))[0] | |
96 if self.file_name.endswith('.tar'): | |
97 self.file_name = os.path.splitext(self.file_name)[0] | |
98 self.type = self.file_type | |
99 method = 'open_%s' % self.file_type | |
100 if hasattr(self, method): | |
101 self.archive = getattr(self, method)(file_path, mode) | |
102 else: | |
103 raise NameError('File type %s specified, no open method found.' % self.file_type) | |
104 | |
105 @property | |
106 def common_prefix_dir(self): | |
107 """ | |
108 Get the common prefix directory for all the files in the archive, if any. | |
109 | |
110 Returns '' if the archive contains multiple files and/or directories at | |
111 the root of the archive. | |
112 """ | |
113 contents = self.getmembers() | |
114 common_prefix = '' | |
115 if len(contents) > 1: | |
116 common_prefix = os.path.commonprefix([self.getname(item) for item in contents]) | |
117 # If the common_prefix does not end with a slash, check that is a | |
118 # directory and all other files are contained in it | |
119 if len(common_prefix) >= 1 and not common_prefix.endswith(os.sep) and self.isdir(self.getmember(common_prefix)) \ | |
120 and all(self.getname(item).startswith(common_prefix + os.sep) for item in contents if self.isfile(item)): | |
121 common_prefix += os.sep | |
122 if not common_prefix.endswith(os.sep): | |
123 common_prefix = '' | |
124 return common_prefix | |
125 | |
126 def extract(self, path): | |
127 '''Determine the path to which the archive should be extracted.''' | |
128 contents = self.getmembers() | |
129 extraction_path = path | |
130 common_prefix_dir = self.common_prefix_dir | |
131 if len(contents) == 1: | |
132 # The archive contains a single file, return the extraction path. | |
133 if self.isfile(contents[0]): | |
134 extraction_path = os.path.join(path, self.file_name) | |
135 if not os.path.exists(extraction_path): | |
136 os.makedirs(extraction_path) | |
137 self.archive.extractall(extraction_path, members=self.safemembers()) | |
138 else: | |
139 if not common_prefix_dir: | |
140 extraction_path = os.path.join(path, self.file_name) | |
141 if not os.path.exists(extraction_path): | |
142 os.makedirs(extraction_path) | |
143 self.archive.extractall(extraction_path, members=self.safemembers()) | |
144 # Since .zip files store unix permissions separately, we need to iterate through the zip file | |
145 # and set permissions on extracted members. | |
146 if self.file_type == 'zip': | |
147 for zipped_file in contents: | |
148 filename = self.getname(zipped_file) | |
149 absolute_filepath = os.path.join(extraction_path, filename) | |
150 external_attributes = self.archive.getinfo(filename).external_attr | |
151 # The 2 least significant bytes are irrelevant, the next two contain unix permissions. | |
152 unix_permissions = external_attributes >> 16 | |
153 if unix_permissions != 0: | |
154 if os.path.exists(absolute_filepath): | |
155 os.chmod(absolute_filepath, unix_permissions) | |
156 else: | |
157 log.warning("Unable to change permission on extracted file '%s' as it does not exist" % absolute_filepath) | |
158 return os.path.abspath(os.path.join(extraction_path, common_prefix_dir)) | |
159 | |
160 def safemembers(self): | |
161 members = self.archive | |
162 common_prefix_dir = self.common_prefix_dir | |
163 if self.file_type == "tar": | |
164 for finfo in members: | |
165 if not safe_relpath(finfo.name): | |
166 raise Exception("Path '%s' is blocked (illegal path)." % finfo.name) | |
167 if finfo.issym() or finfo.islnk(): | |
168 link_target = os.path.join(os.path.dirname(finfo.name), finfo.linkname) | |
169 if not safe_relpath(link_target) or not os.path.normpath(link_target).startswith(common_prefix_dir): | |
170 raise Exception("Link '%s' to '%s' is blocked." % (finfo.name, finfo.linkname)) | |
171 yield finfo | |
172 elif self.file_type == "zip": | |
173 for name in members.namelist(): | |
174 if not safe_relpath(name): | |
175 raise Exception(name + " is blocked (illegal path).") | |
176 yield name | |
177 | |
178 def getmembers_tar(self): | |
179 return self.archive.getmembers() | |
180 | |
181 def getmembers_zip(self): | |
182 return self.archive.infolist() | |
183 | |
184 def getname_tar(self, item): | |
185 return item.name | |
186 | |
187 def getname_zip(self, item): | |
188 return item.filename | |
189 | |
190 def getmember(self, name): | |
191 for member in self.getmembers(): | |
192 if self.getname(member) == name: | |
193 return member | |
194 | |
195 def getmembers(self): | |
196 return getattr(self, 'getmembers_%s' % self.type)() | |
197 | |
198 def getname(self, member): | |
199 return getattr(self, 'getname_%s' % self.type)(member) | |
200 | |
201 def isdir(self, member): | |
202 return getattr(self, 'isdir_%s' % self.type)(member) | |
203 | |
204 def isdir_tar(self, member): | |
205 return member.isdir() | |
206 | |
207 def isdir_zip(self, member): | |
208 if member.filename.endswith(os.sep): | |
209 return True | |
210 return False | |
211 | |
212 def isfile(self, member): | |
213 if not self.isdir(member): | |
214 return True | |
215 return False | |
216 | |
217 def open_tar(self, filepath, mode): | |
218 return tarfile.open(filepath, mode, errorlevel=0) | |
219 | |
220 def open_zip(self, filepath, mode): | |
221 return zipfile.ZipFile(filepath, mode) | |
222 | |
223 def zipfile_ok(self, path_to_archive): | |
224 """ | |
225 This function is a bit pedantic and not functionally necessary. It checks whether there is | |
226 no file pointing outside of the extraction, because ZipFile.extractall() has some potential | |
227 security holes. See python zipfile documentation for more details. | |
228 """ | |
229 basename = os.path.realpath(os.path.dirname(path_to_archive)) | |
230 zip_archive = zipfile.ZipFile(path_to_archive) | |
231 for member in zip_archive.namelist(): | |
232 member_path = os.path.realpath(os.path.join(basename, member)) | |
233 if not member_path.startswith(basename): | |
234 return False | |
235 return True |