From 2864c275847387131502377ddeaaada108df2bbd Mon Sep 17 00:00:00 2001 From: maurycy <5383+maurycy@users.noreply.github.com> Date: Tue, 12 May 2026 12:15:40 +0200 Subject: [PATCH 1/4] aggregating --- Lib/profiling/sampling/collector.py | 2 + Lib/profiling/sampling/gecko_collector.py | 2 + Lib/profiling/sampling/pstats_collector.py | 2 + Lib/profiling/sampling/sample.py | 38 ++++++++++++++++++- Lib/profiling/sampling/stack_collector.py | 2 + .../test_sampling_profiler/test_profiler.py | 8 +++- 6 files changed, 50 insertions(+), 4 deletions(-) diff --git a/Lib/profiling/sampling/collector.py b/Lib/profiling/sampling/collector.py index 81ec6344ebdea4..8e0f0c44c4f8f3 100644 --- a/Lib/profiling/sampling/collector.py +++ b/Lib/profiling/sampling/collector.py @@ -143,6 +143,8 @@ def iter_async_frames(awaited_info_list): class Collector(ABC): + aggregating = False + @abstractmethod def collect(self, stack_frames, timestamps_us=None): """Collect profiling data from stack frames. diff --git a/Lib/profiling/sampling/gecko_collector.py b/Lib/profiling/sampling/gecko_collector.py index 8986194268b3ce..54392af9500008 100644 --- a/Lib/profiling/sampling/gecko_collector.py +++ b/Lib/profiling/sampling/gecko_collector.py @@ -63,6 +63,8 @@ class GeckoCollector(Collector): + aggregating = True + def __init__(self, sample_interval_usec, *, skip_idle=False, opcodes=False): self.sample_interval_usec = sample_interval_usec self.skip_idle = skip_idle diff --git a/Lib/profiling/sampling/pstats_collector.py b/Lib/profiling/sampling/pstats_collector.py index 50500296c15acc..43b1daf2a119d4 100644 --- a/Lib/profiling/sampling/pstats_collector.py +++ b/Lib/profiling/sampling/pstats_collector.py @@ -8,6 +8,8 @@ class PstatsCollector(Collector): + aggregating = True + def __init__(self, sample_interval_usec, *, skip_idle=False): self.result = collections.defaultdict( lambda: dict(total_rec_calls=0, direct_calls=0, cumulative_calls=0) diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py index 5bbe2483581333..8c1102b966fa67 100644 --- a/Lib/profiling/sampling/sample.py +++ b/Lib/profiling/sampling/sample.py @@ -109,6 +109,26 @@ def sample(self, collector, duration_sec=None, *, async_aware=False): last_sample_time = start_time realtime_update_interval = 1.0 # Update every second last_realtime_update = start_time + aggregating = getattr(collector, 'aggregating', False) + prev_stack = None + pending_count = 0 + pending_timestamps = [] if aggregating else None + consecutive_identical = 0 + + def flush_pending(): + nonlocal pending_count, pending_timestamps + if pending_count == 0: + return + count = pending_count + pending_count = 0 + if aggregating: + ts = pending_timestamps + pending_timestamps = [] + collector.collect(prev_stack, timestamps_us=ts) + else: + for _ in range(count): + collector.collect(prev_stack) + try: while duration_sec is None or running_time_sec < duration_sec: # Check if live collector wants to stop @@ -116,6 +136,7 @@ def sample(self, collector, duration_sec=None, *, async_aware=False): break current_time = time.perf_counter() + current_time_us = int(current_time * 1_000_000) if next_time > current_time: sleep_time = (next_time - current_time) * 0.9 if sleep_time > 0.0001: @@ -125,13 +146,22 @@ def sample(self, collector, duration_sec=None, *, async_aware=False): stack_frames = self._get_stack_trace( async_aware=async_aware ) - collector.collect(stack_frames) + if stack_frames == prev_stack: + consecutive_identical += 1 + else: + flush_pending() + prev_stack = stack_frames + pending_count += 1 + if aggregating: + pending_timestamps.append(current_time_us) except ProcessLookupError as e: running_time_sec = current_time - start_time break except (RuntimeError, UnicodeDecodeError, MemoryError, OSError): + flush_pending() collector.collect_failed_sample() errors += 1 + prev_stack = None except Exception as e: if not _is_process_running(self.pid): break @@ -163,6 +193,8 @@ def sample(self, collector, duration_sec=None, *, async_aware=False): interrupted = True running_time_sec = time.perf_counter() - start_time print("Interrupted by user.") + finally: + flush_pending() # Clear real-time stats line if it was being displayed if self.realtime_stats and len(self.sample_intervals) > 0: @@ -178,7 +210,9 @@ def sample(self, collector, duration_sec=None, *, async_aware=False): if not is_live_mode: s = "" if num_samples == 1 else "s" print(f"Captured {num_samples:n} sample{s} in {fmt(running_time_sec, 2)} seconds") - print(f"Sample rate: {fmt(sample_rate, 2)} samples/sec") + comparable_samples = max(1, num_samples - errors - 1) + print(f"Sample rate: {fmt(sample_rate, 2)} samples/sec " + f"(consecutive identical: {consecutive_identical:n}/{comparable_samples:n})") print(f"Error rate: {fmt(error_rate, 2)}") # Print unwinder stats if stats collection is enabled diff --git a/Lib/profiling/sampling/stack_collector.py b/Lib/profiling/sampling/stack_collector.py index 60df026ed76a6c..42281dc6454c83 100644 --- a/Lib/profiling/sampling/stack_collector.py +++ b/Lib/profiling/sampling/stack_collector.py @@ -16,6 +16,8 @@ class StackTraceCollector(Collector): + aggregating = True + def __init__(self, sample_interval_usec, *, skip_idle=False): self.sample_interval_usec = sample_interval_usec self.skip_idle = skip_idle diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py index 68bc59a5414a05..4cc97c44597cfb 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py @@ -198,8 +198,12 @@ def test_sample_profiler_sample_method_timing(self): self.assertIn("samples", result) # Verify collector was called multiple times - self.assertGreaterEqual(mock_collector.collect.call_count, 5) - self.assertLessEqual(mock_collector.collect.call_count, 11) + total_weight = sum( + len(c.kwargs.get("timestamps_us") or [None]) + for c in mock_collector.collect.call_args_list + ) + self.assertGreaterEqual(total_weight, 5) + self.assertLessEqual(total_weight, 11) def test_sample_profiler_error_handling(self): """Test that the sample method handles errors gracefully.""" From c921985daa28f4eb14e6c44b028b989ab0319b8c Mon Sep 17 00:00:00 2001 From: maurycy <5383+maurycy@users.noreply.github.com> Date: Tue, 12 May 2026 13:03:57 +0200 Subject: [PATCH 2/4] blurb --- .../Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst diff --git a/Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst b/Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst new file mode 100644 index 00000000000000..25344e5a90f022 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-05-12-13-03-45.gh-issue-149718.SaM1NJ.rst @@ -0,0 +1,4 @@ +Coalesce consecutive identical stack frames in Tachyon, so aggregating +collectors (pstats, collapsed, flamegraph, gecko) receive one collect. +Improves sample rate 3x, error rate and missed rate drop by 70%. Patch by +Maurycy Pawłowski-Wieroński. From 6832ac0c6890ef955fbc4a7d0c779ca802aead8f Mon Sep 17 00:00:00 2001 From: maurycy <5383+maurycy@users.noreply.github.com> Date: Tue, 12 May 2026 13:15:52 +0200 Subject: [PATCH 3/4] fix heatmap --- Lib/profiling/sampling/heatmap_collector.py | 7 ++++--- Lib/test/test_profiling/test_heatmap.py | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/Lib/profiling/sampling/heatmap_collector.py b/Lib/profiling/sampling/heatmap_collector.py index 5c36d78f5535e7..6e650ec08f410b 100644 --- a/Lib/profiling/sampling/heatmap_collector.py +++ b/Lib/profiling/sampling/heatmap_collector.py @@ -452,7 +452,8 @@ def process_frames(self, frames, thread_id, weight=1): next_lineno = extract_lineno(next_frame[1]) self._record_call_relationship( (filename, lineno, funcname), - (next_frame[0], next_lineno, next_frame[2]) + (next_frame[0], next_lineno, next_frame[2]), + weight=weight, ) def _is_valid_frame(self, filename, lineno): @@ -561,7 +562,7 @@ def _get_bytecode_data_for_line(self, filename, lineno): result.sort(key=lambda x: (-x['samples'], x['opcode'])) return result - def _record_call_relationship(self, callee_frame, caller_frame): + def _record_call_relationship(self, callee_frame, caller_frame, weight=1): """Record caller/callee relationship between adjacent frames.""" callee_filename, callee_lineno, callee_funcname = callee_frame caller_filename, caller_lineno, caller_funcname = caller_frame @@ -587,7 +588,7 @@ def _record_call_relationship(self, callee_frame, caller_frame): # Count this call edge for path analysis edge_key = (caller_key, callee_key) - self.edge_samples[edge_key] += 1 + self.edge_samples[edge_key] += weight def export(self, output_path): """Export heatmap data as HTML files in a directory. diff --git a/Lib/test/test_profiling/test_heatmap.py b/Lib/test/test_profiling/test_heatmap.py index b2acb1cf577341..ee27fdd3fa3053 100644 --- a/Lib/test/test_profiling/test_heatmap.py +++ b/Lib/test/test_profiling/test_heatmap.py @@ -345,6 +345,21 @@ def test_process_frames_tracks_edge_samples(self): # Check that edge count is tracked self.assertGreater(len(collector.edge_samples), 0) + def test_process_frames_weight_applies_to_identical_samples(self): + collector = HeatmapCollector(sample_interval_usec=100) + + frames = [ + ('callee.py', (5, 5, -1, -1), 'callee', None), + ('caller.py', (10, 10, -1, -1), 'caller', None), + ] + + collector.process_frames(frames, thread_id=1, weight=5) + + edge_key = (('caller.py', 10), ('callee.py', 5)) + self.assertEqual(collector.edge_samples[edge_key], 5) + self.assertEqual(collector.line_samples[('callee.py', 5)], 5) + self.assertEqual(collector.line_samples[('caller.py', 10)], 5) + def test_process_frames_handles_empty_frames(self): """Test that process_frames handles empty frame list.""" collector = HeatmapCollector(sample_interval_usec=100) From 276cc10bd54cec8f3bffec7e97a9208eeb52bb83 Mon Sep 17 00:00:00 2001 From: maurycy <5383+maurycy@users.noreply.github.com> Date: Tue, 12 May 2026 13:17:39 +0200 Subject: [PATCH 4/4] no need to show consecutive_identical --- Lib/profiling/sampling/sample.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py index 8c1102b966fa67..d58b83ce615100 100644 --- a/Lib/profiling/sampling/sample.py +++ b/Lib/profiling/sampling/sample.py @@ -113,7 +113,6 @@ def sample(self, collector, duration_sec=None, *, async_aware=False): prev_stack = None pending_count = 0 pending_timestamps = [] if aggregating else None - consecutive_identical = 0 def flush_pending(): nonlocal pending_count, pending_timestamps @@ -146,9 +145,7 @@ def flush_pending(): stack_frames = self._get_stack_trace( async_aware=async_aware ) - if stack_frames == prev_stack: - consecutive_identical += 1 - else: + if stack_frames != prev_stack: flush_pending() prev_stack = stack_frames pending_count += 1 @@ -210,9 +207,7 @@ def flush_pending(): if not is_live_mode: s = "" if num_samples == 1 else "s" print(f"Captured {num_samples:n} sample{s} in {fmt(running_time_sec, 2)} seconds") - comparable_samples = max(1, num_samples - errors - 1) - print(f"Sample rate: {fmt(sample_rate, 2)} samples/sec " - f"(consecutive identical: {consecutive_identical:n}/{comparable_samples:n})") + print(f"Sample rate: {fmt(sample_rate, 2)} samples/sec") print(f"Error rate: {fmt(error_rate, 2)}") # Print unwinder stats if stats collection is enabled