forked from ionelmc/python-hunter
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_tracer.pyx
More file actions
121 lines (102 loc) · 4.25 KB
/
Copy path_tracer.pyx
File metadata and controls
121 lines (102 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# cython: linetrace=True, language_level=3
import threading
from cpython cimport pystate
from cpython.ref cimport Py_INCREF
from cpython.ref cimport Py_CLEAR
from cpython.pystate cimport PyThreadState_Get
from ._event cimport Event
from ._predicates cimport When
from ._predicates cimport fast_When_call
from ._predicates cimport And
from ._predicates cimport fast_And_call
from ._predicates cimport Or
from ._predicates cimport fast_Or_call
from ._predicates cimport Not
from ._predicates cimport fast_Not_call
cdef tuple kind_names = ("call", "exception", "line", "return", "c_call", "c_exception", "c_return")
cdef int trace_func(Tracer self, FrameType frame, int kind, PyObject *arg) except -1:
if frame.f_trace is not <PyObject*> self:
Py_CLEAR(frame.f_trace)
Py_INCREF(self)
frame.f_trace = <PyObject*> self
handler = self.handler
if type(handler) is When:
fast_When_call(<When>handler, Event(frame, kind_names[kind], None if arg is NULL else <object>arg, self))
elif type(handler) is And:
fast_And_call(<And>handler, Event(frame, kind_names[kind], None if arg is NULL else <object>arg, self))
elif type(handler) is Or:
fast_Or_call(<Or>handler, Event(frame, kind_names[kind], None if arg is NULL else <object>arg, self))
elif type(handler) is Not:
fast_Not_call(<Not>handler, Event(frame, kind_names[kind], None if arg is NULL else <object>arg, self))
elif handler is not None:
handler(Event(frame, kind_names[kind], None if arg is NULL else <object>arg, self))
if kind == 0:
self.depth += 1
self.calls += 1
elif kind == 3:
self.depth -= 1
cdef class Tracer:
"""
Tracer object.
"""
def __cinit__(self, threading_support=None):
self.handler = None
self.previous = None
self._previousfunc = NULL
self._threading_previous = None
self.threading_support = threading_support
self.depth = 1
self.calls = 0
def __dealloc__(self):
cdef PyThreadState *state = PyThreadState_Get()
if state.c_traceobj is <PyObject *>self:
self.stop()
def __repr__(self):
return '<hunter._tracer.Tracer at 0x%x: threading_support=%s, %s%s%s%s>' % (
id(self),
self.threading_support,
'<stopped>' if self.handler is None else 'handler=',
'' if self.handler is None else repr(self.handler),
'' if self.previous is None else ', previous=',
'' if self.previous is None else repr(self.previous),
)
def __call__(self, frame, kind, arg):
"""
The settrace function.
.. note::
This always returns self (drills down) - as opposed to only drilling down when predicate(event) is True
because it might match further inside.
"""
trace_func(self, frame, kind_names.index(kind), <PyObject *> arg)
if kind == "call":
PyEval_SetTrace(<pystate.Py_tracefunc> trace_func, <PyObject *> self)
return self
def trace(self, predicate):
cdef PyThreadState *state = PyThreadState_Get()
self.handler = predicate
if self.threading_support is None or self.threading_support:
self._threading_previous = getattr(threading, '_trace_hook', None)
threading.settrace(self)
if state.c_traceobj is NULL:
self.previous = None
self._previousfunc = NULL
else:
self.previous = <object>(state.c_traceobj)
self._previousfunc = state.c_tracefunc
PyEval_SetTrace(<pystate.Py_tracefunc> trace_func, <PyObject *> self)
return self
def stop(self):
if self.handler is not None:
if self.previous is None:
PyEval_SetTrace(NULL, NULL)
else:
PyEval_SetTrace(self._previousfunc, <PyObject *> self.previous)
self.handler = self.previous = None
self._previousfunc = NULL
if self.threading_support is None or self.threading_support:
threading.settrace(self._threading_previous)
self._threading_previous = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()