Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Lib/profiling/sampling/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def iter_async_frames(awaited_info_list):


class Collector(ABC):
aggregating = False

@abstractmethod
def collect(self, stack_frames, timestamps_us=None):
"""Collect profiling data from stack frames.
Expand Down
2 changes: 2 additions & 0 deletions Lib/profiling/sampling/gecko_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@


class GeckoCollector(Collector):
aggregating = True

def __init__(self, sample_interval_usec, *, skip_idle=False, opcodes=False):
self.sample_interval_usec = sample_interval_usec
self.skip_idle = skip_idle
Expand Down
7 changes: 4 additions & 3 deletions Lib/profiling/sampling/heatmap_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ def process_frames(self, frames, thread_id, weight=1):
next_lineno = extract_lineno(next_frame[1])
self._record_call_relationship(
(filename, lineno, funcname),
(next_frame[0], next_lineno, next_frame[2])
(next_frame[0], next_lineno, next_frame[2]),
weight=weight,
)

def _is_valid_frame(self, filename, lineno):
Expand Down Expand Up @@ -561,7 +562,7 @@ def _get_bytecode_data_for_line(self, filename, lineno):
result.sort(key=lambda x: (-x['samples'], x['opcode']))
return result

def _record_call_relationship(self, callee_frame, caller_frame):
def _record_call_relationship(self, callee_frame, caller_frame, weight=1):
"""Record caller/callee relationship between adjacent frames."""
callee_filename, callee_lineno, callee_funcname = callee_frame
caller_filename, caller_lineno, caller_funcname = caller_frame
Expand All @@ -587,7 +588,7 @@ def _record_call_relationship(self, callee_frame, caller_frame):

# Count this call edge for path analysis
edge_key = (caller_key, callee_key)
self.edge_samples[edge_key] += 1
self.edge_samples[edge_key] += weight

def export(self, output_path):
"""Export heatmap data as HTML files in a directory.
Expand Down
2 changes: 2 additions & 0 deletions Lib/profiling/sampling/pstats_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@


class PstatsCollector(Collector):
aggregating = True

def __init__(self, sample_interval_usec, *, skip_idle=False):
self.result = collections.defaultdict(
lambda: dict(total_rec_calls=0, direct_calls=0, cumulative_calls=0)
Expand Down
31 changes: 30 additions & 1 deletion Lib/profiling/sampling/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,33 @@ def sample(self, collector, duration_sec=None, *, async_aware=False):
last_sample_time = start_time
realtime_update_interval = 1.0 # Update every second
last_realtime_update = start_time
aggregating = getattr(collector, 'aggregating', False)
prev_stack = None
pending_count = 0
pending_timestamps = [] if aggregating else None

def flush_pending():
nonlocal pending_count, pending_timestamps
if pending_count == 0:
return
count = pending_count
pending_count = 0
if aggregating:
ts = pending_timestamps
pending_timestamps = []
collector.collect(prev_stack, timestamps_us=ts)
else:
for _ in range(count):
collector.collect(prev_stack)

try:
while duration_sec is None or running_time_sec < duration_sec:
# Check if live collector wants to stop
if hasattr(collector, 'running') and not collector.running:
break

current_time = time.perf_counter()
current_time_us = int(current_time * 1_000_000)
if next_time > current_time:
sleep_time = (next_time - current_time) * 0.9
if sleep_time > 0.0001:
Expand All @@ -125,13 +145,20 @@ def sample(self, collector, duration_sec=None, *, async_aware=False):
stack_frames = self._get_stack_trace(
async_aware=async_aware
)
collector.collect(stack_frames)
if stack_frames != prev_stack:
flush_pending()
prev_stack = stack_frames
pending_count += 1
if aggregating:
pending_timestamps.append(current_time_us)
except ProcessLookupError as e:
running_time_sec = current_time - start_time
break
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
flush_pending()
collector.collect_failed_sample()
errors += 1
prev_stack = None
except Exception as e:
if not _is_process_running(self.pid):
break
Expand Down Expand Up @@ -163,6 +190,8 @@ def sample(self, collector, duration_sec=None, *, async_aware=False):
interrupted = True
running_time_sec = time.perf_counter() - start_time
print("Interrupted by user.")
finally:
flush_pending()

# Clear real-time stats line if it was being displayed
if self.realtime_stats and len(self.sample_intervals) > 0:
Expand Down
2 changes: 2 additions & 0 deletions Lib/profiling/sampling/stack_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@


class StackTraceCollector(Collector):
aggregating = True

def __init__(self, sample_interval_usec, *, skip_idle=False):
self.sample_interval_usec = sample_interval_usec
self.skip_idle = skip_idle
Expand Down
15 changes: 15 additions & 0 deletions Lib/test/test_profiling/test_heatmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,21 @@ def test_process_frames_tracks_edge_samples(self):
# Check that edge count is tracked
self.assertGreater(len(collector.edge_samples), 0)

def test_process_frames_weight_applies_to_identical_samples(self):
collector = HeatmapCollector(sample_interval_usec=100)

frames = [
('callee.py', (5, 5, -1, -1), 'callee', None),
('caller.py', (10, 10, -1, -1), 'caller', None),
]

collector.process_frames(frames, thread_id=1, weight=5)

edge_key = (('caller.py', 10), ('callee.py', 5))
self.assertEqual(collector.edge_samples[edge_key], 5)
self.assertEqual(collector.line_samples[('callee.py', 5)], 5)
self.assertEqual(collector.line_samples[('caller.py', 10)], 5)

def test_process_frames_handles_empty_frames(self):
"""Test that process_frames handles empty frame list."""
collector = HeatmapCollector(sample_interval_usec=100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,12 @@ def test_sample_profiler_sample_method_timing(self):
self.assertIn("samples", result)

# Verify collector was called multiple times
self.assertGreaterEqual(mock_collector.collect.call_count, 5)
self.assertLessEqual(mock_collector.collect.call_count, 11)
total_weight = sum(
len(c.kwargs.get("timestamps_us") or [None])
for c in mock_collector.collect.call_args_list
)
self.assertGreaterEqual(total_weight, 5)
self.assertLessEqual(total_weight, 11)

def test_sample_profiler_error_handling(self):
"""Test that the sample method handles errors gracefully."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Coalesce consecutive identical stack frames in Tachyon, so aggregating
collectors (pstats, collapsed, flamegraph, gecko) receive one collect.
Improves sample rate 3x, error rate and missed rate drop by 70%. Patch by
Maurycy Pawłowski-Wieroński.
Loading