Skip to content

Commit 8049e8a

Browse files
authored
Fallback to BQ API when there are problems reading from BQ Storage. (googleapis#7633)
The tabledata.list API works for more kinds of tables, including small anonymous query results tables. By falling back to this API, we enable a developer to always specify a `bqstorage_client` even when they aren't writing their query results to a destination table and don't know how large their query results will be.
1 parent 682903b commit 8049e8a

3 files changed

Lines changed: 94 additions & 33 deletions

File tree

bigquery/google/cloud/bigquery/table.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
except ImportError: # pragma: NO COVER
3636
tqdm = None
3737

38+
import google.api_core.exceptions
3839
from google.api_core.page_iterator import HTTPIterator
3940

4041
import google.cloud._helpers
@@ -1437,7 +1438,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
14371438
bqstorage_client ( \
14381439
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
14391440
):
1440-
**Alpha Feature** Optional. A BigQuery Storage API client. If
1441+
**Beta Feature** Optional. A BigQuery Storage API client. If
14411442
supplied, use the faster BigQuery Storage API to fetch rows
14421443
from BigQuery. This API is a billable API.
14431444
@@ -1448,8 +1449,9 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
14481449
currently supported by this method.
14491450
14501451
**Caution**: There is a known issue reading small anonymous
1451-
query result tables with the BQ Storage API. Write your query
1452-
results to a destination table to work around this issue.
1452+
query result tables with the BQ Storage API. When a problem
1453+
is encountered reading a table, the tabledata.list method
1454+
from the BigQuery API is used, instead.
14531455
dtypes ( \
14541456
Map[str, Union[str, pandas.Series.dtype]] \
14551457
):
@@ -1496,9 +1498,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
14961498
progress_bar = self._get_progress_bar(progress_bar_type)
14971499

14981500
if bqstorage_client is not None:
1499-
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
1500-
else:
1501-
return self._to_dataframe_tabledata_list(dtypes, progress_bar=progress_bar)
1501+
try:
1502+
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
1503+
except google.api_core.exceptions.GoogleAPICallError:
1504+
# There is a known issue with reading from small anonymous
1505+
# query results tables, so some errors are expected. Rather
1506+
# than throw those errors, try reading the DataFrame again, but
1507+
# with the tabledata.list API.
1508+
pass
1509+
1510+
return self._to_dataframe_tabledata_list(dtypes, progress_bar=progress_bar)
15021511

15031512

15041513
class _EmptyRowIterator(object):

bigquery/tests/system.py

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1540,35 +1540,42 @@ def test_query_results_to_dataframe_w_bqstorage(self):
15401540
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
15411541
credentials=Config.CLIENT._credentials
15421542
)
1543-
df = (
1544-
Config.CLIENT.query(
1545-
query,
1546-
# There is a known issue reading small anonymous query result
1547-
# tables with the BQ Storage API. Writing to a destination
1548-
# table works around this issue.
1549-
job_config=bigquery.QueryJobConfig(
1550-
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
1551-
),
1552-
)
1553-
.result()
1554-
.to_dataframe(bqstorage_client)
1543+
1544+
job_configs = (
1545+
# There is a known issue reading small anonymous query result
1546+
# tables with the BQ Storage API. Writing to a destination
1547+
# table works around this issue.
1548+
bigquery.QueryJobConfig(
1549+
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
1550+
),
1551+
# Check that the client is able to work around the issue with
1552+
# reading small anonymous query result tables by falling back to
1553+
# the tabledata.list API.
1554+
None,
15551555
)
15561556

1557-
self.assertIsInstance(df, pandas.DataFrame)
1558-
self.assertEqual(len(df), 10) # verify the number of rows
1559-
column_names = ["id", "author", "time_ts", "dead"]
1560-
self.assertEqual(list(df), column_names)
1561-
exp_datatypes = {
1562-
"id": int,
1563-
"author": six.text_type,
1564-
"time_ts": pandas.Timestamp,
1565-
"dead": bool,
1566-
}
1567-
for index, row in df.iterrows():
1568-
for col in column_names:
1569-
# all the schema fields are nullable, so None is acceptable
1570-
if not row[col] is None:
1571-
self.assertIsInstance(row[col], exp_datatypes[col])
1557+
for job_config in job_configs:
1558+
df = (
1559+
Config.CLIENT.query(query, job_config=job_config)
1560+
.result()
1561+
.to_dataframe(bqstorage_client)
1562+
)
1563+
1564+
self.assertIsInstance(df, pandas.DataFrame)
1565+
self.assertEqual(len(df), 10) # verify the number of rows
1566+
column_names = ["id", "author", "time_ts", "dead"]
1567+
self.assertEqual(list(df), column_names)
1568+
exp_datatypes = {
1569+
"id": int,
1570+
"author": six.text_type,
1571+
"time_ts": pandas.Timestamp,
1572+
"dead": bool,
1573+
}
1574+
for index, row in df.iterrows():
1575+
for col in column_names:
1576+
# all the schema fields are nullable, so None is acceptable
1577+
if not row[col] is None:
1578+
self.assertIsInstance(row[col], exp_datatypes[col])
15721579

15731580
def test_insert_rows_nested_nested(self):
15741581
# See #2951

bigquery/tests/unit/test_table.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import pytest
2222
import six
2323

24+
import google.api_core.exceptions
25+
2426
try:
2527
from google.cloud import bigquery_storage_v1beta1
2628
except ImportError: # pragma: NO COVER
2729
bigquery_storage_v1beta1 = None
30+
2831
try:
2932
import pandas
3033
except (ImportError, AttributeError): # pragma: NO COVER
@@ -1748,6 +1751,48 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
17481751
self.assertEqual(list(got), column_names)
17491752
self.assertEqual(len(got.index), 2)
17501753

1754+
@unittest.skipIf(pandas is None, "Requires `pandas`")
1755+
@unittest.skipIf(
1756+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
1757+
)
1758+
def test_to_dataframe_w_bqstorage_fallback_to_tabledata_list(self):
1759+
from google.cloud.bigquery import schema
1760+
from google.cloud.bigquery import table as mut
1761+
1762+
bqstorage_client = mock.create_autospec(
1763+
bigquery_storage_v1beta1.BigQueryStorageClient
1764+
)
1765+
bqstorage_client.create_read_session.side_effect = google.api_core.exceptions.InternalServerError(
1766+
"can't read with bqstorage_client"
1767+
)
1768+
iterator_schema = [
1769+
schema.SchemaField("name", "STRING", mode="REQUIRED"),
1770+
schema.SchemaField("age", "INTEGER", mode="REQUIRED"),
1771+
]
1772+
rows = [
1773+
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
1774+
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
1775+
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
1776+
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
1777+
]
1778+
path = "/foo"
1779+
api_request = mock.Mock(return_value={"rows": rows})
1780+
row_iterator = mut.RowIterator(
1781+
_mock_client(),
1782+
api_request,
1783+
path,
1784+
iterator_schema,
1785+
table=mut.Table("proj.dset.tbl"),
1786+
)
1787+
1788+
df = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)
1789+
1790+
self.assertIsInstance(df, pandas.DataFrame)
1791+
self.assertEqual(len(df), 4) # verify the number of rows
1792+
self.assertEqual(list(df), ["name", "age"]) # verify the column names
1793+
self.assertEqual(df.name.dtype.name, "object")
1794+
self.assertEqual(df.age.dtype.name, "int64")
1795+
17511796
@unittest.skipIf(
17521797
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
17531798
)

0 commit comments

Comments
 (0)