Skip to content

Commit 3b6bc4f

Browse files
authored
feat: Add async bidiRpc files in python-storage (#1545)
* Add async bidiRpc files in python-storage these files will be removed once googleapis/python-api-core#836 gets submitted * fix import path for bidi_base
1 parent a7d06c9 commit 3b6bc4f

File tree

3 files changed

+620
-0
lines changed

3 files changed

+620
-0
lines changed
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
# Copyright 2025, Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Asynchronous bi-directional streaming RPC helpers."""
16+
17+
import asyncio
18+
import logging
19+
20+
from google.api_core import exceptions
21+
from google.cloud.storage._experimental.asyncio.bidi_base import BidiRpcBase
22+
23+
_LOGGER = logging.getLogger(__name__)
24+
25+
26+
class _AsyncRequestQueueGenerator:
27+
"""_AsyncRequestQueueGenerator is a helper class for sending asynchronous
28+
requests to a gRPC stream from a Queue.
29+
30+
This generator takes asynchronous requests off a given queue and yields them
31+
to gRPC.
32+
33+
This helper is useful when you have an indeterminate, indefinite, or
34+
otherwise open-ended set of requests to send through a request-streaming
35+
(or bidirectional) RPC.
36+
37+
The reason this is necessary
38+
39+
is because it's let's user have control on the when they would want to
40+
send requests proto messages instead of sending all of them initilally.
41+
42+
This is achieved via asynchronous queue (asyncio.Queue),
43+
gRPC awaits until there's a message in the queue.
44+
45+
Finally, it allows for retrying without swapping queues because if it does
46+
pull an item off the queue when the RPC is inactive, it'll immediately put
47+
it back and then exit. This is necessary because yielding the item in this
48+
case will cause gRPC to discard it. In practice, this means that the order
49+
of messages is not guaranteed. If such a thing is necessary it would be
50+
easy to use a priority queue.
51+
52+
Example::
53+
54+
requests = _AsyncRequestQueueGenerator(q)
55+
call = await stub.StreamingRequest(requests)
56+
requests.call = call
57+
58+
async for response in call:
59+
print(response)
60+
await q.put(...)
61+
62+
Args:
63+
queue (asyncio.Queue): The request queue.
64+
initial_request (Union[protobuf.Message,
65+
Callable[[], protobuf.Message]]): The initial request to
66+
yield. This is done independently of the request queue to allow for
67+
easily restarting streams that require some initial configuration
68+
request.
69+
"""
70+
71+
def __init__(self, queue: asyncio.Queue, initial_request=None):
72+
self._queue = queue
73+
self._initial_request = initial_request
74+
self.call = None
75+
76+
def _is_active(self):
77+
"""
78+
Returns true if the call is not set or not completed.
79+
"""
80+
return self.call is None or not self.call.done()
81+
82+
async def __aiter__(self):
83+
if self._initial_request is not None:
84+
if callable(self._initial_request):
85+
yield self._initial_request()
86+
else:
87+
yield self._initial_request
88+
89+
while True:
90+
item = await self._queue.get()
91+
92+
# The consumer explicitly sent "None", indicating that the request
93+
# should end.
94+
if item is None:
95+
_LOGGER.debug("Cleanly exiting request generator.")
96+
return
97+
98+
if not self._is_active():
99+
# We have an item, but the call is closed. We should put the
100+
# item back on the queue so that the next call can consume it.
101+
await self._queue.put(item)
102+
_LOGGER.debug(
103+
"Inactive call, replacing item on queue and exiting "
104+
"request generator."
105+
)
106+
return
107+
108+
yield item
109+
110+
111+
class AsyncBidiRpc(BidiRpcBase):
112+
"""A helper for consuming a async bi-directional streaming RPC.
113+
114+
This maps gRPC's built-in interface which uses a request iterator and a
115+
response iterator into a socket-like :func:`send` and :func:`recv`. This
116+
is a more useful pattern for long-running or asymmetric streams (streams
117+
where there is not a direct correlation between the requests and
118+
responses).
119+
120+
Example::
121+
122+
initial_request = example_pb2.StreamingRpcRequest(
123+
setting='example')
124+
rpc = AsyncBidiRpc(
125+
stub.StreamingRpc,
126+
initial_request=initial_request,
127+
metadata=[('name', 'value')]
128+
)
129+
130+
await rpc.open()
131+
132+
while rpc.is_active:
133+
print(await rpc.recv())
134+
await rpc.send(example_pb2.StreamingRpcRequest(
135+
data='example'))
136+
137+
This does *not* retry the stream on errors. See :class:`AsyncResumableBidiRpc`.
138+
139+
Args:
140+
start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to
141+
start the RPC.
142+
initial_request (Union[protobuf.Message,
143+
Callable[[], protobuf.Message]]): The initial request to
144+
yield. This is useful if an initial request is needed to start the
145+
stream.
146+
metadata (Sequence[Tuple(str, str)]): RPC metadata to include in
147+
the request.
148+
"""
149+
150+
def _create_queue(self):
151+
"""Create a queue for requests."""
152+
return asyncio.Queue()
153+
154+
async def open(self):
155+
"""Opens the stream."""
156+
if self.is_active:
157+
raise ValueError("Can not open an already open stream.")
158+
159+
request_generator = _AsyncRequestQueueGenerator(
160+
self._request_queue, initial_request=self._initial_request
161+
)
162+
try:
163+
call = await self._start_rpc(request_generator, metadata=self._rpc_metadata)
164+
except exceptions.GoogleAPICallError as exc:
165+
# The original `grpc.RpcError` (which is usually also a `grpc.Call`) is
166+
# available from the ``response`` property on the mapped exception.
167+
self._on_call_done(exc.response)
168+
raise
169+
170+
request_generator.call = call
171+
172+
# TODO: api_core should expose the future interface for wrapped
173+
# callables as well.
174+
if hasattr(call, "_wrapped"): # pragma: NO COVER
175+
call._wrapped.add_done_callback(self._on_call_done)
176+
else:
177+
call.add_done_callback(self._on_call_done)
178+
179+
self._request_generator = request_generator
180+
self.call = call
181+
182+
async def close(self):
183+
"""Closes the stream."""
184+
if self.call is None:
185+
return
186+
187+
await self._request_queue.put(None)
188+
self.call.cancel()
189+
self._request_generator = None
190+
self._initial_request = None
191+
self._callbacks = []
192+
# Don't set self.call to None. Keep it around so that send/recv can
193+
# raise the error.
194+
195+
async def send(self, request):
196+
"""Queue a message to be sent on the stream.
197+
198+
If the underlying RPC has been closed, this will raise.
199+
200+
Args:
201+
request (protobuf.Message): The request to send.
202+
"""
203+
if self.call is None:
204+
raise ValueError("Can not send() on an RPC that has never been opened.")
205+
206+
# Don't use self.is_active(), as ResumableBidiRpc will overload it
207+
# to mean something semantically different.
208+
if not self.call.done():
209+
await self._request_queue.put(request)
210+
else:
211+
# calling read should cause the call to raise.
212+
await self.call.read()
213+
214+
async def recv(self):
215+
"""Wait for a message to be returned from the stream.
216+
217+
If the underlying RPC has been closed, this will raise.
218+
219+
Returns:
220+
protobuf.Message: The received message.
221+
"""
222+
if self.call is None:
223+
raise ValueError("Can not recv() on an RPC that has never been opened.")
224+
225+
return await self.call.read()
226+
227+
@property
228+
def is_active(self):
229+
"""bool: True if this stream is currently open and active."""
230+
return self.call is not None and not self.call.done()
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Copyright 2025, Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# You may obtain a copy of the License at
5+
# https://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
"""Base class for bi-directional streaming RPC helpers."""
14+
15+
16+
class BidiRpcBase:
17+
"""A base class for consuming a bi-directional streaming RPC.
18+
19+
This maps gRPC's built-in interface which uses a request iterator and a
20+
response iterator into a socket-like :func:`send` and :func:`recv`. This
21+
is a more useful pattern for long-running or asymmetric streams (streams
22+
where there is not a direct correlation between the requests and
23+
responses).
24+
25+
This does *not* retry the stream on errors.
26+
27+
Args:
28+
start_rpc (Union[grpc.StreamStreamMultiCallable,
29+
grpc.aio.StreamStreamMultiCallable]): The gRPC method used
30+
to start the RPC.
31+
initial_request (Union[protobuf.Message,
32+
Callable[[], protobuf.Message]]): The initial request to
33+
yield. This is useful if an initial request is needed to start the
34+
stream.
35+
metadata (Sequence[Tuple(str, str)]): RPC metadata to include in
36+
the request.
37+
"""
38+
39+
def __init__(self, start_rpc, initial_request=None, metadata=None):
40+
self._start_rpc = start_rpc
41+
self._initial_request = initial_request
42+
self._rpc_metadata = metadata
43+
self._request_queue = self._create_queue()
44+
self._request_generator = None
45+
self._callbacks = []
46+
self.call = None
47+
48+
def _create_queue(self):
49+
"""Create a queue for requests."""
50+
raise NotImplementedError("`_create_queue` is not implemented.")
51+
52+
def add_done_callback(self, callback):
53+
"""Adds a callback that will be called when the RPC terminates.
54+
55+
This occurs when the RPC errors or is successfully terminated.
56+
57+
Args:
58+
callback (Callable[[grpc.Future], None]): The callback to execute.
59+
It will be provided with the same gRPC future as the underlying
60+
stream which will also be a :class:`grpc.aio.Call`.
61+
"""
62+
self._callbacks.append(callback)
63+
64+
def _on_call_done(self, future):
65+
# This occurs when the RPC errors or is successfully terminated.
66+
# Note that grpc's "future" here can also be a grpc.RpcError.
67+
# See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331
68+
# that `grpc.RpcError` is also `grpc.aio.Call`.
69+
for callback in self._callbacks:
70+
callback(future)
71+
72+
@property
73+
def is_active(self):
74+
"""bool: True if this stream is currently open and active."""
75+
raise NotImplementedError("`is_active` is not implemented.")
76+
77+
@property
78+
def pending_requests(self):
79+
"""int: Returns an estimate of the number of queued requests."""
80+
return self._request_queue.qsize()

0 commit comments

Comments
 (0)