Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix a bug in the binary collector in Tachyon that caused unbounded memory
growth while profiling a thread that stayed asleep. Patch by Maurycy
Pawłowski-Wieroński.
16 changes: 3 additions & 13 deletions Modules/_remote_debugging/binary_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ static_assert(FILE_FOOTER_SIZE == 32,
/* Maximum stack depth we'll buffer for delta encoding */
#define MAX_STACK_DEPTH 256

/* Initial capacity for RLE pending buffer */
#define INITIAL_RLE_CAPACITY 64

/* Initial capacities for dynamic arrays - sized to reduce reallocations */
#define INITIAL_STRING_CAPACITY 4096
#define INITIAL_FRAME_CAPACITY 4096
Expand Down Expand Up @@ -226,12 +223,6 @@ typedef struct {
uint8_t opcode;
} FrameKey;

/* Pending RLE sample - buffered for run-length encoding */
typedef struct {
uint64_t timestamp_delta;
uint8_t status;
} PendingRLESample;

/* Thread entry - tracks per-thread state for delta encoding */
typedef struct {
uint64_t thread_id;
Expand All @@ -244,10 +235,9 @@ typedef struct {
size_t prev_stack_capacity;

/* RLE pending buffer - samples waiting to be written as a repeat group */
PendingRLESample *pending_rle;
size_t pending_rle_count;
size_t pending_rle_capacity;
int has_pending_rle; /* Flag: do we have buffered repeats? */
uint8_t *pending_rle;
size_t pending_rle_bytes;
size_t pending_rle_samples;
} ThreadEntry;

/* Main binary writer structure */
Expand Down
93 changes: 36 additions & 57 deletions Modules/_remote_debugging/binary_io_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
/* Frame buffer: depth varint (max 2 bytes for 256) + 256 frames * 5 bytes/varint + margin */
#define MAX_FRAME_BUFFER_SIZE ((MAX_STACK_DEPTH * MAX_VARINT_SIZE_U32) + MAX_VARINT_SIZE_U32 + 16)

/* RLE pending buffer (per thread): one entry is a u64 delta varint + status byte */
#define MAX_RLE_BUF_SIZE (16 * 1024)
#define MAX_RLE_ENTRY_SIZE (MAX_VARINT_SIZE + sizeof(uint8_t))

/* Helper macro: convert PyLong to int32, using default_val if conversion fails */
#define PYLONG_TO_INT32_OR_DEFAULT(obj, var, default_val) \
do { \
Expand Down Expand Up @@ -134,15 +138,6 @@ writer_write_varint_u32(BinaryWriter *writer, uint32_t value)
return writer_write_bytes(writer, buf, len);
}

/* Encode and write a varint u64 - returns 0 on success, -1 on error */
static inline int
writer_write_varint_u64(BinaryWriter *writer, uint64_t value)
{
uint8_t buf[MAX_VARINT_SIZE];
size_t len = encode_varint_u64(buf, value);
return writer_write_bytes(writer, buf, len);
}


/* ============================================================================
* UTILITY FUNCTIONS
Expand Down Expand Up @@ -511,21 +506,13 @@ writer_get_or_create_thread_entry(BinaryWriter *writer, uint64_t thread_id,
entry->interpreter_id = interpreter_id;
entry->prev_timestamp = writer->start_time_us;
entry->prev_stack_capacity = MAX_STACK_DEPTH;
entry->pending_rle_capacity = INITIAL_RLE_CAPACITY;

entry->prev_stack = PyMem_Calloc(entry->prev_stack_capacity, sizeof(uint32_t));
if (!entry->prev_stack) {
PyErr_NoMemory();
return NULL;
}

entry->pending_rle = PyMem_Malloc(entry->pending_rle_capacity * sizeof(PendingRLESample));
if (!entry->pending_rle) {
PyMem_Free(entry->prev_stack);
PyErr_NoMemory();
return NULL;
}

writer->thread_count++;
if (is_new) {
*is_new = 1;
Expand Down Expand Up @@ -626,14 +613,9 @@ write_sample_header(BinaryWriter *writer, ThreadEntry *entry, uint8_t encoding)
static int
flush_pending_rle(BinaryWriter *writer, ThreadEntry *entry)
{
if (!entry->has_pending_rle || entry->pending_rle_count == 0) {
if (entry->pending_rle_samples == 0) {
return 0;
}
if (entry->pending_rle_count > UINT32_MAX - writer->total_samples) {
PyErr_SetString(PyExc_OverflowError,
"too many samples for binary format");
return -1;
}

/* Write RLE record:
* [thread_id: 8] [interpreter_id: 4] [STACK_REPEAT: 1] [count: varint]
Expand All @@ -644,27 +626,21 @@ flush_pending_rle(BinaryWriter *writer, ThreadEntry *entry)
return -1;
}

if (writer_write_varint_u32(writer, (uint32_t)entry->pending_rle_count) < 0) {
if (writer_write_varint_u32(writer, (uint32_t)entry->pending_rle_samples) < 0) {
return -1;
}

for (size_t i = 0; i < entry->pending_rle_count; i++) {
if (writer_write_varint_u64(writer, entry->pending_rle[i].timestamp_delta) < 0) {
return -1;
}
if (writer_write_bytes(writer, &entry->pending_rle[i].status, 1) < 0) {
return -1;
}
writer->total_samples++;
if (writer_write_bytes(writer, entry->pending_rle, entry->pending_rle_bytes) < 0) {
return -1;
}

writer->stats.repeat_records++;
writer->stats.repeat_samples += entry->pending_rle_count;
writer->stats.repeat_samples += entry->pending_rle_samples;
/* Each RLE sample saves writing the entire stack */
writer->stats.frames_saved += entry->pending_rle_count * entry->prev_stack_depth;
writer->stats.frames_saved += entry->pending_rle_samples * entry->prev_stack_depth;

entry->pending_rle_count = 0;
entry->has_pending_rle = 0;
entry->pending_rle_bytes = 0;
entry->pending_rle_samples = 0;

return 0;
}
Expand All @@ -678,12 +654,6 @@ write_sample_with_encoding(BinaryWriter *writer, ThreadEntry *entry,
const uint32_t *frame_indices, size_t stack_depth,
size_t shared_count, size_t pop_count, size_t push_count)
{
if (writer->total_samples == UINT32_MAX) {
PyErr_SetString(PyExc_OverflowError,
"too many samples for binary format");
return -1;
}

/* Header: thread_id(8) + interpreter_id(4) + encoding(1) + delta(varint) + status(1) */
uint8_t header_buf[SAMPLE_HEADER_MAX_SIZE];
memcpy(header_buf + SMP_OFF_THREAD_ID, &entry->thread_id, SMP_SIZE_THREAD_ID);
Expand Down Expand Up @@ -752,7 +722,6 @@ write_sample_with_encoding(BinaryWriter *writer, ThreadEntry *entry,
}

writer->stats.total_frames_written += frames_written;
writer->total_samples++;
return 0;
}

Expand Down Expand Up @@ -955,6 +924,12 @@ static int
process_thread_sample(BinaryWriter *writer, PyObject *thread_info,
uint32_t interpreter_id, uint64_t timestamp_us)
{
if (writer->total_samples >= UINT32_MAX) {
PyErr_SetString(PyExc_OverflowError,
"too many samples for binary format");
return -1;
}

PyObject *thread_id_obj = PyStructSequence_GET_ITEM(thread_info, 0);
PyObject *status_obj = PyStructSequence_GET_ITEM(thread_info, 1);
PyObject *frame_list = PyStructSequence_GET_ITEM(thread_info, 2);
Expand Down Expand Up @@ -1002,20 +977,25 @@ process_thread_sample(BinaryWriter *writer, PyObject *thread_info,
* STACK_REPEAT against an empty curr_stack (depth 0). Buffering
* it here is correct; the RLE flush path emits it as a normal
* STACK_REPEAT record. */
if (GROW_ARRAY(entry->pending_rle, entry->pending_rle_count,
entry->pending_rle_capacity, PendingRLESample) < 0) {
if (entry->pending_rle == NULL) {
entry->pending_rle = PyMem_Malloc(MAX_RLE_BUF_SIZE);
if (!entry->pending_rle) {
PyErr_NoMemory();
return -1;
}
}
if (entry->pending_rle_bytes + MAX_RLE_ENTRY_SIZE > MAX_RLE_BUF_SIZE
&& flush_pending_rle(writer, entry) < 0) {
return -1;
}
entry->pending_rle[entry->pending_rle_count].timestamp_delta = delta;
entry->pending_rle[entry->pending_rle_count].status = status;
entry->pending_rle_count++;
entry->has_pending_rle = 1;
entry->pending_rle_bytes += encode_varint_u64(
entry->pending_rle + entry->pending_rle_bytes, delta);
entry->pending_rle[entry->pending_rle_bytes++] = status;
entry->pending_rle_samples++;
} else {
/* Stack changed - flush any pending RLE first */
if (entry->has_pending_rle) {
if (flush_pending_rle(writer, entry) < 0) {
return -1;
}
if (flush_pending_rle(writer, entry) < 0) {
return -1;
}

if (write_sample_with_encoding(writer, entry, delta, status, encoding,
Expand All @@ -1028,6 +1008,7 @@ process_thread_sample(BinaryWriter *writer, PyObject *thread_info,
entry->prev_stack_depth = curr_depth;
}

writer->total_samples++;
return 0;
}

Expand Down Expand Up @@ -1075,10 +1056,8 @@ int
binary_writer_finalize(BinaryWriter *writer)
{
for (size_t i = 0; i < writer->thread_count; i++) {
if (writer->thread_entries[i].has_pending_rle) {
if (flush_pending_rle(writer, &writer->thread_entries[i]) < 0) {
return -1;
}
if (flush_pending_rle(writer, &writer->thread_entries[i]) < 0) {
return -1;
}
}

Expand Down
Loading