Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/galaxy/util/compression_utils.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
comparison
equal
deleted
inserted
replaced
1:75ca89e9b81c | 2:6af9afd405e9 |
---|---|
1 from __future__ import absolute_import | |
2 | |
3 import gzip | |
4 import io | |
5 import logging | |
6 import os | |
7 import tarfile | |
8 import zipfile | |
9 | |
10 from galaxy.util.path import safe_relpath | |
11 from .checkers import ( | |
12 bz2, | |
13 is_bz2, | |
14 is_gzip | |
15 ) | |
16 | |
17 log = logging.getLogger(__name__) | |
18 | |
19 | |
20 def get_fileobj(filename, mode="r", compressed_formats=None): | |
21 """ | |
22 Returns a fileobj. If the file is compressed, return an appropriate file | |
23 reader. In text mode, always use 'utf-8' encoding. | |
24 | |
25 :param filename: path to file that should be opened | |
26 :param mode: mode to pass to opener | |
27 :param compressed_formats: list of allowed compressed file formats among | |
28 'bz2', 'gzip' and 'zip'. If left to None, all 3 formats are allowed | |
29 """ | |
30 return get_fileobj_raw(filename, mode, compressed_formats)[1] | |
31 | |
32 | |
33 def get_fileobj_raw(filename, mode="r", compressed_formats=None): | |
34 if compressed_formats is None: | |
35 compressed_formats = ['bz2', 'gzip', 'zip'] | |
36 # Remove 't' from mode, which may cause an error for compressed files | |
37 mode = mode.replace('t', '') | |
38 # 'U' mode is deprecated, we open in 'r'. | |
39 if mode == 'U': | |
40 mode = 'r' | |
41 compressed_format = None | |
42 if 'gzip' in compressed_formats and is_gzip(filename): | |
43 fh = gzip.GzipFile(filename, mode) | |
44 compressed_format = 'gzip' | |
45 elif 'bz2' in compressed_formats and is_bz2(filename): | |
46 fh = bz2.BZ2File(filename, mode) | |
47 compressed_format = 'bz2' | |
48 elif 'zip' in compressed_formats and zipfile.is_zipfile(filename): | |
49 # Return fileobj for the first file in a zip file. | |
50 # 'b' is not allowed in the ZipFile mode argument | |
51 # since it always opens files in binary mode. | |
52 # For emulating text mode, we will be returning the binary fh in a | |
53 # TextIOWrapper. | |
54 zf_mode = mode.replace('b', '') | |
55 with zipfile.ZipFile(filename, zf_mode) as zh: | |
56 fh = zh.open(zh.namelist()[0], zf_mode) | |
57 compressed_format = 'zip' | |
58 elif 'b' in mode: | |
59 return compressed_format, open(filename, mode) | |
60 else: | |
61 return compressed_format, io.open(filename, mode, encoding='utf-8') | |
62 if 'b' not in mode: | |
63 return compressed_format, io.TextIOWrapper(fh, encoding='utf-8') | |
64 else: | |
65 return compressed_format, fh | |
66 | |
67 | |
68 class CompressedFile(object): | |
69 | |
70 @staticmethod | |
71 def can_decompress(file_path): | |
72 return tarfile.is_tarfile(file_path) or zipfile.is_zipfile(file_path) | |
73 | |
74 def __init__(self, file_path, mode='r'): | |
75 if tarfile.is_tarfile(file_path): | |
76 self.file_type = 'tar' | |
77 elif zipfile.is_zipfile(file_path) and not file_path.endswith('.jar'): | |
78 self.file_type = 'zip' | |
79 self.file_name = os.path.splitext(os.path.basename(file_path))[0] | |
80 if self.file_name.endswith('.tar'): | |
81 self.file_name = os.path.splitext(self.file_name)[0] | |
82 self.type = self.file_type | |
83 method = 'open_%s' % self.file_type | |
84 if hasattr(self, method): | |
85 self.archive = getattr(self, method)(file_path, mode) | |
86 else: | |
87 raise NameError('File type %s specified, no open method found.' % self.file_type) | |
88 | |
89 @property | |
90 def common_prefix_dir(self): | |
91 """ | |
92 Get the common prefix directory for all the files in the archive, if any. | |
93 | |
94 Returns '' if the archive contains multiple files and/or directories at | |
95 the root of the archive. | |
96 """ | |
97 contents = self.getmembers() | |
98 common_prefix = '' | |
99 if len(contents) > 1: | |
100 common_prefix = os.path.commonprefix([self.getname(item) for item in contents]) | |
101 # If the common_prefix does not end with a slash, check that is a | |
102 # directory and all other files are contained in it | |
103 if len(common_prefix) >= 1 and not common_prefix.endswith(os.sep) and self.isdir(self.getmember(common_prefix)) \ | |
104 and all(self.getname(item).startswith(common_prefix + os.sep) for item in contents if self.isfile(item)): | |
105 common_prefix += os.sep | |
106 if not common_prefix.endswith(os.sep): | |
107 common_prefix = '' | |
108 return common_prefix | |
109 | |
110 def extract(self, path): | |
111 '''Determine the path to which the archive should be extracted.''' | |
112 contents = self.getmembers() | |
113 extraction_path = path | |
114 common_prefix_dir = self.common_prefix_dir | |
115 if len(contents) == 1: | |
116 # The archive contains a single file, return the extraction path. | |
117 if self.isfile(contents[0]): | |
118 extraction_path = os.path.join(path, self.file_name) | |
119 if not os.path.exists(extraction_path): | |
120 os.makedirs(extraction_path) | |
121 self.archive.extractall(extraction_path, members=self.safemembers()) | |
122 else: | |
123 if not common_prefix_dir: | |
124 extraction_path = os.path.join(path, self.file_name) | |
125 if not os.path.exists(extraction_path): | |
126 os.makedirs(extraction_path) | |
127 self.archive.extractall(extraction_path, members=self.safemembers()) | |
128 # Since .zip files store unix permissions separately, we need to iterate through the zip file | |
129 # and set permissions on extracted members. | |
130 if self.file_type == 'zip': | |
131 for zipped_file in contents: | |
132 filename = self.getname(zipped_file) | |
133 absolute_filepath = os.path.join(extraction_path, filename) | |
134 external_attributes = self.archive.getinfo(filename).external_attr | |
135 # The 2 least significant bytes are irrelevant, the next two contain unix permissions. | |
136 unix_permissions = external_attributes >> 16 | |
137 if unix_permissions != 0: | |
138 if os.path.exists(absolute_filepath): | |
139 os.chmod(absolute_filepath, unix_permissions) | |
140 else: | |
141 log.warning("Unable to change permission on extracted file '%s' as it does not exist" % absolute_filepath) | |
142 return os.path.abspath(os.path.join(extraction_path, common_prefix_dir)) | |
143 | |
144 def safemembers(self): | |
145 members = self.archive | |
146 common_prefix_dir = self.common_prefix_dir | |
147 if self.file_type == "tar": | |
148 for finfo in members: | |
149 if not safe_relpath(finfo.name): | |
150 raise Exception("Path '%s' is blocked (illegal path)." % finfo.name) | |
151 if finfo.issym() or finfo.islnk(): | |
152 link_target = os.path.join(os.path.dirname(finfo.name), finfo.linkname) | |
153 if not safe_relpath(link_target) or not os.path.normpath(link_target).startswith(common_prefix_dir): | |
154 raise Exception("Link '%s' to '%s' is blocked." % (finfo.name, finfo.linkname)) | |
155 yield finfo | |
156 elif self.file_type == "zip": | |
157 for name in members.namelist(): | |
158 if not safe_relpath(name): | |
159 raise Exception(name + " is blocked (illegal path).") | |
160 yield name | |
161 | |
162 def getmembers_tar(self): | |
163 return self.archive.getmembers() | |
164 | |
165 def getmembers_zip(self): | |
166 return self.archive.infolist() | |
167 | |
168 def getname_tar(self, item): | |
169 return item.name | |
170 | |
171 def getname_zip(self, item): | |
172 return item.filename | |
173 | |
174 def getmember(self, name): | |
175 for member in self.getmembers(): | |
176 if self.getname(member) == name: | |
177 return member | |
178 | |
179 def getmembers(self): | |
180 return getattr(self, 'getmembers_%s' % self.type)() | |
181 | |
182 def getname(self, member): | |
183 return getattr(self, 'getname_%s' % self.type)(member) | |
184 | |
185 def isdir(self, member): | |
186 return getattr(self, 'isdir_%s' % self.type)(member) | |
187 | |
188 def isdir_tar(self, member): | |
189 return member.isdir() | |
190 | |
191 def isdir_zip(self, member): | |
192 if member.filename.endswith(os.sep): | |
193 return True | |
194 return False | |
195 | |
196 def isfile(self, member): | |
197 if not self.isdir(member): | |
198 return True | |
199 return False | |
200 | |
201 def open_tar(self, filepath, mode): | |
202 return tarfile.open(filepath, mode, errorlevel=0) | |
203 | |
204 def open_zip(self, filepath, mode): | |
205 return zipfile.ZipFile(filepath, mode) | |
206 | |
207 def zipfile_ok(self, path_to_archive): | |
208 """ | |
209 This function is a bit pedantic and not functionally necessary. It checks whether there is | |
210 no file pointing outside of the extraction, because ZipFile.extractall() has some potential | |
211 security holes. See python zipfile documentation for more details. | |
212 """ | |
213 basename = os.path.realpath(os.path.dirname(path_to_archive)) | |
214 zip_archive = zipfile.ZipFile(path_to_archive) | |
215 for member in zip_archive.namelist(): | |
216 member_path = os.path.realpath(os.path.join(basename, member)) | |
217 if not member_path.startswith(basename): | |
218 return False | |
219 return True |