Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit eabbdca

Browse files
authored
feat(storage): Add base async client and async http iterator class (#1696)
1. Add async client implementation. 2. Add AsyncHTTPIterator deriving from google.api_core.page_iterator_async.AsyncIterator as an alternative to google.api_core.page_iterator.HTTPIterator The AsyncHTTPIterator doesn't exists, and hence needs to be implemented.
1 parent 47e4394 commit eabbdca

File tree

5 files changed

+965
-13
lines changed

5 files changed

+965
-13
lines changed
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Asynchronous client for interacting with Google Cloud Storage."""
16+
17+
import functools
18+
19+
from google.cloud.storage._experimental.asyncio.async_helpers import ASYNC_DEFAULT_TIMEOUT
20+
from google.cloud.storage._experimental.asyncio.async_helpers import ASYNC_DEFAULT_RETRY
21+
from google.cloud.storage._experimental.asyncio.async_helpers import AsyncHTTPIterator
22+
from google.cloud.storage._experimental.asyncio.async_helpers import _do_nothing_page_start
23+
from google.cloud.storage._opentelemetry_tracing import create_trace_span
24+
from google.cloud.storage._experimental.asyncio.async_creds import AsyncCredsWrapper
25+
from google.cloud.storage.abstracts.base_client import BaseClient
26+
from google.cloud.storage._experimental.asyncio.async_connection import AsyncConnection
27+
from google.cloud.storage.abstracts import base_client
28+
29+
try:
30+
from google.auth.aio.transport import sessions
31+
AsyncSession = sessions.AsyncAuthorizedSession
32+
_AIO_AVAILABLE = True
33+
except ImportError:
34+
_AIO_AVAILABLE = False
35+
36+
_marker = base_client.marker
37+
38+
39+
class AsyncClient(BaseClient):
40+
"""Asynchronous client to interact with Google Cloud Storage."""
41+
42+
def __init__(
43+
self,
44+
project=_marker,
45+
credentials=None,
46+
_async_http=None,
47+
client_info=None,
48+
client_options=None,
49+
extra_headers={},
50+
*,
51+
api_key=None,
52+
):
53+
if not _AIO_AVAILABLE:
54+
# Python 3.9 or less comes with an older version of google-auth library which doesn't support asyncio
55+
raise ImportError(
56+
"Failed to import 'google.auth.aio', Consider using a newer python version (>=3.10)"
57+
" or newer version of google-auth library to mitigate this issue."
58+
)
59+
60+
if self._use_client_cert:
61+
# google.auth.aio.transports.sessions.AsyncAuthorizedSession currently doesn't support configuring mTLS.
62+
# In future, we can monkey patch the above, and do provide mTLS support, but that is not a priority
63+
# at the moment.
64+
raise ValueError("Async Client currently do not support mTLS")
65+
66+
# We initialize everything as per synchronous client.
67+
super().__init__(
68+
project=project,
69+
credentials=credentials,
70+
client_info=client_info,
71+
client_options=client_options,
72+
extra_headers=extra_headers,
73+
api_key=api_key
74+
)
75+
self.credentials = AsyncCredsWrapper(self._credentials) # self._credential is synchronous.
76+
self._connection = AsyncConnection(self, **self.connection_kw_args) # adapter for async communication
77+
self._async_http_internal = _async_http
78+
self._async_http_passed_by_user = (_async_http is not None)
79+
80+
@property
81+
def async_http(self):
82+
"""Returns the existing asynchronous session, or create one if it does not exists."""
83+
if self._async_http_internal is None:
84+
self._async_http_internal = AsyncSession(credentials=self.credentials)
85+
return self._async_http_internal
86+
87+
async def close(self):
88+
"""Close the session, if it exists"""
89+
if self._async_http_internal is not None and not self._async_http_passed_by_user:
90+
await self._async_http_internal.close()
91+
92+
async def _get_resource(
93+
self,
94+
path,
95+
query_params=None,
96+
headers=None,
97+
timeout=ASYNC_DEFAULT_TIMEOUT,
98+
retry=ASYNC_DEFAULT_RETRY,
99+
_target_object=None,
100+
):
101+
"""See super() class"""
102+
return await self._connection.api_request(
103+
method="GET",
104+
path=path,
105+
query_params=query_params,
106+
headers=headers,
107+
timeout=timeout,
108+
retry=retry,
109+
_target_object=_target_object,
110+
)
111+
112+
def _list_resource(
113+
self,
114+
path,
115+
item_to_value,
116+
page_token=None,
117+
max_results=None,
118+
extra_params=None,
119+
page_start=_do_nothing_page_start,
120+
page_size=None,
121+
timeout=ASYNC_DEFAULT_TIMEOUT,
122+
retry=ASYNC_DEFAULT_RETRY,
123+
):
124+
"""See super() class"""
125+
kwargs = {
126+
"method": "GET",
127+
"path": path,
128+
"timeout": timeout,
129+
}
130+
with create_trace_span(
131+
name="Storage.AsyncClient._list_resource_returns_iterator",
132+
client=self,
133+
api_request=kwargs,
134+
retry=retry,
135+
):
136+
api_request = functools.partial(
137+
self._connection.api_request, timeout=timeout, retry=retry
138+
)
139+
return AsyncHTTPIterator(
140+
client=self,
141+
api_request=api_request,
142+
path=path,
143+
item_to_value=item_to_value,
144+
page_token=page_token,
145+
max_results=max_results,
146+
extra_params=extra_params,
147+
page_start=page_start,
148+
page_size=page_size,
149+
)
150+
151+
async def _patch_resource(
152+
self,
153+
path,
154+
data,
155+
query_params=None,
156+
headers=None,
157+
timeout=ASYNC_DEFAULT_TIMEOUT,
158+
retry=None,
159+
_target_object=None,
160+
):
161+
"""See super() class"""
162+
return await self._connection.api_request(
163+
method="PATCH",
164+
path=path,
165+
data=data,
166+
query_params=query_params,
167+
headers=headers,
168+
timeout=timeout,
169+
retry=retry,
170+
_target_object=_target_object,
171+
)
172+
173+
async def _put_resource(
174+
self,
175+
path,
176+
data,
177+
query_params=None,
178+
headers=None,
179+
timeout=ASYNC_DEFAULT_TIMEOUT,
180+
retry=None,
181+
_target_object=None,
182+
):
183+
"""See super() class"""
184+
return await self._connection.api_request(
185+
method="PUT",
186+
path=path,
187+
data=data,
188+
query_params=query_params,
189+
headers=headers,
190+
timeout=timeout,
191+
retry=retry,
192+
_target_object=_target_object,
193+
)
194+
195+
async def _post_resource(
196+
self,
197+
path,
198+
data,
199+
query_params=None,
200+
headers=None,
201+
timeout=ASYNC_DEFAULT_TIMEOUT,
202+
retry=None,
203+
_target_object=None,
204+
):
205+
"""See super() class"""
206+
return await self._connection.api_request(
207+
method="POST",
208+
path=path,
209+
data=data,
210+
query_params=query_params,
211+
headers=headers,
212+
timeout=timeout,
213+
retry=retry,
214+
_target_object=_target_object,
215+
)
216+
217+
async def _delete_resource(
218+
self,
219+
path,
220+
query_params=None,
221+
headers=None,
222+
timeout=ASYNC_DEFAULT_TIMEOUT,
223+
retry=ASYNC_DEFAULT_RETRY,
224+
_target_object=None,
225+
):
226+
"""See super() class"""
227+
return await self._connection.api_request(
228+
method="DELETE",
229+
path=path,
230+
query_params=query_params,
231+
headers=headers,
232+
timeout=timeout,
233+
retry=retry,
234+
_target_object=_target_object,
235+
)
236+
237+
def bucket(self, bucket_name, user_project=None, generation=None):
238+
"""Factory constructor for bucket object.
239+
240+
.. note::
241+
This will not make an HTTP request; it simply instantiates
242+
a bucket object owned by this client.
243+
244+
:type bucket_name: str
245+
:param bucket_name: The name of the bucket to be instantiated.
246+
247+
:type user_project: str
248+
:param user_project: (Optional) The project ID to be billed for API
249+
requests made via the bucket.
250+
251+
:type generation: int
252+
:param generation: (Optional) If present, selects a specific revision of
253+
this bucket.
254+
255+
:rtype: :class:`google.cloud.storage._experimental.asyncio.bucket.AsyncBucket`
256+
:returns: The bucket object created.
257+
"""
258+
raise NotImplementedError("This AsyncBucket class needs to be implemented.")

0 commit comments

Comments
 (0)