fix(bigquery-storage): replay in-flight AppendRowsStream requests on transient errors#17388
fix(bigquery-storage): replay in-flight AppendRowsStream requests on transient errors#1738864johnlee wants to merge 2 commits into
Conversation
…transient errors When the gRPC connection is dropped with ServiceUnavailable or Unknown (e.g. "Connection reset by peer"), AppendRowsStream._renew_connection() creates a new _Connection but was immediately failing all pending futures. Callers had no way to know which requests reached the server, forcing manual reconnection logic and risking data loss. This change buffers (request, future) pairs in the connection queue instead of futures alone. On transient errors, _shutdown() returns the in-flight pairs instead of failing them, and _renew_connection() replays them on the fresh connection via _reopen_with_pending(). The original AppendRowsFuture objects are reused, so callers transparently receive results without any API change. The second-level failure case (reconnect also fails) is handled by draining the queue and failing futures directly to avoid an infinite retry loop. Fixes googleapis#14275 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
There was a problem hiding this comment.
Code Review
This pull request implements transient error handling and automatic stream reconnection for the BigQuery Storage writer. When transient errors occur, in-flight requests are collected and replayed on a fresh connection. To support this, the internal queue now stores tuples of requests and futures. Feedback on the changes highlights three critical issues in the new _reopen_with_pending method: potential thread-safety and AttributeError issues when accessing RPC and consumer attributes outside the lock, hanging futures for remaining pending requests if the reconnection fails, and a potential NameError from using itertools without ensuring it is imported. A comprehensive code suggestion was provided to address these concerns.
| with self._thread_lock: | ||
| # Inject the existing future so _on_response resolves it. | ||
| self._queue.put((initial_user_request, initial_future)) | ||
|
|
||
| merged = self._make_initial_request(initial_user_request) | ||
| self._rpc = bidi.BidiRpc( | ||
| self._client.append_rows, | ||
| initial_request=merged, | ||
| metadata=tuple( | ||
| itertools.chain( | ||
| self._metadata, | ||
| ( | ||
| ( | ||
| "x-goog-request-params", | ||
| f"write_stream={self._stream_name}", | ||
| ), | ||
| ), | ||
| ) | ||
| ), | ||
| ) | ||
| self._rpc.add_done_callback(self._on_rpc_done) | ||
|
|
||
| self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response) | ||
| self._consumer.start() | ||
|
|
||
| start_time = time.monotonic() | ||
| try: | ||
| while not self._rpc.is_active and self._consumer.is_active: | ||
| time.sleep(_WRITE_OPEN_INTERVAL) | ||
| if timeout is not None and time.monotonic() - start_time > timeout: | ||
| break | ||
| except AttributeError: | ||
| pass | ||
|
|
||
| if not self._consumer.is_active: | ||
| # Connection failed — drain the queue and fail futures directly | ||
| # rather than going through close() to avoid triggering another | ||
| # reconnect attempt (which would cause an infinite retry loop). | ||
| exc = exceptions.Unknown( | ||
| "There was a problem reopening the stream after a transient error. " | ||
| "Try turning on DEBUG level logs to see the error." | ||
| ) | ||
| with self._thread_lock: | ||
| self._closed = True | ||
| while not self._queue.empty(): | ||
| _, future = self._queue.get_nowait() | ||
| future.set_exception(exc) | ||
| return |
There was a problem hiding this comment.
There are three important issues in _reopen_with_pending that should be addressed:
- Thread Safety & Potential
AttributeError: Accessingself._rpcandself._consumeroutside the lock block can lead to anAttributeErrorif another thread calls_shutdownand sets them toNone. Using local variablesrpcandconsumerresolved within the lock block makes the wait loop and status check completely thread-safe and eliminates the need for thetry-except AttributeErrorblock. - Hanging Futures on Reopen Failure: If the connection fails to reopen, only the first pending request (which was added to
self._queue) is failed. The remainingpending[1:]requests are never added toself._queueand are never failed, leaving them hanging indefinitely. We should iterate over allpendingfutures and fail them, guarding withnot future.done()to avoid raisingInvalidStateErroron already-resolved futures. - Avoid
itertoolsDependency: We can construct the metadata tuple using standard tuple concatenation instead ofitertools.chain, which avoids a potentialNameErrorifitertoolsis not imported at the module level.
with self._thread_lock:
# Inject the existing future so _on_response resolves it.
self._queue.put((initial_user_request, initial_future))
merged = self._make_initial_request(initial_user_request)
metadata = tuple(self._metadata) + (
(
"x-goog-request-params",
f"write_stream={self._stream_name}",
),
)
rpc = bidi.BidiRpc(
self._client.append_rows,
initial_request=merged,
metadata=metadata,
)
rpc.add_done_callback(self._on_rpc_done)
consumer = bidi.BackgroundConsumer(rpc, self._on_response)
consumer.start()
self._rpc = rpc
self._consumer = consumer
start_time = time.monotonic()
while not rpc.is_active and consumer.is_active:
time.sleep(_WRITE_OPEN_INTERVAL)
if timeout is not None and time.monotonic() - start_time > timeout:
break
if not consumer.is_active:
# Connection failed — drain the queue and fail futures directly
# rather than going through close() to avoid triggering another
# reconnect attempt (which would cause an infinite retry loop).
exc = exceptions.Unknown(
"There was a problem reopening the stream after a transient error. "
"Try turning on DEBUG level logs to see the error."
)
with self._thread_lock:
self._closed = True
while not self._queue.empty():
_, future = self._queue.get_nowait()
if not future.done():
future.set_exception(exc)
for _, future in pending:
if not future.done():
future.set_exception(exc)
returnCover the success path (futures resolved after reconnection) and the failure path (consumer never goes active -> futures failed immediately without triggering an infinite retry loop). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Summary
Fixes #14275
When the gRPC connection drops with
ServiceUnavailableorUnknown(e.g."Connection reset by peer"),AppendRowsStream._renew_connection()was creating a fresh_Connectionbut immediately failing all pending futures with the transport error. Callers had no way to distinguish requests that reached the server from those that didn't, and were forced to implement their own reconnect-and-replay logic at risk of data loss or duplicates.Changes
_queuestores(request, future)pairs instead of futures alone, so in-flight requests are always available for replay._shutdown()returns pending pairs on transient errors instead of callingfuture.set_exception()immediately. On non-transient errors the existing behaviour (fail all futures) is unchanged._renew_connection()calls_reopen_with_pending()on the new connection when pending pairs exist. The originalAppendRowsFutureobjects are reused — callers that already hold references transparently receive their results once the server acknowledges the replayed requests._reopen_with_pending()handles its own failure by draining the queue and failing futures directly (bypassingclose()) to avoid an infinite retry loop._STREAM_RESUMPTION_EXCEPTIONSconstant added (mirrorsreader.py) to centralise the set of retryable errors.Behaviour
Unknownerror; no retry loopclose()StreamClosedErrorTesting
test_close,test__on_response_exception,test__on_response_resultto use(request, future)queue format.test__renew_connectionto set_shutdown.return_value = [](non-transient case).test__renew_connection_replays_on_transient_errorto verify pending pairs are forwarded to_reopen_with_pending.🤖 Generated with Claude Code