Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/galaxy/util/checkers.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line source
import gzip import re import sys import tarfile import zipfile from six import BytesIO from six.moves import filter from galaxy import util from galaxy.util.image_util import image_type if sys.version_info < (3, 3): gzip.GzipFile.read1 = gzip.GzipFile.read # workaround for https://bugs.python.org/issue12591 try: import bz2file as bz2 except ImportError: # If bz2file is unavailable, just fallback to not having pbzip2 support. import bz2 else: import bz2 HTML_CHECK_LINES = 100 def check_html(file_path, chunk=None): if chunk is None: temp = open(file_path, mode='rb') elif hasattr(chunk, "splitlines"): temp = chunk.splitlines() else: temp = chunk regexp1 = re.compile(r"<A\s+[^>]*HREF[^>]+>", re.I) regexp2 = re.compile(r"<IFRAME[^>]*>", re.I) regexp3 = re.compile(r"<FRAMESET[^>]*>", re.I) regexp4 = re.compile(r"<META[\W][^>]*>", re.I) regexp5 = re.compile(r"<SCRIPT[^>]*>", re.I) lineno = 0 # TODO: Potentially reading huge lines into string here, this should be # reworked. for line in temp: line = util.unicodify(line) lineno += 1 matches = regexp1.search(line) or regexp2.search(line) or regexp3.search(line) or regexp4.search(line) or regexp5.search(line) if matches: if chunk is None: temp.close() return True if HTML_CHECK_LINES and (lineno > HTML_CHECK_LINES): break if chunk is None: temp.close() return False def check_binary(name, file_path=True): # Handles files if file_path is True or text if file_path is False if file_path: temp = open(name, "rb") else: temp = BytesIO(name) try: return util.is_binary(temp.read(1024)) finally: temp.close() def check_gzip(file_path, check_content=True): # This method returns a tuple of booleans representing ( is_gzipped, is_valid ) # Make sure we have a gzipped file try: with open(file_path, "rb") as temp: magic_check = temp.read(2) if magic_check != util.gzip_magic: return (False, False) except Exception: return (False, False) # We support some binary data types, so check if the compressed binary file is valid # If the file is Bam, it should already have been detected as such, so we'll just check # for sff format. try: with gzip.open(file_path, 'rb') as fh: header = fh.read(4) if header == b'.sff': return (True, True) except Exception: return(False, False) if not check_content: return (True, True) CHUNK_SIZE = 2 ** 15 # 32Kb gzipped_file = gzip.GzipFile(file_path, mode='rb') chunk = gzipped_file.read(CHUNK_SIZE) gzipped_file.close() # See if we have a compressed HTML file if check_html(file_path, chunk=chunk): return (True, False) return (True, True) def check_bz2(file_path, check_content=True): try: with open(file_path, "rb") as temp: magic_check = temp.read(3) if magic_check != util.bz2_magic: return (False, False) except Exception: return(False, False) if not check_content: return (True, True) CHUNK_SIZE = 2 ** 15 # reKb bzipped_file = bz2.BZ2File(file_path, mode='rb') chunk = bzipped_file.read(CHUNK_SIZE) bzipped_file.close() # See if we have a compressed HTML file if check_html(file_path, chunk=chunk): return (True, False) return (True, True) def check_zip(file_path, check_content=True, files=1): if not zipfile.is_zipfile(file_path): return (False, False) if not check_content: return (True, True) CHUNK_SIZE = 2 ** 15 # 32Kb chunk = None for filect, member in enumerate(iter_zip(file_path)): handle, name = member chunk = handle.read(CHUNK_SIZE) if chunk and check_html(file_path, chunk): return (True, False) if filect >= files: break return (True, True) def is_bz2(file_path): is_bz2, is_valid = check_bz2(file_path, check_content=False) return is_bz2 def is_gzip(file_path): is_gzipped, is_valid = check_gzip(file_path, check_content=False) return is_gzipped def is_zip(file_path): is_zipped, is_valid = check_zip(file_path, check_content=False) return is_zipped def is_single_file_zip(file_path): for i, member in enumerate(iter_zip(file_path)): if i > 1: return False return True def is_tar(file_path): return tarfile.is_tarfile(file_path) def iter_zip(file_path): with zipfile.ZipFile(file_path) as z: for f in filter(lambda x: not x.endswith('/'), z.namelist()): yield (z.open(f), f) def check_image(file_path): """ Simple wrapper around image_type to yield a True/False verdict """ if image_type(file_path): return True return False __all__ = ( 'check_binary', 'check_bz2', 'check_gzip', 'check_html', 'check_image', 'check_zip', 'is_gzip', 'is_bz2', 'is_zip', )