From 9f7b020d3c1c557a6e9433020de81c500c2fba0d Mon Sep 17 00:00:00 2001 From: Nitesh Dhanpal Date: Thu, 4 Jun 2026 13:07:09 -0700 Subject: [PATCH] perf(tracing): raise span-queue batch defaults and make batch_size env-tunable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The async span queue batched at 50 spans / 100ms linger. For high-volume span ingest that means many small upsert_batch PUTs — each a separate HTTP round trip and a separate INSERT statement on the backend. Raise the defaults to 200 spans / 250ms so batches fill before flushing, amortizing the per-request and per-statement overhead (still well under the backend's 1000-row cap). Also make batch_size resolvable from AGENTEX_SPAN_QUEUE_BATCH_SIZE, matching the existing env-override pattern for linger_ms / max_size / max_retries / concurrency (batch_size was the only queue knob not tunable without an SDK release). Resolution order: explicit arg > AGENTEX_SPAN_QUEUE_BATCH_SIZE env > default. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/agentex/lib/core/tracing/span_queue.py | 22 +++++++++++--- tests/lib/core/tracing/test_span_queue.py | 34 +++++++++++++++++++++- 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/src/agentex/lib/core/tracing/span_queue.py b/src/agentex/lib/core/tracing/span_queue.py index 5d77e3440..d6ff7c1f6 100644 --- a/src/agentex/lib/core/tracing/span_queue.py +++ b/src/agentex/lib/core/tracing/span_queue.py @@ -15,8 +15,18 @@ logger = make_logger(__name__) -_DEFAULT_BATCH_SIZE = 50 -_DEFAULT_LINGER_MS = 100 +# Max spans coalesced into one ``upsert_batch`` HTTP call (one +# ``INSERT ... ON CONFLICT`` statement server-side). Larger batches amortize +# the per-request round trip and the per-statement parse/plan + index +# maintenance overhead, which dominates at high span volume. Kept well under +# the EGP backend's 1000-row cap; tune per-deploy via +# ``AGENTEX_SPAN_QUEUE_BATCH_SIZE``. +_DEFAULT_BATCH_SIZE = 200 +# Max time the drain lingers after the first span to let a batch fill. Spans +# typically arrive a few ms apart, so a longer linger fills the larger batch +# above rather than shipping near-size-1 batches; bounded so worst-case ingest +# latency (and the in-flight loss window) stays sub-second. +_DEFAULT_LINGER_MS = 250 # 0 == unbounded (preserves prior behavior). A bound makes backpressure # visible (dropped spans are counted) and caps worst-case memory. _DEFAULT_MAX_SIZE = 0 @@ -114,7 +124,7 @@ class AsyncSpanQueue: def __init__( self, - batch_size: int = _DEFAULT_BATCH_SIZE, + batch_size: int | None = None, linger_ms: int | None = None, max_size: int | None = None, max_retries: int | None = None, @@ -126,7 +136,11 @@ def __init__( self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue(maxsize=resolved_max_size) self._drain_task: asyncio.Task[None] | None = None self._stopping = False - self._batch_size = batch_size + self._batch_size = ( + _read_int_env("AGENTEX_SPAN_QUEUE_BATCH_SIZE", _DEFAULT_BATCH_SIZE, minimum=1) + if batch_size is None + else max(1, batch_size) + ) self._linger_ms = _read_linger_ms_env() if linger_ms is None else max(0, linger_ms) self._max_retries = ( _read_int_env("AGENTEX_SPAN_QUEUE_MAX_RETRIES", _DEFAULT_MAX_RETRIES, minimum=1) diff --git a/tests/lib/core/tracing/test_span_queue.py b/tests/lib/core/tracing/test_span_queue.py index d2452d619..b8092daca 100644 --- a/tests/lib/core/tracing/test_span_queue.py +++ b/tests/lib/core/tracing/test_span_queue.py @@ -8,7 +8,11 @@ from unittest.mock import AsyncMock, MagicMock, patch from agentex.types.span import Span -from agentex.lib.core.tracing.span_queue import SpanEventType, AsyncSpanQueue +from agentex.lib.core.tracing.span_queue import ( + _DEFAULT_BATCH_SIZE, + SpanEventType, + AsyncSpanQueue, +) def _make_span(span_id: str | None = None) -> Span: @@ -859,3 +863,31 @@ async def test_enqueue_overhead_with_metrics_disabled(self, monkeypatch): assert elapsed < 0.05, f"disabled metrics enqueue too slow: {elapsed:.3f}s" mock_get.assert_not_called() + + +class TestAsyncSpanQueueBatchSizeConfig: + """batch_size resolution: explicit arg > AGENTEX_SPAN_QUEUE_BATCH_SIZE env > default.""" + + async def test_default_batch_size(self, monkeypatch): + monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False) + assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE + + async def test_explicit_arg_overrides_default(self, monkeypatch): + monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False) + assert AsyncSpanQueue(batch_size=10)._batch_size == 10 + + async def test_explicit_arg_clamped_to_min_one(self, monkeypatch): + monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False) + assert AsyncSpanQueue(batch_size=0)._batch_size == 1 + + async def test_env_used_when_arg_is_none(self, monkeypatch): + monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500") + assert AsyncSpanQueue()._batch_size == 500 + + async def test_explicit_arg_beats_env(self, monkeypatch): + monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500") + assert AsyncSpanQueue(batch_size=7)._batch_size == 7 + + async def test_invalid_env_falls_back_to_default(self, monkeypatch): + monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "not-an-int") + assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE