Skip to content

Commit d893035

Browse files
committed
fix(bigquerystorage): resume reader connection on EOS internal error
It's infeasible for the backend to change the status of `EOS on DATA` internal errors, so instead we check the error message to see if it's an error that is resumable. We don't want to try to resume on *all* internal errors, so inspecting the message is the best we can do.
1 parent 8bb4068 commit d893035

File tree

2 files changed

+73
-5
lines changed

2 files changed

+73
-5
lines changed

bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@
4343

4444
_STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,)
4545

46+
# The Google API endpoint can unexpectedly close long-running HTTP/2 streams.
47+
# Unfortunately, this condition is surfaced to the caller as an internal error
48+
# by gRPC. We don't want to resume on all internal errors, so instead we look
49+
# for error message that we know are caused by problems that are safe to
50+
# reconnect.
51+
_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
52+
# See: https://issuetracker.google.com/143292803
53+
"unexpected EOS on DATA frame",
54+
)
55+
4656
_FASTAVRO_REQUIRED = (
4757
"fastavro is required to parse ReadRowResponse messages with Avro bytes."
4858
)
@@ -131,6 +141,14 @@ def __iter__(self):
131141
yield message
132142

133143
return # Made it through the whole stream.
144+
except google.api_core.exceptions.InternalServerError as exc:
145+
resumable_error = False
146+
for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES:
147+
resumable_error = (
148+
resumable_error or resumable_message in exc.message
149+
)
150+
if not resumable_error:
151+
raise
134152
except _STREAM_RESUMPTION_EXCEPTIONS:
135153
# Transient error, so reconnect to the stream.
136154
pass

bigquery_storage/tests/unit/test_reader.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,22 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema):
173173
return arrow_batches
174174

175175

176+
def _avro_blocks_w_nonresumable_internal_error(avro_blocks):
177+
for block in avro_blocks:
178+
yield block
179+
raise google.api_core.exceptions.InternalServerError(
180+
"INTERNAL: Got a nonresumable error."
181+
)
182+
183+
184+
def _avro_blocks_w_resumable_internal_error(avro_blocks):
185+
for block in avro_blocks:
186+
yield block
187+
raise google.api_core.exceptions.InternalServerError(
188+
"INTERNAL: Received unexpected EOS on DATA frame from server."
189+
)
190+
191+
176192
def _avro_blocks_w_unavailable(avro_blocks):
177193
for block in avro_blocks:
178194
yield block
@@ -363,6 +379,29 @@ def test_rows_w_timeout(class_under_test, mock_client):
363379
mock_client.read_rows.assert_not_called()
364380

365381

382+
def test_rows_w_nonresumable_internal_error(class_under_test, mock_client):
383+
bq_columns = [{"name": "int_col", "type": "int64"}]
384+
avro_schema = _bq_to_avro_schema(bq_columns)
385+
read_session = _generate_avro_read_session(avro_schema)
386+
bq_blocks = [[{"int_col": 1024}, {"int_col": 512}], [{"int_col": 256}]]
387+
avro_blocks = _avro_blocks_w_nonresumable_internal_error(
388+
_bq_to_avro_blocks(bq_blocks, avro_schema)
389+
)
390+
391+
stream_position = bigquery_storage_v1beta1.types.StreamPosition(
392+
stream={"name": "test"}
393+
)
394+
395+
reader = class_under_test(avro_blocks, mock_client, stream_position, {})
396+
397+
with pytest.raises(
398+
google.api_core.exceptions.InternalServerError, match="nonresumable error"
399+
):
400+
list(reader.rows(read_session))
401+
402+
mock_client.read_rows.assert_not_called()
403+
404+
366405
def test_rows_w_reconnect(class_under_test, mock_client):
367406
bq_columns = [{"name": "int_col", "type": "int64"}]
368407
avro_schema = _bq_to_avro_schema(bq_columns)
@@ -374,13 +413,17 @@ def test_rows_w_reconnect(class_under_test, mock_client):
374413
avro_blocks_1 = _avro_blocks_w_unavailable(
375414
_bq_to_avro_blocks(bq_blocks_1, avro_schema)
376415
)
377-
bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
378-
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)
416+
bq_blocks_2 = [[{"int_col": 1024}, {"int_col": 512}], [{"int_col": 256}]]
417+
avro_blocks_2 = _avro_blocks_w_resumable_internal_error(
418+
_bq_to_avro_blocks(bq_blocks_2, avro_schema)
419+
)
420+
bq_blocks_3 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
421+
avro_blocks_3 = _bq_to_avro_blocks(bq_blocks_3, avro_schema)
379422

380-
for block in avro_blocks_2:
423+
for block in avro_blocks_3:
381424
block.status.estimated_row_count = 7
382425

383-
mock_client.read_rows.return_value = avro_blocks_2
426+
mock_client.read_rows.side_effect = (avro_blocks_2, avro_blocks_3)
384427
stream_position = bigquery_storage_v1beta1.types.StreamPosition(
385428
stream={"name": "test"}
386429
)
@@ -397,17 +440,24 @@ def test_rows_w_reconnect(class_under_test, mock_client):
397440
itertools.chain(
398441
itertools.chain.from_iterable(bq_blocks_1),
399442
itertools.chain.from_iterable(bq_blocks_2),
443+
itertools.chain.from_iterable(bq_blocks_3),
400444
)
401445
)
402446

403447
assert tuple(got) == expected
404448
assert got.total_rows == 7
405-
mock_client.read_rows.assert_called_once_with(
449+
mock_client.read_rows.assert_any_call(
406450
bigquery_storage_v1beta1.types.StreamPosition(
407451
stream={"name": "test"}, offset=4
408452
),
409453
metadata={"test-key": "test-value"},
410454
)
455+
mock_client.read_rows.assert_called_with(
456+
bigquery_storage_v1beta1.types.StreamPosition(
457+
stream={"name": "test"}, offset=7
458+
),
459+
metadata={"test-key": "test-value"},
460+
)
411461

412462

413463
def test_rows_w_reconnect_by_page(class_under_test, mock_client):

0 commit comments

Comments
 (0)