Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions tests/unit/vertexai/genai/test_evals.py
Original file line number Diff line number Diff line change
Expand Up @@ -7044,3 +7044,53 @@ def test_create_evaluation_set_with_agent_data(
candidate_response = candidate_responses[0]
assert candidate_response["candidate"] == "test-candidate"
assert candidate_response["agent_data"] == agent_data


class TestRateLimiter:
"""Tests for the RateLimiter class in _evals_utils."""

def test_rate_limiter_init(self):
"""Tests that RateLimiter initializes correctly."""
limiter = _evals_utils.RateLimiter(rate=10.0)
assert limiter.seconds_per_event == pytest.approx(0.1)

def test_rate_limiter_invalid_rate(self):
"""Tests that RateLimiter raises ValueError for non-positive rate."""
with pytest.raises(ValueError, match="Rate must be a positive number"):
_evals_utils.RateLimiter(rate=0)
with pytest.raises(ValueError, match="Rate must be a positive number"):
_evals_utils.RateLimiter(rate=-1)

@mock.patch("time.sleep", return_value=None)
@mock.patch("time.monotonic")
def test_rate_limiter_sleep_and_advance(self, mock_monotonic, mock_sleep):
"""Tests that sleep_and_advance properly throttles calls."""
# With rate=10 (0.1s interval):
# - __init__ at t=0: _next_allowed = 0.0
# - first call at t=0: no delay, _next_allowed = 0.1
# - second call at t=0.01: delay = 0.1 - 0.01 = 0.09
mock_monotonic.side_effect = [
0.0, # __init__: time.monotonic()
0.0, # first sleep_and_advance: now
0.01, # second sleep_and_advance: now
]
limiter = _evals_utils.RateLimiter(rate=10.0)
limiter.sleep_and_advance() # First call - should not sleep
limiter.sleep_and_advance() # Second call - should sleep
assert mock_sleep.call_count == 1
# Verify sleep was called with approximately the right delay
sleep_delay = mock_sleep.call_args[0][0]
assert 0.08 < sleep_delay <= 0.1

def test_rate_limiter_no_sleep_when_enough_time_passed(self):
"""Tests that no sleep occurs when enough time has passed."""
import time as real_time

limiter = _evals_utils.RateLimiter(rate=1000.0) # Very high rate
# With rate=1000, interval is 0.001s - should not sleep
start = real_time.time()
for _ in range(5):
limiter.sleep_and_advance()
elapsed = real_time.time() - start
# 5 calls at 1000 QPS should take ~0.005s, certainly under 1s
assert elapsed < 1.0
7 changes: 6 additions & 1 deletion vertexai/_genai/_evals_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,7 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
dataset_schema: Optional[Literal["GEMINI", "FLATTEN", "OPENAI"]] = None,
dest: Optional[str] = None,
location: Optional[str] = None,
evaluation_service_qps: Optional[float] = None,
**kwargs,
) -> types.EvaluationResult:
"""Evaluates a dataset using the provided metrics.
Expand All @@ -1550,6 +1551,9 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
dest: The destination to save the evaluation results.
location: The location to use for the evaluation. If not specified, the
location configured in the client will be used.
evaluation_service_qps: The rate limit (queries per second) for calls
to the evaluation service. Defaults to 10. Increase this value if
your project has a higher EvaluateInstances API quota.
**kwargs: Extra arguments to pass to evaluation, such as `agent_info`.

Returns:
Expand Down Expand Up @@ -1625,7 +1629,8 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
logger.info("Running Metric Computation...")
t1 = time.perf_counter()
evaluation_result = _evals_metric_handlers.compute_metrics_and_aggregate(
evaluation_run_config
evaluation_run_config,
evaluation_service_qps=evaluation_service_qps,
)
t2 = time.perf_counter()
logger.info("Evaluation took: %f seconds", t2 - t1)
Expand Down
32 changes: 30 additions & 2 deletions vertexai/_genai/_evals_metric_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from . import _evals_common
from . import _evals_constant
from . import _evals_utils
from . import evals
from . import types

Expand Down Expand Up @@ -1498,10 +1499,29 @@ class EvaluationRunConfig(_common.BaseModel):
"""The number of response candidates for the evaluation run."""


def _rate_limited_get_metric_result(
rate_limiter: _evals_utils.RateLimiter,
handler: MetricHandler[Any],
eval_case: types.EvalCase,
response_index: int,
) -> types.EvalCaseMetricResult:
"""Wraps a handler's get_metric_result with rate limiting."""
rate_limiter.sleep_and_advance()
return handler.get_metric_result(eval_case, response_index)


def compute_metrics_and_aggregate(
evaluation_run_config: EvaluationRunConfig,
evaluation_service_qps: Optional[float] = None,
) -> types.EvaluationResult:
"""Computes metrics and aggregates them for a given evaluation run config."""
"""Computes metrics and aggregates them for a given evaluation run config.

Args:
evaluation_run_config: The configuration for the evaluation run.
evaluation_service_qps: Optional QPS limit for the evaluation service.
Defaults to _DEFAULT_EVAL_SERVICE_QPS (10). Users with higher
quotas can increase this value.
"""
metric_handlers = []
all_futures = []
results_by_case_response_metric: collections.defaultdict[
Expand All @@ -1511,6 +1531,12 @@ def compute_metrics_and_aggregate(
execution_errors = []
case_indices_with_errors = set()

if evaluation_service_qps is not None and evaluation_service_qps <= 0:
raise ValueError("evaluation_service_qps must be a positive number.")
qps = evaluation_service_qps or _evals_utils._DEFAULT_EVAL_SERVICE_QPS
rate_limiter = _evals_utils.RateLimiter(rate=qps)
logger.info("Rate limiting evaluation service requests to %.1f QPS.", qps)

for eval_metric in evaluation_run_config.metrics:
metric_handlers.append(
get_handler_for_metric(evaluation_run_config.evals_module, eval_metric)
Expand Down Expand Up @@ -1553,7 +1579,9 @@ def compute_metrics_and_aggregate(
for response_index in range(actual_num_candidates_for_case):
try:
future = executor.submit(
metric_handler_instance.get_metric_result,
_rate_limited_get_metric_result,
rate_limiter,
metric_handler_instance,
eval_case,
response_index,
)
Expand Down
50 changes: 49 additions & 1 deletion vertexai/_genai/_evals_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import logging
import os
import threading
import time
from typing import Any, Optional, Union

Expand All @@ -38,12 +39,59 @@

GCS_PREFIX = "gs://"
BQ_PREFIX = "bq://"
_DEFAULT_EVAL_SERVICE_QPS = 10


class RateLimiter:
"""Helper class for rate-limiting requests to Vertex AI to improve QoS.

Implements a token bucket algorithm to limit the rate at which API calls
can occur. Designed for cases where the batch size is always 1 for traffic
shaping and rate limiting.

Attributes:
seconds_per_event: The time interval (in seconds) between events to
maintain the desired rate.
last: The timestamp of the last event.
_lock: A lock to ensure thread safety.
"""

def __init__(self, rate: float) -> None:
"""Initializes the rate limiter.

Args:
rate: The number of queries allowed per second.

Raises:
ValueError: If the rate is not positive.
"""
if not rate or rate <= 0:
raise ValueError("Rate must be a positive number")
self.seconds_per_event = 1.0 / rate
self._next_allowed = time.monotonic()
self._lock = threading.Lock()

def sleep_and_advance(self) -> None:
"""Blocks the current thread until the next event can be admitted.

The lock is held only long enough to reserve a time slot. The
actual sleep happens outside the lock so that multiple threads
can be sleeping concurrently with staggered wake-up times.
"""
with self._lock:
now = time.monotonic()
wait_until = max(now, self._next_allowed)
delay = wait_until - now
self._next_allowed = wait_until + self.seconds_per_event

if delay > 0:
time.sleep(delay)


class EvalDatasetLoader:
"""A loader for datasets from various sources, using a shared client."""

def __init__(self, api_client: BaseApiClient):
def __init__(self, api_client: BaseApiClient) -> None:
self.api_client = api_client
self.gcs_utils = _gcs_utils.GcsUtils(self.api_client)
self.bigquery_utils = _bigquery_utils.BigQueryUtils(self.api_client)
Expand Down
11 changes: 11 additions & 0 deletions vertexai/_genai/types/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15453,6 +15453,12 @@ class EvaluateMethodConfig(_common.BaseModel):
dest: Optional[str] = Field(
default=None, description="""The destination path for the evaluation results."""
)
evaluation_service_qps: Optional[float] = Field(
default=None,
description="""The rate limit (queries per second) for calls to the
evaluation service. Defaults to 10. Increase this value if your
project has a higher EvaluateInstances API quota.""",
)


class EvaluateMethodConfigDict(TypedDict, total=False):
Expand All @@ -15469,6 +15475,11 @@ class EvaluateMethodConfigDict(TypedDict, total=False):
dest: Optional[str]
"""The destination path for the evaluation results."""

evaluation_service_qps: Optional[float]
"""The rate limit (queries per second) for calls to the
evaluation service. Defaults to 10. Increase this value if your
project has a higher EvaluateInstances API quota."""


EvaluateMethodConfigOrDict = Union[EvaluateMethodConfig, EvaluateMethodConfigDict]

Expand Down
Loading