Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions Lib/test/test_external_inspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,89 @@ async def main():
finally:
_cleanup_sockets(client_socket, server_socket)

@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_async_deep_awaited_by_chain_is_bounded(self):
# A very deep awaited_by chain in the target (which a corrupted or
# concurrently-mutated remote process can also present as a cycle) must
# not drive unbounded C recursion in the debugger. get_async_stack_trace
# should raise instead of overflowing the stack.
depth = 2000
port = find_unused_port()
script = textwrap.dedent(
f"""\
import asyncio
import socket
import time

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))

async def main():
started = asyncio.Event()

async def leaf():
while not started.is_set():
await asyncio.sleep(0)
# The whole awaited_by chain is built and this is now the
# running task at the bottom of it. Signal here, then
# busy-spin, so the observer inspects while the chain is
# fully present and rooted at a running task.
sock.sendall(b"ready")
end = time.time() + 10_000
while time.time() < end:
pass

leaf_t = asyncio.ensure_future(leaf())

async def waiter(child):
await child

cur = leaf_t
tasks = [cur]
for _ in range({depth}):
cur = asyncio.ensure_future(waiter(cur))
tasks.append(cur)

for _ in range(5):
await asyncio.sleep(0)

started.set()
try:
await leaf_t
finally:
for t in tasks:
t.cancel()

asyncio.run(main())
"""
)

with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)

server_socket = _create_server_socket(port)
script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
with _managed_subprocess([sys.executable, script_name]) as p:
client_socket, _ = server_socket.accept()
server_socket.close()
server_socket = None

_wait_for_signal(client_socket, b"ready")

unwinder = RemoteUnwinder(p.pid)
with self.assertRaises(RuntimeError) as cm:
unwinder.get_async_stack_trace()
self.assertIn("too deep or cyclic", str(cm.exception))
finally:
_cleanup_sockets(client_socket, server_socket)

@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
:meth:`!RemoteUnwinder.get_async_stack_trace` no longer crashes when the
target process presents a deeply nested or concurrently-mutated ``awaited_by``
graph; a :exc:`RuntimeError` is now raised instead.
1 change: 1 addition & 0 deletions Modules/_remote_debugging/_remote_debugging.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ typedef enum _WIN32_THREADSTATE {
#define MAX_STACK_CHUNK_SIZE (16 * 1024 * 1024) /* 16 MB max for stack chunks */
#define MAX_LONG_DIGITS 64 /* Allows values up to ~2^1920 */
#define MAX_SET_TABLE_SIZE (1 << 20) /* 1 million entries max for set iteration */
#define MAX_TASK_AWAITED_BY_DEPTH 1000 /* Bound the awaited_by graph walk so a cycle terminates */

#ifndef MAX
#define MAX(a, b) ((a) > (b) ? (a) : (b))
Expand Down
120 changes: 102 additions & 18 deletions Modules/_remote_debugging/asyncio.c
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,24 @@ parse_task(
* TASK AWAITED_BY PROCESSING
* ============================================================================ */

// Forward declaration for mutual recursion
static int process_waiter_task(RemoteUnwinderObject *unwinder, uintptr_t key_addr, void *context);
// The awaited_by graph is walked with an explicit, heap-allocated work-stack
// rather than C recursion. A deeply nested -- or, in a corrupted or
// concurrently-mutated remote process, cyclic -- chain would otherwise drive
// unbounded recursion and overflow the debugger's own C stack: each level holds
// a SIZEOF_TASK_OBJ buffer (see process_task_awaited_by), so even a few hundred
// levels can exhaust a 1 MB stack. MAX_TASK_AWAITED_BY_DEPTH bounds the walk so
// a cycle terminates.
typedef struct {
uintptr_t addr;
int depth;
} awaited_by_entry_t;

typedef struct {
awaited_by_entry_t *items;
Py_ssize_t size;
Py_ssize_t capacity;
int current_depth;
} awaited_by_stack_t;

// Processor function for parsing tasks in sets
static int
Expand Down Expand Up @@ -658,30 +674,89 @@ process_single_task_node(
return -1;
}

int
process_task_and_waiters(
static int
awaited_by_stack_push(
RemoteUnwinderObject *unwinder,
uintptr_t task_addr,
PyObject *result
awaited_by_stack_t *stack,
uintptr_t addr,
int depth
) {
// First, add this task to the result
if (process_single_task_node(unwinder, task_addr, NULL, result) < 0) {
return -1;
if (stack->size >= stack->capacity) {
Py_ssize_t new_capacity = stack->capacity ? stack->capacity * 2 : 16;
awaited_by_entry_t *new_items = PyMem_Realloc(
stack->items, (size_t)new_capacity * sizeof(awaited_by_entry_t));
if (new_items == NULL) {
PyErr_NoMemory();
set_exception_cause(unwinder, PyExc_MemoryError,
"Failed to grow awaited_by work-stack");
return -1;
}
stack->items = new_items;
stack->capacity = new_capacity;
}

// Now find all tasks that are waiting for this task and process them
return process_task_awaited_by(unwinder, task_addr, process_waiter_task, result);
stack->items[stack->size].addr = addr;
stack->items[stack->size].depth = depth;
stack->size++;
return 0;
}

// Processor function for task waiters
// set_entry_processor_func: enqueue a task waiting on the one currently being
// expanded, one level deeper. The depth bound makes a cyclic or corrupted
// awaited_by graph terminate instead of looping forever.
static int
process_waiter_task(
push_awaited_by_waiter(
RemoteUnwinderObject *unwinder,
uintptr_t key_addr,
void *context
) {
PyObject *result = (PyObject *)context;
return process_task_and_waiters(unwinder, key_addr, result);
awaited_by_stack_t *stack = (awaited_by_stack_t *)context;
if (stack->current_depth >= MAX_TASK_AWAITED_BY_DEPTH) {
PyErr_SetString(PyExc_RuntimeError,
"Task awaited_by chain is too deep or cyclic "
"(corrupted remote memory)");
set_exception_cause(unwinder, PyExc_RuntimeError,
"Task awaited_by depth limit exceeded");
return -1;
}
return awaited_by_stack_push(unwinder, stack, key_addr,
stack->current_depth + 1);
}

// Drain the work-stack: append each task node to result, then enqueue the
// tasks waiting on it. Depth-first, with no C recursion over the graph.
static int
drain_awaited_by_stack(
RemoteUnwinderObject *unwinder,
PyObject *result,
awaited_by_stack_t *stack
) {
while (stack->size > 0) {
awaited_by_entry_t entry = stack->items[--stack->size];
if (process_single_task_node(unwinder, entry.addr, NULL, result) < 0) {
return -1;
}
stack->current_depth = entry.depth;
if (process_task_awaited_by(unwinder, entry.addr,
push_awaited_by_waiter, stack) < 0) {
return -1;
}
}
return 0;
}

int
process_task_and_waiters(
RemoteUnwinderObject *unwinder,
uintptr_t task_addr,
PyObject *result
) {
awaited_by_stack_t stack = {0};
int result_code = -1;
if (awaited_by_stack_push(unwinder, &stack, task_addr, 0) == 0) {
result_code = drain_awaited_by_stack(unwinder, result, &stack);
}
PyMem_Free(stack.items);
return result_code;
}

/* ============================================================================
Expand Down Expand Up @@ -977,8 +1052,17 @@ process_running_task_chain(
return -1;
}

// Now find all tasks that are waiting for this task and process them
if (process_task_awaited_by(unwinder, running_task_addr, process_waiter_task, result) < 0) {
// Now find all tasks that are waiting for this task and process them with
// the same iterative, heap-stacked walk as process_task_and_waiters (the
// running task itself is already recorded via the frame chain above).
awaited_by_stack_t stack = {0};
int waiters_code = process_task_awaited_by(unwinder, running_task_addr,
push_awaited_by_waiter, &stack);
if (waiters_code == 0) {
waiters_code = drain_awaited_by_stack(unwinder, result, &stack);
}
PyMem_Free(stack.items);
if (waiters_code < 0) {
return -1;
}

Expand Down
Loading