Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/galaxy/util/checkers.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 import gzip | |
2 import io | |
3 import re | |
4 import sys | |
5 import tarfile | |
6 import zipfile | |
7 | |
8 from six import ( | |
9 BytesIO, | |
10 StringIO | |
11 ) | |
12 from six.moves import filter | |
13 | |
14 from galaxy import util | |
15 from galaxy.util.image_util import image_type | |
16 | |
17 if sys.version_info < (3, 3): | |
18 gzip.GzipFile.read1 = gzip.GzipFile.read # workaround for https://bugs.python.org/issue12591 | |
19 try: | |
20 import bz2file as bz2 | |
21 except ImportError: | |
22 # If bz2file is unavailable, just fallback to not having pbzip2 support. | |
23 import bz2 | |
24 else: | |
25 import bz2 | |
26 | |
27 HTML_CHECK_LINES = 100 | |
28 CHUNK_SIZE = 2 ** 15 # 32Kb | |
29 HTML_REGEXPS = ( | |
30 re.compile(r"<A\s+[^>]*HREF[^>]+>", re.I), | |
31 re.compile(r"<IFRAME[^>]*>", re.I), | |
32 re.compile(r"<FRAMESET[^>]*>", re.I), | |
33 re.compile(r"<META[\W][^>]*>", re.I), | |
34 re.compile(r"<SCRIPT[^>]*>", re.I), | |
35 ) | |
36 | |
37 | |
38 def check_html(name, file_path=True): | |
39 """ | |
40 Returns True if the file/string contains HTML code. | |
41 """ | |
42 # Handles files if file_path is True or text if file_path is False | |
43 if file_path: | |
44 temp = io.open(name, "r", encoding='utf-8') | |
45 else: | |
46 temp = StringIO(util.unicodify(name)) | |
47 try: | |
48 for _ in range(HTML_CHECK_LINES): | |
49 line = temp.readline(CHUNK_SIZE) | |
50 if not line: | |
51 break | |
52 if any(regexp.search(line) for regexp in HTML_REGEXPS): | |
53 return True | |
54 except UnicodeDecodeError: | |
55 return False | |
56 finally: | |
57 temp.close() | |
58 return False | |
59 | |
60 | |
61 def check_binary(name, file_path=True): | |
62 # Handles files if file_path is True or text if file_path is False | |
63 if file_path: | |
64 temp = open(name, "rb") | |
65 else: | |
66 temp = BytesIO(name) | |
67 try: | |
68 return util.is_binary(temp.read(1024)) | |
69 finally: | |
70 temp.close() | |
71 | |
72 | |
73 def check_gzip(file_path, check_content=True): | |
74 # This method returns a tuple of booleans representing ( is_gzipped, is_valid ) | |
75 # Make sure we have a gzipped file | |
76 try: | |
77 with open(file_path, "rb") as temp: | |
78 magic_check = temp.read(2) | |
79 if magic_check != util.gzip_magic: | |
80 return (False, False) | |
81 except Exception: | |
82 return (False, False) | |
83 # We support some binary data types, so check if the compressed binary file is valid | |
84 # If the file is Bam, it should already have been detected as such, so we'll just check | |
85 # for sff format. | |
86 try: | |
87 with gzip.open(file_path, 'rb') as fh: | |
88 header = fh.read(4) | |
89 if header == b'.sff': | |
90 return (True, True) | |
91 except Exception: | |
92 return(False, False) | |
93 | |
94 if not check_content: | |
95 return (True, True) | |
96 | |
97 with gzip.open(file_path, mode='rb') as gzipped_file: | |
98 chunk = gzipped_file.read(CHUNK_SIZE) | |
99 # See if we have a compressed HTML file | |
100 if check_html(chunk, file_path=False): | |
101 return (True, False) | |
102 return (True, True) | |
103 | |
104 | |
105 def check_bz2(file_path, check_content=True): | |
106 try: | |
107 with open(file_path, "rb") as temp: | |
108 magic_check = temp.read(3) | |
109 if magic_check != util.bz2_magic: | |
110 return (False, False) | |
111 except Exception: | |
112 return(False, False) | |
113 | |
114 if not check_content: | |
115 return (True, True) | |
116 | |
117 with bz2.BZ2File(file_path, mode='rb') as bzipped_file: | |
118 chunk = bzipped_file.read(CHUNK_SIZE) | |
119 # See if we have a compressed HTML file | |
120 if check_html(chunk, file_path=False): | |
121 return (True, False) | |
122 return (True, True) | |
123 | |
124 | |
125 def check_zip(file_path, check_content=True, files=1): | |
126 if not zipfile.is_zipfile(file_path): | |
127 return (False, False) | |
128 | |
129 if not check_content: | |
130 return (True, True) | |
131 | |
132 chunk = None | |
133 for filect, member in enumerate(iter_zip(file_path)): | |
134 handle, name = member | |
135 chunk = handle.read(CHUNK_SIZE) | |
136 if chunk and check_html(chunk, file_path=False): | |
137 return (True, False) | |
138 if filect >= files: | |
139 break | |
140 return (True, True) | |
141 | |
142 | |
143 def is_bz2(file_path): | |
144 is_bz2, is_valid = check_bz2(file_path, check_content=False) | |
145 return is_bz2 | |
146 | |
147 | |
148 def is_gzip(file_path): | |
149 is_gzipped, is_valid = check_gzip(file_path, check_content=False) | |
150 return is_gzipped | |
151 | |
152 | |
153 def is_zip(file_path): | |
154 is_zipped, is_valid = check_zip(file_path, check_content=False) | |
155 return is_zipped | |
156 | |
157 | |
158 def is_single_file_zip(file_path): | |
159 for i, member in enumerate(iter_zip(file_path)): | |
160 if i > 1: | |
161 return False | |
162 return True | |
163 | |
164 | |
165 def is_tar(file_path): | |
166 return tarfile.is_tarfile(file_path) | |
167 | |
168 | |
169 def iter_zip(file_path): | |
170 with zipfile.ZipFile(file_path) as z: | |
171 for f in filter(lambda x: not x.endswith('/'), z.namelist()): | |
172 yield (z.open(f), f) | |
173 | |
174 | |
175 def check_image(file_path): | |
176 """ Simple wrapper around image_type to yield a True/False verdict """ | |
177 if image_type(file_path): | |
178 return True | |
179 return False | |
180 | |
181 | |
182 __all__ = ( | |
183 'check_binary', | |
184 'check_bz2', | |
185 'check_gzip', | |
186 'check_html', | |
187 'check_image', | |
188 'check_zip', | |
189 'is_gzip', | |
190 'is_bz2', | |
191 'is_zip', | |
192 ) |