Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/galaxy/util/checkers.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
comparison
equal
deleted
inserted
replaced
| 4:79f47841a781 | 5:9b1c78e6ba9c |
|---|---|
| 1 import gzip | |
| 2 import re | |
| 3 import sys | |
| 4 import tarfile | |
| 5 import zipfile | |
| 6 | |
| 7 from six import BytesIO | |
| 8 from six.moves import filter | |
| 9 | |
| 10 from galaxy import util | |
| 11 from galaxy.util.image_util import image_type | |
| 12 | |
| 13 if sys.version_info < (3, 3): | |
| 14 gzip.GzipFile.read1 = gzip.GzipFile.read # workaround for https://bugs.python.org/issue12591 | |
| 15 try: | |
| 16 import bz2file as bz2 | |
| 17 except ImportError: | |
| 18 # If bz2file is unavailable, just fallback to not having pbzip2 support. | |
| 19 import bz2 | |
| 20 else: | |
| 21 import bz2 | |
| 22 | |
| 23 HTML_CHECK_LINES = 100 | |
| 24 | |
| 25 | |
| 26 def check_html(file_path, chunk=None): | |
| 27 if chunk is None: | |
| 28 temp = open(file_path, mode='rb') | |
| 29 elif hasattr(chunk, "splitlines"): | |
| 30 temp = chunk.splitlines() | |
| 31 else: | |
| 32 temp = chunk | |
| 33 regexp1 = re.compile(r"<A\s+[^>]*HREF[^>]+>", re.I) | |
| 34 regexp2 = re.compile(r"<IFRAME[^>]*>", re.I) | |
| 35 regexp3 = re.compile(r"<FRAMESET[^>]*>", re.I) | |
| 36 regexp4 = re.compile(r"<META[\W][^>]*>", re.I) | |
| 37 regexp5 = re.compile(r"<SCRIPT[^>]*>", re.I) | |
| 38 lineno = 0 | |
| 39 # TODO: Potentially reading huge lines into string here, this should be | |
| 40 # reworked. | |
| 41 for line in temp: | |
| 42 line = util.unicodify(line) | |
| 43 lineno += 1 | |
| 44 matches = regexp1.search(line) or regexp2.search(line) or regexp3.search(line) or regexp4.search(line) or regexp5.search(line) | |
| 45 if matches: | |
| 46 if chunk is None: | |
| 47 temp.close() | |
| 48 return True | |
| 49 if HTML_CHECK_LINES and (lineno > HTML_CHECK_LINES): | |
| 50 break | |
| 51 if chunk is None: | |
| 52 temp.close() | |
| 53 return False | |
| 54 | |
| 55 | |
| 56 def check_binary(name, file_path=True): | |
| 57 # Handles files if file_path is True or text if file_path is False | |
| 58 if file_path: | |
| 59 temp = open(name, "rb") | |
| 60 else: | |
| 61 temp = BytesIO(name) | |
| 62 try: | |
| 63 return util.is_binary(temp.read(1024)) | |
| 64 finally: | |
| 65 temp.close() | |
| 66 | |
| 67 | |
| 68 def check_gzip(file_path, check_content=True): | |
| 69 # This method returns a tuple of booleans representing ( is_gzipped, is_valid ) | |
| 70 # Make sure we have a gzipped file | |
| 71 try: | |
| 72 with open(file_path, "rb") as temp: | |
| 73 magic_check = temp.read(2) | |
| 74 if magic_check != util.gzip_magic: | |
| 75 return (False, False) | |
| 76 except Exception: | |
| 77 return (False, False) | |
| 78 # We support some binary data types, so check if the compressed binary file is valid | |
| 79 # If the file is Bam, it should already have been detected as such, so we'll just check | |
| 80 # for sff format. | |
| 81 try: | |
| 82 with gzip.open(file_path, 'rb') as fh: | |
| 83 header = fh.read(4) | |
| 84 if header == b'.sff': | |
| 85 return (True, True) | |
| 86 except Exception: | |
| 87 return(False, False) | |
| 88 | |
| 89 if not check_content: | |
| 90 return (True, True) | |
| 91 | |
| 92 CHUNK_SIZE = 2 ** 15 # 32Kb | |
| 93 gzipped_file = gzip.GzipFile(file_path, mode='rb') | |
| 94 chunk = gzipped_file.read(CHUNK_SIZE) | |
| 95 gzipped_file.close() | |
| 96 # See if we have a compressed HTML file | |
| 97 if check_html(file_path, chunk=chunk): | |
| 98 return (True, False) | |
| 99 return (True, True) | |
| 100 | |
| 101 | |
| 102 def check_bz2(file_path, check_content=True): | |
| 103 try: | |
| 104 with open(file_path, "rb") as temp: | |
| 105 magic_check = temp.read(3) | |
| 106 if magic_check != util.bz2_magic: | |
| 107 return (False, False) | |
| 108 except Exception: | |
| 109 return(False, False) | |
| 110 | |
| 111 if not check_content: | |
| 112 return (True, True) | |
| 113 | |
| 114 CHUNK_SIZE = 2 ** 15 # reKb | |
| 115 bzipped_file = bz2.BZ2File(file_path, mode='rb') | |
| 116 chunk = bzipped_file.read(CHUNK_SIZE) | |
| 117 bzipped_file.close() | |
| 118 # See if we have a compressed HTML file | |
| 119 if check_html(file_path, chunk=chunk): | |
| 120 return (True, False) | |
| 121 return (True, True) | |
| 122 | |
| 123 | |
| 124 def check_zip(file_path, check_content=True, files=1): | |
| 125 if not zipfile.is_zipfile(file_path): | |
| 126 return (False, False) | |
| 127 | |
| 128 if not check_content: | |
| 129 return (True, True) | |
| 130 | |
| 131 CHUNK_SIZE = 2 ** 15 # 32Kb | |
| 132 chunk = None | |
| 133 for filect, member in enumerate(iter_zip(file_path)): | |
| 134 handle, name = member | |
| 135 chunk = handle.read(CHUNK_SIZE) | |
| 136 if chunk and check_html(file_path, chunk): | |
| 137 return (True, False) | |
| 138 if filect >= files: | |
| 139 break | |
| 140 return (True, True) | |
| 141 | |
| 142 | |
| 143 def is_bz2(file_path): | |
| 144 is_bz2, is_valid = check_bz2(file_path, check_content=False) | |
| 145 return is_bz2 | |
| 146 | |
| 147 | |
| 148 def is_gzip(file_path): | |
| 149 is_gzipped, is_valid = check_gzip(file_path, check_content=False) | |
| 150 return is_gzipped | |
| 151 | |
| 152 | |
| 153 def is_zip(file_path): | |
| 154 is_zipped, is_valid = check_zip(file_path, check_content=False) | |
| 155 return is_zipped | |
| 156 | |
| 157 | |
| 158 def is_single_file_zip(file_path): | |
| 159 for i, member in enumerate(iter_zip(file_path)): | |
| 160 if i > 1: | |
| 161 return False | |
| 162 return True | |
| 163 | |
| 164 | |
| 165 def is_tar(file_path): | |
| 166 return tarfile.is_tarfile(file_path) | |
| 167 | |
| 168 | |
| 169 def iter_zip(file_path): | |
| 170 with zipfile.ZipFile(file_path) as z: | |
| 171 for f in filter(lambda x: not x.endswith('/'), z.namelist()): | |
| 172 yield (z.open(f), f) | |
| 173 | |
| 174 | |
| 175 def check_image(file_path): | |
| 176 """ Simple wrapper around image_type to yield a True/False verdict """ | |
| 177 if image_type(file_path): | |
| 178 return True | |
| 179 return False | |
| 180 | |
| 181 | |
| 182 __all__ = ( | |
| 183 'check_binary', | |
| 184 'check_bz2', | |
| 185 'check_gzip', | |
| 186 'check_html', | |
| 187 'check_image', | |
| 188 'check_zip', | |
| 189 'is_gzip', | |
| 190 'is_bz2', | |
| 191 'is_zip', | |
| 192 ) |
