diff --git a/tests/unit/vertexai/genai/test_evals.py b/tests/unit/vertexai/genai/test_evals.py index 88da6bf61e..79ce058d76 100644 --- a/tests/unit/vertexai/genai/test_evals.py +++ b/tests/unit/vertexai/genai/test_evals.py @@ -7044,3 +7044,53 @@ def test_create_evaluation_set_with_agent_data( candidate_response = candidate_responses[0] assert candidate_response["candidate"] == "test-candidate" assert candidate_response["agent_data"] == agent_data + + +class TestRateLimiter: + """Tests for the RateLimiter class in _evals_utils.""" + + def test_rate_limiter_init(self): + """Tests that RateLimiter initializes correctly.""" + limiter = _evals_utils.RateLimiter(rate=10.0) + assert limiter.seconds_per_event == pytest.approx(0.1) + + def test_rate_limiter_invalid_rate(self): + """Tests that RateLimiter raises ValueError for non-positive rate.""" + with pytest.raises(ValueError, match="Rate must be a positive number"): + _evals_utils.RateLimiter(rate=0) + with pytest.raises(ValueError, match="Rate must be a positive number"): + _evals_utils.RateLimiter(rate=-1) + + @mock.patch("time.sleep", return_value=None) + @mock.patch("time.monotonic") + def test_rate_limiter_sleep_and_advance(self, mock_monotonic, mock_sleep): + """Tests that sleep_and_advance properly throttles calls.""" + # With rate=10 (0.1s interval): + # - __init__ at t=0: _next_allowed = 0.0 + # - first call at t=0: no delay, _next_allowed = 0.1 + # - second call at t=0.01: delay = 0.1 - 0.01 = 0.09 + mock_monotonic.side_effect = [ + 0.0, # __init__: time.monotonic() + 0.0, # first sleep_and_advance: now + 0.01, # second sleep_and_advance: now + ] + limiter = _evals_utils.RateLimiter(rate=10.0) + limiter.sleep_and_advance() # First call - should not sleep + limiter.sleep_and_advance() # Second call - should sleep + assert mock_sleep.call_count == 1 + # Verify sleep was called with approximately the right delay + sleep_delay = mock_sleep.call_args[0][0] + assert 0.08 < sleep_delay <= 0.1 + + def test_rate_limiter_no_sleep_when_enough_time_passed(self): + """Tests that no sleep occurs when enough time has passed.""" + import time as real_time + + limiter = _evals_utils.RateLimiter(rate=1000.0) # Very high rate + # With rate=1000, interval is 0.001s - should not sleep + start = real_time.time() + for _ in range(5): + limiter.sleep_and_advance() + elapsed = real_time.time() - start + # 5 calls at 1000 QPS should take ~0.005s, certainly under 1s + assert elapsed < 1.0 diff --git a/vertexai/_genai/_evals_common.py b/vertexai/_genai/_evals_common.py index 69de2c5b2b..c01c7866ae 100644 --- a/vertexai/_genai/_evals_common.py +++ b/vertexai/_genai/_evals_common.py @@ -1538,6 +1538,7 @@ def _execute_evaluation( # type: ignore[no-untyped-def] dataset_schema: Optional[Literal["GEMINI", "FLATTEN", "OPENAI"]] = None, dest: Optional[str] = None, location: Optional[str] = None, + evaluation_service_qps: Optional[float] = None, **kwargs, ) -> types.EvaluationResult: """Evaluates a dataset using the provided metrics. @@ -1550,6 +1551,9 @@ def _execute_evaluation( # type: ignore[no-untyped-def] dest: The destination to save the evaluation results. location: The location to use for the evaluation. If not specified, the location configured in the client will be used. + evaluation_service_qps: The rate limit (queries per second) for calls + to the evaluation service. Defaults to 10. Increase this value if + your project has a higher EvaluateInstances API quota. **kwargs: Extra arguments to pass to evaluation, such as `agent_info`. Returns: @@ -1625,7 +1629,8 @@ def _execute_evaluation( # type: ignore[no-untyped-def] logger.info("Running Metric Computation...") t1 = time.perf_counter() evaluation_result = _evals_metric_handlers.compute_metrics_and_aggregate( - evaluation_run_config + evaluation_run_config, + evaluation_service_qps=evaluation_service_qps, ) t2 = time.perf_counter() logger.info("Evaluation took: %f seconds", t2 - t1) diff --git a/vertexai/_genai/_evals_metric_handlers.py b/vertexai/_genai/_evals_metric_handlers.py index 9d72bafc86..37a31adeea 100644 --- a/vertexai/_genai/_evals_metric_handlers.py +++ b/vertexai/_genai/_evals_metric_handlers.py @@ -31,6 +31,7 @@ from . import _evals_common from . import _evals_constant +from . import _evals_utils from . import evals from . import types @@ -1498,10 +1499,29 @@ class EvaluationRunConfig(_common.BaseModel): """The number of response candidates for the evaluation run.""" +def _rate_limited_get_metric_result( + rate_limiter: _evals_utils.RateLimiter, + handler: MetricHandler[Any], + eval_case: types.EvalCase, + response_index: int, +) -> types.EvalCaseMetricResult: + """Wraps a handler's get_metric_result with rate limiting.""" + rate_limiter.sleep_and_advance() + return handler.get_metric_result(eval_case, response_index) + + def compute_metrics_and_aggregate( evaluation_run_config: EvaluationRunConfig, + evaluation_service_qps: Optional[float] = None, ) -> types.EvaluationResult: - """Computes metrics and aggregates them for a given evaluation run config.""" + """Computes metrics and aggregates them for a given evaluation run config. + + Args: + evaluation_run_config: The configuration for the evaluation run. + evaluation_service_qps: Optional QPS limit for the evaluation service. + Defaults to _DEFAULT_EVAL_SERVICE_QPS (10). Users with higher + quotas can increase this value. + """ metric_handlers = [] all_futures = [] results_by_case_response_metric: collections.defaultdict[ @@ -1511,6 +1531,12 @@ def compute_metrics_and_aggregate( execution_errors = [] case_indices_with_errors = set() + if evaluation_service_qps is not None and evaluation_service_qps <= 0: + raise ValueError("evaluation_service_qps must be a positive number.") + qps = evaluation_service_qps or _evals_utils._DEFAULT_EVAL_SERVICE_QPS + rate_limiter = _evals_utils.RateLimiter(rate=qps) + logger.info("Rate limiting evaluation service requests to %.1f QPS.", qps) + for eval_metric in evaluation_run_config.metrics: metric_handlers.append( get_handler_for_metric(evaluation_run_config.evals_module, eval_metric) @@ -1553,7 +1579,9 @@ def compute_metrics_and_aggregate( for response_index in range(actual_num_candidates_for_case): try: future = executor.submit( - metric_handler_instance.get_metric_result, + _rate_limited_get_metric_result, + rate_limiter, + metric_handler_instance, eval_case, response_index, ) diff --git a/vertexai/_genai/_evals_utils.py b/vertexai/_genai/_evals_utils.py index 5d356017fc..4f2dc743fa 100644 --- a/vertexai/_genai/_evals_utils.py +++ b/vertexai/_genai/_evals_utils.py @@ -19,6 +19,7 @@ import json import logging import os +import threading import time from typing import Any, Optional, Union @@ -38,12 +39,59 @@ GCS_PREFIX = "gs://" BQ_PREFIX = "bq://" +_DEFAULT_EVAL_SERVICE_QPS = 10 + + +class RateLimiter: + """Helper class for rate-limiting requests to Vertex AI to improve QoS. + + Implements a token bucket algorithm to limit the rate at which API calls + can occur. Designed for cases where the batch size is always 1 for traffic + shaping and rate limiting. + + Attributes: + seconds_per_event: The time interval (in seconds) between events to + maintain the desired rate. + last: The timestamp of the last event. + _lock: A lock to ensure thread safety. + """ + + def __init__(self, rate: float) -> None: + """Initializes the rate limiter. + + Args: + rate: The number of queries allowed per second. + + Raises: + ValueError: If the rate is not positive. + """ + if not rate or rate <= 0: + raise ValueError("Rate must be a positive number") + self.seconds_per_event = 1.0 / rate + self._next_allowed = time.monotonic() + self._lock = threading.Lock() + + def sleep_and_advance(self) -> None: + """Blocks the current thread until the next event can be admitted. + + The lock is held only long enough to reserve a time slot. The + actual sleep happens outside the lock so that multiple threads + can be sleeping concurrently with staggered wake-up times. + """ + with self._lock: + now = time.monotonic() + wait_until = max(now, self._next_allowed) + delay = wait_until - now + self._next_allowed = wait_until + self.seconds_per_event + + if delay > 0: + time.sleep(delay) class EvalDatasetLoader: """A loader for datasets from various sources, using a shared client.""" - def __init__(self, api_client: BaseApiClient): + def __init__(self, api_client: BaseApiClient) -> None: self.api_client = api_client self.gcs_utils = _gcs_utils.GcsUtils(self.api_client) self.bigquery_utils = _bigquery_utils.BigQueryUtils(self.api_client) diff --git a/vertexai/_genai/types/common.py b/vertexai/_genai/types/common.py index 5c88a4f48e..b67be65045 100644 --- a/vertexai/_genai/types/common.py +++ b/vertexai/_genai/types/common.py @@ -15453,6 +15453,12 @@ class EvaluateMethodConfig(_common.BaseModel): dest: Optional[str] = Field( default=None, description="""The destination path for the evaluation results.""" ) + evaluation_service_qps: Optional[float] = Field( + default=None, + description="""The rate limit (queries per second) for calls to the + evaluation service. Defaults to 10. Increase this value if your + project has a higher EvaluateInstances API quota.""", + ) class EvaluateMethodConfigDict(TypedDict, total=False): @@ -15469,6 +15475,11 @@ class EvaluateMethodConfigDict(TypedDict, total=False): dest: Optional[str] """The destination path for the evaluation results.""" + evaluation_service_qps: Optional[float] + """The rate limit (queries per second) for calls to the + evaluation service. Defaults to 10. Increase this value if your + project has a higher EvaluateInstances API quota.""" + EvaluateMethodConfigOrDict = Union[EvaluateMethodConfig, EvaluateMethodConfigDict]