From d6791fac9ff1374323a14a29065e41cda4a90870 Mon Sep 17 00:00:00 2001 From: Nitesh Dhanpal Date: Thu, 4 Jun 2026 10:00:19 -0700 Subject: [PATCH 1/3] perf(tracing): skip span-start upsert by default (end-only ingest) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tracing writes each span twice — once on start (no end_time) and once on end — so the start row is only ever overwritten by the end write moments later. Persisting it doubles span-ingest write volume and, on the SGP backend, costs a non-HOT UPDATE (tsvector/GIN recompute + index churn) plus a dead tuple per span. Skip the span-start upsert by default so each span is persisted once, on end (a single INSERT). Set AGENTEX_TRACING_SKIP_SPAN_START=0/false/no/off to restore the start write when in-flight or never-ending spans must be visible. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../processors/sgp_tracing_processor.py | 31 +++++ .../processors/test_sgp_tracing_processor.py | 106 +++++++++++++++++- 2 files changed, 131 insertions(+), 6 deletions(-) diff --git a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py index ced4c5d2c..9fc58bee9 100644 --- a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os import asyncio import weakref from typing import cast, override @@ -22,6 +23,28 @@ logger = make_logger(__name__) +_SKIP_SPAN_START_ENV = "AGENTEX_TRACING_SKIP_SPAN_START" + + +def _skip_span_start_enabled() -> bool: + """Whether to skip the span-start upsert and write each span only on end. + + Tracing writes each span twice — once on start (no ``end_time``) and once + on end. The start row is only ever overwritten by the end write moments + later, so persisting it doubles span-ingest write volume and, on the SGP + backend, costs a non-HOT UPDATE (tsvector/GIN recompute + index churn) plus + a dead tuple per span. Skipping the start makes the end write a single + INSERT. + + Default ON. Set ``AGENTEX_TRACING_SKIP_SPAN_START`` to + ``0``/``false``/``no``/``off`` to restore the start write — e.g. if you + need in-flight spans visible before they complete, or spans that never end + (process crash) to still be persisted. + """ + raw = os.environ.get(_SKIP_SPAN_START_ENV, "1").strip().lower() + return raw not in ("0", "false", "no", "off") + + def _get_span_type(span: Span) -> str: """Read span_type from span.data['__span_type__'], defaulting to STANDALONE.""" if isinstance(span.data, dict): @@ -78,6 +101,10 @@ def __init__(self, config: SGPTracingProcessorConfig): @override def on_span_start(self, span: Span) -> None: + # End-only ingest: by default the start write is skipped (see + # _skip_span_start_enabled) so each span is persisted once, on end. + if _skip_span_start_enabled(): + return sgp_span = _build_sgp_span(span, self.env_vars) sgp_span.flush(blocking=False) @@ -150,6 +177,10 @@ async def on_span_end(self, span: Span) -> None: @override async def on_spans_start(self, spans: list[Span]) -> None: + # End-only ingest: by default the start write is skipped (see + # _skip_span_start_enabled) so each span is persisted once, on end. + if _skip_span_start_enabled(): + return if not spans: return diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py index 41efcea5a..dc8bab127 100644 --- a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -5,6 +5,8 @@ from datetime import UTC, datetime from unittest.mock import AsyncMock, MagicMock, patch +import pytest + from agentex.types.span import Span from agentex.lib.types.tracing import SGPTracingProcessorConfig @@ -65,8 +67,9 @@ def test_processor_holds_no_per_span_state(self): processor, _ = self._make_processor() assert not hasattr(processor, "_spans") - def test_span_lifecycle_produces_two_flushes(self): - """Each span produces one flush on start and one on end.""" + def test_span_lifecycle_produces_two_flushes(self, monkeypatch): + """With start writes enabled, each span produces one flush on start and one on end.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") processor, _ = self._make_processor() with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()) as mock_cs: @@ -105,6 +108,38 @@ def capture_create_span(**kwargs): assert captured_spans[0].start_time is not None assert captured_spans[0].end_time is not None + def test_span_start_skipped_by_default(self, monkeypatch): + """Default (end-only): on_span_start is a no-op; only on_span_end writes.""" + monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False) + processor, _ = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()) as mock_cs: + span = _make_span() + processor.on_span_start(span) + assert mock_cs.call_count == 0 # start skipped — nothing built or flushed + span.end_time = datetime.now(UTC) + processor.on_span_end(span) + + assert mock_cs.call_count == 1 # only the end write + + def test_span_start_emitted_when_skip_disabled(self, monkeypatch): + """With skip disabled, on_span_start builds and flushes a span.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") + processor, _ = self._make_processor() + + captured: list[MagicMock] = [] + + def capture(**kwargs): + sgp_span = _make_mock_sgp_span() + captured.append(sgp_span) + return sgp_span + + with patch(f"{MODULE}.create_span", side_effect=capture): + processor.on_span_start(_make_span()) + + assert len(captured) == 1 + assert captured[0].flush.called + # --------------------------------------------------------------------------- # Async processor tests @@ -141,8 +176,9 @@ def test_processor_holds_no_per_span_state(self): processor, _, _ = self._make_processor() assert not hasattr(processor, "_spans") - async def test_span_lifecycle_produces_two_upserts(self): - """Each span produces one upsert_batch call on start and one on end.""" + async def test_span_lifecycle_produces_two_upserts(self, monkeypatch): + """With start writes enabled, each span produces one upsert on start and one on end.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") processor, _, mock_client = self._make_processor() with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): @@ -153,6 +189,31 @@ async def test_span_lifecycle_produces_two_upserts(self): assert mock_client.spans.upsert_batch.call_count == 2 + async def test_spans_start_skipped_by_default(self, monkeypatch): + """Default (end-only): on_spans_start makes no upsert; on_spans_end does.""" + monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False) + processor, _, mock_client = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + spans = [_make_span() for _ in range(3)] + await processor.on_spans_start(spans) + assert mock_client.spans.upsert_batch.call_count == 0 # start skipped + for s in spans: + s.end_time = datetime.now(UTC) + await processor.on_spans_end(spans) + + assert mock_client.spans.upsert_batch.call_count == 1 # only the end write + + async def test_spans_start_emitted_when_skip_disabled(self, monkeypatch): + """With skip disabled, on_spans_start makes one upsert_batch call.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") + processor, _, mock_client = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + await processor.on_spans_start([_make_span()]) + + assert mock_client.spans.upsert_batch.call_count == 1 + async def test_span_end_without_prior_start_still_upserts(self): """Cross-pod Temporal case: END activity lands on a pod that never saw START. @@ -171,8 +232,9 @@ async def test_span_end_without_prior_start_still_upserts(self): items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == 1 - async def test_sgp_span_input_and_output_propagated_on_end(self): + async def test_sgp_span_input_and_output_propagated_on_end(self, monkeypatch): """on_span_end should send the span's current input and output via upsert_batch.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") processor, _, mock_client = self._make_processor() captured: list[MagicMock] = [] @@ -207,8 +269,9 @@ def capture_create_span(**kwargs): assert end_call_kwargs["input"]["messages"][-1]["role"] == "assistant" assert end_call_kwargs["output"] == {"response": "hi"} - async def test_on_spans_start_sends_single_upsert_for_batch(self): + async def test_on_spans_start_sends_single_upsert_for_batch(self, monkeypatch): """Given N spans at once, on_spans_start should make ONE upsert_batch HTTP call.""" + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") processor, _, mock_client = self._make_processor() n = 10 @@ -224,6 +287,7 @@ async def test_on_spans_start_sends_single_upsert_for_batch(self): async def test_on_spans_start_records_export_success_metrics(self, monkeypatch): monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1") + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0") import agentex.lib.core.observability.tracing_metrics_recording as recording recording._metrics_enabled = None @@ -400,3 +464,33 @@ async def test_on_spans_end_sends_single_upsert_for_batch(self): ) items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == n + + +# --------------------------------------------------------------------------- +# AGENTEX_TRACING_SKIP_SPAN_START env parsing +# --------------------------------------------------------------------------- + + +class TestSkipSpanStartEnv: + @staticmethod + def _fn(): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + _skip_span_start_enabled, + ) + + return _skip_span_start_enabled + + def test_default_is_skip_enabled(self, monkeypatch): + """Unset → skip span-start (end-only ingest is the default).""" + monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False) + assert self._fn()() is True + + @pytest.mark.parametrize("val", ["0", "false", "no", "off", "FALSE", "Off", " no "]) + def test_falsy_values_restore_span_start(self, monkeypatch, val): + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", val) + assert self._fn()() is False + + @pytest.mark.parametrize("val", ["1", "true", "yes", "on", "anything"]) + def test_other_values_keep_skip_enabled(self, monkeypatch, val): + monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", val) + assert self._fn()() is True From 82220744ea18f5da35fb324364cfd50f474b2ba7 Mon Sep 17 00:00:00 2001 From: Nitesh Dhanpal Date: Thu, 4 Jun 2026 10:16:17 -0700 Subject: [PATCH 2/3] observability: log resolved span-start mode at processor init MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The end-only skip is governed by AGENTEX_TRACING_SKIP_SPAN_START (default ON) but was silent — an operator could only infer it from the absence of start-export metrics. Emit a one-time INFO at processor init stating whether span-start upsert is enabled or skipped, so the deployment's tracing mode is visible in logs. Off the hot path (once per construction). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../core/tracing/processors/sgp_tracing_processor.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py index 9fc58bee9..627b34d7b 100644 --- a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py @@ -98,6 +98,11 @@ def __init__(self, config: SGPTracingProcessorConfig): disabled=disabled, ) self.env_vars = EnvironmentVariables.refresh() + logger.info( + "SGP tracing span-start upsert %s (%s)", + "disabled — end-only ingest" if _skip_span_start_enabled() else "enabled", + _SKIP_SPAN_START_ENV, + ) @override def on_span_start(self, span: Span) -> None: @@ -134,6 +139,11 @@ def __init__(self, config: SGPTracingProcessorConfig): asyncio.AbstractEventLoop, AsyncSGPClient ] = weakref.WeakKeyDictionary() self.env_vars = EnvironmentVariables.refresh() + logger.info( + "SGP tracing span-start upsert %s (%s)", + "disabled — end-only ingest" if _skip_span_start_enabled() else "enabled", + _SKIP_SPAN_START_ENV, + ) def _build_client(self) -> AsyncSGPClient: import httpx From 700151439ed385c7e2f0d23b7f51bc6439590911 Mon Sep 17 00:00:00 2001 From: Nitesh Dhanpal Date: Thu, 4 Jun 2026 13:07:09 -0700 Subject: [PATCH 3/3] perf(tracing): raise span-queue batch defaults and make batch_size env-tunable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The async span queue batched at 50 spans / 100ms linger. For high-volume span ingest that means many small upsert_batch PUTs — each a separate HTTP round trip and a separate INSERT statement on the backend. Raise the defaults to 200 spans / 250ms so batches fill before flushing, amortizing the per-request and per-statement overhead (still well under the backend's 1000-row cap). Also make batch_size resolvable from AGENTEX_SPAN_QUEUE_BATCH_SIZE, matching the existing env-override pattern for linger_ms / max_size / max_retries / concurrency (batch_size was the only queue knob not tunable without an SDK release). Resolution order: explicit arg > AGENTEX_SPAN_QUEUE_BATCH_SIZE env > default. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/agentex/lib/core/tracing/span_queue.py | 22 +++++++++++--- tests/lib/core/tracing/test_span_queue.py | 34 +++++++++++++++++++++- 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/src/agentex/lib/core/tracing/span_queue.py b/src/agentex/lib/core/tracing/span_queue.py index 5d77e3440..d6ff7c1f6 100644 --- a/src/agentex/lib/core/tracing/span_queue.py +++ b/src/agentex/lib/core/tracing/span_queue.py @@ -15,8 +15,18 @@ logger = make_logger(__name__) -_DEFAULT_BATCH_SIZE = 50 -_DEFAULT_LINGER_MS = 100 +# Max spans coalesced into one ``upsert_batch`` HTTP call (one +# ``INSERT ... ON CONFLICT`` statement server-side). Larger batches amortize +# the per-request round trip and the per-statement parse/plan + index +# maintenance overhead, which dominates at high span volume. Kept well under +# the EGP backend's 1000-row cap; tune per-deploy via +# ``AGENTEX_SPAN_QUEUE_BATCH_SIZE``. +_DEFAULT_BATCH_SIZE = 200 +# Max time the drain lingers after the first span to let a batch fill. Spans +# typically arrive a few ms apart, so a longer linger fills the larger batch +# above rather than shipping near-size-1 batches; bounded so worst-case ingest +# latency (and the in-flight loss window) stays sub-second. +_DEFAULT_LINGER_MS = 250 # 0 == unbounded (preserves prior behavior). A bound makes backpressure # visible (dropped spans are counted) and caps worst-case memory. _DEFAULT_MAX_SIZE = 0 @@ -114,7 +124,7 @@ class AsyncSpanQueue: def __init__( self, - batch_size: int = _DEFAULT_BATCH_SIZE, + batch_size: int | None = None, linger_ms: int | None = None, max_size: int | None = None, max_retries: int | None = None, @@ -126,7 +136,11 @@ def __init__( self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue(maxsize=resolved_max_size) self._drain_task: asyncio.Task[None] | None = None self._stopping = False - self._batch_size = batch_size + self._batch_size = ( + _read_int_env("AGENTEX_SPAN_QUEUE_BATCH_SIZE", _DEFAULT_BATCH_SIZE, minimum=1) + if batch_size is None + else max(1, batch_size) + ) self._linger_ms = _read_linger_ms_env() if linger_ms is None else max(0, linger_ms) self._max_retries = ( _read_int_env("AGENTEX_SPAN_QUEUE_MAX_RETRIES", _DEFAULT_MAX_RETRIES, minimum=1) diff --git a/tests/lib/core/tracing/test_span_queue.py b/tests/lib/core/tracing/test_span_queue.py index d2452d619..b8092daca 100644 --- a/tests/lib/core/tracing/test_span_queue.py +++ b/tests/lib/core/tracing/test_span_queue.py @@ -8,7 +8,11 @@ from unittest.mock import AsyncMock, MagicMock, patch from agentex.types.span import Span -from agentex.lib.core.tracing.span_queue import SpanEventType, AsyncSpanQueue +from agentex.lib.core.tracing.span_queue import ( + _DEFAULT_BATCH_SIZE, + SpanEventType, + AsyncSpanQueue, +) def _make_span(span_id: str | None = None) -> Span: @@ -859,3 +863,31 @@ async def test_enqueue_overhead_with_metrics_disabled(self, monkeypatch): assert elapsed < 0.05, f"disabled metrics enqueue too slow: {elapsed:.3f}s" mock_get.assert_not_called() + + +class TestAsyncSpanQueueBatchSizeConfig: + """batch_size resolution: explicit arg > AGENTEX_SPAN_QUEUE_BATCH_SIZE env > default.""" + + async def test_default_batch_size(self, monkeypatch): + monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False) + assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE + + async def test_explicit_arg_overrides_default(self, monkeypatch): + monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False) + assert AsyncSpanQueue(batch_size=10)._batch_size == 10 + + async def test_explicit_arg_clamped_to_min_one(self, monkeypatch): + monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False) + assert AsyncSpanQueue(batch_size=0)._batch_size == 1 + + async def test_env_used_when_arg_is_none(self, monkeypatch): + monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500") + assert AsyncSpanQueue()._batch_size == 500 + + async def test_explicit_arg_beats_env(self, monkeypatch): + monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500") + assert AsyncSpanQueue(batch_size=7)._batch_size == 7 + + async def test_invalid_env_falls_back_to_default(self, monkeypatch): + monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "not-an-int") + assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE