Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
681 changes: 681 additions & 0 deletions Lib/test/audit-tests.py

Large diffs are not rendered by default.

34 changes: 33 additions & 1 deletion Lib/test/test_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def run_test_in_subprocess(self, *args):
with subprocess.Popen(
[sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
encoding="utf-8",
errors="backslashreplace",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as p:
Expand Down Expand Up @@ -57,6 +58,7 @@ def test_block_add_hook(self):
def test_block_add_hook_baseexception(self):
self.do_test("test_block_add_hook_baseexception")

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_marshal(self):
import_helper.import_module("marshal")

Expand All @@ -67,18 +69,33 @@ def test_pickle(self):

self.do_test("test_pickle")

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_monkeypatch(self):
self.do_test("test_monkeypatch")

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_open(self):
self.do_test("test_open", os_helper.TESTFN)

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_cantrace(self):
self.do_test("test_cantrace")

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_mmap(self):
self.do_test("test_mmap")

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_ctypes_call_function(self):
import_helper.import_module("ctypes")
self.do_test("test_ctypes_call_function")

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_posixsubprocess(self):
import_helper.import_module("_posixsubprocess")
self.do_test("test_posixsubprocess")

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_excepthook(self):
returncode, events, stderr = self.run_python("test_excepthook")
if not returncode:
Expand All @@ -100,6 +117,7 @@ def test_unraisablehook(self):
"RuntimeError('nonfatal-error') Exception ignored for audit hook test",
)

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_winreg(self):
import_helper.import_module("winreg")
returncode, events, stderr = self.run_python("test_winreg")
Expand All @@ -125,8 +143,9 @@ def test_socket(self):
self.assertEqual(events[0][0], "socket.gethostname")
self.assertEqual(events[1][0], "socket.__new__")
self.assertEqual(events[2][0], "socket.bind")
self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)"))
self.assertEndsWith(events[2][2], "('127.0.0.1', 8080)")

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_gc(self):
returncode, events, stderr = self.run_python("test_gc")
if returncode:
Expand Down Expand Up @@ -156,6 +175,7 @@ def test_http(self):
self.assertIn('HTTP', events[1][2])


@unittest.expectedFailure # TODO: RUSTPYTHON
def test_sqlite3(self):
sqlite3 = import_helper.import_module("sqlite3")
returncode, events, stderr = self.run_python("test_sqlite3")
Expand Down Expand Up @@ -200,6 +220,7 @@ def test_sys_getframemodulename(self):
self.assertEqual(actual, expected)


@unittest.expectedFailure # TODO: RUSTPYTHON
def test_threading(self):
returncode, events, stderr = self.run_python("test_threading")
if returncode:
Expand All @@ -218,6 +239,7 @@ def test_threading(self):
self.assertEqual(actual, expected)


@unittest.expectedFailure # TODO: RUSTPYTHON
def test_wmi_exec_query(self):
import_helper.import_module("_wmi")
returncode, events, stderr = self.run_python("test_wmi_exec_query")
Expand All @@ -231,6 +253,7 @@ def test_wmi_exec_query(self):

self.assertEqual(actual, expected)

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_syslog(self):
syslog = import_helper.import_module("syslog")

Expand Down Expand Up @@ -292,6 +315,7 @@ def test_sys_monitoring_register_callback(self):

self.assertEqual(actual, expected)

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_winapi_createnamedpipe(self):
winapi = import_helper.import_module("_winapi")

Expand All @@ -313,6 +337,14 @@ def test_assert_unicode(self):
if returncode:
self.fail(stderr)

@support.support_remote_exec_only
@support.cpython_only
def test_sys_remote_exec(self):
returncode, events, stderr = self.run_python("test_sys_remote_exec")
self.assertTrue(any(["sys.remote_exec" in event for event in events]))
self.assertTrue(any(["cpython.remote_debugger_script" in event for event in events]))
if returncode:
self.fail(stderr)

if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions Lib/test/test_bdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ def test_until_in_caller_frame(self):
with TracerRun(self) as tracer:
tracer.runcall(tfunc_main)

@unittest.skipIf(hasattr(__import__("sys"), "addaudithook"), "TODO: RUSTPYTHON; Currently no conditional tracing toggle")
@patch_list(sys.meta_path)
def test_skip(self):
# Check that tracing is skipped over the import statement in
Expand Down
151 changes: 137 additions & 14 deletions Lib/test/test_sys_setprofile.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ def callback(self, frame, event, arg):
if (event == "call"
or event == "return"
or event == "exception"):
self.add_event(event, frame)
self.add_event(event, frame, arg)

def add_event(self, event, frame=None):
def add_event(self, event, frame=None, arg=None):
"""Add an event to the log."""
if frame is None:
frame = sys._getframe(1)
Expand All @@ -43,7 +43,7 @@ def add_event(self, event, frame=None):
frameno = len(self.frames)
self.frames.append(frame)

self.events.append((frameno, event, ident(frame)))
self.events.append((frameno, event, ident(frame), arg))

def get_events(self):
"""Remove calls to add_event()."""
Expand Down Expand Up @@ -89,11 +89,16 @@ def trace_pass(self, frame):


class TestCaseBase(unittest.TestCase):
def check_events(self, callable, expected):
def check_events(self, callable, expected, check_args=False):
events = capture_events(callable, self.new_watcher())
if events != expected:
self.fail("Expected events:\n%s\nReceived events:\n%s"
% (pprint.pformat(expected), pprint.pformat(events)))
if check_args:
if events != expected:
self.fail("Expected events:\n%s\nReceived events:\n%s"
% (pprint.pformat(expected), pprint.pformat(events)))
else:
if [(frameno, event, ident) for frameno, event, ident, arg in events] != expected:
self.fail("Expected events:\n%s\nReceived events:\n%s"
% (pprint.pformat(expected), pprint.pformat(events)))


class ProfileHookTestCase(TestCaseBase):
Expand All @@ -119,7 +124,7 @@ def f(p):
def test_caught_exception(self):
def f(p):
try: 1/0
except: pass
except ZeroDivisionError: pass
f_ident = ident(f)
self.check_events(f, [(1, 'call', f_ident),
(1, 'return', f_ident),
Expand All @@ -128,7 +133,7 @@ def f(p):
def test_caught_nested_exception(self):
def f(p):
try: 1/0
except: pass
except ZeroDivisionError: pass
f_ident = ident(f)
self.check_events(f, [(1, 'call', f_ident),
(1, 'return', f_ident),
Expand All @@ -151,9 +156,9 @@ def f(p):
def g(p):
try:
f(p)
except:
except ZeroDivisionError:
try: f(p)
except: pass
except ZeroDivisionError: pass
f_ident = ident(f)
g_ident = ident(g)
self.check_events(g, [(1, 'call', g_ident),
Expand All @@ -164,6 +169,7 @@ def g(p):
(1, 'return', g_ident),
])

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_exception_propagation(self):
def f(p):
1/0
Expand All @@ -182,7 +188,7 @@ def g(p):
def test_raise_twice(self):
def f(p):
try: 1/0
except: 1/0
except ZeroDivisionError: 1/0
f_ident = ident(f)
self.check_events(f, [(1, 'call', f_ident),
(1, 'return', f_ident),
Expand All @@ -191,7 +197,7 @@ def f(p):
def test_raise_reraise(self):
def f(p):
try: 1/0
except: raise
except ZeroDivisionError: raise
f_ident = ident(f)
self.check_events(f, [(1, 'call', f_ident),
(1, 'return', f_ident),
Expand Down Expand Up @@ -255,6 +261,24 @@ def g(p):
(1, 'return', g_ident),
])

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_unfinished_generator(self):
def f():
for i in range(2):
yield i
def g(p):
next(f())

f_ident = ident(f)
g_ident = ident(g)
self.check_events(g, [(1, 'call', g_ident, None),
(2, 'call', f_ident, None),
(2, 'return', f_ident, 0),
(2, 'call', f_ident, None),
(2, 'return', f_ident, None),
(1, 'return', g_ident, None),
], check_args=True)

def test_stop_iteration(self):
def f():
for i in range(2):
Expand Down Expand Up @@ -300,7 +324,7 @@ def f(p):
def test_caught_exception(self):
def f(p):
try: 1/0
except: pass
except ZeroDivisionError: pass
f_ident = ident(f)
self.check_events(f, [(1, 'call', f_ident),
(1, 'return', f_ident),
Expand Down Expand Up @@ -415,5 +439,104 @@ def show_events(callable):
pprint.pprint(capture_events(callable))


class TestEdgeCases(unittest.TestCase):

def setUp(self):
self.addCleanup(sys.setprofile, sys.getprofile())
sys.setprofile(None)

def test_reentrancy(self):
def foo(*args):
...

def bar(*args):
...

class A:
def __call__(self, *args):
pass

def __del__(self):
sys.setprofile(bar)

sys.setprofile(A())
sys.setprofile(foo)
self.assertEqual(sys.getprofile(), bar)

def test_same_object(self):
def foo(*args):
...

sys.setprofile(foo)
del foo
sys.setprofile(sys.getprofile())

def test_profile_after_trace_opcodes(self):
def f():
...

sys._getframe().f_trace_opcodes = True
prev_trace = sys.gettrace()
sys.settrace(lambda *args: None)
f()
sys.settrace(prev_trace)
sys.setprofile(lambda *args: None)
f()

@unittest.expectedFailure # TODO: RUSTPYTHON
def test_method_with_c_function(self):
# gh-122029
# When we have a PyMethodObject whose im_func is a C function, we
# should record both the call and the return. f = classmethod(repr)
# is just a way to create a PyMethodObject with a C function.
class A:
f = classmethod(repr)
events = []
sys.setprofile(lambda frame, event, args: events.append(event))
A().f()
sys.setprofile(None)
# The last c_call is the call to sys.setprofile
self.assertEqual(events, ['c_call', 'c_return', 'c_call'])

class B:
f = classmethod(max)
events = []
sys.setprofile(lambda frame, event, args: events.append(event))
# Not important, we only want to trigger INSTRUMENTED_CALL_KW
B().f(1, key=lambda x: 0)
sys.setprofile(None)
# The last c_call is the call to sys.setprofile
self.assertEqual(
events,
['c_call',
'call', 'return',
'call', 'return',
'c_return',
'c_call'
]
)

# Test CALL_FUNCTION_EX
events = []
sys.setprofile(lambda frame, event, args: events.append(event))
# Not important, we only want to trigger INSTRUMENTED_CALL_KW
args = (1,)
m = B().f
m(*args, key=lambda x: 0)
sys.setprofile(None)
# The last c_call is the call to sys.setprofile
# INSTRUMENTED_CALL_FUNCTION_EX has different behavior than the other
# instrumented call bytecodes, it does not unpack the callable before
# calling it. This is probably not ideal because it's not consistent,
# but at least we get a consistent call stack (no unmatched c_call).
self.assertEqual(
events,
['call', 'return',
'call', 'return',
'c_call'
]
)


if __name__ == "__main__":
unittest.main()
Loading
Loading