comparison env/lib/python3.7/site-packages/boltons/excutils.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 # -*- coding: utf-8 -*-
2
3 import sys
4 import traceback
5 import linecache
6 from collections import namedtuple
7
8 # TODO: last arg or first arg? (last arg makes it harder to *args
9 # into, but makes it more readable in the default exception
10 # __repr__ output)
11 # TODO: Multiexception wrapper
12
13
14 __all__ = ['ExceptionCauseMixin']
15
16
17 class ExceptionCauseMixin(Exception):
18 """
19 A mixin class for wrapping an exception in another exception, or
20 otherwise indicating an exception was caused by another exception.
21
22 This is most useful in concurrent or failure-intolerant scenarios,
23 where just because one operation failed, doesn't mean the remainder
24 should be aborted, or that it's the appropriate time to raise
25 exceptions.
26
27 This is still a work in progress, but an example use case at the
28 bottom of this module.
29
30 NOTE: when inheriting, you will probably want to put the
31 ExceptionCauseMixin first. Builtin exceptions are not good about
32 calling super()
33 """
34
35 cause = None
36
37 def __new__(cls, *args, **kw):
38 cause = None
39 if args and isinstance(args[0], Exception):
40 cause, args = args[0], args[1:]
41 ret = super(ExceptionCauseMixin, cls).__new__(cls, *args, **kw)
42 ret.cause = cause
43 if cause is None:
44 return ret
45 root_cause = getattr(cause, 'root_cause', None)
46 if root_cause is None:
47 ret.root_cause = cause
48 else:
49 ret.root_cause = root_cause
50
51 full_trace = getattr(cause, 'full_trace', None)
52 if full_trace is not None:
53 ret.full_trace = list(full_trace)
54 ret._tb = list(cause._tb)
55 ret._stack = list(cause._stack)
56 return ret
57
58 try:
59 exc_type, exc_value, exc_tb = sys.exc_info()
60 if exc_type is None and exc_value is None:
61 return ret
62 if cause is exc_value or root_cause is exc_value:
63 # handles when cause is the current exception or when
64 # there are multiple wraps while handling the original
65 # exception, but a cause was never provided
66 ret._tb = _extract_from_tb(exc_tb)
67 ret._stack = _extract_from_frame(exc_tb.tb_frame)
68 ret.full_trace = ret._stack[:-1] + ret._tb
69 finally:
70 del exc_tb
71 return ret
72
73 def get_str(self):
74 """
75 Get formatted the formatted traceback and exception
76 message. This function exists separately from __str__()
77 because __str__() is somewhat specialized for the built-in
78 traceback module's particular usage.
79 """
80 ret = []
81 trace_str = self._get_trace_str()
82 if trace_str:
83 ret.extend(['Traceback (most recent call last):\n', trace_str])
84 ret.append(self._get_exc_str())
85 return ''.join(ret)
86
87 def _get_message(self):
88 args = getattr(self, 'args', [])
89 if self.cause:
90 args = args[1:]
91 if args and args[0]:
92 return args[0]
93 return ''
94
95 def _get_trace_str(self):
96 if not self.cause:
97 return super(ExceptionCauseMixin, self).__repr__()
98 if self.full_trace:
99 return ''.join(traceback.format_list(self.full_trace))
100 return ''
101
102 def _get_exc_str(self, incl_name=True):
103 cause_str = _format_exc(self.root_cause)
104 message = self._get_message()
105 ret = []
106 if incl_name:
107 ret = [self.__class__.__name__, ': ']
108 if message:
109 ret.extend([message, ' (caused by ', cause_str, ')'])
110 else:
111 ret.extend([' caused by ', cause_str])
112 return ''.join(ret)
113
114 def __str__(self):
115 if not self.cause:
116 return super(ExceptionCauseMixin, self).__str__()
117 trace_str = self._get_trace_str()
118 ret = []
119 if trace_str:
120 message = self._get_message()
121 if message:
122 ret.extend([message, ' --- '])
123 ret.extend(['Wrapped traceback (most recent call last):\n',
124 trace_str,
125 self._get_exc_str(incl_name=True)])
126 return ''.join(ret)
127 else:
128 return self._get_exc_str(incl_name=False)
129
130
131 def _format_exc(exc, message=None):
132 if message is None:
133 message = exc
134 exc_str = traceback._format_final_exc_line(exc.__class__.__name__, message)
135 return exc_str.rstrip()
136
137
138 _BaseTBItem = namedtuple('_BaseTBItem', 'filename, lineno, name, line')
139
140
141 class _TBItem(_BaseTBItem):
142 def __repr__(self):
143 ret = super(_TBItem, self).__repr__()
144 ret += ' <%r>' % self.frame_id
145 return ret
146
147
148 class _DeferredLine(object):
149 def __init__(self, filename, lineno, module_globals=None):
150 self.filename = filename
151 self.lineno = lineno
152 module_globals = module_globals or {}
153 self.module_globals = dict([(k, v) for k, v in module_globals.items()
154 if k in ('__name__', '__loader__')])
155
156 def __eq__(self, other):
157 return (self.lineno, self.filename) == (other.lineno, other.filename)
158
159 def __ne__(self, other):
160 return (self.lineno, self.filename) != (other.lineno, other.filename)
161
162 def __str__(self):
163 if hasattr(self, '_line'):
164 return self._line
165 linecache.checkcache(self.filename)
166 line = linecache.getline(self.filename,
167 self.lineno,
168 self.module_globals)
169 if line:
170 line = line.strip()
171 else:
172 line = None
173 self._line = line
174 return line
175
176 def __repr__(self):
177 return repr(str(self))
178
179 def __len__(self):
180 return len(str(self))
181
182 def strip(self):
183 return str(self).strip()
184
185
186 def _extract_from_frame(f=None, limit=None):
187 ret = []
188 if f is None:
189 f = sys._getframe(1) # cross-impl yadayada
190 if limit is None:
191 limit = getattr(sys, 'tracebacklimit', 1000)
192 n = 0
193 while f is not None and n < limit:
194 filename = f.f_code.co_filename
195 lineno = f.f_lineno
196 name = f.f_code.co_name
197 line = _DeferredLine(filename, lineno, f.f_globals)
198 item = _TBItem(filename, lineno, name, line)
199 item.frame_id = id(f)
200 ret.append(item)
201 f = f.f_back
202 n += 1
203 ret.reverse()
204 return ret
205
206
207 def _extract_from_tb(tb, limit=None):
208 ret = []
209 if limit is None:
210 limit = getattr(sys, 'tracebacklimit', 1000)
211 n = 0
212 while tb is not None and n < limit:
213 filename = tb.tb_frame.f_code.co_filename
214 lineno = tb.tb_lineno
215 name = tb.tb_frame.f_code.co_name
216 line = _DeferredLine(filename, lineno, tb.tb_frame.f_globals)
217 item = _TBItem(filename, lineno, name, line)
218 item.frame_id = id(tb.tb_frame)
219 ret.append(item)
220 tb = tb.tb_next
221 n += 1
222 return ret
223
224
225 # An Example/Prototest:
226
227
228 class MathError(ExceptionCauseMixin, ValueError):
229 pass
230
231
232 def whoops_math():
233 return 1/0
234
235
236 def math_lol(n=0):
237 if n < 3:
238 return math_lol(n=n+1)
239 try:
240 return whoops_math()
241 except ZeroDivisionError as zde:
242 exc = MathError(zde, 'ya done messed up')
243 raise exc
244
245 def main():
246 try:
247 math_lol()
248 except ValueError as me:
249 exc = MathError(me, 'hi')
250 raise exc
251
252
253 if __name__ == '__main__':
254 try:
255 main()
256 except Exception:
257 import pdb;pdb.post_mortem()
258 raise