comparison env/lib/python3.7/site-packages/schema_salad/sourceline.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
comparison
equal deleted inserted replaced
4:79f47841a781 5:9b1c78e6ba9c
1 from __future__ import absolute_import
2
3 import itertools
4 import os
5 import re
6 import traceback
7 from typing import (
8 Any,
9 AnyStr,
10 Dict,
11 List,
12 MutableMapping,
13 MutableSequence,
14 Optional,
15 Pattern,
16 Tuple,
17 Type,
18 Union,
19 )
20
21 import six
22 from future.utils import raise_from
23 from typing_extensions import Text # pylint: disable=unused-import
24
25 import ruamel.yaml
26 from ruamel.yaml.comments import CommentedBase, CommentedMap, CommentedSeq
27
28 # move to a regular typing import when Python 3.3-3.6 is no longer supported
29
30
31 lineno_re = re.compile(u"^(.*?:[0-9]+:[0-9]+: )(( *)(.*))")
32
33
34 def regex_chunk(lines, regex):
35 # type: (List[str], Pattern[str]) -> List[List[str]]
36 lst = list(itertools.dropwhile(lambda x: not regex.match(x), lines))
37 arr = []
38 while lst:
39 ret = [lst[0]] + list(
40 itertools.takewhile(lambda x: not regex.match(x), lst[1:])
41 )
42 arr.append(ret)
43 lst = list(itertools.dropwhile(lambda x: not regex.match(x), lst[1:]))
44 return arr
45
46
47 def chunk_messages(message): # type: (str) -> List[Tuple[int, str]]
48 file_regex = re.compile(r"^(.+:\d+:\d+:)(\s+)(.+)$")
49 item_regex = re.compile(r"^\s*\*\s+")
50 arr = []
51 for chun in regex_chunk(message.splitlines(), file_regex):
52 fst = chun[0]
53 mat = file_regex.match(fst)
54 if mat:
55 place = mat.group(1)
56 indent = len(mat.group(2))
57
58 lst = [mat.group(3)] + chun[1:]
59 if [x for x in lst if item_regex.match(x)]:
60 for item in regex_chunk(lst, item_regex):
61 msg = re.sub(item_regex, "", "\n".join(item))
62 arr.append((indent, place + " " + re.sub(r"[\n\s]+", " ", msg)))
63 else:
64 msg = re.sub(item_regex, "", "\n".join(lst))
65 arr.append((indent, place + " " + re.sub(r"[\n\s]+", " ", msg)))
66 return arr
67
68
69 def reformat_yaml_exception_message(message): # type: (str) -> str
70 line_regex = re.compile(r'^\s+in "(.+)", line (\d+), column (\d+)$')
71 fname_regex = re.compile(r"^file://" + re.escape(os.getcwd()) + "/")
72 msgs = message.splitlines()
73 ret = []
74
75 if len(msgs) == 3:
76 msgs = msgs[1:]
77 nblanks = 0
78 elif len(msgs) == 4:
79 c_msg = msgs[0]
80 match = line_regex.match(msgs[1])
81 if match:
82 c_file, c_line, c_column = match.groups()
83 c_file = re.sub(fname_regex, "", c_file)
84 ret.append("{}:{}:{}: {}".format(c_file, c_line, c_column, c_msg))
85
86 msgs = msgs[2:]
87 nblanks = 2
88
89 p_msg = msgs[0]
90 match = line_regex.match(msgs[1])
91 if match:
92 p_file, p_line, p_column = match.groups()
93 p_file = re.sub(fname_regex, "", p_file)
94 ret.append(
95 "{}:{}:{}:{} {}".format(p_file, p_line, p_column, " " * nblanks, p_msg)
96 )
97 return "\n".join(ret)
98
99
100 def _add_lc_filename(
101 r, source
102 ): # type: (ruamel.yaml.comments.CommentedBase, AnyStr) -> None
103 if isinstance(r, ruamel.yaml.comments.CommentedBase):
104 r.lc.filename = source
105 if isinstance(r, MutableSequence):
106 for d in r:
107 _add_lc_filename(d, source)
108 elif isinstance(r, MutableMapping):
109 for d in six.itervalues(r):
110 _add_lc_filename(d, source)
111
112
113 def relname(source): # type: (Text) -> Text
114 if source.startswith("file://"):
115 source = source[7:]
116 source = os.path.relpath(source)
117 return source
118
119
120 def add_lc_filename(
121 r, source
122 ): # type: (ruamel.yaml.comments.CommentedBase, Text) -> None
123 _add_lc_filename(r, relname(source))
124
125
126 def reflow_all(text, maxline=None): # type: (Text, Optional[int]) -> Text
127 if maxline is None:
128 maxline = int(os.environ.get("COLUMNS", "100"))
129 maxno = 0
130 for l in text.splitlines():
131 g = lineno_re.match(l)
132 if not g:
133 continue
134 maxno = max(maxno, len(g.group(1)))
135 maxno_text = maxline - maxno
136 msg = []
137 for l in text.splitlines():
138 g = lineno_re.match(l)
139 if not g:
140 msg.append(l)
141 continue
142 pre = g.group(1)
143 reflowed = reflow(g.group(2), maxno_text, g.group(3)).splitlines()
144 msg.extend([pre.ljust(maxno, " ") + r for r in reflowed])
145 return "\n".join(msg)
146
147
148 def reflow(text, maxline, shift=""): # type: (Text, int, Text) -> Text
149 if maxline < 20:
150 maxline = 20
151 if len(text) > maxline:
152 sp = text.rfind(" ", 0, maxline)
153 if sp < 1:
154 sp = text.find(" ", sp + 1)
155 if sp == -1:
156 sp = len(text)
157 if sp < len(text):
158 return "{}\n{}{}".format(
159 text[0:sp], shift, reflow(text[sp + 1 :], maxline, shift)
160 )
161 return text
162
163
164 def indent(
165 v, nolead=False, shift=u" ", bullet=u" "
166 ): # type: (Text, bool, Text, Text) -> Text
167 if nolead:
168 return v.splitlines()[0] + u"\n".join([shift + l for l in v.splitlines()[1:]])
169 else:
170
171 def lineno(i, l): # type: (int, Text) -> Text
172 r = lineno_re.match(l)
173 if r is not None:
174 return r.group(1) + (bullet if i == 0 else shift) + r.group(2)
175 else:
176 return (bullet if i == 0 else shift) + l
177
178 return u"\n".join([lineno(i, l) for i, l in enumerate(v.splitlines())])
179
180
181 def bullets(textlist, bul): # type: (List[Text], Text) -> Text
182 if len(textlist) == 1:
183 return textlist[0]
184 else:
185 return "\n".join(indent(t, bullet=bul) for t in textlist)
186
187
188 def strip_duplicated_lineno(text): # type: (Text) -> Text
189 """Same as `strip_dup_lineno` but without reflow"""
190 pre = None
191 msg = []
192 for l in text.splitlines():
193 g = lineno_re.match(l)
194 if not g:
195 msg.append(l)
196 continue
197 elif g.group(1) != pre:
198 msg.append(l)
199 pre = g.group(1)
200 else:
201 msg.append(" " * len(g.group(1)) + g.group(2))
202 return "\n".join(msg)
203
204
205 def strip_dup_lineno(text, maxline=None): # type: (Text, Optional[int]) -> Text
206 if maxline is None:
207 maxline = int(os.environ.get("COLUMNS", "100"))
208 pre = None
209 msg = []
210 maxno = 0
211 for l in text.splitlines():
212 g = lineno_re.match(l)
213 if not g:
214 continue
215 maxno = max(maxno, len(g.group(1)))
216
217 for l in text.splitlines():
218 g = lineno_re.match(l)
219 if not g:
220 msg.append(l)
221 continue
222 if g.group(1) != pre:
223 shift = maxno + len(g.group(3))
224 g2 = reflow(g.group(2), maxline - shift, " " * shift)
225 pre = g.group(1)
226 msg.append(pre + " " * (maxno - len(g.group(1))) + g2)
227 else:
228 g2 = reflow(g.group(2), maxline - maxno, " " * (maxno + len(g.group(3))))
229 msg.append(" " * maxno + g2)
230 return "\n".join(msg)
231
232
233 def cmap(
234 d, # type: Union[int, float, str, Text, Dict[Text, Any], List[Dict[Text, Any]]]
235 lc=None, # type: Optional[List[int]]
236 fn=None, # type: Optional[Text]
237 ): # type: (...) -> Union[int, float, str, Text, CommentedMap, CommentedSeq]
238 if lc is None:
239 lc = [0, 0, 0, 0]
240 if fn is None:
241 fn = "test"
242
243 if isinstance(d, CommentedMap):
244 fn = d.lc.filename if hasattr(d.lc, "filename") else fn
245 for k, v in six.iteritems(d):
246 if d.lc.data is not None and k in d.lc.data:
247 d[k] = cmap(v, lc=d.lc.data[k], fn=fn)
248 else:
249 d[k] = cmap(v, lc, fn=fn)
250 return d
251 if isinstance(d, CommentedSeq):
252 fn = d.lc.filename if hasattr(d.lc, "filename") else fn
253 for k2, v2 in enumerate(d):
254 if d.lc.data is not None and k2 in d.lc.data:
255 d[k2] = cmap(v2, lc=d.lc.data[k2], fn=fn)
256 else:
257 d[k2] = cmap(v2, lc, fn=fn)
258 return d
259 if isinstance(d, MutableMapping):
260 cm = CommentedMap()
261 for k in sorted(d.keys()):
262 v = d[k]
263 if isinstance(v, CommentedBase):
264 uselc = [v.lc.line, v.lc.col, v.lc.line, v.lc.col]
265 vfn = v.lc.filename if hasattr(v.lc, "filename") else fn
266 else:
267 uselc = lc
268 vfn = fn
269 cm[k] = cmap(v, lc=uselc, fn=vfn)
270 cm.lc.add_kv_line_col(k, uselc)
271 cm.lc.filename = fn
272 return cm
273 if isinstance(d, MutableSequence):
274 cs = CommentedSeq()
275 for k3, v3 in enumerate(d):
276 if isinstance(v3, CommentedBase):
277 uselc = [v3.lc.line, v3.lc.col, v3.lc.line, v3.lc.col]
278 vfn = v3.lc.filename if hasattr(v3.lc, "filename") else fn
279 else:
280 uselc = lc
281 vfn = fn
282 cs.append(cmap(v3, lc=uselc, fn=vfn))
283 cs.lc.add_kv_line_col(k3, uselc)
284 cs.lc.filename = fn
285 return cs
286 else:
287 return d
288
289
290 class SourceLine(object):
291 def __init__(
292 self,
293 item, # type: Any
294 key=None, # type: Optional[Any]
295 raise_type=six.text_type, # type: Union[Type[six.text_type], Type[Exception]]
296 include_traceback=False, # type: bool
297 ): # type: (...) -> None
298 self.item = item
299 self.key = key
300 self.raise_type = raise_type
301 self.include_traceback = include_traceback
302
303 def __enter__(self): # type: () -> SourceLine
304 return self
305
306 def __exit__(
307 self,
308 exc_type, # type: Any
309 exc_value, # type: Any
310 tb, # type: Any
311 ): # type: (...) -> None
312 if not exc_value:
313 return
314 if self.include_traceback and six.PY2:
315 # Python2 doesn't actually have chained exceptions, so
316 # fake it by injecting the backtrace into the message.
317 raise_from(
318 self.makeError(
319 "\n".join(traceback.format_exception(exc_type, exc_value, tb))
320 ),
321 exc_value,
322 )
323 else:
324 raise_from(self.makeError(six.text_type(exc_value)), exc_value)
325
326 def file(self): # type: () -> Optional[Text]
327 if hasattr(self.item, "lc") and hasattr(self.item.lc, "filename"):
328 return Text(self.item.lc.filename)
329 else:
330 return None
331
332 def start(self): # type: () -> Optional[Tuple[int, int]]
333 if self.file() is None:
334 return None
335 elif (
336 self.key is None
337 or self.item.lc.data is None
338 or self.key not in self.item.lc.data
339 ):
340 return ((self.item.lc.line or 0) + 1, (self.item.lc.col or 0) + 1)
341 else:
342 return (
343 (self.item.lc.data[self.key][0] or 0) + 1,
344 (self.item.lc.data[self.key][1] or 0) + 1,
345 )
346
347 def end(self): # type: () -> Optional[Tuple[int, int]]
348 return None
349
350 def makeLead(self): # type: () -> Text
351 if self.file():
352 lcol = self.start()
353 line, col = lcol if lcol else ("", "")
354 return "{}:{}:{}:".format(self.file(), line, col)
355 else:
356 return ""
357
358 def makeError(self, msg): # type: (Text) -> Any
359 if not isinstance(self.item, ruamel.yaml.comments.CommentedBase):
360 return self.raise_type(msg)
361 errs = []
362 lead = self.makeLead()
363 for m in msg.splitlines():
364 if bool(lineno_re.match(m)):
365 errs.append(m)
366 else:
367 errs.append("{} {}".format(lead, m))
368 return self.raise_type("\n".join(errs))