Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions src/agentex/lib/core/tracing/span_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,18 @@

logger = make_logger(__name__)

_DEFAULT_BATCH_SIZE = 50
_DEFAULT_LINGER_MS = 100
# Max spans coalesced into one ``upsert_batch`` HTTP call (one
# ``INSERT ... ON CONFLICT`` statement server-side). Larger batches amortize
# the per-request round trip and the per-statement parse/plan + index
# maintenance overhead, which dominates at high span volume. Kept well under
# the EGP backend's 1000-row cap; tune per-deploy via
# ``AGENTEX_SPAN_QUEUE_BATCH_SIZE``.
_DEFAULT_BATCH_SIZE = 200
# Max time the drain lingers after the first span to let a batch fill. Spans
# typically arrive a few ms apart, so a longer linger fills the larger batch
# above rather than shipping near-size-1 batches; bounded so worst-case ingest
# latency (and the in-flight loss window) stays sub-second.
_DEFAULT_LINGER_MS = 250
# 0 == unbounded (preserves prior behavior). A bound makes backpressure
# visible (dropped spans are counted) and caps worst-case memory.
_DEFAULT_MAX_SIZE = 0
Expand Down Expand Up @@ -114,7 +124,7 @@ class AsyncSpanQueue:

def __init__(
self,
batch_size: int = _DEFAULT_BATCH_SIZE,
batch_size: int | None = None,
linger_ms: int | None = None,
max_size: int | None = None,
max_retries: int | None = None,
Expand All @@ -126,7 +136,11 @@ def __init__(
self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue(maxsize=resolved_max_size)
self._drain_task: asyncio.Task[None] | None = None
self._stopping = False
self._batch_size = batch_size
self._batch_size = (
_read_int_env("AGENTEX_SPAN_QUEUE_BATCH_SIZE", _DEFAULT_BATCH_SIZE, minimum=1)
if batch_size is None
else max(1, batch_size)
)
self._linger_ms = _read_linger_ms_env() if linger_ms is None else max(0, linger_ms)
self._max_retries = (
_read_int_env("AGENTEX_SPAN_QUEUE_MAX_RETRIES", _DEFAULT_MAX_RETRIES, minimum=1)
Expand Down
34 changes: 33 additions & 1 deletion tests/lib/core/tracing/test_span_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from unittest.mock import AsyncMock, MagicMock, patch

from agentex.types.span import Span
from agentex.lib.core.tracing.span_queue import SpanEventType, AsyncSpanQueue
from agentex.lib.core.tracing.span_queue import (
_DEFAULT_BATCH_SIZE,
SpanEventType,
AsyncSpanQueue,
)


def _make_span(span_id: str | None = None) -> Span:
Expand Down Expand Up @@ -859,3 +863,31 @@ async def test_enqueue_overhead_with_metrics_disabled(self, monkeypatch):

assert elapsed < 0.05, f"disabled metrics enqueue too slow: {elapsed:.3f}s"
mock_get.assert_not_called()


class TestAsyncSpanQueueBatchSizeConfig:
"""batch_size resolution: explicit arg > AGENTEX_SPAN_QUEUE_BATCH_SIZE env > default."""

async def test_default_batch_size(self, monkeypatch):
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE

async def test_explicit_arg_overrides_default(self, monkeypatch):
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
assert AsyncSpanQueue(batch_size=10)._batch_size == 10

async def test_explicit_arg_clamped_to_min_one(self, monkeypatch):
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
assert AsyncSpanQueue(batch_size=0)._batch_size == 1

async def test_env_used_when_arg_is_none(self, monkeypatch):
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500")
assert AsyncSpanQueue()._batch_size == 500

async def test_explicit_arg_beats_env(self, monkeypatch):
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500")
assert AsyncSpanQueue(batch_size=7)._batch_size == 7

async def test_invalid_env_falls_back_to_default(self, monkeypatch):
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "not-an-int")
assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE
Loading