Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit b17dbbf

Browse files
committed
feat(storage): Add AsyncConnection class and unit tests
1 parent 4e91c54 commit b17dbbf

File tree

3 files changed

+611
-3
lines changed

3 files changed

+611
-3
lines changed
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Create/interact with Google Cloud Storage connections in asynchronous manner."""
16+
17+
import json
18+
import collections
19+
import functools
20+
import types
21+
from urllib.parse import urlencode
22+
23+
import google.api_core.exceptions
24+
from google.cloud import _http
25+
from google.cloud.storage import _http as storage_http
26+
from google.cloud.storage import _helpers
27+
from google.api_core.client_info import ClientInfo
28+
from google.cloud.storage._opentelemetry_tracing import create_trace_span
29+
from google.cloud.storage import __version__
30+
from google.cloud.storage._http import AGENT_VERSION
31+
32+
33+
class AsyncConnection:
34+
"""Class for asynchronous connection using google.auth.aio.
35+
36+
This class handles the creation of API requests, header management,
37+
user agent configuration, and error handling for the Async Storage Client.
38+
39+
Args:
40+
client: The client that owns this connection.
41+
client_info: Information about the client library.
42+
api_endpoint: The API endpoint to use.
43+
"""
44+
45+
def __init__(self, client, client_info=None, api_endpoint=None):
46+
self._client = client
47+
48+
if client_info is None:
49+
client_info = ClientInfo()
50+
51+
self._client_info = client_info
52+
if self._client_info.user_agent is None:
53+
self._client_info.user_agent = AGENT_VERSION
54+
else:
55+
self._client_info.user_agent = f"{self._client_info.user_agent} {AGENT_VERSION}"
56+
self._client_info.client_library_version = __version__
57+
self._extra_headers = {}
58+
59+
self.API_BASE_URL = api_endpoint or storage_http.Connection.DEFAULT_API_ENDPOINT
60+
self.API_VERSION = storage_http.Connection.API_VERSION
61+
self.API_URL_TEMPLATE = storage_http.Connection.API_URL_TEMPLATE
62+
63+
@property
64+
def extra_headers(self):
65+
"""Returns extra headers to send with every request."""
66+
return self._extra_headers
67+
68+
@extra_headers.setter
69+
def extra_headers(self, value):
70+
"""Set the extra header property."""
71+
self._extra_headers = value
72+
73+
@property
74+
def async_http(self):
75+
"""Returns the AsyncAuthorizedSession from the client.
76+
77+
Returns:
78+
google.auth.aio.transport.sessions.AsyncAuthorizedSession: The async session.
79+
"""
80+
return self._client.async_http
81+
82+
@property
83+
def user_agent(self):
84+
"""Returns user_agent for async HTTP transport.
85+
86+
Returns:
87+
str: The user agent string.
88+
"""
89+
return self._client_info.to_user_agent()
90+
91+
@user_agent.setter
92+
def user_agent(self, value):
93+
"""Setter for user_agent in connection."""
94+
self._client_info.user_agent = value
95+
96+
async def _make_request(
97+
self,
98+
method,
99+
url,
100+
data=None,
101+
content_type=None,
102+
headers=None,
103+
target_object=None,
104+
timeout=_http._DEFAULT_TIMEOUT,
105+
extra_api_info=None,
106+
):
107+
"""A low level method to send a request to the API.
108+
109+
Args:
110+
method (str): The HTTP method (e.g., 'GET', 'POST').
111+
url (str): The specific API URL.
112+
data (Optional[Union[str, bytes, dict]]): The body of the request.
113+
content_type (Optional[str]): The Content-Type header.
114+
headers (Optional[dict]): Additional headers for the request.
115+
target_object (Optional[object]): (Unused in async impl) Reference to the target object.
116+
timeout (Optional[float]): The timeout in seconds.
117+
extra_api_info (Optional[str]): Extra info for the User-Agent / Client-Info.
118+
119+
Returns:
120+
google.auth.aio.transport.Response: The HTTP response object.
121+
"""
122+
headers = headers.copy() if headers else {}
123+
headers.update(self.extra_headers)
124+
headers["Accept-Encoding"] = "gzip"
125+
126+
if content_type:
127+
headers["Content-Type"] = content_type
128+
129+
if extra_api_info:
130+
headers[_http.CLIENT_INFO_HEADER] = f"{self.user_agent} {extra_api_info}"
131+
else:
132+
headers[_http.CLIENT_INFO_HEADER] = self.user_agent
133+
headers["User-Agent"] = self.user_agent
134+
135+
return await self._do_request(
136+
method, url, headers, data, target_object, timeout=timeout
137+
)
138+
139+
async def _do_request(
140+
self, method, url, headers, data, target_object, timeout=_http._DEFAULT_TIMEOUT
141+
):
142+
"""Low-level helper: perform the actual API request.
143+
144+
Args:
145+
method (str): HTTP method.
146+
url (str): API URL.
147+
headers (dict): HTTP headers.
148+
data (Optional[bytes]): Request body.
149+
target_object: Unused in this implementation, kept for compatibility.
150+
timeout (float): Request timeout.
151+
152+
Returns:
153+
google.auth.aio.transport.Response: The response object.
154+
"""
155+
return await self.async_http.request(
156+
method=method,
157+
url=url,
158+
headers=headers,
159+
data=data,
160+
timeout=timeout,
161+
)
162+
163+
async def api_request(self, *args, **kwargs):
164+
"""Perform an API request with retry and tracing support.
165+
166+
Args:
167+
*args: Positional arguments passed to _perform_api_request.
168+
**kwargs: Keyword arguments passed to _perform_api_request.
169+
Can include 'retry' (an AsyncRetry object).
170+
171+
Returns:
172+
Union[dict, bytes]: The parsed JSON response or raw bytes.
173+
"""
174+
retry = kwargs.pop("retry", None)
175+
invocation_id = _helpers._get_invocation_id()
176+
kwargs["extra_api_info"] = invocation_id
177+
span_attributes = {
178+
"gccl-invocation-id": invocation_id,
179+
}
180+
181+
call = functools.partial(self._perform_api_request, *args, **kwargs)
182+
183+
with create_trace_span(
184+
name="Storage.AsyncConnection.api_request",
185+
attributes=span_attributes,
186+
client=self._client,
187+
api_request=kwargs,
188+
retry=retry,
189+
):
190+
if retry:
191+
# Ensure the retry policy checks its conditions
192+
try:
193+
retry = retry.get_retry_policy_if_conditions_met(**kwargs)
194+
except AttributeError:
195+
pass
196+
if retry:
197+
call = retry(call)
198+
return await call()
199+
200+
def build_api_url(
201+
self, path, query_params=None, api_base_url=None, api_version=None
202+
):
203+
"""Construct an API URL.
204+
205+
Args:
206+
path (str): The API path (e.g. '/b/bucket-name').
207+
query_params (Optional[Union[dict, list]]): Query parameters.
208+
api_base_url (Optional[str]): Base URL override.
209+
api_version (Optional[str]): API version override.
210+
211+
Returns:
212+
str: The fully constructed URL.
213+
"""
214+
url = self.API_URL_TEMPLATE.format(
215+
api_base_url=(api_base_url or self.API_BASE_URL),
216+
api_version=(api_version or self.API_VERSION),
217+
path=path,
218+
)
219+
220+
query_params = query_params or {}
221+
222+
if isinstance(query_params, collections.abc.Mapping):
223+
query_params = query_params.copy()
224+
else:
225+
query_params_dict = collections.defaultdict(list)
226+
for key, value in query_params:
227+
query_params_dict[key].append(value)
228+
query_params = query_params_dict
229+
230+
query_params.setdefault("prettyPrint", "false")
231+
232+
url += "?" + urlencode(query_params, doseq=True)
233+
234+
return url
235+
236+
async def _perform_api_request(
237+
self,
238+
method,
239+
path,
240+
query_params=None,
241+
data=None,
242+
content_type=None,
243+
headers=None,
244+
api_base_url=None,
245+
api_version=None,
246+
expect_json=True,
247+
_target_object=None,
248+
timeout=_http._DEFAULT_TIMEOUT,
249+
extra_api_info=None,
250+
):
251+
"""Internal helper to prepare the URL/Body and execute the request.
252+
253+
This method handles JSON serialization of the body, URL construction,
254+
and converts HTTP errors into google.api_core.exceptions.
255+
256+
Args:
257+
method (str): HTTP method.
258+
path (str): URL path.
259+
query_params (Optional[dict]): Query params.
260+
data (Optional[Union[dict, bytes]]): Request body.
261+
content_type (Optional[str]): Content-Type header.
262+
headers (Optional[dict]): HTTP headers.
263+
api_base_url (Optional[str]): Base URL override.
264+
api_version (Optional[str]): API version override.
265+
expect_json (bool): If True, parses response as JSON. Defaults to True.
266+
_target_object: Internal use (unused here).
267+
timeout (float): Request timeout.
268+
extra_api_info (Optional[str]): Extra client info.
269+
270+
Returns:
271+
Union[dict, bytes]: Parsed JSON or raw bytes.
272+
273+
Raises:
274+
google.api_core.exceptions.GoogleAPICallError: If the API returns an error.
275+
"""
276+
url = self.build_api_url(
277+
path=path,
278+
query_params=query_params,
279+
api_base_url=api_base_url,
280+
api_version=api_version,
281+
)
282+
283+
if data and isinstance(data, dict):
284+
data = json.dumps(data)
285+
content_type = "application/json"
286+
287+
response = await self._make_request(
288+
method=method,
289+
url=url,
290+
data=data,
291+
content_type=content_type,
292+
headers=headers,
293+
target_object=_target_object,
294+
timeout=timeout,
295+
extra_api_info=extra_api_info,
296+
)
297+
298+
# Handle API Errors
299+
if not (200 <= response.status_code < 300):
300+
content = await response.read()
301+
payload = {}
302+
if content:
303+
try:
304+
payload = json.loads(content.decode("utf-8"))
305+
except (ValueError, UnicodeDecodeError):
306+
payload = {"error": {"message": content.decode("utf-8", errors="replace")}}
307+
raise google.api_core.exceptions.format_http_response_error(response, method, url, payload)
308+
309+
# Handle Success
310+
payload = await response.read()
311+
if expect_json:
312+
if not payload:
313+
return {}
314+
return json.loads(payload)
315+
else:
316+
return payload

google/cloud/storage/_http.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
from google.cloud.storage import _helpers
2121
from google.cloud.storage._opentelemetry_tracing import create_trace_span
2222

23+
AGENT_VERSION = f"gcloud-python/{__version__}"
24+
2325

2426
class Connection(_http.JSONConnection):
2527
"""A connection to Google Cloud Storage via the JSON REST API.
@@ -54,9 +56,8 @@ def __init__(self, client, client_info=None, api_endpoint=None):
5456
# TODO: When metrics all use gccl, this should be removed #9552
5557
if self._client_info.user_agent is None: # pragma: no branch
5658
self._client_info.user_agent = ""
57-
agent_version = f"gcloud-python/{__version__}"
58-
if agent_version not in self._client_info.user_agent:
59-
self._client_info.user_agent += f" {agent_version} "
59+
if AGENT_VERSION not in self._client_info.user_agent:
60+
self._client_info.user_agent += f" {AGENT_VERSION} "
6061

6162
API_VERSION = _helpers._API_VERSION
6263
"""The version of the API, used in building the API call's URL."""

0 commit comments

Comments
 (0)