-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix(rab): run async background boundary refresh on detached session #17441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -678,6 +678,7 @@ async def test_async_refresh_manager_session_closed_ignored(self): | |
| ) | ||
|
|
||
| request = mock.Mock() | ||
| request.clone.return_value = request | ||
| rab_manager = mock.Mock() | ||
|
|
||
| manager = ( | ||
|
|
@@ -694,6 +695,91 @@ async def test_async_refresh_manager_session_closed_ignored(self): | |
| credentials._lookup_regional_access_boundary.assert_called_once_with(request) | ||
| rab_manager.process_regional_access_boundary_info.assert_called_once_with(None) | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_start_refresh_async_clones_request_and_unwraps_partial(self): | ||
| import functools | ||
|
|
||
| credentials = mock.AsyncMock() | ||
| credentials._lookup_regional_access_boundary.return_value = { | ||
| "encodedLocations": "0xA30" | ||
| } | ||
|
|
||
| mock_request = mock.Mock() | ||
| mock_cloned_request = mock.Mock() | ||
| mock_request.clone.return_value = mock_cloned_request | ||
| mock_cloned_request.close = mock.AsyncMock() | ||
|
|
||
| # Wrap in a functools.partial to simulate AuthorizedSession.request() timeouts | ||
| partial_request = functools.partial(mock_request, timeout=180) | ||
|
|
||
| rab_manager = mock.Mock() | ||
|
|
||
| manager = ( | ||
| _regional_access_boundary_utils._AsyncRegionalAccessBoundaryRefreshManager() | ||
| ) | ||
| manager.start_refresh(credentials, partial_request, rab_manager) | ||
|
|
||
| await manager._worker_task | ||
|
|
||
| # Verify that actual_request.clone() was called | ||
| mock_request.clone.assert_called_once() | ||
|
|
||
| # Verify that the lookup ran on a re-wrapped partial of the cloned request | ||
| called_arg = credentials._lookup_regional_access_boundary.call_args[0][0] | ||
| assert isinstance(called_arg, functools.partial) | ||
| assert called_arg.func is mock_cloned_request | ||
| assert called_arg.keywords == {"timeout": 180} | ||
|
|
||
| # Verify that the cloned request was closed cleanly in the finally block | ||
| mock_cloned_request.close.assert_called_once() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since |
||
| rab_manager.process_regional_access_boundary_info.assert_called_once_with( | ||
| {"encodedLocations": "0xA30"} | ||
| ) | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_start_refresh_async_mimics_ephemeral_session_closed_bug(self): | ||
| # Specifically mimics the real-world race condition where a fast foreground main call | ||
| # pulls the rug out from under the background worker when using an un-cloned session. | ||
| import asyncio | ||
|
|
||
| manager = ( | ||
| _regional_access_boundary_utils._AsyncRegionalAccessBoundaryRefreshManager() | ||
| ) | ||
|
|
||
| class EphemeralRequest: | ||
| def __init__(self): | ||
| self.closed = False | ||
|
|
||
| async def __call__(self, *args, **kwargs): | ||
| await asyncio.sleep(0.05) | ||
| if self.closed: | ||
| raise RuntimeError("Session is closed") | ||
| return "success" | ||
|
|
||
| ephemeral_req = EphemeralRequest() | ||
|
|
||
| credentials = mock.AsyncMock() | ||
|
|
||
| async def mock_lookup(req): | ||
| return await req() | ||
|
|
||
| credentials._lookup_regional_access_boundary.side_effect = mock_lookup | ||
|
|
||
| rab_manager = mock.Mock() | ||
|
|
||
| # Start the background refresh worker | ||
| manager.start_refresh(credentials, ephemeral_req, rab_manager) | ||
|
|
||
| # Simulate fast foreground primary call (completes in 10ms and closes the session) | ||
| await asyncio.sleep(0.01) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using hardcoded sleeps ( We can make this test 100% deterministic with zero sleeps by using |
||
| ephemeral_req.closed = True | ||
|
|
||
| # Await the background worker task to settle | ||
| await manager._worker_task | ||
|
|
||
| # Verify that the background worker hit the "Session is closed" error and failed open cleanly | ||
| rab_manager.process_regional_access_boundary_info.assert_called_once_with(None) | ||
|
|
||
|
|
||
| def test_get_service_account_rab_endpoint(monkeypatch): | ||
| from google.auth.transport import _mtls_helper | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -169,3 +169,21 @@ async def test_request_call_raises_transport_error_for_closed_session( | |
|
|
||
| exc.match("session is closed.") | ||
| aiohttp_request._closed = False | ||
|
|
||
| async def test_request_clone(self): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new clone test only covers the case where the request doesn't have an active session ( |
||
| request = auth_aiohttp.Request() | ||
| cloned = request.clone() | ||
| assert cloned is not request | ||
| assert isinstance(cloned, auth_aiohttp.Request) | ||
| assert cloned._session is not request._session | ||
| await request.close() | ||
| await cloned.close() | ||
|
|
||
| async def test_request_close(self): | ||
| request = auth_aiohttp.Request() | ||
| assert not getattr(request, "_closed", False) | ||
| await request.close() | ||
| assert request._closed | ||
| # Second call should be idempotent | ||
| await request.close() | ||
| assert request._closed | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't call
.clone()before the background task is safely spawned. Ifasyncio.create_taskfails (for example, if the event loop is shutting down or isn't running on this thread), the background coroutine is discarded without ever starting. Since the coroutine body never runs, thefinallyblock is skipped and the cloned session is permanently leaked.If we move
actual_request.clone()inside thetryblock of the_workercoroutine, we can guarantee that if a clone is created, it will definitely hit thefinallyblock and get closed.