comparison planemo/lib/python3.7/site-packages/galaxy/util/compression_utils.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 from __future__ import absolute_import
2
3 import gzip
4 import io
5 import logging
6 import os
7 import tarfile
8 import zipfile
9
10 from galaxy.util.path import safe_relpath
11 from .checkers import (
12 bz2,
13 is_bz2,
14 is_gzip
15 )
16
17 log = logging.getLogger(__name__)
18
19
20 def get_fileobj(filename, mode="r", compressed_formats=None):
21 """
22 Returns a fileobj. If the file is compressed, return an appropriate file
23 reader. In text mode, always use 'utf-8' encoding.
24
25 :param filename: path to file that should be opened
26 :param mode: mode to pass to opener
27 :param compressed_formats: list of allowed compressed file formats among
28 'bz2', 'gzip' and 'zip'. If left to None, all 3 formats are allowed
29 """
30 return get_fileobj_raw(filename, mode, compressed_formats)[1]
31
32
33 def get_fileobj_raw(filename, mode="r", compressed_formats=None):
34 if compressed_formats is None:
35 compressed_formats = ['bz2', 'gzip', 'zip']
36 # Remove 't' from mode, which may cause an error for compressed files
37 mode = mode.replace('t', '')
38 # 'U' mode is deprecated, we open in 'r'.
39 if mode == 'U':
40 mode = 'r'
41 compressed_format = None
42 if 'gzip' in compressed_formats and is_gzip(filename):
43 fh = gzip.GzipFile(filename, mode)
44 compressed_format = 'gzip'
45 elif 'bz2' in compressed_formats and is_bz2(filename):
46 fh = bz2.BZ2File(filename, mode)
47 compressed_format = 'bz2'
48 elif 'zip' in compressed_formats and zipfile.is_zipfile(filename):
49 # Return fileobj for the first file in a zip file.
50 # 'b' is not allowed in the ZipFile mode argument
51 # since it always opens files in binary mode.
52 # For emulating text mode, we will be returning the binary fh in a
53 # TextIOWrapper.
54 zf_mode = mode.replace('b', '')
55 with zipfile.ZipFile(filename, zf_mode) as zh:
56 fh = zh.open(zh.namelist()[0], zf_mode)
57 compressed_format = 'zip'
58 elif 'b' in mode:
59 return compressed_format, open(filename, mode)
60 else:
61 return compressed_format, io.open(filename, mode, encoding='utf-8')
62 if 'b' not in mode:
63 return compressed_format, io.TextIOWrapper(fh, encoding='utf-8')
64 else:
65 return compressed_format, fh
66
67
68 def file_iter(fname, sep=None):
69 """
70 This generator iterates over a file and yields its lines
71 splitted via the C{sep} parameter. Skips empty lines and lines starting with
72 the C{#} character.
73
74 >>> lines = [ line for line in file_iter(__file__) ]
75 >>> len(lines) != 0
76 True
77 """
78 with get_fileobj(fname) as fh:
79 for line in fh:
80 if line and line[0] != '#':
81 yield line.split(sep)
82
83
84 class CompressedFile(object):
85
86 @staticmethod
87 def can_decompress(file_path):
88 return tarfile.is_tarfile(file_path) or zipfile.is_zipfile(file_path)
89
90 def __init__(self, file_path, mode='r'):
91 if tarfile.is_tarfile(file_path):
92 self.file_type = 'tar'
93 elif zipfile.is_zipfile(file_path) and not file_path.endswith('.jar'):
94 self.file_type = 'zip'
95 self.file_name = os.path.splitext(os.path.basename(file_path))[0]
96 if self.file_name.endswith('.tar'):
97 self.file_name = os.path.splitext(self.file_name)[0]
98 self.type = self.file_type
99 method = 'open_%s' % self.file_type
100 if hasattr(self, method):
101 self.archive = getattr(self, method)(file_path, mode)
102 else:
103 raise NameError('File type %s specified, no open method found.' % self.file_type)
104
105 @property
106 def common_prefix_dir(self):
107 """
108 Get the common prefix directory for all the files in the archive, if any.
109
110 Returns '' if the archive contains multiple files and/or directories at
111 the root of the archive.
112 """
113 contents = self.getmembers()
114 common_prefix = ''
115 if len(contents) > 1:
116 common_prefix = os.path.commonprefix([self.getname(item) for item in contents])
117 # If the common_prefix does not end with a slash, check that is a
118 # directory and all other files are contained in it
119 if len(common_prefix) >= 1 and not common_prefix.endswith(os.sep) and self.isdir(self.getmember(common_prefix)) \
120 and all(self.getname(item).startswith(common_prefix + os.sep) for item in contents if self.isfile(item)):
121 common_prefix += os.sep
122 if not common_prefix.endswith(os.sep):
123 common_prefix = ''
124 return common_prefix
125
126 def extract(self, path):
127 '''Determine the path to which the archive should be extracted.'''
128 contents = self.getmembers()
129 extraction_path = path
130 common_prefix_dir = self.common_prefix_dir
131 if len(contents) == 1:
132 # The archive contains a single file, return the extraction path.
133 if self.isfile(contents[0]):
134 extraction_path = os.path.join(path, self.file_name)
135 if not os.path.exists(extraction_path):
136 os.makedirs(extraction_path)
137 self.archive.extractall(extraction_path, members=self.safemembers())
138 else:
139 if not common_prefix_dir:
140 extraction_path = os.path.join(path, self.file_name)
141 if not os.path.exists(extraction_path):
142 os.makedirs(extraction_path)
143 self.archive.extractall(extraction_path, members=self.safemembers())
144 # Since .zip files store unix permissions separately, we need to iterate through the zip file
145 # and set permissions on extracted members.
146 if self.file_type == 'zip':
147 for zipped_file in contents:
148 filename = self.getname(zipped_file)
149 absolute_filepath = os.path.join(extraction_path, filename)
150 external_attributes = self.archive.getinfo(filename).external_attr
151 # The 2 least significant bytes are irrelevant, the next two contain unix permissions.
152 unix_permissions = external_attributes >> 16
153 if unix_permissions != 0:
154 if os.path.exists(absolute_filepath):
155 os.chmod(absolute_filepath, unix_permissions)
156 else:
157 log.warning("Unable to change permission on extracted file '%s' as it does not exist" % absolute_filepath)
158 return os.path.abspath(os.path.join(extraction_path, common_prefix_dir))
159
160 def safemembers(self):
161 members = self.archive
162 common_prefix_dir = self.common_prefix_dir
163 if self.file_type == "tar":
164 for finfo in members:
165 if not safe_relpath(finfo.name):
166 raise Exception("Path '%s' is blocked (illegal path)." % finfo.name)
167 if finfo.issym() or finfo.islnk():
168 link_target = os.path.join(os.path.dirname(finfo.name), finfo.linkname)
169 if not safe_relpath(link_target) or not os.path.normpath(link_target).startswith(common_prefix_dir):
170 raise Exception("Link '%s' to '%s' is blocked." % (finfo.name, finfo.linkname))
171 yield finfo
172 elif self.file_type == "zip":
173 for name in members.namelist():
174 if not safe_relpath(name):
175 raise Exception(name + " is blocked (illegal path).")
176 yield name
177
178 def getmembers_tar(self):
179 return self.archive.getmembers()
180
181 def getmembers_zip(self):
182 return self.archive.infolist()
183
184 def getname_tar(self, item):
185 return item.name
186
187 def getname_zip(self, item):
188 return item.filename
189
190 def getmember(self, name):
191 for member in self.getmembers():
192 if self.getname(member) == name:
193 return member
194
195 def getmembers(self):
196 return getattr(self, 'getmembers_%s' % self.type)()
197
198 def getname(self, member):
199 return getattr(self, 'getname_%s' % self.type)(member)
200
201 def isdir(self, member):
202 return getattr(self, 'isdir_%s' % self.type)(member)
203
204 def isdir_tar(self, member):
205 return member.isdir()
206
207 def isdir_zip(self, member):
208 if member.filename.endswith(os.sep):
209 return True
210 return False
211
212 def isfile(self, member):
213 if not self.isdir(member):
214 return True
215 return False
216
217 def open_tar(self, filepath, mode):
218 return tarfile.open(filepath, mode, errorlevel=0)
219
220 def open_zip(self, filepath, mode):
221 return zipfile.ZipFile(filepath, mode)
222
223 def zipfile_ok(self, path_to_archive):
224 """
225 This function is a bit pedantic and not functionally necessary. It checks whether there is
226 no file pointing outside of the extraction, because ZipFile.extractall() has some potential
227 security holes. See python zipfile documentation for more details.
228 """
229 basename = os.path.realpath(os.path.dirname(path_to_archive))
230 zip_archive = zipfile.ZipFile(path_to_archive)
231 for member in zip_archive.namelist():
232 member_path = os.path.realpath(os.path.join(basename, member))
233 if not member_path.startswith(basename):
234 return False
235 return True