comparison env/lib/python3.7/site-packages/galaxy/util/heartbeat.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 import os
2 import sys
3 import threading
4 import time
5 import traceback
6
7 from six import iteritems
8
9
10 def get_current_thread_object_dict():
11 """
12 Get a dictionary of all 'Thread' objects created via the threading
13 module keyed by thread_id. Note that not all interpreter threads
14 have a thread objects, only the main thread and any created via the
15 'threading' module. Threads created via the low level 'thread' module
16 will not be in the returned dictionary.
17
18 HACK: This mucks with the internals of the threading module since that
19 module does not expose any way to match 'Thread' objects with
20 intepreter thread identifiers (though it should).
21 """
22 rval = dict()
23 # Acquire the lock and then union the contents of 'active' and 'limbo'
24 # threads into the return value.
25 threading._active_limbo_lock.acquire()
26 rval.update(threading._active)
27 rval.update(threading._limbo)
28 threading._active_limbo_lock.release()
29 return rval
30
31
32 class Heartbeat(threading.Thread):
33 """
34 Thread that periodically dumps the state of all threads to a file
35 """
36
37 def __init__(self, config, name="Heartbeat Thread", period=20, fname="heartbeat.log"):
38 threading.Thread.__init__(self, name=name)
39 self.config = config
40 self.should_stop = False
41 self.period = period
42 self.fname = fname
43 self.file = None
44 self.fname_nonsleeping = None
45 self.file_nonsleeping = None
46 self.pid = None
47 self.nonsleeping_heartbeats = {}
48 # Event to wait on when sleeping, allows us to interrupt for shutdown
49 self.wait_event = threading.Event()
50
51 def run(self):
52 self.pid = os.getpid()
53 self.fname = self.fname.format(
54 server_name=self.config.server_name,
55 pid=self.pid
56 )
57 fname, ext = os.path.splitext(self.fname)
58 self.fname_nonsleeping = fname + '.nonsleeping' + ext
59 wait = self.period
60 if self.period <= 0:
61 wait = 60
62 while not self.should_stop:
63 if self.period > 0:
64 self.dump()
65 self.wait_event.wait(wait)
66
67 def open_logs(self):
68 if self.file is None or self.file.closed:
69 self.file = open(self.fname, "a")
70 self.file_nonsleeping = open(self.fname_nonsleeping, "a")
71 self.file.write("Heartbeat for pid %d thread started at %s\n\n" % (self.pid, time.asctime()))
72 self.file_nonsleeping.write("Non-Sleeping-threads for pid %d thread started at %s\n\n" % (self.pid, time.asctime()))
73
74 def close_logs(self):
75 if self.file is not None and not self.file.closed:
76 self.file.write("Heartbeat for pid %d thread stopped at %s\n\n" % (self.pid, time.asctime()))
77 self.file_nonsleeping.write("Non-Sleeping-threads for pid %d thread stopped at %s\n\n" % (self.pid, time.asctime()))
78 self.file.close()
79 self.file_nonsleeping.close()
80
81 def dump(self):
82 self.open_logs()
83 try:
84 # Print separator with timestamp
85 self.file.write("Traceback dump for all threads at %s:\n\n" % time.asctime())
86 # Print the thread states
87 threads = get_current_thread_object_dict()
88 for thread_id, frame in iteritems(sys._current_frames()):
89 if thread_id in threads:
90 object = repr(threads[thread_id])
91 else:
92 object = "<No Thread object>"
93 self.file.write("Thread %s, %s:\n\n" % (thread_id, object))
94 traceback.print_stack(frame, file=self.file)
95 self.file.write("\n")
96 self.file.write("End dump\n\n")
97 self.file.flush()
98 self.print_nonsleeping(threads)
99 except Exception:
100 self.file.write("Caught exception attempting to dump thread states:")
101 traceback.print_exc(None, self.file)
102 self.file.write("\n")
103
104 def shutdown(self):
105 self.should_stop = True
106 self.wait_event.set()
107 self.close_logs()
108 self.join()
109
110 def thread_is_sleeping(self, last_stack_frame):
111 """
112 Returns True if the given stack-frame represents a known
113 sleeper function (at least in python 2.5)
114 """
115 _filename = last_stack_frame[0]
116 # _line = last_stack_frame[1]
117 _funcname = last_stack_frame[2]
118 _text = last_stack_frame[3]
119 # Ugly hack to tell if a thread is supposedly sleeping or not
120 # These are the most common sleeping functions I've found.
121 # Is there a better way? (python interpreter internals?)
122 # Tested only with python 2.5
123 if _funcname == "wait" and _text == "waiter.acquire()":
124 return True
125 if _funcname == "wait" and _text == "_sleep(delay)":
126 return True
127 if _funcname == "accept" and _text[-14:] == "_sock.accept()":
128 return True
129 if _funcname in ("monitor", "__monitor", "app_loop", "check") \
130 and _text.startswith("time.sleep(") and _text.endswith(")"):
131 return True
132 if _funcname == "drain_events" and _text == "sleep(polling_interval)":
133 return True
134 # Ugly hack: always skip the heartbeat thread
135 # TODO: get the current thread-id in python
136 # skip heartbeat thread by thread-id, not by filename
137 if _filename.find("/lib/galaxy/util/heartbeat.py") != -1:
138 return True
139 # By default, assume the thread is not sleeping
140 return False
141
142 def get_interesting_stack_frame(self, stack_frames):
143 """
144 Scans a given backtrace stack frames, returns a single
145 quadraple of [filename, line, function-name, text] of
146 the single, deepest, most interesting frame.
147
148 Interesting being::
149
150 inside the galaxy source code ("/lib/galaxy"),
151 prefreably not an egg.
152 """
153 for _filename, _line, _funcname, _text in reversed(stack_frames):
154 idx = _filename.find("/lib/galaxy/")
155 if idx != -1:
156 relative_filename = _filename[idx:]
157 return (relative_filename, _line, _funcname, _text)
158 # no "/lib/galaxy" code found, return the innermost frame
159 return stack_frames[-1]
160
161 def print_nonsleeping(self, threads_object_dict):
162 self.file_nonsleeping.write("Non-Sleeping threads at %s:\n\n" % time.asctime())
163 all_threads_are_sleeping = True
164 threads = get_current_thread_object_dict()
165 for thread_id, frame in iteritems(sys._current_frames()):
166 if thread_id in threads:
167 object = repr(threads[thread_id])
168 else:
169 object = "<No Thread object>"
170 tb = traceback.extract_stack(frame)
171 if self.thread_is_sleeping(tb[-1]):
172 if thread_id in self.nonsleeping_heartbeats:
173 del self.nonsleeping_heartbeats[thread_id]
174 continue
175
176 # Count non-sleeping thread heartbeats
177 if thread_id in self.nonsleeping_heartbeats:
178 self.nonsleeping_heartbeats[thread_id] += 1
179 else:
180 self.nonsleeping_heartbeats[thread_id] = 1
181
182 good_frame = self.get_interesting_stack_frame(tb)
183 self.file_nonsleeping.write("Thread %s\t%s\tnon-sleeping for %d heartbeat(s)\n File %s:%d\n Function \"%s\"\n %s\n" %
184 (thread_id, object, self.nonsleeping_heartbeats[thread_id], good_frame[0], good_frame[1], good_frame[2], good_frame[3]))
185 all_threads_are_sleeping = False
186
187 if all_threads_are_sleeping:
188 self.file_nonsleeping.write("All threads are sleeping.\n")
189 self.file_nonsleeping.write("\n")
190 self.file_nonsleeping.flush()
191
192 def dump_signal_handler(self, signum, frame):
193 self.dump()