Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/schema_salad/sourceline.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 import os | |
| 2 import re | |
| 3 from typing import ( | |
| 4 Any, | |
| 5 AnyStr, | |
| 6 Callable, | |
| 7 Dict, | |
| 8 List, | |
| 9 MutableMapping, | |
| 10 MutableSequence, | |
| 11 Optional, | |
| 12 Tuple, | |
| 13 Union, | |
| 14 ) | |
| 15 | |
| 16 import ruamel.yaml | |
| 17 from ruamel.yaml.comments import CommentedBase, CommentedMap, CommentedSeq | |
| 18 | |
| 19 lineno_re = re.compile("^(.*?:[0-9]+:[0-9]+: )(( *)(.*))") | |
| 20 | |
| 21 | |
| 22 def _add_lc_filename(r: ruamel.yaml.comments.CommentedBase, source: AnyStr) -> None: | |
| 23 if isinstance(r, ruamel.yaml.comments.CommentedBase): | |
| 24 r.lc.filename = source | |
| 25 if isinstance(r, MutableSequence): | |
| 26 for d in r: | |
| 27 _add_lc_filename(d, source) | |
| 28 elif isinstance(r, MutableMapping): | |
| 29 for d in r.values(): | |
| 30 _add_lc_filename(d, source) | |
| 31 | |
| 32 | |
| 33 def relname(source: str) -> str: | |
| 34 if source.startswith("file://"): | |
| 35 source = source[7:] | |
| 36 source = os.path.relpath(source) | |
| 37 return source | |
| 38 | |
| 39 | |
| 40 def add_lc_filename(r: ruamel.yaml.comments.CommentedBase, source: str) -> None: | |
| 41 _add_lc_filename(r, relname(source)) | |
| 42 | |
| 43 | |
| 44 def reflow_all(text: str, maxline: Optional[int] = None) -> str: | |
| 45 if maxline is None: | |
| 46 maxline = int(os.environ.get("COLUMNS", "100")) | |
| 47 maxno = 0 | |
| 48 for line in text.splitlines(): | |
| 49 g = lineno_re.match(line) | |
| 50 if not g: | |
| 51 continue | |
| 52 group = g.group(1) | |
| 53 assert group is not None # nosec | |
| 54 maxno = max(maxno, len(group)) | |
| 55 maxno_text = maxline - maxno | |
| 56 msg = [] # type: List[str] | |
| 57 for line in text.splitlines(): | |
| 58 g = lineno_re.match(line) | |
| 59 if not g: | |
| 60 msg.append(line) | |
| 61 continue | |
| 62 pre = g.group(1) | |
| 63 assert pre is not None # nosec | |
| 64 group2 = g.group(2) | |
| 65 assert group2 is not None # nosec | |
| 66 reflowed = reflow(group2, maxno_text, g.group(3)).splitlines() | |
| 67 msg.extend([pre.ljust(maxno, " ") + r for r in reflowed]) | |
| 68 return "\n".join(msg) | |
| 69 | |
| 70 | |
| 71 def reflow(text: str, maxline: int, shift: Optional[str] = "") -> str: | |
| 72 if maxline < 20: | |
| 73 maxline = 20 | |
| 74 if len(text) > maxline: | |
| 75 sp = text.rfind(" ", 0, maxline) | |
| 76 if sp < 1: | |
| 77 sp = text.find(" ", sp + 1) | |
| 78 if sp == -1: | |
| 79 sp = len(text) | |
| 80 if sp < len(text): | |
| 81 return "{}\n{}{}".format( | |
| 82 text[0:sp], shift, reflow(text[sp + 1 :], maxline, shift) | |
| 83 ) | |
| 84 return text | |
| 85 | |
| 86 | |
| 87 def indent(v: str, nolead: bool = False, shift: str = " ", bullet: str = " ") -> str: | |
| 88 if nolead: | |
| 89 return v.splitlines()[0] + "\n".join( | |
| 90 [shift + line for line in v.splitlines()[1:]] | |
| 91 ) | |
| 92 else: | |
| 93 | |
| 94 def lineno(i: int, line: str) -> str: | |
| 95 r = lineno_re.match(line) | |
| 96 if r is not None: | |
| 97 group1 = r.group(1) | |
| 98 group2 = r.group(2) | |
| 99 assert group1 is not None # nosec | |
| 100 assert group2 is not None # nosec | |
| 101 return group1 + (bullet if i == 0 else shift) + group2 | |
| 102 else: | |
| 103 return (bullet if i == 0 else shift) + line | |
| 104 | |
| 105 return "\n".join([lineno(i, line) for i, line in enumerate(v.splitlines())]) | |
| 106 | |
| 107 | |
| 108 def bullets(textlist: List[str], bul: str) -> str: | |
| 109 if len(textlist) == 1: | |
| 110 return textlist[0] | |
| 111 else: | |
| 112 return "\n".join(indent(t, bullet=bul) for t in textlist) | |
| 113 | |
| 114 | |
| 115 def strip_duplicated_lineno(text: str) -> str: | |
| 116 """Same as `strip_dup_lineno` but without reflow""" | |
| 117 pre = None # type: Optional[str] | |
| 118 msg = [] | |
| 119 for line in text.splitlines(): | |
| 120 g = lineno_re.match(line) | |
| 121 if not g: | |
| 122 msg.append(line) | |
| 123 continue | |
| 124 elif g.group(1) != pre: | |
| 125 msg.append(line) | |
| 126 pre = g.group(1) | |
| 127 else: | |
| 128 group1 = g.group(1) | |
| 129 group2 = g.group(2) | |
| 130 assert group1 is not None # nosec | |
| 131 assert group2 is not None # nosec | |
| 132 msg.append(" " * len(group1) + group2) | |
| 133 return "\n".join(msg) | |
| 134 | |
| 135 | |
| 136 def strip_dup_lineno(text: str, maxline: Optional[int] = None) -> str: | |
| 137 if maxline is None: | |
| 138 maxline = int(os.environ.get("COLUMNS", "100")) | |
| 139 pre = None # type: Optional[str] | |
| 140 msg = [] | |
| 141 maxno = 0 | |
| 142 for line in text.splitlines(): | |
| 143 g = lineno_re.match(line) | |
| 144 if not g: | |
| 145 continue | |
| 146 group1 = g.group(1) | |
| 147 assert group1 is not None # nosec | |
| 148 maxno = max(maxno, len(group1)) | |
| 149 | |
| 150 for line in text.splitlines(): | |
| 151 g = lineno_re.match(line) | |
| 152 if not g: | |
| 153 msg.append(line) | |
| 154 continue | |
| 155 if g.group(1) != pre: | |
| 156 group3 = g.group(3) | |
| 157 assert group3 is not None # nosec | |
| 158 shift = maxno + len(group3) | |
| 159 group2 = g.group(2) | |
| 160 assert group2 is not None # nosec | |
| 161 g2 = reflow(group2, maxline - shift, " " * shift) | |
| 162 pre = g.group(1) | |
| 163 assert pre is not None # nosec | |
| 164 msg.append(pre + " " * (maxno - len(pre)) + g2) | |
| 165 else: | |
| 166 group2 = g.group(2) | |
| 167 assert group2 is not None # nosec | |
| 168 group3 = g.group(3) | |
| 169 assert group3 is not None # nosec | |
| 170 g2 = reflow(group2, maxline - maxno, " " * (maxno + len(group3))) | |
| 171 msg.append(" " * maxno + g2) | |
| 172 return "\n".join(msg) | |
| 173 | |
| 174 | |
| 175 def cmap( | |
| 176 d: Union[int, float, str, Dict[str, Any], List[Any], None], | |
| 177 lc: Optional[List[int]] = None, | |
| 178 fn: Optional[str] = None, | |
| 179 ) -> Union[int, float, str, CommentedMap, CommentedSeq, None]: | |
| 180 if lc is None: | |
| 181 lc = [0, 0, 0, 0] | |
| 182 if fn is None: | |
| 183 fn = "test" | |
| 184 | |
| 185 if isinstance(d, CommentedMap): | |
| 186 fn = d.lc.filename if hasattr(d.lc, "filename") else fn | |
| 187 for k, v in d.items(): | |
| 188 if d.lc.data is not None and k in d.lc.data: | |
| 189 d[k] = cmap(v, lc=d.lc.data[k], fn=fn) | |
| 190 else: | |
| 191 d[k] = cmap(v, lc, fn=fn) | |
| 192 return d | |
| 193 if isinstance(d, CommentedSeq): | |
| 194 fn = d.lc.filename if hasattr(d.lc, "filename") else fn | |
| 195 for k2, v2 in enumerate(d): | |
| 196 if d.lc.data is not None and k2 in d.lc.data: | |
| 197 d[k2] = cmap(v2, lc=d.lc.data[k2], fn=fn) | |
| 198 else: | |
| 199 d[k2] = cmap(v2, lc, fn=fn) | |
| 200 return d | |
| 201 if isinstance(d, MutableMapping): | |
| 202 cm = CommentedMap() | |
| 203 for k in sorted(d.keys()): | |
| 204 v = d[k] | |
| 205 if isinstance(v, CommentedBase): | |
| 206 uselc = [v.lc.line, v.lc.col, v.lc.line, v.lc.col] | |
| 207 vfn = v.lc.filename if hasattr(v.lc, "filename") else fn | |
| 208 else: | |
| 209 uselc = lc | |
| 210 vfn = fn | |
| 211 cm[k] = cmap(v, lc=uselc, fn=vfn) | |
| 212 cm.lc.add_kv_line_col(k, uselc) | |
| 213 cm.lc.filename = fn | |
| 214 return cm | |
| 215 if isinstance(d, MutableSequence): | |
| 216 cs = CommentedSeq() | |
| 217 for k3, v3 in enumerate(d): | |
| 218 if isinstance(v3, CommentedBase): | |
| 219 uselc = [v3.lc.line, v3.lc.col, v3.lc.line, v3.lc.col] | |
| 220 vfn = v3.lc.filename if hasattr(v3.lc, "filename") else fn | |
| 221 else: | |
| 222 uselc = lc | |
| 223 vfn = fn | |
| 224 cs.append(cmap(v3, lc=uselc, fn=vfn)) | |
| 225 cs.lc.add_kv_line_col(k3, uselc) | |
| 226 cs.lc.filename = fn | |
| 227 return cs | |
| 228 else: | |
| 229 return d | |
| 230 | |
| 231 | |
| 232 class SourceLine: | |
| 233 def __init__( | |
| 234 self, | |
| 235 item: Any, | |
| 236 key: Optional[Any] = None, | |
| 237 raise_type: Callable[[str], Any] = str, | |
| 238 include_traceback: bool = False, | |
| 239 ) -> None: | |
| 240 self.item = item | |
| 241 self.key = key | |
| 242 self.raise_type = raise_type | |
| 243 self.include_traceback = include_traceback | |
| 244 | |
| 245 def __enter__(self) -> "SourceLine": | |
| 246 return self | |
| 247 | |
| 248 def __exit__( | |
| 249 self, | |
| 250 exc_type: Any, | |
| 251 exc_value: Any, | |
| 252 tb: Any, | |
| 253 ) -> None: | |
| 254 if not exc_value: | |
| 255 return | |
| 256 raise self.makeError(str(exc_value)) from exc_value | |
| 257 | |
| 258 def file(self) -> Optional[str]: | |
| 259 if hasattr(self.item, "lc") and hasattr(self.item.lc, "filename"): | |
| 260 return str(self.item.lc.filename) | |
| 261 else: | |
| 262 return None | |
| 263 | |
| 264 def start(self) -> Optional[Tuple[int, int]]: | |
| 265 if self.file() is None: | |
| 266 return None | |
| 267 elif ( | |
| 268 self.key is None | |
| 269 or self.item.lc.data is None | |
| 270 or self.key not in self.item.lc.data | |
| 271 ): | |
| 272 return ((self.item.lc.line or 0) + 1, (self.item.lc.col or 0) + 1) | |
| 273 else: | |
| 274 return ( | |
| 275 (self.item.lc.data[self.key][0] or 0) + 1, | |
| 276 (self.item.lc.data[self.key][1] or 0) + 1, | |
| 277 ) | |
| 278 | |
| 279 def end(self) -> Optional[Tuple[int, int]]: | |
| 280 return None | |
| 281 | |
| 282 def makeLead(self) -> str: | |
| 283 if self.file(): | |
| 284 lcol = self.start() | |
| 285 line, col = lcol if lcol else ("", "") | |
| 286 return f"{self.file()}:{line}:{col}:" | |
| 287 else: | |
| 288 return "" | |
| 289 | |
| 290 def makeError(self, msg: str) -> Any: | |
| 291 if not isinstance(self.item, ruamel.yaml.comments.CommentedBase): | |
| 292 return self.raise_type(msg) | |
| 293 errs = [] | |
| 294 lead = self.makeLead() | |
| 295 for m in msg.splitlines(): | |
| 296 if bool(lineno_re.match(m)): | |
| 297 errs.append(m) | |
| 298 else: | |
| 299 errs.append(f"{lead} {m}") | |
| 300 return self.raise_type("\n".join(errs)) |
