Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/schema_salad/sourceline.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
comparison
equal
deleted
inserted
replaced
4:79f47841a781 | 5:9b1c78e6ba9c |
---|---|
1 from __future__ import absolute_import | |
2 | |
3 import itertools | |
4 import os | |
5 import re | |
6 import traceback | |
7 from typing import ( | |
8 Any, | |
9 AnyStr, | |
10 Dict, | |
11 List, | |
12 MutableMapping, | |
13 MutableSequence, | |
14 Optional, | |
15 Pattern, | |
16 Tuple, | |
17 Type, | |
18 Union, | |
19 ) | |
20 | |
21 import six | |
22 from future.utils import raise_from | |
23 from typing_extensions import Text # pylint: disable=unused-import | |
24 | |
25 import ruamel.yaml | |
26 from ruamel.yaml.comments import CommentedBase, CommentedMap, CommentedSeq | |
27 | |
28 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
29 | |
30 | |
31 lineno_re = re.compile(u"^(.*?:[0-9]+:[0-9]+: )(( *)(.*))") | |
32 | |
33 | |
34 def regex_chunk(lines, regex): | |
35 # type: (List[str], Pattern[str]) -> List[List[str]] | |
36 lst = list(itertools.dropwhile(lambda x: not regex.match(x), lines)) | |
37 arr = [] | |
38 while lst: | |
39 ret = [lst[0]] + list( | |
40 itertools.takewhile(lambda x: not regex.match(x), lst[1:]) | |
41 ) | |
42 arr.append(ret) | |
43 lst = list(itertools.dropwhile(lambda x: not regex.match(x), lst[1:])) | |
44 return arr | |
45 | |
46 | |
47 def chunk_messages(message): # type: (str) -> List[Tuple[int, str]] | |
48 file_regex = re.compile(r"^(.+:\d+:\d+:)(\s+)(.+)$") | |
49 item_regex = re.compile(r"^\s*\*\s+") | |
50 arr = [] | |
51 for chun in regex_chunk(message.splitlines(), file_regex): | |
52 fst = chun[0] | |
53 mat = file_regex.match(fst) | |
54 if mat: | |
55 place = mat.group(1) | |
56 indent = len(mat.group(2)) | |
57 | |
58 lst = [mat.group(3)] + chun[1:] | |
59 if [x for x in lst if item_regex.match(x)]: | |
60 for item in regex_chunk(lst, item_regex): | |
61 msg = re.sub(item_regex, "", "\n".join(item)) | |
62 arr.append((indent, place + " " + re.sub(r"[\n\s]+", " ", msg))) | |
63 else: | |
64 msg = re.sub(item_regex, "", "\n".join(lst)) | |
65 arr.append((indent, place + " " + re.sub(r"[\n\s]+", " ", msg))) | |
66 return arr | |
67 | |
68 | |
69 def reformat_yaml_exception_message(message): # type: (str) -> str | |
70 line_regex = re.compile(r'^\s+in "(.+)", line (\d+), column (\d+)$') | |
71 fname_regex = re.compile(r"^file://" + re.escape(os.getcwd()) + "/") | |
72 msgs = message.splitlines() | |
73 ret = [] | |
74 | |
75 if len(msgs) == 3: | |
76 msgs = msgs[1:] | |
77 nblanks = 0 | |
78 elif len(msgs) == 4: | |
79 c_msg = msgs[0] | |
80 match = line_regex.match(msgs[1]) | |
81 if match: | |
82 c_file, c_line, c_column = match.groups() | |
83 c_file = re.sub(fname_regex, "", c_file) | |
84 ret.append("{}:{}:{}: {}".format(c_file, c_line, c_column, c_msg)) | |
85 | |
86 msgs = msgs[2:] | |
87 nblanks = 2 | |
88 | |
89 p_msg = msgs[0] | |
90 match = line_regex.match(msgs[1]) | |
91 if match: | |
92 p_file, p_line, p_column = match.groups() | |
93 p_file = re.sub(fname_regex, "", p_file) | |
94 ret.append( | |
95 "{}:{}:{}:{} {}".format(p_file, p_line, p_column, " " * nblanks, p_msg) | |
96 ) | |
97 return "\n".join(ret) | |
98 | |
99 | |
100 def _add_lc_filename( | |
101 r, source | |
102 ): # type: (ruamel.yaml.comments.CommentedBase, AnyStr) -> None | |
103 if isinstance(r, ruamel.yaml.comments.CommentedBase): | |
104 r.lc.filename = source | |
105 if isinstance(r, MutableSequence): | |
106 for d in r: | |
107 _add_lc_filename(d, source) | |
108 elif isinstance(r, MutableMapping): | |
109 for d in six.itervalues(r): | |
110 _add_lc_filename(d, source) | |
111 | |
112 | |
113 def relname(source): # type: (Text) -> Text | |
114 if source.startswith("file://"): | |
115 source = source[7:] | |
116 source = os.path.relpath(source) | |
117 return source | |
118 | |
119 | |
120 def add_lc_filename( | |
121 r, source | |
122 ): # type: (ruamel.yaml.comments.CommentedBase, Text) -> None | |
123 _add_lc_filename(r, relname(source)) | |
124 | |
125 | |
126 def reflow_all(text, maxline=None): # type: (Text, Optional[int]) -> Text | |
127 if maxline is None: | |
128 maxline = int(os.environ.get("COLUMNS", "100")) | |
129 maxno = 0 | |
130 for l in text.splitlines(): | |
131 g = lineno_re.match(l) | |
132 if not g: | |
133 continue | |
134 maxno = max(maxno, len(g.group(1))) | |
135 maxno_text = maxline - maxno | |
136 msg = [] | |
137 for l in text.splitlines(): | |
138 g = lineno_re.match(l) | |
139 if not g: | |
140 msg.append(l) | |
141 continue | |
142 pre = g.group(1) | |
143 reflowed = reflow(g.group(2), maxno_text, g.group(3)).splitlines() | |
144 msg.extend([pre.ljust(maxno, " ") + r for r in reflowed]) | |
145 return "\n".join(msg) | |
146 | |
147 | |
148 def reflow(text, maxline, shift=""): # type: (Text, int, Text) -> Text | |
149 if maxline < 20: | |
150 maxline = 20 | |
151 if len(text) > maxline: | |
152 sp = text.rfind(" ", 0, maxline) | |
153 if sp < 1: | |
154 sp = text.find(" ", sp + 1) | |
155 if sp == -1: | |
156 sp = len(text) | |
157 if sp < len(text): | |
158 return "{}\n{}{}".format( | |
159 text[0:sp], shift, reflow(text[sp + 1 :], maxline, shift) | |
160 ) | |
161 return text | |
162 | |
163 | |
164 def indent( | |
165 v, nolead=False, shift=u" ", bullet=u" " | |
166 ): # type: (Text, bool, Text, Text) -> Text | |
167 if nolead: | |
168 return v.splitlines()[0] + u"\n".join([shift + l for l in v.splitlines()[1:]]) | |
169 else: | |
170 | |
171 def lineno(i, l): # type: (int, Text) -> Text | |
172 r = lineno_re.match(l) | |
173 if r is not None: | |
174 return r.group(1) + (bullet if i == 0 else shift) + r.group(2) | |
175 else: | |
176 return (bullet if i == 0 else shift) + l | |
177 | |
178 return u"\n".join([lineno(i, l) for i, l in enumerate(v.splitlines())]) | |
179 | |
180 | |
181 def bullets(textlist, bul): # type: (List[Text], Text) -> Text | |
182 if len(textlist) == 1: | |
183 return textlist[0] | |
184 else: | |
185 return "\n".join(indent(t, bullet=bul) for t in textlist) | |
186 | |
187 | |
188 def strip_duplicated_lineno(text): # type: (Text) -> Text | |
189 """Same as `strip_dup_lineno` but without reflow""" | |
190 pre = None | |
191 msg = [] | |
192 for l in text.splitlines(): | |
193 g = lineno_re.match(l) | |
194 if not g: | |
195 msg.append(l) | |
196 continue | |
197 elif g.group(1) != pre: | |
198 msg.append(l) | |
199 pre = g.group(1) | |
200 else: | |
201 msg.append(" " * len(g.group(1)) + g.group(2)) | |
202 return "\n".join(msg) | |
203 | |
204 | |
205 def strip_dup_lineno(text, maxline=None): # type: (Text, Optional[int]) -> Text | |
206 if maxline is None: | |
207 maxline = int(os.environ.get("COLUMNS", "100")) | |
208 pre = None | |
209 msg = [] | |
210 maxno = 0 | |
211 for l in text.splitlines(): | |
212 g = lineno_re.match(l) | |
213 if not g: | |
214 continue | |
215 maxno = max(maxno, len(g.group(1))) | |
216 | |
217 for l in text.splitlines(): | |
218 g = lineno_re.match(l) | |
219 if not g: | |
220 msg.append(l) | |
221 continue | |
222 if g.group(1) != pre: | |
223 shift = maxno + len(g.group(3)) | |
224 g2 = reflow(g.group(2), maxline - shift, " " * shift) | |
225 pre = g.group(1) | |
226 msg.append(pre + " " * (maxno - len(g.group(1))) + g2) | |
227 else: | |
228 g2 = reflow(g.group(2), maxline - maxno, " " * (maxno + len(g.group(3)))) | |
229 msg.append(" " * maxno + g2) | |
230 return "\n".join(msg) | |
231 | |
232 | |
233 def cmap( | |
234 d, # type: Union[int, float, str, Text, Dict[Text, Any], List[Dict[Text, Any]]] | |
235 lc=None, # type: Optional[List[int]] | |
236 fn=None, # type: Optional[Text] | |
237 ): # type: (...) -> Union[int, float, str, Text, CommentedMap, CommentedSeq] | |
238 if lc is None: | |
239 lc = [0, 0, 0, 0] | |
240 if fn is None: | |
241 fn = "test" | |
242 | |
243 if isinstance(d, CommentedMap): | |
244 fn = d.lc.filename if hasattr(d.lc, "filename") else fn | |
245 for k, v in six.iteritems(d): | |
246 if d.lc.data is not None and k in d.lc.data: | |
247 d[k] = cmap(v, lc=d.lc.data[k], fn=fn) | |
248 else: | |
249 d[k] = cmap(v, lc, fn=fn) | |
250 return d | |
251 if isinstance(d, CommentedSeq): | |
252 fn = d.lc.filename if hasattr(d.lc, "filename") else fn | |
253 for k2, v2 in enumerate(d): | |
254 if d.lc.data is not None and k2 in d.lc.data: | |
255 d[k2] = cmap(v2, lc=d.lc.data[k2], fn=fn) | |
256 else: | |
257 d[k2] = cmap(v2, lc, fn=fn) | |
258 return d | |
259 if isinstance(d, MutableMapping): | |
260 cm = CommentedMap() | |
261 for k in sorted(d.keys()): | |
262 v = d[k] | |
263 if isinstance(v, CommentedBase): | |
264 uselc = [v.lc.line, v.lc.col, v.lc.line, v.lc.col] | |
265 vfn = v.lc.filename if hasattr(v.lc, "filename") else fn | |
266 else: | |
267 uselc = lc | |
268 vfn = fn | |
269 cm[k] = cmap(v, lc=uselc, fn=vfn) | |
270 cm.lc.add_kv_line_col(k, uselc) | |
271 cm.lc.filename = fn | |
272 return cm | |
273 if isinstance(d, MutableSequence): | |
274 cs = CommentedSeq() | |
275 for k3, v3 in enumerate(d): | |
276 if isinstance(v3, CommentedBase): | |
277 uselc = [v3.lc.line, v3.lc.col, v3.lc.line, v3.lc.col] | |
278 vfn = v3.lc.filename if hasattr(v3.lc, "filename") else fn | |
279 else: | |
280 uselc = lc | |
281 vfn = fn | |
282 cs.append(cmap(v3, lc=uselc, fn=vfn)) | |
283 cs.lc.add_kv_line_col(k3, uselc) | |
284 cs.lc.filename = fn | |
285 return cs | |
286 else: | |
287 return d | |
288 | |
289 | |
290 class SourceLine(object): | |
291 def __init__( | |
292 self, | |
293 item, # type: Any | |
294 key=None, # type: Optional[Any] | |
295 raise_type=six.text_type, # type: Union[Type[six.text_type], Type[Exception]] | |
296 include_traceback=False, # type: bool | |
297 ): # type: (...) -> None | |
298 self.item = item | |
299 self.key = key | |
300 self.raise_type = raise_type | |
301 self.include_traceback = include_traceback | |
302 | |
303 def __enter__(self): # type: () -> SourceLine | |
304 return self | |
305 | |
306 def __exit__( | |
307 self, | |
308 exc_type, # type: Any | |
309 exc_value, # type: Any | |
310 tb, # type: Any | |
311 ): # type: (...) -> None | |
312 if not exc_value: | |
313 return | |
314 if self.include_traceback and six.PY2: | |
315 # Python2 doesn't actually have chained exceptions, so | |
316 # fake it by injecting the backtrace into the message. | |
317 raise_from( | |
318 self.makeError( | |
319 "\n".join(traceback.format_exception(exc_type, exc_value, tb)) | |
320 ), | |
321 exc_value, | |
322 ) | |
323 else: | |
324 raise_from(self.makeError(six.text_type(exc_value)), exc_value) | |
325 | |
326 def file(self): # type: () -> Optional[Text] | |
327 if hasattr(self.item, "lc") and hasattr(self.item.lc, "filename"): | |
328 return Text(self.item.lc.filename) | |
329 else: | |
330 return None | |
331 | |
332 def start(self): # type: () -> Optional[Tuple[int, int]] | |
333 if self.file() is None: | |
334 return None | |
335 elif ( | |
336 self.key is None | |
337 or self.item.lc.data is None | |
338 or self.key not in self.item.lc.data | |
339 ): | |
340 return ((self.item.lc.line or 0) + 1, (self.item.lc.col or 0) + 1) | |
341 else: | |
342 return ( | |
343 (self.item.lc.data[self.key][0] or 0) + 1, | |
344 (self.item.lc.data[self.key][1] or 0) + 1, | |
345 ) | |
346 | |
347 def end(self): # type: () -> Optional[Tuple[int, int]] | |
348 return None | |
349 | |
350 def makeLead(self): # type: () -> Text | |
351 if self.file(): | |
352 lcol = self.start() | |
353 line, col = lcol if lcol else ("", "") | |
354 return "{}:{}:{}:".format(self.file(), line, col) | |
355 else: | |
356 return "" | |
357 | |
358 def makeError(self, msg): # type: (Text) -> Any | |
359 if not isinstance(self.item, ruamel.yaml.comments.CommentedBase): | |
360 return self.raise_type(msg) | |
361 errs = [] | |
362 lead = self.makeLead() | |
363 for m in msg.splitlines(): | |
364 if bool(lineno_re.match(m)): | |
365 errs.append(m) | |
366 else: | |
367 errs.append("{} {}".format(lead, m)) | |
368 return self.raise_type("\n".join(errs)) |