Skip to content

fix(bigquery-storage): replay in-flight AppendRowsStream requests on transient errors#17388

Open
64johnlee wants to merge 2 commits into
googleapis:mainfrom
64johnlee:fix/bigquery-storage-writer-transient-retry
Open

fix(bigquery-storage): replay in-flight AppendRowsStream requests on transient errors#17388
64johnlee wants to merge 2 commits into
googleapis:mainfrom
64johnlee:fix/bigquery-storage-writer-transient-retry

Conversation

@64johnlee
Copy link
Copy Markdown

Summary

Fixes #14275

When the gRPC connection drops with ServiceUnavailable or Unknown (e.g. "Connection reset by peer"), AppendRowsStream._renew_connection() was creating a fresh _Connection but immediately failing all pending futures with the transport error. Callers had no way to distinguish requests that reached the server from those that didn't, and were forced to implement their own reconnect-and-replay logic at risk of data loss or duplicates.

Changes

  • _queue stores (request, future) pairs instead of futures alone, so in-flight requests are always available for replay.
  • _shutdown() returns pending pairs on transient errors instead of calling future.set_exception() immediately. On non-transient errors the existing behaviour (fail all futures) is unchanged.
  • _renew_connection() calls _reopen_with_pending() on the new connection when pending pairs exist. The original AppendRowsFuture objects are reused — callers that already hold references transparently receive their results once the server acknowledges the replayed requests.
  • _reopen_with_pending() handles its own failure by draining the queue and failing futures directly (bypassing close()) to avoid an infinite retry loop.
  • _STREAM_RESUMPTION_EXCEPTIONS constant added (mirrors reader.py) to centralise the set of retryable errors.

Behaviour

Scenario Before After
Transient drop (503/Unknown) with 1+ in-flight requests Futures fail immediately; caller must retry Futures resolve transparently after reconnect
Non-transient error Futures fail with error Unchanged
Reconnect also fails N/A Futures fail with descriptive Unknown error; no retry loop
Intentional close() Futures fail with StreamClosedError Unchanged

Testing

  • Updated test_close, test__on_response_exception, test__on_response_result to use (request, future) queue format.
  • Updated test__renew_connection to set _shutdown.return_value = [] (non-transient case).
  • Added test__renew_connection_replays_on_transient_error to verify pending pairs are forwarded to _reopen_with_pending.
  • All 23 existing writer unit tests pass.

🤖 Generated with Claude Code

…transient errors

When the gRPC connection is dropped with ServiceUnavailable or Unknown
(e.g. "Connection reset by peer"), AppendRowsStream._renew_connection()
creates a new _Connection but was immediately failing all pending futures.
Callers had no way to know which requests reached the server, forcing
manual reconnection logic and risking data loss.

This change buffers (request, future) pairs in the connection queue
instead of futures alone. On transient errors, _shutdown() returns
the in-flight pairs instead of failing them, and _renew_connection()
replays them on the fresh connection via _reopen_with_pending(). The
original AppendRowsFuture objects are reused, so callers transparently
receive results without any API change.

The second-level failure case (reconnect also fails) is handled by
draining the queue and failing futures directly to avoid an infinite
retry loop.

Fixes googleapis#14275

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@64johnlee 64johnlee requested review from a team as code owners June 6, 2026 13:19
@64johnlee 64johnlee requested review from TrevorBergeron and removed request for a team June 6, 2026 13:19
@google-cla
Copy link
Copy Markdown

google-cla Bot commented Jun 6, 2026

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements transient error handling and automatic stream reconnection for the BigQuery Storage writer. When transient errors occur, in-flight requests are collected and replayed on a fresh connection. To support this, the internal queue now stores tuples of requests and futures. Feedback on the changes highlights three critical issues in the new _reopen_with_pending method: potential thread-safety and AttributeError issues when accessing RPC and consumer attributes outside the lock, hanging futures for remaining pending requests if the reconnection fails, and a potential NameError from using itertools without ensuring it is imported. A comprehensive code suggestion was provided to address these concerns.

Comment on lines +573 to +620
with self._thread_lock:
# Inject the existing future so _on_response resolves it.
self._queue.put((initial_user_request, initial_future))

merged = self._make_initial_request(initial_user_request)
self._rpc = bidi.BidiRpc(
self._client.append_rows,
initial_request=merged,
metadata=tuple(
itertools.chain(
self._metadata,
(
(
"x-goog-request-params",
f"write_stream={self._stream_name}",
),
),
)
),
)
self._rpc.add_done_callback(self._on_rpc_done)

self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
self._consumer.start()

start_time = time.monotonic()
try:
while not self._rpc.is_active and self._consumer.is_active:
time.sleep(_WRITE_OPEN_INTERVAL)
if timeout is not None and time.monotonic() - start_time > timeout:
break
except AttributeError:
pass

if not self._consumer.is_active:
# Connection failed — drain the queue and fail futures directly
# rather than going through close() to avoid triggering another
# reconnect attempt (which would cause an infinite retry loop).
exc = exceptions.Unknown(
"There was a problem reopening the stream after a transient error. "
"Try turning on DEBUG level logs to see the error."
)
with self._thread_lock:
self._closed = True
while not self._queue.empty():
_, future = self._queue.get_nowait()
future.set_exception(exc)
return
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are three important issues in _reopen_with_pending that should be addressed:

  1. Thread Safety & Potential AttributeError: Accessing self._rpc and self._consumer outside the lock block can lead to an AttributeError if another thread calls _shutdown and sets them to None. Using local variables rpc and consumer resolved within the lock block makes the wait loop and status check completely thread-safe and eliminates the need for the try-except AttributeError block.
  2. Hanging Futures on Reopen Failure: If the connection fails to reopen, only the first pending request (which was added to self._queue) is failed. The remaining pending[1:] requests are never added to self._queue and are never failed, leaving them hanging indefinitely. We should iterate over all pending futures and fail them, guarding with not future.done() to avoid raising InvalidStateError on already-resolved futures.
  3. Avoid itertools Dependency: We can construct the metadata tuple using standard tuple concatenation instead of itertools.chain, which avoids a potential NameError if itertools is not imported at the module level.
        with self._thread_lock:
            # Inject the existing future so _on_response resolves it.
            self._queue.put((initial_user_request, initial_future))

            merged = self._make_initial_request(initial_user_request)
            metadata = tuple(self._metadata) + (
                (
                    "x-goog-request-params",
                    f"write_stream={self._stream_name}",
                ),
            )
            rpc = bidi.BidiRpc(
                self._client.append_rows,
                initial_request=merged,
                metadata=metadata,
            )
            rpc.add_done_callback(self._on_rpc_done)

            consumer = bidi.BackgroundConsumer(rpc, self._on_response)
            consumer.start()

            self._rpc = rpc
            self._consumer = consumer

        start_time = time.monotonic()
        while not rpc.is_active and consumer.is_active:
            time.sleep(_WRITE_OPEN_INTERVAL)
            if timeout is not None and time.monotonic() - start_time > timeout:
                break

        if not consumer.is_active:
            # Connection failed — drain the queue and fail futures directly
            # rather than going through close() to avoid triggering another
            # reconnect attempt (which would cause an infinite retry loop).
            exc = exceptions.Unknown(
                "There was a problem reopening the stream after a transient error. "
                "Try turning on DEBUG level logs to see the error."
            )
            with self._thread_lock:
                self._closed = True
                while not self._queue.empty():
                    _, future = self._queue.get_nowait()
                    if not future.done():
                        future.set_exception(exc)
                for _, future in pending:
                    if not future.done():
                        future.set_exception(exc)
            return

Cover the success path (futures resolved after reconnection) and the
failure path (consumer never goes active -> futures failed immediately
without triggering an infinite retry loop).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Google Big Query Write append_rows does not respect retry configuration

1 participant