From bdf843ad188d594458b76f980024fac1ff6c37ea Mon Sep 17 00:00:00 2001 From: Rob von Behren Date: Thu, 7 May 2026 07:50:59 -0700 Subject: [PATCH 1/6] cp --- src/runloop_api_client/_base_client.py | 193 ++++++++++++++++-- src/runloop_api_client/_client.py | 24 ++- src/runloop_api_client/_constants.py | 2 +- src/runloop_api_client/lib/wait_for_status.py | 99 +++++++++ .../resources/devboxes/devboxes.py | 134 ++++-------- .../resources/devboxes/executions.py | 73 +++---- uv.lock | 2 +- 7 files changed, 361 insertions(+), 166 deletions(-) create mode 100644 src/runloop_api_client/lib/wait_for_status.py diff --git a/src/runloop_api_client/_base_client.py b/src/runloop_api_client/_base_client.py index 410e78aab..9be7adf02 100644 --- a/src/runloop_api_client/_base_client.py +++ b/src/runloop_api_client/_base_client.py @@ -10,6 +10,7 @@ import logging import platform import warnings +import threading import email.utils from types import TracebackType from random import random @@ -90,6 +91,14 @@ log: logging.Logger = logging.getLogger(__name__) +# Shared HTTP connection pool state. One pool per client type (sync/async), +# refcounted so the pool is closed when the last SDK client releases it. +_pool_lock = threading.Lock() +_shared_sync_client: httpx.Client | None = None +_shared_sync_refcount: int = 0 +_shared_async_client: httpx.AsyncClient | None = None +_shared_async_refcount: int = 0 + # TODO: make base page type vars covariant SyncPageT = TypeVar("SyncPageT", bound="BaseSyncPage[Any]") AsyncPageT = TypeVar("AsyncPageT", bound="BaseAsyncPage[Any]") @@ -816,6 +825,7 @@ def __init__(self, **kwargs: Any) -> None: kwargs.setdefault("timeout", DEFAULT_TIMEOUT) kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS) kwargs.setdefault("follow_redirects", True) + kwargs.setdefault("http2", True) super().__init__(**kwargs) @@ -845,6 +855,7 @@ def __del__(self) -> None: class SyncAPIClient(BaseClient[httpx.Client, Stream[Any]]): _client: httpx.Client _default_stream_cls: type[Stream[Any]] | None = None + _uses_shared_pool: bool def __init__( self, @@ -857,6 +868,7 @@ def __init__( custom_headers: Mapping[str, str] | None = None, custom_query: Mapping[str, object] | None = None, _strict_response_validation: bool, + shared_http_pool: bool = True, ) -> None: if not is_given(timeout): # if the user passed in a custom http client with a non-default @@ -886,11 +898,28 @@ def __init__( custom_headers=custom_headers, _strict_response_validation=_strict_response_validation, ) - self._client = http_client or SyncHttpxClientWrapper( - base_url=base_url, - # cast to a valid type because mypy doesn't understand our type narrowing - timeout=cast(Timeout, timeout), - ) + + if http_client is not None: + self._client = http_client + self._uses_shared_pool = False + elif shared_http_pool: + global _shared_sync_client, _shared_sync_refcount + with _pool_lock: + if _shared_sync_client is None or _shared_sync_client.is_closed: + _shared_sync_client = SyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + ) + _shared_sync_refcount = 0 + _shared_sync_refcount += 1 + self._client = _shared_sync_client + self._uses_shared_pool = True + else: + self._client = SyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + ) + self._uses_shared_pool = False def is_closed(self) -> bool: return self._client.is_closed @@ -900,11 +929,40 @@ def close(self) -> None: The client will *not* be usable after this. """ - # If an error is thrown while constructing a client, self._client - # may not be present - if hasattr(self, "_client"): + if not hasattr(self, "_client"): + return + if self._uses_shared_pool: + self._release_shared_pool() + else: self._client.close() + def _release_shared_pool(self) -> None: + global _shared_sync_client, _shared_sync_refcount + with _pool_lock: + if not self._uses_shared_pool: + return + self._uses_shared_pool = False + _shared_sync_refcount -= 1 + if _shared_sync_refcount <= 0: + client = _shared_sync_client + _shared_sync_client = None + _shared_sync_refcount = 0 + else: + client = None + if client is not None: + try: + client.close() + except Exception: + pass + + def __del__(self) -> None: + if not getattr(self, "_uses_shared_pool", False): + return + try: + self._release_shared_pool() + except Exception: + pass + def __enter__(self: _T) -> _T: return self @@ -1018,6 +1076,7 @@ def request( max_retries=max_retries, options=input_options, response=None, + error=err, ) continue @@ -1032,6 +1091,7 @@ def request( max_retries=max_retries, options=input_options, response=None, + error=err, ) continue @@ -1083,7 +1143,13 @@ def request( ) def _sleep_for_retry( - self, *, retries_taken: int, max_retries: int, options: FinalRequestOptions, response: httpx.Response | None + self, + *, + retries_taken: int, + max_retries: int, + options: FinalRequestOptions, + response: httpx.Response | None, + error: BaseException | None = None, ) -> None: remaining_retries = max_retries - retries_taken if remaining_retries == 1: @@ -1092,7 +1158,23 @@ def _sleep_for_retry( log.debug("%i retries left", remaining_retries) timeout = self._calculate_retry_timeout(remaining_retries, options, response.headers if response else None) - log.info("Retrying request to %s in %f seconds", options.url, timeout) + if response is not None: + log.info( + "Retrying request to %s in %f seconds (status %d)", + options.url, + timeout, + response.status_code, + ) + elif error is not None: + log.info( + "Retrying request to %s in %f seconds (%s: %s)", + options.url, + timeout, + type(error).__name__, + error, + ) + else: + log.info("Retrying request to %s in %f seconds", options.url, timeout) time.sleep(timeout) @@ -1428,6 +1510,7 @@ def __del__(self) -> None: class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]): _client: httpx.AsyncClient _default_stream_cls: type[AsyncStream[Any]] | None = None + _uses_shared_pool: bool def __init__( self, @@ -1440,6 +1523,7 @@ def __init__( http_client: httpx.AsyncClient | None = None, custom_headers: Mapping[str, str] | None = None, custom_query: Mapping[str, object] | None = None, + shared_http_pool: bool = True, ) -> None: if not is_given(timeout): # if the user passed in a custom http client with a non-default @@ -1469,11 +1553,28 @@ def __init__( custom_headers=custom_headers, _strict_response_validation=_strict_response_validation, ) - self._client = http_client or AsyncHttpxClientWrapper( - base_url=base_url, - # cast to a valid type because mypy doesn't understand our type narrowing - timeout=cast(Timeout, timeout), - ) + + if http_client is not None: + self._client = http_client + self._uses_shared_pool = False + elif shared_http_pool: + global _shared_async_client, _shared_async_refcount + with _pool_lock: + if _shared_async_client is None or _shared_async_client.is_closed: + _shared_async_client = AsyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + ) + _shared_async_refcount = 0 + _shared_async_refcount += 1 + self._client = _shared_async_client + self._uses_shared_pool = True + else: + self._client = AsyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + ) + self._uses_shared_pool = False def is_closed(self) -> bool: return self._client.is_closed @@ -1483,7 +1584,39 @@ async def close(self) -> None: The client will *not* be usable after this. """ - await self._client.aclose() + if self._uses_shared_pool: + self._release_shared_pool() + else: + await self._client.aclose() + + def _release_shared_pool(self) -> None: + global _shared_async_client, _shared_async_refcount + should_close = False + client = None + with _pool_lock: + if not self._uses_shared_pool: + return + self._uses_shared_pool = False + _shared_async_refcount -= 1 + if _shared_async_refcount <= 0: + client = _shared_async_client + _shared_async_client = None + _shared_async_refcount = 0 + should_close = True + if should_close and client is not None: + try: + loop = asyncio.get_running_loop() + loop.create_task(client.aclose()) + except Exception: + pass + + def __del__(self) -> None: + if not getattr(self, "_uses_shared_pool", False): + return + try: + self._release_shared_pool() + except Exception: + pass async def __aenter__(self: _T) -> _T: return self @@ -1603,6 +1736,7 @@ async def request( max_retries=max_retries, options=input_options, response=None, + error=err, ) continue @@ -1617,6 +1751,7 @@ async def request( max_retries=max_retries, options=input_options, response=None, + error=err, ) continue @@ -1668,7 +1803,13 @@ async def request( ) async def _sleep_for_retry( - self, *, retries_taken: int, max_retries: int, options: FinalRequestOptions, response: httpx.Response | None + self, + *, + retries_taken: int, + max_retries: int, + options: FinalRequestOptions, + response: httpx.Response | None, + error: BaseException | None = None, ) -> None: remaining_retries = max_retries - retries_taken if remaining_retries == 1: @@ -1677,7 +1818,23 @@ async def _sleep_for_retry( log.debug("%i retries left", remaining_retries) timeout = self._calculate_retry_timeout(remaining_retries, options, response.headers if response else None) - log.info("Retrying request to %s in %f seconds", options.url, timeout) + if response is not None: + log.info( + "Retrying request to %s in %f seconds (status %d)", + options.url, + timeout, + response.status_code, + ) + elif error is not None: + log.info( + "Retrying request to %s in %f seconds (%s: %s)", + options.url, + timeout, + type(error).__name__, + error, + ) + else: + log.info("Retrying request to %s in %f seconds", options.url, timeout) await anyio.sleep(timeout) diff --git a/src/runloop_api_client/_client.py b/src/runloop_api_client/_client.py index 61db3a474..e6ce4f2c0 100644 --- a/src/runloop_api_client/_client.py +++ b/src/runloop_api_client/_client.py @@ -84,6 +84,10 @@ def __init__( # We provide a `DefaultHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`. # See the [httpx documentation](https://www.python-httpx.org/api/#client) for more details. http_client: httpx.Client | None = None, + # Share a single httpx connection pool across all Runloop client instances. + # Enables HTTP/2 multiplexing and avoids ConnectTimeout storms under high concurrency. + # Set to False to create a private connection pool (old behavior). + shared_http_pool: bool = True, # Enable or disable schema validation for data returned by the API. # When enabled an error APIResponseValidationError is raised # if the API responds with invalid data for the expected schema. @@ -120,6 +124,7 @@ def __init__( custom_headers=default_headers, custom_query=default_query, _strict_response_validation=_strict_response_validation, + shared_http_pool=shared_http_pool, ) self._idempotency_header = "x-request-id" @@ -277,12 +282,17 @@ def copy( elif set_default_query is not None: params = set_default_query - http_client = http_client or self._client + if http_client is not None: + shared_http_pool = False + else: + shared_http_pool = self._uses_shared_pool + return self.__class__( bearer_token=bearer_token or self.bearer_token, base_url=base_url or self.base_url, timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, + shared_http_pool=shared_http_pool, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, @@ -344,6 +354,10 @@ def __init__( # We provide a `DefaultAsyncHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`. # See the [httpx documentation](https://www.python-httpx.org/api/#asyncclient) for more details. http_client: httpx.AsyncClient | None = None, + # Share a single httpx connection pool across all AsyncRunloop client instances. + # Enables HTTP/2 multiplexing and avoids ConnectTimeout storms under high concurrency. + # Set to False to create a private connection pool (old behavior). + shared_http_pool: bool = True, # Enable or disable schema validation for data returned by the API. # When enabled an error APIResponseValidationError is raised # if the API responds with invalid data for the expected schema. @@ -380,6 +394,7 @@ def __init__( custom_headers=default_headers, custom_query=default_query, _strict_response_validation=_strict_response_validation, + shared_http_pool=shared_http_pool, ) self._idempotency_header = "x-request-id" @@ -537,12 +552,17 @@ def copy( elif set_default_query is not None: params = set_default_query - http_client = http_client or self._client + if http_client is not None: + shared_http_pool = False + else: + shared_http_pool = self._uses_shared_pool + return self.__class__( bearer_token=bearer_token or self.bearer_token, base_url=base_url or self.base_url, timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, + shared_http_pool=shared_http_pool, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, diff --git a/src/runloop_api_client/_constants.py b/src/runloop_api_client/_constants.py index d6361c8ad..88f944ce2 100644 --- a/src/runloop_api_client/_constants.py +++ b/src/runloop_api_client/_constants.py @@ -8,7 +8,7 @@ # default timeout is 30 seconds DEFAULT_TIMEOUT = httpx.Timeout(timeout=30, connect=5.0) DEFAULT_MAX_RETRIES = 5 -DEFAULT_CONNECTION_LIMITS = httpx.Limits(max_connections=100, max_keepalive_connections=20) +DEFAULT_CONNECTION_LIMITS = httpx.Limits(max_connections=20, max_keepalive_connections=10) INITIAL_RETRY_DELAY = 1.0 MAX_RETRY_DELAY = 60.0 diff --git a/src/runloop_api_client/lib/wait_for_status.py b/src/runloop_api_client/lib/wait_for_status.py new file mode 100644 index 000000000..73df2bf95 --- /dev/null +++ b/src/runloop_api_client/lib/wait_for_status.py @@ -0,0 +1,99 @@ +"""Helpers for polling wait_for_status long-poll endpoints. + +Each function wraps a server-side long-poll POST with a client-side retry +loop. On each iteration the remaining timeout is forwarded to the server +so the server can long-poll for up to that duration. 408 responses and +client-side timeouts are converted to a caller-supplied placeholder so the +loop can continue. No client-side sleep between iterations — the +server-side long-poll *is* the wait. +""" + +from __future__ import annotations + +import time +from typing import List, Type, TypeVar, Callable, Optional, Awaitable + +from .polling import PollingConfig, PollingTimeout +from .._exceptions import APIStatusError, APITimeoutError + +T = TypeVar("T") + + +def wait_for_status( + post_fn: Callable[..., T], + path: str, + statuses: List[str], + cast_to: Type[T], + placeholder: Callable[[], T], + is_terminal: Callable[[T], bool], + polling_config: Optional[PollingConfig] = None, +) -> T: + """Sync long-poll for a status change, retrying until *is_terminal* or timeout.""" + config = polling_config or PollingConfig() + timeout = config.interval_seconds * config.max_attempts + if config.timeout_seconds is not None and config.timeout_seconds > 0: + timeout = min(config.timeout_seconds, timeout) + + start_time = time.time() + last_result: T | None = None + + while True: + remaining = timeout - (time.time() - start_time) + if remaining <= 0: + raise PollingTimeout(f"Exceeded timeout of {timeout} seconds", last_result) + + try: + last_result = post_fn( + path, + body={"statuses": statuses, "timeout_seconds": remaining}, + cast_to=cast_to, + options={"max_retries": 0}, + ) + except (APITimeoutError, APIStatusError) as error: + if isinstance(error, APITimeoutError) or error.response.status_code == 408: + last_result = placeholder() + else: + raise + + if is_terminal(last_result): + return last_result + + +async def async_wait_for_status( + post_fn: Callable[..., Awaitable[T]], + path: str, + statuses: List[str], + cast_to: Type[T], + placeholder: Callable[[], T], + is_terminal: Callable[[T], bool], + polling_config: Optional[PollingConfig] = None, +) -> T: + """Async long-poll for a status change, retrying until *is_terminal* or timeout.""" + config = polling_config or PollingConfig() + timeout = config.interval_seconds * config.max_attempts + if config.timeout_seconds is not None and config.timeout_seconds > 0: + timeout = min(config.timeout_seconds, timeout) + + start_time = time.time() + last_result: T | None = None + + while True: + remaining = timeout - (time.time() - start_time) + if remaining <= 0: + raise PollingTimeout(f"Exceeded timeout of {timeout} seconds", last_result) + + try: + last_result = await post_fn( + path, + body={"statuses": statuses, "timeout_seconds": remaining}, + cast_to=cast_to, + options={"max_retries": 0}, + ) + except (APITimeoutError, APIStatusError) as error: + if isinstance(error, APITimeoutError) or error.response.status_code == 408: + last_result = placeholder() + else: + raise + + if is_terminal(last_result): + return last_result diff --git a/src/runloop_api_client/resources/devboxes/devboxes.py b/src/runloop_api_client/resources/devboxes/devboxes.py index 83459959b..888369e98 100644 --- a/src/runloop_api_client/resources/devboxes/devboxes.py +++ b/src/runloop_api_client/resources/devboxes/devboxes.py @@ -72,7 +72,7 @@ AsyncDiskSnapshotsCursorIDPage, ) from ..._exceptions import RunloopError, APIStatusError, APITimeoutError -from ...lib.polling import PollingConfig, poll_until, retry_server_poll_until as sync_retry_server_poll_until +from ...lib.polling import PollingConfig, poll_until from ..._base_client import AsyncPaginator, make_request_options from .disk_snapshots import ( DiskSnapshotsResource, @@ -82,9 +82,10 @@ DiskSnapshotsResourceWithStreamingResponse, AsyncDiskSnapshotsResourceWithStreamingResponse, ) -from ...lib.polling_async import async_poll_until, async_retry_server_poll_until +from ...lib.polling_async import async_poll_until from ...types.devbox_view import DevboxView from ...types.tunnel_view import TunnelView +from ...lib.wait_for_status import wait_for_status, async_wait_for_status from ...types.shared_params.mount import Mount from ...types.devbox_snapshot_view import DevboxSnapshotView from ...types.shared.launch_parameters import LaunchParameters as SharedLaunchParameters @@ -383,11 +384,7 @@ def await_running( Args: id: The ID of the devbox to wait for - config: Optional polling configuration - extra_headers: Send extra headers - extra_query: Add additional query parameters to the request - extra_body: Add additional JSON properties to the request - timeout: Override the client-level default timeout for this request, in seconds + polling_config: Optional polling configuration Returns: The devbox in running state @@ -397,31 +394,18 @@ def await_running( RunloopError: If devbox enters a non-running terminal state """ - def wait_for_devbox_status(remaining_timeout_seconds: float) -> DevboxView: - try: - return self._post( - f"/v1/devboxes/{id}/wait_for_status", - body={"statuses": ["running", "failure", "shutdown"], "timeout_seconds": remaining_timeout_seconds}, - cast_to=DevboxView, - options={"max_retries": 0}, - ) - except (APITimeoutError, APIStatusError) as error: - if isinstance(error, APITimeoutError) or error.response.status_code == 408: - return placeholder_devbox_view(id) - raise - def is_done_booting(devbox: DevboxView) -> bool: return devbox.status not in DEVBOX_BOOTING_STATES - config = polling_config - if not config: - config = PollingConfig() - - timeout = config.interval_seconds * config.max_attempts - if config.timeout_seconds is not None and config.timeout_seconds > 0: - timeout = min(config.timeout_seconds, timeout) - - devbox = sync_retry_server_poll_until(wait_for_devbox_status, is_done_booting, timeout) + devbox = wait_for_status( + self._post, + f"/v1/devboxes/{id}/wait_for_status", + ["running", "failure", "shutdown"], + DevboxView, + lambda: placeholder_devbox_view(id), + is_done_booting, + polling_config, + ) if devbox.status != "running": raise RunloopError(f"Devbox entered non-running terminal state: {devbox.status}") @@ -448,25 +432,18 @@ def await_suspended( RunloopError: If the devbox enters a non-suspended terminal state. """ - def wait_for_devbox_status() -> DevboxView: - return self._post( - f"/v1/devboxes/{id}/wait_for_status", - body={"statuses": list(DEVBOX_TERMINAL_STATES)}, - cast_to=DevboxView, - options={"max_retries": 0}, - ) - - def handle_timeout_error(error: Exception) -> DevboxView: - if isinstance(error, APITimeoutError) or ( - isinstance(error, APIStatusError) and error.response.status_code == 408 - ): - return placeholder_devbox_view(id) - raise error - def is_terminal_state(devbox: DevboxView) -> bool: return devbox.status in DEVBOX_TERMINAL_STATES - devbox = poll_until(wait_for_devbox_status, is_terminal_state, polling_config, handle_timeout_error) + devbox = wait_for_status( + self._post, + f"/v1/devboxes/{id}/wait_for_status", + list(DEVBOX_TERMINAL_STATES), + DevboxView, + lambda: placeholder_devbox_view(id), + is_terminal_state, + polling_config, + ) if devbox.status != "suspended": raise RunloopError(f"Devbox entered non-suspended terminal state: {devbox.status}") @@ -2045,9 +2022,6 @@ async def await_running( Args: id: The ID of the devbox to wait for polling_config: Optional polling configuration - extra_headers: Send extra headers - extra_query: Add additional query parameters to the request - extra_body: Add additional JSON properties to the request Returns: The devbox in running state @@ -2057,41 +2031,18 @@ async def await_running( RunloopError: If devbox enters a non-running terminal state """ - async def wait_for_devbox_status(remaining_timeout_seconds: float) -> DevboxView: - # This wait_for_status endpoint polls the devbox status for 10 seconds until it reaches either running or failure. - # If it's neither, it will throw an error. - try: - return await self._post( - f"/v1/devboxes/{id}/wait_for_status", - body={"statuses": ["running", "failure", "shutdown"], "timeout_seconds": remaining_timeout_seconds}, - cast_to=DevboxView, - options={"max_retries": 0}, - ) - except (APITimeoutError, APIStatusError) as error: - # Handle timeout errors by returning current devbox state to continue polling - if isinstance(error, APITimeoutError) or error.response.status_code == 408: - # Return a placeholder result to continue polling - return placeholder_devbox_view(id) - - # Re-raise other errors to stop polling - raise - def is_done_booting(devbox: DevboxView) -> bool: return devbox.status not in DEVBOX_BOOTING_STATES - # calculate the timeout to use. The PollingConfig doesn't - # match the semantics for server-side polling well, so we - # instead convert interval*attempts to a total time, and take - # the minimum total. - config = polling_config - if not config: - config = PollingConfig() # use defaults - - timeout = config.interval_seconds * config.max_attempts - if config.timeout_seconds is not None and config.timeout_seconds > 0: - timeout = min(config.timeout_seconds, timeout) - - devbox = await async_retry_server_poll_until(wait_for_devbox_status, is_done_booting, timeout) + devbox = await async_wait_for_status( + self._post, + f"/v1/devboxes/{id}/wait_for_status", + ["running", "failure", "shutdown"], + DevboxView, + lambda: placeholder_devbox_view(id), + is_done_booting, + polling_config, + ) if devbox.status != "running": raise RunloopError(f"Devbox entered non-running terminal state: {devbox.status}") @@ -2118,23 +2069,18 @@ async def await_suspended( RunloopError: If the devbox enters a non-suspended terminal state. """ - async def wait_for_devbox_status() -> DevboxView: - try: - return await self._post( - f"/v1/devboxes/{id}/wait_for_status", - body={"statuses": list(DEVBOX_TERMINAL_STATES)}, - cast_to=DevboxView, - options={"max_retries": 0}, - ) - except (APITimeoutError, APIStatusError) as error: - if isinstance(error, APITimeoutError) or error.response.status_code == 408: - return placeholder_devbox_view(id) - raise - def is_terminal_state(devbox: DevboxView) -> bool: return devbox.status in DEVBOX_TERMINAL_STATES - devbox = await async_poll_until(wait_for_devbox_status, is_terminal_state, polling_config) + devbox = await async_wait_for_status( + self._post, + f"/v1/devboxes/{id}/wait_for_status", + list(DEVBOX_TERMINAL_STATES), + DevboxView, + lambda: placeholder_devbox_view(id), + is_terminal_state, + polling_config, + ) if devbox.status != "suspended": raise RunloopError(f"Devbox entered non-suspended terminal state: {devbox.status}") diff --git a/src/runloop_api_client/resources/devboxes/executions.py b/src/runloop_api_client/resources/devboxes/executions.py index ff7638798..e5bfd7ed8 100755 --- a/src/runloop_api_client/resources/devboxes/executions.py +++ b/src/runloop_api_client/resources/devboxes/executions.py @@ -20,8 +20,7 @@ ) from ..._constants import DEFAULT_TIMEOUT, RAW_RESPONSE_HEADER from ..._streaming import Stream, AsyncStream, ReconnectingStream, AsyncReconnectingStream -from ..._exceptions import APIStatusError, APITimeoutError -from ...lib.polling import PollingConfig, poll_until +from ...lib.polling import PollingConfig from ..._base_client import make_request_options from ...types.devboxes import ( execution_kill_params, @@ -32,7 +31,7 @@ execution_stream_stderr_updates_params, execution_stream_stdout_updates_params, ) -from ...lib.polling_async import async_poll_until +from ...lib.wait_for_status import wait_for_status, async_wait_for_status from ...types.devbox_send_std_in_result import DevboxSendStdInResult from ...types.devbox_execution_detail_view import DevboxExecutionDetailView from ...types.devboxes.execution_update_chunk import ExecutionUpdateChunk @@ -129,12 +128,8 @@ def await_completed( Args: execution_id: The ID of the execution to wait for - id: The ID of the devbox - config: Optional polling configuration - extra_headers: Send extra headers - extra_query: Add additional query parameters to the request - extra_body: Add additional JSON properties to the request - timeout: Override the client-level default timeout for this request, in seconds + devbox_id: The ID of the devbox + polling_config: Optional polling configuration Returns: The completed execution @@ -143,29 +138,18 @@ def await_completed( PollingTimeout: If polling times out before execution completes """ - def wait_for_execution_status() -> DevboxAsyncExecutionDetailView: - # This wait_for_status endpoint polls the execution status for 60 seconds until it reaches either completed. - return self._post( - f"/v1/devboxes/{devbox_id}/executions/{execution_id}/wait_for_status", - body={"statuses": ["completed"]}, - cast_to=DevboxAsyncExecutionDetailView, - ) - - def handle_timeout_error(error: Exception) -> DevboxAsyncExecutionDetailView: - # Handle timeout errors by returning current execution state to continue polling - if isinstance(error, APITimeoutError) or ( - isinstance(error, APIStatusError) and error.response.status_code == 408 - ): - # Return a placeholder result to continue polling - return placeholder_execution_detail_view(devbox_id, execution_id) - else: - # Re-raise other errors to stop polling - raise error - def is_done(execution: DevboxAsyncExecutionDetailView) -> bool: return execution.status == "completed" - return poll_until(wait_for_execution_status, is_done, polling_config, handle_timeout_error) + return wait_for_status( + self._post, + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/wait_for_status", + ["completed"], + DevboxAsyncExecutionDetailView, + lambda: placeholder_execution_detail_view(devbox_id, execution_id), + is_done, + polling_config, + ) def execute_async( self, @@ -675,12 +659,8 @@ async def await_completed( Args: execution_id: The ID of the execution to wait for - id: The ID of the devbox + devbox_id: The ID of the devbox polling_config: Optional polling configuration - extra_headers: Send extra headers - extra_query: Add additional query parameters to the request - extra_body: Add additional JSON properties to the request - timeout: Override the client-level default timeout for this request, in seconds Returns: The completed execution @@ -689,25 +669,18 @@ async def await_completed( PollingTimeout: If polling times out before execution completes """ - async def wait_for_execution_status() -> DevboxAsyncExecutionDetailView: - try: - return await self._post( - f"/v1/devboxes/{devbox_id}/executions/{execution_id}/wait_for_status", - body={"statuses": ["completed"]}, - cast_to=DevboxAsyncExecutionDetailView, - ) - except (APITimeoutError, APIStatusError) as error: - # Handle timeout errors by returning placeholder to continue polling - if isinstance(error, APITimeoutError) or error.response.status_code == 408: - return placeholder_execution_detail_view(devbox_id, execution_id) - - # Re-raise other errors to stop polling - raise - def is_done(execution: DevboxAsyncExecutionDetailView) -> bool: return execution.status == "completed" - return await async_poll_until(wait_for_execution_status, is_done, polling_config) + return await async_wait_for_status( + self._post, + f"/v1/devboxes/{devbox_id}/executions/{execution_id}/wait_for_status", + ["completed"], + DevboxAsyncExecutionDetailView, + lambda: placeholder_execution_detail_view(devbox_id, execution_id), + is_done, + polling_config, + ) async def execute_async( self, diff --git a/uv.lock b/uv.lock index 88dc754a1..a35165b2c 100644 --- a/uv.lock +++ b/uv.lock @@ -2422,7 +2422,7 @@ wheels = [ [[package]] name = "runloop-api-client" -version = "1.20.0" +version = "1.20.2" source = { editable = "." } dependencies = [ { name = "anyio" }, From 2728bdc1e1c22623b8277af2b0cec819a5ee5fa1 Mon Sep 17 00:00:00 2001 From: Rob von Behren Date: Thu, 7 May 2026 11:41:16 -0700 Subject: [PATCH 2/6] Fix shared pool bugs and add tests - Add hasattr guard to AsyncAPIClient.close() matching sync version - Track per-instance _closed flag so is_closed() works correctly for shared pool clients (underlying transport stays open for others) - Add shared_http_pool param to copy()/with_options() to match __init__ - Add 19 tests covering sharing, refcounting, copy propagation Co-Authored-By: Claude Opus 4.6 --- src/runloop_api_client/_base_client.py | 14 +- src/runloop_api_client/_client.py | 18 +- tests/test_shared_pool.py | 259 +++++++++++++++++++++++++ 3 files changed, 283 insertions(+), 8 deletions(-) create mode 100644 tests/test_shared_pool.py diff --git a/src/runloop_api_client/_base_client.py b/src/runloop_api_client/_base_client.py index 9be7adf02..6986415cf 100644 --- a/src/runloop_api_client/_base_client.py +++ b/src/runloop_api_client/_base_client.py @@ -856,6 +856,7 @@ class SyncAPIClient(BaseClient[httpx.Client, Stream[Any]]): _client: httpx.Client _default_stream_cls: type[Stream[Any]] | None = None _uses_shared_pool: bool + _closed: bool def __init__( self, @@ -899,6 +900,8 @@ def __init__( _strict_response_validation=_strict_response_validation, ) + self._closed = False + if http_client is not None: self._client = http_client self._uses_shared_pool = False @@ -922,7 +925,7 @@ def __init__( self._uses_shared_pool = False def is_closed(self) -> bool: - return self._client.is_closed + return self._closed or self._client.is_closed def close(self) -> None: """Close the underlying HTTPX client. @@ -931,6 +934,7 @@ def close(self) -> None: """ if not hasattr(self, "_client"): return + self._closed = True if self._uses_shared_pool: self._release_shared_pool() else: @@ -1511,6 +1515,7 @@ class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]): _client: httpx.AsyncClient _default_stream_cls: type[AsyncStream[Any]] | None = None _uses_shared_pool: bool + _closed: bool def __init__( self, @@ -1554,6 +1559,8 @@ def __init__( _strict_response_validation=_strict_response_validation, ) + self._closed = False + if http_client is not None: self._client = http_client self._uses_shared_pool = False @@ -1577,13 +1584,16 @@ def __init__( self._uses_shared_pool = False def is_closed(self) -> bool: - return self._client.is_closed + return self._closed or self._client.is_closed async def close(self) -> None: """Close the underlying HTTPX client. The client will *not* be usable after this. """ + if not hasattr(self, "_client"): + return + self._closed = True if self._uses_shared_pool: self._release_shared_pool() else: diff --git a/src/runloop_api_client/_client.py b/src/runloop_api_client/_client.py index e6ce4f2c0..1867f9997 100644 --- a/src/runloop_api_client/_client.py +++ b/src/runloop_api_client/_client.py @@ -254,6 +254,7 @@ def copy( base_url: str | httpx.URL | None = None, timeout: float | Timeout | None | NotGiven = not_given, http_client: httpx.Client | None = None, + shared_http_pool: bool | None = None, max_retries: int | NotGiven = not_given, default_headers: Mapping[str, str] | None = None, set_default_headers: Mapping[str, str] | None = None, @@ -283,16 +284,18 @@ def copy( params = set_default_query if http_client is not None: - shared_http_pool = False + resolved_shared = False + elif shared_http_pool is not None: + resolved_shared = shared_http_pool else: - shared_http_pool = self._uses_shared_pool + resolved_shared = self._uses_shared_pool return self.__class__( bearer_token=bearer_token or self.bearer_token, base_url=base_url or self.base_url, timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, - shared_http_pool=shared_http_pool, + shared_http_pool=resolved_shared, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, @@ -524,6 +527,7 @@ def copy( base_url: str | httpx.URL | None = None, timeout: float | Timeout | None | NotGiven = not_given, http_client: httpx.AsyncClient | None = None, + shared_http_pool: bool | None = None, max_retries: int | NotGiven = not_given, default_headers: Mapping[str, str] | None = None, set_default_headers: Mapping[str, str] | None = None, @@ -553,16 +557,18 @@ def copy( params = set_default_query if http_client is not None: - shared_http_pool = False + resolved_shared = False + elif shared_http_pool is not None: + resolved_shared = shared_http_pool else: - shared_http_pool = self._uses_shared_pool + resolved_shared = self._uses_shared_pool return self.__class__( bearer_token=bearer_token or self.bearer_token, base_url=base_url or self.base_url, timeout=self.timeout if isinstance(timeout, NotGiven) else timeout, http_client=http_client, - shared_http_pool=shared_http_pool, + shared_http_pool=resolved_shared, max_retries=max_retries if is_given(max_retries) else self.max_retries, default_headers=headers, default_query=params, diff --git a/tests/test_shared_pool.py b/tests/test_shared_pool.py new file mode 100644 index 000000000..40da1dc07 --- /dev/null +++ b/tests/test_shared_pool.py @@ -0,0 +1,259 @@ +"""Tests for shared HTTP connection pool behavior. + +Verifies that SDK clients share (or don't share) the underlying httpx +transport, and that refcounting correctly manages the pool lifecycle. +""" + +from __future__ import annotations + +import os + +import httpx +import pytest + +import runloop_api_client._base_client as _base_mod +from runloop_api_client import Runloop, AsyncRunloop + +base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") +bearer_token = "My Bearer Token" + + +@pytest.fixture(autouse=True) +def _reset_shared_pool(): + """Reset module-level shared pool state before and after each test.""" + _clear_pool_state() + yield + _clear_pool_state() + + +def _clear_pool_state(): + with _base_mod._pool_lock: + if _base_mod._shared_sync_client is not None and not _base_mod._shared_sync_client.is_closed: + try: + _base_mod._shared_sync_client.close() + except Exception: + pass + _base_mod._shared_sync_client = None + _base_mod._shared_sync_refcount = 0 + + if _base_mod._shared_async_client is not None and not _base_mod._shared_async_client.is_closed: + pass # async close is best-effort in sync context + _base_mod._shared_async_client = None + _base_mod._shared_async_refcount = 0 + + +def _make_client(**kwargs) -> Runloop: + kwargs.setdefault("base_url", base_url) + kwargs.setdefault("bearer_token", bearer_token) + return Runloop(**kwargs) + + +def _make_async_client(**kwargs) -> AsyncRunloop: + kwargs.setdefault("base_url", base_url) + kwargs.setdefault("bearer_token", bearer_token) + return AsyncRunloop(**kwargs) + + +# --------------------------------------------------------------------------- +# Sync: sharing behavior +# --------------------------------------------------------------------------- + + +class TestSyncSharedPool: + def test_shared_pool_uses_same_client(self): + c1 = _make_client(shared_http_pool=True) + c2 = _make_client(shared_http_pool=True) + + assert c1._client is c2._client + assert c1._uses_shared_pool is True + assert c2._uses_shared_pool is True + + c1.close() + c2.close() + + def test_private_pool_uses_different_clients(self): + c1 = _make_client(shared_http_pool=False) + c2 = _make_client(shared_http_pool=False) + + assert c1._client is not c2._client + assert c1._uses_shared_pool is False + assert c2._uses_shared_pool is False + + c1.close() + c2.close() + + def test_custom_http_client_bypasses_sharing(self): + custom = httpx.Client() + c1 = _make_client(http_client=custom, shared_http_pool=True) + + assert c1._client is custom + assert c1._uses_shared_pool is False + + c1.close() + custom.close() + + def test_default_is_shared(self): + c1 = _make_client() + assert c1._uses_shared_pool is True + c1.close() + + +class TestSyncRefcounting: + def test_close_one_keeps_pool_alive(self): + c1 = _make_client(shared_http_pool=True) + c2 = _make_client(shared_http_pool=True) + pool = c1._client + + c1.close() + assert not pool.is_closed + assert _base_mod._shared_sync_refcount == 1 + + c2.close() + assert pool.is_closed + assert _base_mod._shared_sync_client is None + assert _base_mod._shared_sync_refcount == 0 + + def test_double_close_is_safe(self): + c1 = _make_client(shared_http_pool=True) + c1.close() + c1.close() # should not raise or go negative + assert _base_mod._shared_sync_refcount == 0 + + def test_three_clients_refcount(self): + c1 = _make_client(shared_http_pool=True) + c2 = _make_client(shared_http_pool=True) + c3 = _make_client(shared_http_pool=True) + pool = c1._client + + assert _base_mod._shared_sync_refcount == 3 + + c1.close() + assert _base_mod._shared_sync_refcount == 2 + assert not pool.is_closed + + c2.close() + assert _base_mod._shared_sync_refcount == 1 + assert not pool.is_closed + + c3.close() + assert _base_mod._shared_sync_refcount == 0 + assert pool.is_closed + + def test_pool_recreated_after_full_release(self): + c1 = _make_client(shared_http_pool=True) + pool1 = c1._client + c1.close() + + c2 = _make_client(shared_http_pool=True) + pool2 = c2._client + assert pool2 is not pool1 + assert not pool2.is_closed + + c2.close() + + +class TestSyncCopy: + def test_copy_inherits_shared_pool(self): + c1 = _make_client(shared_http_pool=True) + c2 = c1.copy() + + assert c2._uses_shared_pool is True + assert c2._client is c1._client + assert _base_mod._shared_sync_refcount == 2 + + c1.close() + c2.close() + + def test_copy_with_custom_client_disables_sharing(self): + c1 = _make_client(shared_http_pool=True) + custom = httpx.Client() + c2 = c1.copy(http_client=custom) + + assert c2._uses_shared_pool is False + assert c2._client is custom + + c1.close() + c2.close() + custom.close() + + def test_copy_of_non_shared_stays_non_shared(self): + c1 = _make_client(shared_http_pool=False) + c2 = c1.copy() + + assert c2._uses_shared_pool is False + assert c2._client is not c1._client + + c1.close() + c2.close() + + +# --------------------------------------------------------------------------- +# Async: sharing behavior +# --------------------------------------------------------------------------- + + +class TestAsyncSharedPool: + def test_shared_pool_uses_same_client(self): + c1 = _make_async_client(shared_http_pool=True) + c2 = _make_async_client(shared_http_pool=True) + + assert c1._client is c2._client + assert c1._uses_shared_pool is True + assert c2._uses_shared_pool is True + + def test_private_pool_uses_different_clients(self): + c1 = _make_async_client(shared_http_pool=False) + c2 = _make_async_client(shared_http_pool=False) + + assert c1._client is not c2._client + assert c1._uses_shared_pool is False + + def test_custom_http_client_bypasses_sharing(self): + custom = httpx.AsyncClient() + c1 = _make_async_client(http_client=custom, shared_http_pool=True) + + assert c1._client is custom + assert c1._uses_shared_pool is False + + def test_default_is_shared(self): + c1 = _make_async_client() + assert c1._uses_shared_pool is True + + +class TestAsyncRefcounting: + def test_close_one_keeps_pool_alive(self): + c1 = _make_async_client(shared_http_pool=True) + c2 = _make_async_client(shared_http_pool=True) + + # Release c1's ref synchronously (the release_shared_pool is sync) + c1._release_shared_pool() + assert _base_mod._shared_async_refcount == 1 + assert _base_mod._shared_async_client is not None + + c2._release_shared_pool() + assert _base_mod._shared_async_refcount == 0 + assert _base_mod._shared_async_client is None + + def test_double_release_is_safe(self): + c1 = _make_async_client(shared_http_pool=True) + c1._release_shared_pool() + c1._release_shared_pool() # should not raise or go negative + assert _base_mod._shared_async_refcount == 0 + + +class TestAsyncCopy: + def test_copy_inherits_shared_pool(self): + c1 = _make_async_client(shared_http_pool=True) + c2 = c1.copy() + + assert c2._uses_shared_pool is True + assert c2._client is c1._client + assert _base_mod._shared_async_refcount == 2 + + def test_copy_with_custom_client_disables_sharing(self): + c1 = _make_async_client(shared_http_pool=True) + custom = httpx.AsyncClient() + c2 = c1.copy(http_client=custom) + + assert c2._uses_shared_pool is False + assert c2._client is custom From 01b73fad4ae3bf8d212679227b40abdcc4d1bd37 Mon Sep 17 00:00:00 2001 From: Rob von Behren Date: Thu, 7 May 2026 12:24:53 -0700 Subject: [PATCH 3/6] Fix pyright lint errors in shared pool tests Co-Authored-By: Claude Opus 4.6 --- tests/test_shared_pool.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_shared_pool.py b/tests/test_shared_pool.py index 40da1dc07..33c8e7907 100644 --- a/tests/test_shared_pool.py +++ b/tests/test_shared_pool.py @@ -7,6 +7,7 @@ from __future__ import annotations import os +from typing import Any, Iterator import httpx import pytest @@ -19,14 +20,13 @@ @pytest.fixture(autouse=True) -def _reset_shared_pool(): - """Reset module-level shared pool state before and after each test.""" +def _reset_shared_pool() -> Iterator[None]: # pyright: ignore[reportUnusedFunction] _clear_pool_state() yield _clear_pool_state() -def _clear_pool_state(): +def _clear_pool_state() -> None: with _base_mod._pool_lock: if _base_mod._shared_sync_client is not None and not _base_mod._shared_sync_client.is_closed: try: @@ -42,13 +42,13 @@ def _clear_pool_state(): _base_mod._shared_async_refcount = 0 -def _make_client(**kwargs) -> Runloop: +def _make_client(**kwargs: Any) -> Runloop: kwargs.setdefault("base_url", base_url) kwargs.setdefault("bearer_token", bearer_token) return Runloop(**kwargs) -def _make_async_client(**kwargs) -> AsyncRunloop: +def _make_async_client(**kwargs: Any) -> AsyncRunloop: kwargs.setdefault("base_url", base_url) kwargs.setdefault("bearer_token", bearer_token) return AsyncRunloop(**kwargs) From 8c24dd0be403d160c0680cfee5a9b3942c0e3a3a Mon Sep 17 00:00:00 2001 From: Rob von Behren Date: Thu, 7 May 2026 12:33:56 -0700 Subject: [PATCH 4/6] Key async pool by event loop with WeakKeyDictionary Address review feedback: async shared pool is now keyed by the running event loop using weakref.WeakKeyDictionary, so clients in different asyncio.run() calls get separate pools. close() properly awaits aclose() on the last release. __del__ uses a sync-only release path. Async tests converted to async def so they run inside an event loop and can exercise per-loop pool sharing correctly. Co-Authored-By: Claude Opus 4.6 --- src/runloop_api_client/_base_client.py | 102 ++++++++++++++++++------- tests/test_shared_pool.py | 75 +++++++++++++----- 2 files changed, 128 insertions(+), 49 deletions(-) diff --git a/src/runloop_api_client/_base_client.py b/src/runloop_api_client/_base_client.py index 6986415cf..0672a0196 100644 --- a/src/runloop_api_client/_base_client.py +++ b/src/runloop_api_client/_base_client.py @@ -8,6 +8,7 @@ import asyncio import inspect import logging +import weakref import platform import warnings import threading @@ -93,11 +94,22 @@ # Shared HTTP connection pool state. One pool per client type (sync/async), # refcounted so the pool is closed when the last SDK client releases it. +# The async pool is keyed by event loop because httpx.AsyncClient binds to +# the loop that created it and cannot be reused across asyncio.run() calls. _pool_lock = threading.Lock() _shared_sync_client: httpx.Client | None = None _shared_sync_refcount: int = 0 -_shared_async_client: httpx.AsyncClient | None = None -_shared_async_refcount: int = 0 + + +class _AsyncPoolState: + __slots__ = ("client", "refcount") + + def __init__(self, client: httpx.AsyncClient) -> None: + self.client = client + self.refcount = 1 + + +_async_pools: weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, _AsyncPoolState] = weakref.WeakKeyDictionary() # TODO: make base page type vars covariant SyncPageT = TypeVar("SyncPageT", bound="BaseSyncPage[Any]") @@ -1516,6 +1528,7 @@ class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]): _default_stream_cls: type[AsyncStream[Any]] | None = None _uses_shared_pool: bool _closed: bool + _pool_loop: asyncio.AbstractEventLoop | None def __init__( self, @@ -1560,22 +1573,13 @@ def __init__( ) self._closed = False + self._pool_loop = None if http_client is not None: self._client = http_client self._uses_shared_pool = False elif shared_http_pool: - global _shared_async_client, _shared_async_refcount - with _pool_lock: - if _shared_async_client is None or _shared_async_client.is_closed: - _shared_async_client = AsyncHttpxClientWrapper( - base_url=base_url, - timeout=cast(Timeout, timeout), - ) - _shared_async_refcount = 0 - _shared_async_refcount += 1 - self._client = _shared_async_client - self._uses_shared_pool = True + self._acquire_shared_pool(base_url, cast(Timeout, timeout)) else: self._client = AsyncHttpxClientWrapper( base_url=base_url, @@ -1583,6 +1587,25 @@ def __init__( ) self._uses_shared_pool = False + def _acquire_shared_pool(self, base_url: str | URL, timeout: Timeout) -> None: + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + with _pool_lock: + pool_state = _async_pools.get(loop) if loop is not None else None + if pool_state is None or pool_state.client.is_closed: + pool_state = _AsyncPoolState( + AsyncHttpxClientWrapper(base_url=base_url, timeout=timeout), + ) + if loop is not None: + _async_pools[loop] = pool_state + else: + pool_state.refcount += 1 + self._client = pool_state.client + self._pool_loop = loop + self._uses_shared_pool = True + def is_closed(self) -> bool: return self._closed or self._client.is_closed @@ -1595,28 +1618,51 @@ async def close(self) -> None: return self._closed = True if self._uses_shared_pool: - self._release_shared_pool() + await self._release_shared_pool() else: await self._client.aclose() - def _release_shared_pool(self) -> None: - global _shared_async_client, _shared_async_refcount - should_close = False - client = None + async def _release_shared_pool(self) -> None: + client: httpx.AsyncClient | None = None with _pool_lock: if not self._uses_shared_pool: return self._uses_shared_pool = False - _shared_async_refcount -= 1 - if _shared_async_refcount <= 0: - client = _shared_async_client - _shared_async_client = None - _shared_async_refcount = 0 - should_close = True - if should_close and client is not None: + loop = self._pool_loop + if loop is None: + return + pool = _async_pools.get(loop) + if pool is not None: + pool.refcount -= 1 + if pool.refcount <= 0: + client = pool.client + del _async_pools[loop] + if client is not None: try: - loop = asyncio.get_running_loop() - loop.create_task(client.aclose()) + await client.aclose() + except Exception: + pass + + def _release_shared_pool_sync(self) -> None: + """Best-effort synchronous release for use in __del__.""" + client: httpx.AsyncClient | None = None + with _pool_lock: + if not self._uses_shared_pool: + return + self._uses_shared_pool = False + loop = self._pool_loop + if loop is None: + return + pool = _async_pools.get(loop) + if pool is not None: + pool.refcount -= 1 + if pool.refcount <= 0: + client = pool.client + del _async_pools[loop] + if client is not None: + try: + running_loop = asyncio.get_running_loop() + running_loop.create_task(client.aclose()) except Exception: pass @@ -1624,7 +1670,7 @@ def __del__(self) -> None: if not getattr(self, "_uses_shared_pool", False): return try: - self._release_shared_pool() + self._release_shared_pool_sync() except Exception: pass diff --git a/tests/test_shared_pool.py b/tests/test_shared_pool.py index 33c8e7907..850f2840b 100644 --- a/tests/test_shared_pool.py +++ b/tests/test_shared_pool.py @@ -7,6 +7,7 @@ from __future__ import annotations import os +import asyncio from typing import Any, Iterator import httpx @@ -36,10 +37,7 @@ def _clear_pool_state() -> None: _base_mod._shared_sync_client = None _base_mod._shared_sync_refcount = 0 - if _base_mod._shared_async_client is not None and not _base_mod._shared_async_client.is_closed: - pass # async close is best-effort in sync context - _base_mod._shared_async_client = None - _base_mod._shared_async_refcount = 0 + _base_mod._async_pools.clear() def _make_client(**kwargs: Any) -> Runloop: @@ -193,7 +191,7 @@ def test_copy_of_non_shared_stays_non_shared(self): class TestAsyncSharedPool: - def test_shared_pool_uses_same_client(self): + async def test_shared_pool_uses_same_client(self): c1 = _make_async_client(shared_http_pool=True) c2 = _make_async_client(shared_http_pool=True) @@ -215,45 +213,80 @@ def test_custom_http_client_bypasses_sharing(self): assert c1._client is custom assert c1._uses_shared_pool is False - def test_default_is_shared(self): + async def test_default_is_shared(self): c1 = _make_async_client() assert c1._uses_shared_pool is True +def _async_pool_refcount(loop: asyncio.AbstractEventLoop | None) -> int: + pool = _base_mod._async_pools.get(loop) if loop is not None else None + return pool.refcount if pool is not None else 0 + + class TestAsyncRefcounting: - def test_close_one_keeps_pool_alive(self): + async def test_close_one_keeps_pool_alive(self): c1 = _make_async_client(shared_http_pool=True) c2 = _make_async_client(shared_http_pool=True) + loop = c1._pool_loop - # Release c1's ref synchronously (the release_shared_pool is sync) - c1._release_shared_pool() - assert _base_mod._shared_async_refcount == 1 - assert _base_mod._shared_async_client is not None + c1._release_shared_pool_sync() + assert _async_pool_refcount(loop) == 1 - c2._release_shared_pool() - assert _base_mod._shared_async_refcount == 0 - assert _base_mod._shared_async_client is None + c2._release_shared_pool_sync() + assert _async_pool_refcount(loop) == 0 - def test_double_release_is_safe(self): + async def test_double_release_is_safe(self): c1 = _make_async_client(shared_http_pool=True) - c1._release_shared_pool() - c1._release_shared_pool() # should not raise or go negative - assert _base_mod._shared_async_refcount == 0 + c1._release_shared_pool_sync() + c1._release_shared_pool_sync() # should not raise or go negative + assert _async_pool_refcount(c1._pool_loop) == 0 class TestAsyncCopy: - def test_copy_inherits_shared_pool(self): + async def test_copy_inherits_shared_pool(self): c1 = _make_async_client(shared_http_pool=True) c2 = c1.copy() + loop = c1._pool_loop assert c2._uses_shared_pool is True assert c2._client is c1._client - assert _base_mod._shared_async_refcount == 2 + assert _async_pool_refcount(loop) == 2 - def test_copy_with_custom_client_disables_sharing(self): + async def test_copy_with_custom_client_disables_sharing(self): c1 = _make_async_client(shared_http_pool=True) custom = httpx.AsyncClient() c2 = c1.copy(http_client=custom) assert c2._uses_shared_pool is False assert c2._client is custom + + +class TestAsyncCrossLoop: + def test_separate_loops_get_separate_pools(self): + """Clients created in different asyncio.run() calls must not share a pool.""" + + async def create_client() -> int: + c = _make_async_client(shared_http_pool=True) + client_id = id(c._client) + await c.close() + return client_id + + id1 = asyncio.run(create_client()) + id2 = asyncio.run(create_client()) + + assert id1 != id2, "each loop should get its own httpx.AsyncClient" + + def test_same_loop_shares_pool(self): + """Clients created in the same asyncio.run() must share a pool.""" + + async def create_two() -> tuple[int, int]: + c1 = _make_async_client(shared_http_pool=True) + c2 = _make_async_client(shared_http_pool=True) + id1 = id(c1._client) + id2 = id(c2._client) + await c1.close() + await c2.close() + return id1, id2 + + id1, id2 = asyncio.run(create_two()) + assert id1 == id2 From 000479abd394c84930d4b0a87636e027a9c67407 Mon Sep 17 00:00:00 2001 From: Rob von Behren Date: Thu, 7 May 2026 13:47:56 -0700 Subject: [PATCH 5/6] Share transports instead of clients to fix pool bugs Fixes three bugs in the shared HTTP pool: (1) async clients created without a running event loop leaked connections, (2) __del__ could drop clients without closing them, (3) sharing httpx.Client shared cookie jars across SDK instances. Now shares only the transport (connection pool) via refcounted wrappers, giving each SDK instance its own httpx client with isolated mutable state. Co-Authored-By: Claude Opus 4.6 --- src/runloop_api_client/_base_client.py | 245 ++++++++++++------------- tests/test_client.py | 6 +- tests/test_shared_pool.py | 167 ++++++++++------- 3 files changed, 218 insertions(+), 200 deletions(-) diff --git a/src/runloop_api_client/_base_client.py b/src/runloop_api_client/_base_client.py index 0672a0196..88e0bbb3b 100644 --- a/src/runloop_api_client/_base_client.py +++ b/src/runloop_api_client/_base_client.py @@ -92,24 +92,87 @@ log: logging.Logger = logging.getLogger(__name__) -# Shared HTTP connection pool state. One pool per client type (sync/async), -# refcounted so the pool is closed when the last SDK client releases it. -# The async pool is keyed by event loop because httpx.AsyncClient binds to -# the loop that created it and cannot be reused across asyncio.run() calls. +# Shared HTTP transport state. We share transports (connection pools) rather +# than full httpx clients so each SDK instance keeps its own cookie jar and +# mutable client state. Refcounted wrappers close the real transport only +# when the last user releases it. +# The async transport is keyed by event loop because connections bind to the +# loop that created them and cannot be reused across asyncio.run() calls. _pool_lock = threading.Lock() -_shared_sync_client: httpx.Client | None = None -_shared_sync_refcount: int = 0 -class _AsyncPoolState: - __slots__ = ("client", "refcount") +class _SharedTransport(httpx.BaseTransport): + """Refcounted wrapper: delegates to a real transport, closes it when refcount hits 0.""" - def __init__(self, client: httpx.AsyncClient) -> None: - self.client = client - self.refcount = 1 + def __init__(self, transport: httpx.BaseTransport) -> None: + self._transport = transport + self._refcount = 1 + self._lock = threading.Lock() + + @property + def refcount(self) -> int: + return self._refcount + + def acquire(self) -> bool: + with self._lock: + if self._refcount <= 0: + return False + self._refcount += 1 + return True + + @override + def handle_request(self, request: httpx.Request) -> httpx.Response: + return self._transport.handle_request(request) + + @override + def close(self) -> None: + should_close = False + with self._lock: + self._refcount -= 1 + if self._refcount <= 0: + should_close = True + if should_close: + self._transport.close() -_async_pools: weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, _AsyncPoolState] = weakref.WeakKeyDictionary() +class _SharedAsyncTransport(httpx.AsyncBaseTransport): + """Async refcounted wrapper: delegates to a real async transport.""" + + def __init__(self, transport: httpx.AsyncBaseTransport) -> None: + self._transport = transport + self._refcount = 1 + self._lock = threading.Lock() + + @property + def refcount(self) -> int: + return self._refcount + + def acquire(self) -> bool: + with self._lock: + if self._refcount <= 0: + return False + self._refcount += 1 + return True + + @override + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + return await self._transport.handle_async_request(request) + + @override + async def aclose(self) -> None: + should_close = False + with self._lock: + self._refcount -= 1 + if self._refcount <= 0: + should_close = True + if should_close: + await self._transport.aclose() + + +_shared_sync_transport: _SharedTransport | None = None +_shared_async_transports: weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, _SharedAsyncTransport] = ( + weakref.WeakKeyDictionary() +) # TODO: make base page type vars covariant SyncPageT = TypeVar("SyncPageT", bound="BaseSyncPage[Any]") @@ -918,16 +981,17 @@ def __init__( self._client = http_client self._uses_shared_pool = False elif shared_http_pool: - global _shared_sync_client, _shared_sync_refcount + global _shared_sync_transport with _pool_lock: - if _shared_sync_client is None or _shared_sync_client.is_closed: - _shared_sync_client = SyncHttpxClientWrapper( - base_url=base_url, - timeout=cast(Timeout, timeout), + if _shared_sync_transport is None or not _shared_sync_transport.acquire(): + _shared_sync_transport = _SharedTransport( + httpx.HTTPTransport(limits=DEFAULT_CONNECTION_LIMITS, http2=True), ) - _shared_sync_refcount = 0 - _shared_sync_refcount += 1 - self._client = _shared_sync_client + self._client = SyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + transport=_shared_sync_transport, + ) self._uses_shared_pool = True else: self._client = SyncHttpxClientWrapper( @@ -946,38 +1010,10 @@ def close(self) -> None: """ if not hasattr(self, "_client"): return - self._closed = True - if self._uses_shared_pool: - self._release_shared_pool() - else: - self._client.close() - - def _release_shared_pool(self) -> None: - global _shared_sync_client, _shared_sync_refcount - with _pool_lock: - if not self._uses_shared_pool: - return - self._uses_shared_pool = False - _shared_sync_refcount -= 1 - if _shared_sync_refcount <= 0: - client = _shared_sync_client - _shared_sync_client = None - _shared_sync_refcount = 0 - else: - client = None - if client is not None: - try: - client.close() - except Exception: - pass - - def __del__(self) -> None: - if not getattr(self, "_uses_shared_pool", False): + if self._closed: return - try: - self._release_shared_pool() - except Exception: - pass + self._closed = True + self._client.close() def __enter__(self: _T) -> _T: return self @@ -1528,7 +1564,6 @@ class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]): _default_stream_cls: type[AsyncStream[Any]] | None = None _uses_shared_pool: bool _closed: bool - _pool_loop: asyncio.AbstractEventLoop | None def __init__( self, @@ -1573,13 +1608,37 @@ def __init__( ) self._closed = False - self._pool_loop = None if http_client is not None: self._client = http_client self._uses_shared_pool = False elif shared_http_pool: - self._acquire_shared_pool(base_url, cast(Timeout, timeout)) + try: + loop: asyncio.AbstractEventLoop | None = asyncio.get_running_loop() + except RuntimeError: + loop = None + if loop is not None: + with _pool_lock: + existing = _shared_async_transports.get(loop) + if existing is not None and existing.acquire(): + transport: _SharedAsyncTransport = existing + else: + transport = _SharedAsyncTransport( + httpx.AsyncHTTPTransport(limits=DEFAULT_CONNECTION_LIMITS, http2=True), + ) + _shared_async_transports[loop] = transport + self._client = AsyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + transport=transport, + ) + self._uses_shared_pool = True + else: + self._client = AsyncHttpxClientWrapper( + base_url=base_url, + timeout=cast(Timeout, timeout), + ) + self._uses_shared_pool = False else: self._client = AsyncHttpxClientWrapper( base_url=base_url, @@ -1587,25 +1646,6 @@ def __init__( ) self._uses_shared_pool = False - def _acquire_shared_pool(self, base_url: str | URL, timeout: Timeout) -> None: - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = None - with _pool_lock: - pool_state = _async_pools.get(loop) if loop is not None else None - if pool_state is None or pool_state.client.is_closed: - pool_state = _AsyncPoolState( - AsyncHttpxClientWrapper(base_url=base_url, timeout=timeout), - ) - if loop is not None: - _async_pools[loop] = pool_state - else: - pool_state.refcount += 1 - self._client = pool_state.client - self._pool_loop = loop - self._uses_shared_pool = True - def is_closed(self) -> bool: return self._closed or self._client.is_closed @@ -1616,63 +1656,10 @@ async def close(self) -> None: """ if not hasattr(self, "_client"): return - self._closed = True - if self._uses_shared_pool: - await self._release_shared_pool() - else: - await self._client.aclose() - - async def _release_shared_pool(self) -> None: - client: httpx.AsyncClient | None = None - with _pool_lock: - if not self._uses_shared_pool: - return - self._uses_shared_pool = False - loop = self._pool_loop - if loop is None: - return - pool = _async_pools.get(loop) - if pool is not None: - pool.refcount -= 1 - if pool.refcount <= 0: - client = pool.client - del _async_pools[loop] - if client is not None: - try: - await client.aclose() - except Exception: - pass - - def _release_shared_pool_sync(self) -> None: - """Best-effort synchronous release for use in __del__.""" - client: httpx.AsyncClient | None = None - with _pool_lock: - if not self._uses_shared_pool: - return - self._uses_shared_pool = False - loop = self._pool_loop - if loop is None: - return - pool = _async_pools.get(loop) - if pool is not None: - pool.refcount -= 1 - if pool.refcount <= 0: - client = pool.client - del _async_pools[loop] - if client is not None: - try: - running_loop = asyncio.get_running_loop() - running_loop.create_task(client.aclose()) - except Exception: - pass - - def __del__(self) -> None: - if not getattr(self, "_uses_shared_pool", False): + if self._closed: return - try: - self._release_shared_pool_sync() - except Exception: - pass + self._closed = True + await self._client.aclose() async def __aenter__(self: _T) -> _T: return self diff --git a/tests/test_client.py b/tests/test_client.py index 408c7cedd..7728bf5bb 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -32,7 +32,9 @@ DefaultHttpxClient, DefaultAsyncHttpxClient, get_platform, + _SharedTransport, make_request_options, + _SharedAsyncTransport, ) from .utils import update_env @@ -105,7 +107,9 @@ async def _make_async_iterator(iterable: Iterable[T], counter: Optional[Counter] def _get_open_connections(client: Runloop | AsyncRunloop) -> int: transport = client._client._transport - assert isinstance(transport, httpx.HTTPTransport) or isinstance(transport, httpx.AsyncHTTPTransport) + if isinstance(transport, (_SharedTransport, _SharedAsyncTransport)): + transport = transport._transport + assert isinstance(transport, (httpx.HTTPTransport, httpx.AsyncHTTPTransport)) pool = transport._pool return len(pool._requests) diff --git a/tests/test_shared_pool.py b/tests/test_shared_pool.py index 850f2840b..fca8b8967 100644 --- a/tests/test_shared_pool.py +++ b/tests/test_shared_pool.py @@ -1,7 +1,7 @@ -"""Tests for shared HTTP connection pool behavior. +"""Tests for shared HTTP transport pool behavior. Verifies that SDK clients share (or don't share) the underlying httpx -transport, and that refcounting correctly manages the pool lifecycle. +transport, and that refcounting correctly manages the transport lifecycle. """ from __future__ import annotations @@ -29,15 +29,14 @@ def _reset_shared_pool() -> Iterator[None]: # pyright: ignore[reportUnusedFunct def _clear_pool_state() -> None: with _base_mod._pool_lock: - if _base_mod._shared_sync_client is not None and not _base_mod._shared_sync_client.is_closed: - try: - _base_mod._shared_sync_client.close() - except Exception: - pass - _base_mod._shared_sync_client = None - _base_mod._shared_sync_refcount = 0 - - _base_mod._async_pools.clear() + old_sync = _base_mod._shared_sync_transport + _base_mod._shared_sync_transport = None + _base_mod._shared_async_transports.clear() + if old_sync is not None: + try: + old_sync._transport.close() + except Exception: + pass def _make_client(**kwargs: Any) -> Runloop: @@ -52,28 +51,33 @@ def _make_async_client(**kwargs: Any) -> AsyncRunloop: return AsyncRunloop(**kwargs) +def _get_transport(client: Runloop | AsyncRunloop) -> Any: + return client._client._transport # type: ignore[union-attr] + + # --------------------------------------------------------------------------- # Sync: sharing behavior # --------------------------------------------------------------------------- class TestSyncSharedPool: - def test_shared_pool_uses_same_client(self): + def test_shared_pool_uses_same_transport(self): c1 = _make_client(shared_http_pool=True) c2 = _make_client(shared_http_pool=True) - assert c1._client is c2._client + assert _get_transport(c1) is _get_transport(c2) + assert c1._client is not c2._client assert c1._uses_shared_pool is True assert c2._uses_shared_pool is True c1.close() c2.close() - def test_private_pool_uses_different_clients(self): + def test_private_pool_uses_different_transports(self): c1 = _make_client(shared_http_pool=False) c2 = _make_client(shared_http_pool=False) - assert c1._client is not c2._client + assert _get_transport(c1) is not _get_transport(c2) assert c1._uses_shared_pool is False assert c2._uses_shared_pool is False @@ -95,57 +99,66 @@ def test_default_is_shared(self): assert c1._uses_shared_pool is True c1.close() + def test_cookie_isolation(self): + c1 = _make_client(shared_http_pool=True) + c2 = _make_client(shared_http_pool=True) + + c1._client.cookies.set("session", "secret-123") + assert "session" not in c2._client.cookies + + c1.close() + c2.close() + class TestSyncRefcounting: - def test_close_one_keeps_pool_alive(self): + def test_close_one_keeps_transport_alive(self): c1 = _make_client(shared_http_pool=True) c2 = _make_client(shared_http_pool=True) - pool = c1._client + transport = _get_transport(c1) + + assert transport.refcount == 2 c1.close() - assert not pool.is_closed - assert _base_mod._shared_sync_refcount == 1 + assert transport.refcount == 1 + assert not c2.is_closed() c2.close() - assert pool.is_closed - assert _base_mod._shared_sync_client is None - assert _base_mod._shared_sync_refcount == 0 + assert transport.refcount == 0 def test_double_close_is_safe(self): c1 = _make_client(shared_http_pool=True) + transport = _get_transport(c1) + c1.close() - c1.close() # should not raise or go negative - assert _base_mod._shared_sync_refcount == 0 + c1.close() # should not raise or double-decrement + assert transport.refcount == 0 def test_three_clients_refcount(self): c1 = _make_client(shared_http_pool=True) c2 = _make_client(shared_http_pool=True) c3 = _make_client(shared_http_pool=True) - pool = c1._client + transport = _get_transport(c1) - assert _base_mod._shared_sync_refcount == 3 + assert transport.refcount == 3 c1.close() - assert _base_mod._shared_sync_refcount == 2 - assert not pool.is_closed + assert transport.refcount == 2 c2.close() - assert _base_mod._shared_sync_refcount == 1 - assert not pool.is_closed + assert transport.refcount == 1 c3.close() - assert _base_mod._shared_sync_refcount == 0 - assert pool.is_closed + assert transport.refcount == 0 - def test_pool_recreated_after_full_release(self): + def test_transport_recreated_after_full_release(self): c1 = _make_client(shared_http_pool=True) - pool1 = c1._client + t1 = _get_transport(c1) c1.close() c2 = _make_client(shared_http_pool=True) - pool2 = c2._client - assert pool2 is not pool1 - assert not pool2.is_closed + t2 = _get_transport(c2) + assert t2 is not t1 + assert t2.refcount == 1 c2.close() @@ -154,10 +167,11 @@ class TestSyncCopy: def test_copy_inherits_shared_pool(self): c1 = _make_client(shared_http_pool=True) c2 = c1.copy() + transport = _get_transport(c1) assert c2._uses_shared_pool is True - assert c2._client is c1._client - assert _base_mod._shared_sync_refcount == 2 + assert _get_transport(c2) is transport + assert transport.refcount == 2 c1.close() c2.close() @@ -179,7 +193,7 @@ def test_copy_of_non_shared_stays_non_shared(self): c2 = c1.copy() assert c2._uses_shared_pool is False - assert c2._client is not c1._client + assert _get_transport(c2) is not _get_transport(c1) c1.close() c2.close() @@ -191,19 +205,20 @@ def test_copy_of_non_shared_stays_non_shared(self): class TestAsyncSharedPool: - async def test_shared_pool_uses_same_client(self): + async def test_shared_pool_uses_same_transport(self): c1 = _make_async_client(shared_http_pool=True) c2 = _make_async_client(shared_http_pool=True) - assert c1._client is c2._client + assert _get_transport(c1) is _get_transport(c2) + assert c1._client is not c2._client assert c1._uses_shared_pool is True assert c2._uses_shared_pool is True - def test_private_pool_uses_different_clients(self): + def test_private_pool_uses_different_transports(self): c1 = _make_async_client(shared_http_pool=False) c2 = _make_async_client(shared_http_pool=False) - assert c1._client is not c2._client + assert _get_transport(c1) is not _get_transport(c2) assert c1._uses_shared_pool is False def test_custom_http_client_bypasses_sharing(self): @@ -217,40 +232,52 @@ async def test_default_is_shared(self): c1 = _make_async_client() assert c1._uses_shared_pool is True - -def _async_pool_refcount(loop: asyncio.AbstractEventLoop | None) -> int: - pool = _base_mod._async_pools.get(loop) if loop is not None else None - return pool.refcount if pool is not None else 0 + def test_no_loop_creates_private_client(self): + c1 = _make_async_client(shared_http_pool=True) + assert c1._uses_shared_pool is False class TestAsyncRefcounting: - async def test_close_one_keeps_pool_alive(self): + async def test_close_one_keeps_transport_alive(self): c1 = _make_async_client(shared_http_pool=True) c2 = _make_async_client(shared_http_pool=True) - loop = c1._pool_loop + transport = _get_transport(c1) - c1._release_shared_pool_sync() - assert _async_pool_refcount(loop) == 1 + assert transport.refcount == 2 - c2._release_shared_pool_sync() - assert _async_pool_refcount(loop) == 0 + await c1.close() + assert transport.refcount == 1 + assert not c2.is_closed() - async def test_double_release_is_safe(self): + await c2.close() + assert transport.refcount == 0 + + async def test_double_close_is_safe(self): + c1 = _make_async_client(shared_http_pool=True) + transport = _get_transport(c1) + + await c1.close() + await c1.close() # should not raise or double-decrement + assert transport.refcount == 0 + + def test_no_loop_client_closes_properly(self): + """Client created without a running loop should close without leaking.""" c1 = _make_async_client(shared_http_pool=True) - c1._release_shared_pool_sync() - c1._release_shared_pool_sync() # should not raise or go negative - assert _async_pool_refcount(c1._pool_loop) == 0 + assert c1._uses_shared_pool is False + + asyncio.run(c1.close()) + assert c1.is_closed() class TestAsyncCopy: async def test_copy_inherits_shared_pool(self): c1 = _make_async_client(shared_http_pool=True) c2 = c1.copy() - loop = c1._pool_loop + transport = _get_transport(c1) assert c2._uses_shared_pool is True - assert c2._client is c1._client - assert _async_pool_refcount(loop) == 2 + assert _get_transport(c2) is transport + assert transport.refcount == 2 async def test_copy_with_custom_client_disables_sharing(self): c1 = _make_async_client(shared_http_pool=True) @@ -262,28 +289,28 @@ async def test_copy_with_custom_client_disables_sharing(self): class TestAsyncCrossLoop: - def test_separate_loops_get_separate_pools(self): - """Clients created in different asyncio.run() calls must not share a pool.""" + def test_separate_loops_get_separate_transports(self): + """Clients created in different asyncio.run() calls must not share a transport.""" async def create_client() -> int: c = _make_async_client(shared_http_pool=True) - client_id = id(c._client) + transport_id = id(_get_transport(c)) await c.close() - return client_id + return transport_id id1 = asyncio.run(create_client()) id2 = asyncio.run(create_client()) - assert id1 != id2, "each loop should get its own httpx.AsyncClient" + assert id1 != id2, "each loop should get its own transport" - def test_same_loop_shares_pool(self): - """Clients created in the same asyncio.run() must share a pool.""" + def test_same_loop_shares_transport(self): + """Clients created in the same asyncio.run() must share a transport.""" async def create_two() -> tuple[int, int]: c1 = _make_async_client(shared_http_pool=True) c2 = _make_async_client(shared_http_pool=True) - id1 = id(c1._client) - id2 = id(c2._client) + id1 = id(_get_transport(c1)) + id2 = id(_get_transport(c2)) await c1.close() await c2.close() return id1, id2 From 8e56ddaa93f3f0080bdc973a52256d3ec126fb5e Mon Sep 17 00:00:00 2001 From: Rob von Behren Date: Thu, 7 May 2026 16:06:01 -0700 Subject: [PATCH 6/6] Fix cross-loop test: compare transport identity, not id() After close(), Python can reuse the freed memory address for the next transport, making id() values match across separate asyncio.run() calls. Keep the first transport reference alive and use `is not` instead. Co-Authored-By: Claude Opus 4.6 --- tests/test_shared_pool.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_shared_pool.py b/tests/test_shared_pool.py index fca8b8967..4220f8ba9 100644 --- a/tests/test_shared_pool.py +++ b/tests/test_shared_pool.py @@ -292,16 +292,16 @@ class TestAsyncCrossLoop: def test_separate_loops_get_separate_transports(self): """Clients created in different asyncio.run() calls must not share a transport.""" - async def create_client() -> int: + async def create_client() -> Any: c = _make_async_client(shared_http_pool=True) - transport_id = id(_get_transport(c)) + transport = _get_transport(c) await c.close() - return transport_id + return transport - id1 = asyncio.run(create_client()) - id2 = asyncio.run(create_client()) + t1 = asyncio.run(create_client()) + t2 = asyncio.run(create_client()) - assert id1 != id2, "each loop should get its own transport" + assert t1 is not t2, "each loop should get its own transport" def test_same_loop_shares_transport(self): """Clients created in the same asyncio.run() must share a transport."""