diff env/lib/python3.7/site-packages/galaxy/util/compression_utils.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/galaxy/util/compression_utils.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,219 @@
+from __future__ import absolute_import
+
+import gzip
+import io
+import logging
+import os
+import tarfile
+import zipfile
+
+from galaxy.util.path import safe_relpath
+from .checkers import (
+    bz2,
+    is_bz2,
+    is_gzip
+)
+
+log = logging.getLogger(__name__)
+
+
+def get_fileobj(filename, mode="r", compressed_formats=None):
+    """
+    Returns a fileobj. If the file is compressed, return an appropriate file
+    reader. In text mode, always use 'utf-8' encoding.
+
+    :param filename: path to file that should be opened
+    :param mode: mode to pass to opener
+    :param compressed_formats: list of allowed compressed file formats among
+      'bz2', 'gzip' and 'zip'. If left to None, all 3 formats are allowed
+    """
+    return get_fileobj_raw(filename, mode, compressed_formats)[1]
+
+
+def get_fileobj_raw(filename, mode="r", compressed_formats=None):
+    if compressed_formats is None:
+        compressed_formats = ['bz2', 'gzip', 'zip']
+    # Remove 't' from mode, which may cause an error for compressed files
+    mode = mode.replace('t', '')
+    # 'U' mode is deprecated, we open in 'r'.
+    if mode == 'U':
+        mode = 'r'
+    compressed_format = None
+    if 'gzip' in compressed_formats and is_gzip(filename):
+        fh = gzip.GzipFile(filename, mode)
+        compressed_format = 'gzip'
+    elif 'bz2' in compressed_formats and is_bz2(filename):
+        fh = bz2.BZ2File(filename, mode)
+        compressed_format = 'bz2'
+    elif 'zip' in compressed_formats and zipfile.is_zipfile(filename):
+        # Return fileobj for the first file in a zip file.
+        # 'b' is not allowed in the ZipFile mode argument
+        # since it always opens files in binary mode.
+        # For emulating text mode, we will be returning the binary fh in a
+        # TextIOWrapper.
+        zf_mode = mode.replace('b', '')
+        with zipfile.ZipFile(filename, zf_mode) as zh:
+            fh = zh.open(zh.namelist()[0], zf_mode)
+        compressed_format = 'zip'
+    elif 'b' in mode:
+        return compressed_format, open(filename, mode)
+    else:
+        return compressed_format, io.open(filename, mode, encoding='utf-8')
+    if 'b' not in mode:
+        return compressed_format, io.TextIOWrapper(fh, encoding='utf-8')
+    else:
+        return compressed_format, fh
+
+
+class CompressedFile(object):
+
+    @staticmethod
+    def can_decompress(file_path):
+        return tarfile.is_tarfile(file_path) or zipfile.is_zipfile(file_path)
+
+    def __init__(self, file_path, mode='r'):
+        if tarfile.is_tarfile(file_path):
+            self.file_type = 'tar'
+        elif zipfile.is_zipfile(file_path) and not file_path.endswith('.jar'):
+            self.file_type = 'zip'
+        self.file_name = os.path.splitext(os.path.basename(file_path))[0]
+        if self.file_name.endswith('.tar'):
+            self.file_name = os.path.splitext(self.file_name)[0]
+        self.type = self.file_type
+        method = 'open_%s' % self.file_type
+        if hasattr(self, method):
+            self.archive = getattr(self, method)(file_path, mode)
+        else:
+            raise NameError('File type %s specified, no open method found.' % self.file_type)
+
+    @property
+    def common_prefix_dir(self):
+        """
+        Get the common prefix directory for all the files in the archive, if any.
+
+        Returns '' if the archive contains multiple files and/or directories at
+        the root of the archive.
+        """
+        contents = self.getmembers()
+        common_prefix = ''
+        if len(contents) > 1:
+            common_prefix = os.path.commonprefix([self.getname(item) for item in contents])
+            # If the common_prefix does not end with a slash, check that is a
+            # directory and all other files are contained in it
+            if len(common_prefix) >= 1 and not common_prefix.endswith(os.sep) and self.isdir(self.getmember(common_prefix)) \
+                    and all(self.getname(item).startswith(common_prefix + os.sep) for item in contents if self.isfile(item)):
+                common_prefix += os.sep
+            if not common_prefix.endswith(os.sep):
+                common_prefix = ''
+        return common_prefix
+
+    def extract(self, path):
+        '''Determine the path to which the archive should be extracted.'''
+        contents = self.getmembers()
+        extraction_path = path
+        common_prefix_dir = self.common_prefix_dir
+        if len(contents) == 1:
+            # The archive contains a single file, return the extraction path.
+            if self.isfile(contents[0]):
+                extraction_path = os.path.join(path, self.file_name)
+                if not os.path.exists(extraction_path):
+                    os.makedirs(extraction_path)
+                self.archive.extractall(extraction_path, members=self.safemembers())
+        else:
+            if not common_prefix_dir:
+                extraction_path = os.path.join(path, self.file_name)
+                if not os.path.exists(extraction_path):
+                    os.makedirs(extraction_path)
+            self.archive.extractall(extraction_path, members=self.safemembers())
+        # Since .zip files store unix permissions separately, we need to iterate through the zip file
+        # and set permissions on extracted members.
+        if self.file_type == 'zip':
+            for zipped_file in contents:
+                filename = self.getname(zipped_file)
+                absolute_filepath = os.path.join(extraction_path, filename)
+                external_attributes = self.archive.getinfo(filename).external_attr
+                # The 2 least significant bytes are irrelevant, the next two contain unix permissions.
+                unix_permissions = external_attributes >> 16
+                if unix_permissions != 0:
+                    if os.path.exists(absolute_filepath):
+                        os.chmod(absolute_filepath, unix_permissions)
+                    else:
+                        log.warning("Unable to change permission on extracted file '%s' as it does not exist" % absolute_filepath)
+        return os.path.abspath(os.path.join(extraction_path, common_prefix_dir))
+
+    def safemembers(self):
+        members = self.archive
+        common_prefix_dir = self.common_prefix_dir
+        if self.file_type == "tar":
+            for finfo in members:
+                if not safe_relpath(finfo.name):
+                    raise Exception("Path '%s' is blocked (illegal path)." % finfo.name)
+                if finfo.issym() or finfo.islnk():
+                    link_target = os.path.join(os.path.dirname(finfo.name), finfo.linkname)
+                    if not safe_relpath(link_target) or not os.path.normpath(link_target).startswith(common_prefix_dir):
+                        raise Exception("Link '%s' to '%s' is blocked." % (finfo.name, finfo.linkname))
+                yield finfo
+        elif self.file_type == "zip":
+            for name in members.namelist():
+                if not safe_relpath(name):
+                    raise Exception(name + " is blocked (illegal path).")
+                yield name
+
+    def getmembers_tar(self):
+        return self.archive.getmembers()
+
+    def getmembers_zip(self):
+        return self.archive.infolist()
+
+    def getname_tar(self, item):
+        return item.name
+
+    def getname_zip(self, item):
+        return item.filename
+
+    def getmember(self, name):
+        for member in self.getmembers():
+            if self.getname(member) == name:
+                return member
+
+    def getmembers(self):
+        return getattr(self, 'getmembers_%s' % self.type)()
+
+    def getname(self, member):
+        return getattr(self, 'getname_%s' % self.type)(member)
+
+    def isdir(self, member):
+        return getattr(self, 'isdir_%s' % self.type)(member)
+
+    def isdir_tar(self, member):
+        return member.isdir()
+
+    def isdir_zip(self, member):
+        if member.filename.endswith(os.sep):
+            return True
+        return False
+
+    def isfile(self, member):
+        if not self.isdir(member):
+            return True
+        return False
+
+    def open_tar(self, filepath, mode):
+        return tarfile.open(filepath, mode, errorlevel=0)
+
+    def open_zip(self, filepath, mode):
+        return zipfile.ZipFile(filepath, mode)
+
+    def zipfile_ok(self, path_to_archive):
+        """
+        This function is a bit pedantic and not functionally necessary.  It checks whether there is
+        no file pointing outside of the extraction, because ZipFile.extractall() has some potential
+        security holes.  See python zipfile documentation for more details.
+        """
+        basename = os.path.realpath(os.path.dirname(path_to_archive))
+        zip_archive = zipfile.ZipFile(path_to_archive)
+        for member in zip_archive.namelist():
+            member_path = os.path.realpath(os.path.join(basename, member))
+            if not member_path.startswith(basename):
+                return False
+        return True