@@ -173,6 +173,22 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema):
173173 return arrow_batches
174174
175175
176+ def _avro_blocks_w_nonresumable_internal_error (avro_blocks ):
177+ for block in avro_blocks :
178+ yield block
179+ raise google .api_core .exceptions .InternalServerError (
180+ "INTERNAL: Got a nonresumable error."
181+ )
182+
183+
184+ def _avro_blocks_w_resumable_internal_error (avro_blocks ):
185+ for block in avro_blocks :
186+ yield block
187+ raise google .api_core .exceptions .InternalServerError (
188+ "INTERNAL: Received unexpected EOS on DATA frame from server."
189+ )
190+
191+
176192def _avro_blocks_w_unavailable (avro_blocks ):
177193 for block in avro_blocks :
178194 yield block
@@ -363,6 +379,29 @@ def test_rows_w_timeout(class_under_test, mock_client):
363379 mock_client .read_rows .assert_not_called ()
364380
365381
382+ def test_rows_w_nonresumable_internal_error (class_under_test , mock_client ):
383+ bq_columns = [{"name" : "int_col" , "type" : "int64" }]
384+ avro_schema = _bq_to_avro_schema (bq_columns )
385+ read_session = _generate_avro_read_session (avro_schema )
386+ bq_blocks = [[{"int_col" : 1024 }, {"int_col" : 512 }], [{"int_col" : 256 }]]
387+ avro_blocks = _avro_blocks_w_nonresumable_internal_error (
388+ _bq_to_avro_blocks (bq_blocks , avro_schema )
389+ )
390+
391+ stream_position = bigquery_storage_v1beta1 .types .StreamPosition (
392+ stream = {"name" : "test" }
393+ )
394+
395+ reader = class_under_test (avro_blocks , mock_client , stream_position , {})
396+
397+ with pytest .raises (
398+ google .api_core .exceptions .InternalServerError , match = "nonresumable error"
399+ ):
400+ list (reader .rows (read_session ))
401+
402+ mock_client .read_rows .assert_not_called ()
403+
404+
366405def test_rows_w_reconnect (class_under_test , mock_client ):
367406 bq_columns = [{"name" : "int_col" , "type" : "int64" }]
368407 avro_schema = _bq_to_avro_schema (bq_columns )
@@ -374,13 +413,17 @@ def test_rows_w_reconnect(class_under_test, mock_client):
374413 avro_blocks_1 = _avro_blocks_w_unavailable (
375414 _bq_to_avro_blocks (bq_blocks_1 , avro_schema )
376415 )
377- bq_blocks_2 = [[{"int_col" : 567 }, {"int_col" : 789 }], [{"int_col" : 890 }]]
378- avro_blocks_2 = _bq_to_avro_blocks (bq_blocks_2 , avro_schema )
416+ bq_blocks_2 = [[{"int_col" : 1024 }, {"int_col" : 512 }], [{"int_col" : 256 }]]
417+ avro_blocks_2 = _avro_blocks_w_resumable_internal_error (
418+ _bq_to_avro_blocks (bq_blocks_2 , avro_schema )
419+ )
420+ bq_blocks_3 = [[{"int_col" : 567 }, {"int_col" : 789 }], [{"int_col" : 890 }]]
421+ avro_blocks_3 = _bq_to_avro_blocks (bq_blocks_3 , avro_schema )
379422
380- for block in avro_blocks_2 :
423+ for block in avro_blocks_3 :
381424 block .status .estimated_row_count = 7
382425
383- mock_client .read_rows .return_value = avro_blocks_2
426+ mock_client .read_rows .side_effect = ( avro_blocks_2 , avro_blocks_3 )
384427 stream_position = bigquery_storage_v1beta1 .types .StreamPosition (
385428 stream = {"name" : "test" }
386429 )
@@ -397,17 +440,24 @@ def test_rows_w_reconnect(class_under_test, mock_client):
397440 itertools .chain (
398441 itertools .chain .from_iterable (bq_blocks_1 ),
399442 itertools .chain .from_iterable (bq_blocks_2 ),
443+ itertools .chain .from_iterable (bq_blocks_3 ),
400444 )
401445 )
402446
403447 assert tuple (got ) == expected
404448 assert got .total_rows == 7
405- mock_client .read_rows .assert_called_once_with (
449+ mock_client .read_rows .assert_any_call (
406450 bigquery_storage_v1beta1 .types .StreamPosition (
407451 stream = {"name" : "test" }, offset = 4
408452 ),
409453 metadata = {"test-key" : "test-value" },
410454 )
455+ mock_client .read_rows .assert_called_with (
456+ bigquery_storage_v1beta1 .types .StreamPosition (
457+ stream = {"name" : "test" }, offset = 7
458+ ),
459+ metadata = {"test-key" : "test-value" },
460+ )
411461
412462
413463def test_rows_w_reconnect_by_page (class_under_test , mock_client ):
0 commit comments