Skip to content

Commit d7790fb

Browse files
jsondaicopybara-github
authored andcommitted
chore: GenAI Client(evals) - Add client-side rate limiter to GenAI Eval SDK
PiperOrigin-RevId: 896691319
1 parent 0653ff9 commit d7790fb

File tree

4 files changed

+97
-4
lines changed

4 files changed

+97
-4
lines changed

vertexai/_genai/_evals_common.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1530,6 +1530,7 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
15301530
dataset_schema: Optional[Literal["GEMINI", "FLATTEN", "OPENAI"]] = None,
15311531
dest: Optional[str] = None,
15321532
location: Optional[str] = None,
1533+
evaluation_service_qps: Optional[float] = None,
15331534
**kwargs,
15341535
) -> types.EvaluationResult:
15351536
"""Evaluates a dataset using the provided metrics.
@@ -1542,6 +1543,9 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
15421543
dest: The destination to save the evaluation results.
15431544
location: The location to use for the evaluation. If not specified, the
15441545
location configured in the client will be used.
1546+
evaluation_service_qps: The rate limit (queries per second) for calls
1547+
to the evaluation service. Defaults to 10. Increase this value if
1548+
your project has a higher EvaluateInstances API quota.
15451549
**kwargs: Extra arguments to pass to evaluation, such as `agent_info`.
15461550
15471551
Returns:
@@ -1617,7 +1621,8 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
16171621
logger.info("Running Metric Computation...")
16181622
t1 = time.perf_counter()
16191623
evaluation_result = _evals_metric_handlers.compute_metrics_and_aggregate(
1620-
evaluation_run_config
1624+
evaluation_run_config,
1625+
evaluation_service_qps=evaluation_service_qps,
16211626
)
16221627
t2 = time.perf_counter()
16231628
logger.info("Evaluation took: %f seconds", t2 - t1)

vertexai/_genai/_evals_metric_handlers.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
from . import _evals_common
3333
from . import _evals_constant
34+
from . import _evals_utils
3435
from . import evals
3536
from . import types
3637

@@ -1498,10 +1499,29 @@ class EvaluationRunConfig(_common.BaseModel):
14981499
"""The number of response candidates for the evaluation run."""
14991500

15001501

1502+
def _rate_limited_get_metric_result(
1503+
rate_limiter: _evals_utils.RateLimiter,
1504+
handler: MetricHandler[Any],
1505+
eval_case: types.EvalCase,
1506+
response_index: int,
1507+
) -> types.EvalCaseMetricResult:
1508+
"""Wraps a handler's get_metric_result with rate limiting."""
1509+
rate_limiter.sleep_and_advance()
1510+
return handler.get_metric_result(eval_case, response_index)
1511+
1512+
15011513
def compute_metrics_and_aggregate(
15021514
evaluation_run_config: EvaluationRunConfig,
1515+
evaluation_service_qps: Optional[float] = None,
15031516
) -> types.EvaluationResult:
1504-
"""Computes metrics and aggregates them for a given evaluation run config."""
1517+
"""Computes metrics and aggregates them for a given evaluation run config.
1518+
1519+
Args:
1520+
evaluation_run_config: The configuration for the evaluation run.
1521+
evaluation_service_qps: Optional QPS limit for the evaluation service.
1522+
Defaults to _DEFAULT_EVAL_SERVICE_QPS (10). Users with higher
1523+
quotas can increase this value.
1524+
"""
15051525
metric_handlers = []
15061526
all_futures = []
15071527
results_by_case_response_metric: collections.defaultdict[
@@ -1511,6 +1531,12 @@ def compute_metrics_and_aggregate(
15111531
execution_errors = []
15121532
case_indices_with_errors = set()
15131533

1534+
if evaluation_service_qps is not None and evaluation_service_qps <= 0:
1535+
raise ValueError("evaluation_service_qps must be a positive number.")
1536+
qps = evaluation_service_qps or _evals_utils._DEFAULT_EVAL_SERVICE_QPS
1537+
rate_limiter = _evals_utils.RateLimiter(rate=qps)
1538+
logger.info("Rate limiting evaluation service requests to %.1f QPS.", qps)
1539+
15141540
for eval_metric in evaluation_run_config.metrics:
15151541
metric_handlers.append(
15161542
get_handler_for_metric(evaluation_run_config.evals_module, eval_metric)
@@ -1553,7 +1579,9 @@ def compute_metrics_and_aggregate(
15531579
for response_index in range(actual_num_candidates_for_case):
15541580
try:
15551581
future = executor.submit(
1556-
metric_handler_instance.get_metric_result,
1582+
_rate_limited_get_metric_result,
1583+
rate_limiter,
1584+
metric_handler_instance,
15571585
eval_case,
15581586
response_index,
15591587
)

vertexai/_genai/_evals_utils.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
"""Utility functions for evals."""
1616

1717
import abc
18+
import json
1819
import logging
1920
import os
20-
import json
21+
import threading
22+
import time
2123
from typing import Any, Optional, Union
2224

2325
from google.genai._api_client import BaseApiClient
@@ -36,6 +38,53 @@
3638

3739
GCS_PREFIX = "gs://"
3840
BQ_PREFIX = "bq://"
41+
_DEFAULT_EVAL_SERVICE_QPS = 10
42+
43+
44+
class RateLimiter:
45+
"""Helper class for rate-limiting requests to Vertex AI to improve QoS.
46+
47+
Implements a token bucket algorithm to limit the rate at which API calls
48+
can occur. Designed for cases where the batch size is always 1 for traffic
49+
shaping and rate limiting.
50+
51+
Attributes:
52+
seconds_per_event: The time interval (in seconds) between events to
53+
maintain the desired rate.
54+
last: The timestamp of the last event.
55+
_lock: A lock to ensure thread safety.
56+
"""
57+
58+
def __init__(self, rate: float):
59+
"""Initializes the rate limiter.
60+
61+
Args:
62+
rate: The number of queries allowed per second.
63+
64+
Raises:
65+
ValueError: If the rate is not positive.
66+
"""
67+
if not rate or rate <= 0:
68+
raise ValueError("Rate must be a positive number")
69+
self.seconds_per_event = 1.0 / rate
70+
self._next_allowed = time.monotonic()
71+
self._lock = threading.Lock()
72+
73+
def sleep_and_advance(self) -> None:
74+
"""Blocks the current thread until the next event can be admitted.
75+
76+
The lock is held only long enough to reserve a time slot. The
77+
actual sleep happens outside the lock so that multiple threads
78+
can be sleeping concurrently with staggered wake-up times.
79+
"""
80+
with self._lock:
81+
now = time.monotonic()
82+
wait_until = max(now, self._next_allowed)
83+
delay = wait_until - now
84+
self._next_allowed = wait_until + self.seconds_per_event
85+
86+
if delay > 0:
87+
time.sleep(delay)
3988

4089

4190
class EvalDatasetLoader:

vertexai/_genai/types/common.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15396,6 +15396,12 @@ class EvaluateMethodConfig(_common.BaseModel):
1539615396
dest: Optional[str] = Field(
1539715397
default=None, description="""The destination path for the evaluation results."""
1539815398
)
15399+
evaluation_service_qps: Optional[float] = Field(
15400+
default=None,
15401+
description="""The rate limit (queries per second) for calls to the
15402+
evaluation service. Defaults to 10. Increase this value if your
15403+
project has a higher EvaluateInstances API quota.""",
15404+
)
1539915405

1540015406

1540115407
class EvaluateMethodConfigDict(TypedDict, total=False):
@@ -15412,6 +15418,11 @@ class EvaluateMethodConfigDict(TypedDict, total=False):
1541215418
dest: Optional[str]
1541315419
"""The destination path for the evaluation results."""
1541415420

15421+
evaluation_service_qps: Optional[float]
15422+
"""The rate limit (queries per second) for calls to the
15423+
evaluation service. Defaults to 10. Increase this value if your
15424+
project has a higher EvaluateInstances API quota."""
15425+
1541515426

1541615427
EvaluateMethodConfigOrDict = Union[EvaluateMethodConfig, EvaluateMethodConfigDict]
1541715428

0 commit comments

Comments
 (0)