Skip to content

Commit c71d5f8

Browse files
tswastplamut
authored andcommitted
refactor(bigquery): to_dataframe uses faster to_arrow + to_pandas when pyarrow is available (#10027)
* fix(bigquery): to_dataframe uses 2x faster to_arrow + to_pandas when pyarrow is available * fix: skip to_arrow tests when pyarrow is missing * test: update test to work around numpy array encoding of nested arrays * test: add test for tabledata.list with no rows * test: boost test coverage * chore: fix lint
1 parent 121394c commit c71d5f8

File tree

3 files changed

+286
-62
lines changed

3 files changed

+286
-62
lines changed

bigquery/google/cloud/bigquery/table.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,6 +1519,17 @@ def to_arrow(
15191519
if pyarrow is None:
15201520
raise ValueError(_NO_PYARROW_ERROR)
15211521

1522+
if (
1523+
bqstorage_client or create_bqstorage_client
1524+
) and self.max_results is not None:
1525+
warnings.warn(
1526+
"Cannot use bqstorage_client if max_results is set, "
1527+
"reverting to fetching data with the tabledata.list endpoint.",
1528+
stacklevel=2,
1529+
)
1530+
create_bqstorage_client = False
1531+
bqstorage_client = None
1532+
15221533
owns_bqstorage_client = False
15231534
if not bqstorage_client and create_bqstorage_client:
15241535
owns_bqstorage_client = True
@@ -1707,33 +1718,39 @@ def to_dataframe(
17071718
create_bqstorage_client = False
17081719
bqstorage_client = None
17091720

1710-
owns_bqstorage_client = False
1711-
if not bqstorage_client and create_bqstorage_client:
1712-
owns_bqstorage_client = True
1713-
bqstorage_client = self.client._create_bqstorage_client()
1714-
1715-
try:
1716-
progress_bar = self._get_progress_bar(progress_bar_type)
1721+
if pyarrow is not None:
1722+
# If pyarrow is available, calling to_arrow, then converting to a
1723+
# pandas dataframe is about 2x faster. This is because pandas.concat is
1724+
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
1725+
# usually no-copy.
1726+
record_batch = self.to_arrow(
1727+
progress_bar_type=progress_bar_type,
1728+
bqstorage_client=bqstorage_client,
1729+
create_bqstorage_client=create_bqstorage_client,
1730+
)
1731+
df = record_batch.to_pandas()
1732+
for column in dtypes:
1733+
df[column] = pandas.Series(df[column], dtype=dtypes[column])
1734+
return df
17171735

1718-
frames = []
1719-
for frame in self.to_dataframe_iterable(
1720-
bqstorage_client=bqstorage_client, dtypes=dtypes
1721-
):
1722-
frames.append(frame)
1736+
# The bqstorage_client is only used if pyarrow is available, so the
1737+
# rest of this method only needs to account for tabledata.list.
1738+
progress_bar = self._get_progress_bar(progress_bar_type)
17231739

1724-
if progress_bar is not None:
1725-
# In some cases, the number of total rows is not populated
1726-
# until the first page of rows is fetched. Update the
1727-
# progress bar's total to keep an accurate count.
1728-
progress_bar.total = progress_bar.total or self.total_rows
1729-
progress_bar.update(len(frame))
1740+
frames = []
1741+
for frame in self.to_dataframe_iterable(dtypes=dtypes):
1742+
frames.append(frame)
17301743

17311744
if progress_bar is not None:
1732-
# Indicate that the download has finished.
1733-
progress_bar.close()
1734-
finally:
1735-
if owns_bqstorage_client:
1736-
bqstorage_client.transport.channel.close()
1745+
# In some cases, the number of total rows is not populated
1746+
# until the first page of rows is fetched. Update the
1747+
# progress bar's total to keep an accurate count.
1748+
progress_bar.total = progress_bar.total or self.total_rows
1749+
progress_bar.update(len(frame))
1750+
1751+
if progress_bar is not None:
1752+
# Indicate that the download has finished.
1753+
progress_bar.close()
17371754

17381755
# Avoid concatting an empty list.
17391756
if not frames:

bigquery/tests/system.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2372,7 +2372,12 @@ def test_nested_table_to_dataframe(self):
23722372
row = df.iloc[0]
23732373
# verify the row content
23742374
self.assertEqual(row["string_col"], "Some value")
2375-
self.assertEqual(row["record_col"], record)
2375+
expected_keys = tuple(sorted(record.keys()))
2376+
row_keys = tuple(sorted(row["record_col"].keys()))
2377+
self.assertEqual(row_keys, expected_keys)
2378+
# Can't compare numpy arrays, which pyarrow encodes the embedded
2379+
# repeated column to, so convert to list.
2380+
self.assertEqual(list(row["record_col"]["nested_repeated"]), [0, 1, 2])
23762381
# verify that nested data can be accessed with indices/keys
23772382
self.assertEqual(row["record_col"]["nested_repeated"][0], 0)
23782383
self.assertEqual(

0 commit comments

Comments
 (0)