Skip to content

Commit 8b59a92

Browse files
authored
Add option to use BQ Storage API with to_dataframe (googleapis#6854)
* Add option to use BQ Storage API with to_dataframe This is a faster method to read a dataframe from a table using the (alpha) BigQuery Storage API. Supply a BQ Storage API to to_dataframe() to use the faster method. Currently it cannot read data from (small) anonymous query results tables, thus why the system test has a destination table for the query. * Remove thread prefix (not present in Python 3.5)
1 parent a48f30d commit 8b59a92

File tree

4 files changed

+306
-15
lines changed

4 files changed

+306
-15
lines changed

bigquery/google/cloud/bigquery/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,10 @@ def list_rows(
17171717
max_results=max_results,
17181718
page_size=page_size,
17191719
extra_params=params,
1720+
table=table,
1721+
# Pass in selected_fields separately from schema so that full
1722+
# tables can be fetched without a column filter.
1723+
selected_fields=selected_fields,
17201724
)
17211725
return row_iterator
17221726

bigquery/google/cloud/bigquery/table.py

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import copy
2020
import datetime
21+
import json
2122
import operator
2223
import warnings
2324

@@ -1242,6 +1243,17 @@ class RowIterator(HTTPIterator):
12421243
page_size (int, optional): The number of items to return per page.
12431244
extra_params (Dict[str, object]):
12441245
Extra query string parameters for the API call.
1246+
table (Union[ \
1247+
:class:`~google.cloud.bigquery.table.Table`, \
1248+
:class:`~google.cloud.bigquery.table.TableReference`, \
1249+
]):
1250+
Optional. The table which these rows belong to, or a reference to
1251+
it. Used to call the BigQuery Storage API to fetch rows.
1252+
selected_fields (Sequence[ \
1253+
google.cloud.bigquery.schema.SchemaField, \
1254+
]):
1255+
Optional. A subset of columns to select from this table.
1256+
12451257
"""
12461258

12471259
def __init__(
@@ -1254,6 +1266,8 @@ def __init__(
12541266
max_results=None,
12551267
page_size=None,
12561268
extra_params=None,
1269+
table=None,
1270+
selected_fields=None,
12571271
):
12581272
super(RowIterator, self).__init__(
12591273
client,
@@ -1271,6 +1285,9 @@ def __init__(
12711285
self._field_to_index = _helpers._field_to_index_mapping(schema)
12721286
self._total_rows = None
12731287
self._page_size = page_size
1288+
self._table = table
1289+
self._selected_fields = selected_fields
1290+
self._project = client.project
12741291

12751292
def _get_next_page_response(self):
12761293
"""Requests the next page from the path provided.
@@ -1296,9 +1313,81 @@ def total_rows(self):
12961313
"""int: The total number of rows in the table."""
12971314
return self._total_rows
12981315

1299-
def to_dataframe(self):
1316+
def _to_dataframe_tabledata_list(self):
1317+
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
1318+
column_headers = [field.name for field in self.schema]
1319+
# Use generator, rather than pulling the whole rowset into memory.
1320+
rows = (row.values() for row in iter(self))
1321+
return pandas.DataFrame(rows, columns=column_headers)
1322+
1323+
def _to_dataframe_bqstorage(self, bqstorage_client):
1324+
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
1325+
import concurrent.futures
1326+
from google.cloud import bigquery_storage_v1beta1
1327+
1328+
if "$" in self._table.table_id:
1329+
raise ValueError(
1330+
"Reading from a specific partition is not currently supported."
1331+
)
1332+
if "@" in self._table.table_id:
1333+
raise ValueError(
1334+
"Reading from a specific snapshot is not currently supported."
1335+
)
1336+
1337+
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
1338+
if self._selected_fields is not None:
1339+
for field in self._selected_fields:
1340+
read_options.selected_fields.append(field.name)
1341+
1342+
session = bqstorage_client.create_read_session(
1343+
self._table.to_bqstorage(),
1344+
"projects/{}".format(self._project),
1345+
read_options=read_options,
1346+
)
1347+
1348+
# We need to parse the schema manually so that we can rearrange the
1349+
# columns.
1350+
schema = json.loads(session.avro_schema.schema)
1351+
columns = [field["name"] for field in schema["fields"]]
1352+
1353+
# Avoid reading rows from an empty table. pandas.concat will fail on an
1354+
# empty list.
1355+
if not session.streams:
1356+
return pandas.DataFrame(columns=columns)
1357+
1358+
def get_dataframe(stream):
1359+
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
1360+
rowstream = bqstorage_client.read_rows(position)
1361+
return rowstream.to_dataframe(session)
1362+
1363+
with concurrent.futures.ThreadPoolExecutor() as pool:
1364+
frames = pool.map(get_dataframe, session.streams)
1365+
1366+
# rowstream.to_dataframe() does not preserve column order. Rearrange at
1367+
# the end using manually-parsed schema.
1368+
return pandas.concat(frames)[columns]
1369+
1370+
def to_dataframe(self, bqstorage_client=None):
13001371
"""Create a pandas DataFrame from the query results.
13011372
1373+
Args:
1374+
bqstorage_client ( \
1375+
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
1376+
):
1377+
Optional. A BigQuery Storage API client. If supplied, use the
1378+
faster BigQuery Storage API to fetch rows from BigQuery. This
1379+
API is a billable API.
1380+
1381+
This method requires the ``fastavro`` and
1382+
``google-cloud-bigquery-storage`` libraries.
1383+
1384+
Reading from a specific partition or snapshot is not
1385+
currently supported by this method.
1386+
1387+
**Caution**: There is a known issue reading small anonymous
1388+
query result tables with the BQ Storage API. Write your query
1389+
results to a destination table to work around this issue.
1390+
13021391
Returns:
13031392
pandas.DataFrame:
13041393
A :class:`~pandas.DataFrame` populated with row data and column
@@ -1312,11 +1401,10 @@ def to_dataframe(self):
13121401
if pandas is None:
13131402
raise ValueError(_NO_PANDAS_ERROR)
13141403

1315-
column_headers = [field.name for field in self.schema]
1316-
# Use generator, rather than pulling the whole rowset into memory.
1317-
rows = (row.values() for row in iter(self))
1318-
1319-
return pandas.DataFrame(rows, columns=column_headers)
1404+
if bqstorage_client is not None:
1405+
return self._to_dataframe_bqstorage(bqstorage_client)
1406+
else:
1407+
return self._to_dataframe_tabledata_list()
13201408

13211409

13221410
class _EmptyRowIterator(object):

bigquery/tests/system.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
import six
2929
import pytest
3030

31+
try:
32+
from google.cloud import bigquery_storage_v1beta1
33+
except ImportError: # pragma: NO COVER
34+
bigquery_storage_v1beta1 = None
3135
try:
3236
import pandas
3337
except ImportError: # pragma: NO COVER
@@ -1496,7 +1500,7 @@ def test_query_iter(self):
14961500
def test_query_results_to_dataframe(self):
14971501
QUERY = """
14981502
SELECT id, author, time_ts, dead
1499-
from `bigquery-public-data.hacker_news.comments`
1503+
FROM `bigquery-public-data.hacker_news.comments`
15001504
LIMIT 10
15011505
"""
15021506

@@ -1518,6 +1522,53 @@ def test_query_results_to_dataframe(self):
15181522
if not row[col] is None:
15191523
self.assertIsInstance(row[col], exp_datatypes[col])
15201524

1525+
@unittest.skipIf(pandas is None, "Requires `pandas`")
1526+
@unittest.skipIf(
1527+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
1528+
)
1529+
def test_query_results_to_dataframe_w_bqstorage(self):
1530+
dest_dataset = self.temp_dataset(_make_dataset_id("bqstorage_to_dataframe_"))
1531+
dest_ref = dest_dataset.table("query_results")
1532+
1533+
query = """
1534+
SELECT id, author, time_ts, dead
1535+
FROM `bigquery-public-data.hacker_news.comments`
1536+
LIMIT 10
1537+
"""
1538+
1539+
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
1540+
credentials=Config.CLIENT._credentials
1541+
)
1542+
df = (
1543+
Config.CLIENT.query(
1544+
query,
1545+
# There is a known issue reading small anonymous query result
1546+
# tables with the BQ Storage API. Writing to a destination
1547+
# table works around this issue.
1548+
job_config=bigquery.QueryJobConfig(
1549+
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
1550+
),
1551+
)
1552+
.result()
1553+
.to_dataframe(bqstorage_client)
1554+
)
1555+
1556+
self.assertIsInstance(df, pandas.DataFrame)
1557+
self.assertEqual(len(df), 10) # verify the number of rows
1558+
column_names = ["id", "author", "time_ts", "dead"]
1559+
self.assertEqual(list(df), column_names)
1560+
exp_datatypes = {
1561+
"id": int,
1562+
"author": six.text_type,
1563+
"time_ts": pandas.Timestamp,
1564+
"dead": bool,
1565+
}
1566+
for index, row in df.iterrows():
1567+
for col in column_names:
1568+
# all the schema fields are nullable, so None is acceptable
1569+
if not row[col] is None:
1570+
self.assertIsInstance(row[col], exp_datatypes[col])
1571+
15211572
def test_insert_rows_nested_nested(self):
15221573
# See #2951
15231574
SF = bigquery.SchemaField

0 commit comments

Comments
 (0)