comparison env/lib/python3.7/site-packages/galaxy/util/compression_utils.py @ 2:6af9afd405e9 draft

"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author shellac
date Thu, 14 May 2020 14:56:58 -0400
parents 26e78fe6e8c4
children
comparison
equal deleted inserted replaced
1:75ca89e9b81c 2:6af9afd405e9
1 from __future__ import absolute_import
2
3 import gzip
4 import io
5 import logging
6 import os
7 import tarfile
8 import zipfile
9
10 from galaxy.util.path import safe_relpath
11 from .checkers import (
12 bz2,
13 is_bz2,
14 is_gzip
15 )
16
17 log = logging.getLogger(__name__)
18
19
20 def get_fileobj(filename, mode="r", compressed_formats=None):
21 """
22 Returns a fileobj. If the file is compressed, return an appropriate file
23 reader. In text mode, always use 'utf-8' encoding.
24
25 :param filename: path to file that should be opened
26 :param mode: mode to pass to opener
27 :param compressed_formats: list of allowed compressed file formats among
28 'bz2', 'gzip' and 'zip'. If left to None, all 3 formats are allowed
29 """
30 return get_fileobj_raw(filename, mode, compressed_formats)[1]
31
32
33 def get_fileobj_raw(filename, mode="r", compressed_formats=None):
34 if compressed_formats is None:
35 compressed_formats = ['bz2', 'gzip', 'zip']
36 # Remove 't' from mode, which may cause an error for compressed files
37 mode = mode.replace('t', '')
38 # 'U' mode is deprecated, we open in 'r'.
39 if mode == 'U':
40 mode = 'r'
41 compressed_format = None
42 if 'gzip' in compressed_formats and is_gzip(filename):
43 fh = gzip.GzipFile(filename, mode)
44 compressed_format = 'gzip'
45 elif 'bz2' in compressed_formats and is_bz2(filename):
46 fh = bz2.BZ2File(filename, mode)
47 compressed_format = 'bz2'
48 elif 'zip' in compressed_formats and zipfile.is_zipfile(filename):
49 # Return fileobj for the first file in a zip file.
50 # 'b' is not allowed in the ZipFile mode argument
51 # since it always opens files in binary mode.
52 # For emulating text mode, we will be returning the binary fh in a
53 # TextIOWrapper.
54 zf_mode = mode.replace('b', '')
55 with zipfile.ZipFile(filename, zf_mode) as zh:
56 fh = zh.open(zh.namelist()[0], zf_mode)
57 compressed_format = 'zip'
58 elif 'b' in mode:
59 return compressed_format, open(filename, mode)
60 else:
61 return compressed_format, io.open(filename, mode, encoding='utf-8')
62 if 'b' not in mode:
63 return compressed_format, io.TextIOWrapper(fh, encoding='utf-8')
64 else:
65 return compressed_format, fh
66
67
68 class CompressedFile(object):
69
70 @staticmethod
71 def can_decompress(file_path):
72 return tarfile.is_tarfile(file_path) or zipfile.is_zipfile(file_path)
73
74 def __init__(self, file_path, mode='r'):
75 if tarfile.is_tarfile(file_path):
76 self.file_type = 'tar'
77 elif zipfile.is_zipfile(file_path) and not file_path.endswith('.jar'):
78 self.file_type = 'zip'
79 self.file_name = os.path.splitext(os.path.basename(file_path))[0]
80 if self.file_name.endswith('.tar'):
81 self.file_name = os.path.splitext(self.file_name)[0]
82 self.type = self.file_type
83 method = 'open_%s' % self.file_type
84 if hasattr(self, method):
85 self.archive = getattr(self, method)(file_path, mode)
86 else:
87 raise NameError('File type %s specified, no open method found.' % self.file_type)
88
89 @property
90 def common_prefix_dir(self):
91 """
92 Get the common prefix directory for all the files in the archive, if any.
93
94 Returns '' if the archive contains multiple files and/or directories at
95 the root of the archive.
96 """
97 contents = self.getmembers()
98 common_prefix = ''
99 if len(contents) > 1:
100 common_prefix = os.path.commonprefix([self.getname(item) for item in contents])
101 # If the common_prefix does not end with a slash, check that is a
102 # directory and all other files are contained in it
103 if len(common_prefix) >= 1 and not common_prefix.endswith(os.sep) and self.isdir(self.getmember(common_prefix)) \
104 and all(self.getname(item).startswith(common_prefix + os.sep) for item in contents if self.isfile(item)):
105 common_prefix += os.sep
106 if not common_prefix.endswith(os.sep):
107 common_prefix = ''
108 return common_prefix
109
110 def extract(self, path):
111 '''Determine the path to which the archive should be extracted.'''
112 contents = self.getmembers()
113 extraction_path = path
114 common_prefix_dir = self.common_prefix_dir
115 if len(contents) == 1:
116 # The archive contains a single file, return the extraction path.
117 if self.isfile(contents[0]):
118 extraction_path = os.path.join(path, self.file_name)
119 if not os.path.exists(extraction_path):
120 os.makedirs(extraction_path)
121 self.archive.extractall(extraction_path, members=self.safemembers())
122 else:
123 if not common_prefix_dir:
124 extraction_path = os.path.join(path, self.file_name)
125 if not os.path.exists(extraction_path):
126 os.makedirs(extraction_path)
127 self.archive.extractall(extraction_path, members=self.safemembers())
128 # Since .zip files store unix permissions separately, we need to iterate through the zip file
129 # and set permissions on extracted members.
130 if self.file_type == 'zip':
131 for zipped_file in contents:
132 filename = self.getname(zipped_file)
133 absolute_filepath = os.path.join(extraction_path, filename)
134 external_attributes = self.archive.getinfo(filename).external_attr
135 # The 2 least significant bytes are irrelevant, the next two contain unix permissions.
136 unix_permissions = external_attributes >> 16
137 if unix_permissions != 0:
138 if os.path.exists(absolute_filepath):
139 os.chmod(absolute_filepath, unix_permissions)
140 else:
141 log.warning("Unable to change permission on extracted file '%s' as it does not exist" % absolute_filepath)
142 return os.path.abspath(os.path.join(extraction_path, common_prefix_dir))
143
144 def safemembers(self):
145 members = self.archive
146 common_prefix_dir = self.common_prefix_dir
147 if self.file_type == "tar":
148 for finfo in members:
149 if not safe_relpath(finfo.name):
150 raise Exception("Path '%s' is blocked (illegal path)." % finfo.name)
151 if finfo.issym() or finfo.islnk():
152 link_target = os.path.join(os.path.dirname(finfo.name), finfo.linkname)
153 if not safe_relpath(link_target) or not os.path.normpath(link_target).startswith(common_prefix_dir):
154 raise Exception("Link '%s' to '%s' is blocked." % (finfo.name, finfo.linkname))
155 yield finfo
156 elif self.file_type == "zip":
157 for name in members.namelist():
158 if not safe_relpath(name):
159 raise Exception(name + " is blocked (illegal path).")
160 yield name
161
162 def getmembers_tar(self):
163 return self.archive.getmembers()
164
165 def getmembers_zip(self):
166 return self.archive.infolist()
167
168 def getname_tar(self, item):
169 return item.name
170
171 def getname_zip(self, item):
172 return item.filename
173
174 def getmember(self, name):
175 for member in self.getmembers():
176 if self.getname(member) == name:
177 return member
178
179 def getmembers(self):
180 return getattr(self, 'getmembers_%s' % self.type)()
181
182 def getname(self, member):
183 return getattr(self, 'getname_%s' % self.type)(member)
184
185 def isdir(self, member):
186 return getattr(self, 'isdir_%s' % self.type)(member)
187
188 def isdir_tar(self, member):
189 return member.isdir()
190
191 def isdir_zip(self, member):
192 if member.filename.endswith(os.sep):
193 return True
194 return False
195
196 def isfile(self, member):
197 if not self.isdir(member):
198 return True
199 return False
200
201 def open_tar(self, filepath, mode):
202 return tarfile.open(filepath, mode, errorlevel=0)
203
204 def open_zip(self, filepath, mode):
205 return zipfile.ZipFile(filepath, mode)
206
207 def zipfile_ok(self, path_to_archive):
208 """
209 This function is a bit pedantic and not functionally necessary. It checks whether there is
210 no file pointing outside of the extraction, because ZipFile.extractall() has some potential
211 security holes. See python zipfile documentation for more details.
212 """
213 basename = os.path.realpath(os.path.dirname(path_to_archive))
214 zip_archive = zipfile.ZipFile(path_to_archive)
215 for member in zip_archive.namelist():
216 member_path = os.path.realpath(os.path.join(basename, member))
217 if not member_path.startswith(basename):
218 return False
219 return True