Skip to content

Commit 8593692

Browse files
committed
Refactor _to_dataframe_bqstorage_stream
1 parent 3b258d9 commit 8593692

1 file changed

Lines changed: 36 additions & 26 deletions

File tree

bigquery/google/cloud/bigquery/table.py

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,6 +1368,27 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
13681368

13691369
return pandas.concat(frames)
13701370

1371+
def _to_dataframe_bqstorage_stream(
1372+
self, bqstorage_client, dtypes, columns, session, stream
1373+
):
1374+
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
1375+
rowstream = bqstorage_client.read_rows(position).rows(session)
1376+
1377+
frames = []
1378+
for page in rowstream.pages:
1379+
if self._to_dataframe_finished:
1380+
return
1381+
frames.append(page.to_dataframe(dtypes=dtypes))
1382+
1383+
# Avoid errors on unlucky streams with no blocks. pandas.concat
1384+
# will fail on an empty list.
1385+
if not frames:
1386+
return pandas.DataFrame(columns=columns)
1387+
1388+
# page.to_dataframe() does not preserve column order. Rearrange at
1389+
# the end using manually-parsed schema.
1390+
return pandas.concat(frames)[columns]
1391+
13711392
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
13721393
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
13731394
import concurrent.futures
@@ -1403,37 +1424,27 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
14031424
if not session.streams:
14041425
return pandas.DataFrame(columns=columns)
14051426

1406-
# Use finished to notify worker threads when to quit. See:
1407-
# https://stackoverflow.com/a/29237343/101923
1408-
finished = False
1409-
1410-
def get_dataframe(stream):
1411-
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
1412-
rowstream = bqstorage_client.read_rows(position).rows(session)
1413-
1414-
frames = []
1415-
for page in rowstream.pages:
1416-
if finished:
1417-
return
1418-
frames.append(page.to_dataframe(dtypes=dtypes))
1419-
1420-
# Avoid errors on unlucky streams with no blocks. pandas.concat
1421-
# will fail on an empty list.
1422-
if not frames:
1423-
return pandas.DataFrame(columns=columns)
1424-
1425-
# page.to_dataframe() does not preserve column order. Rearrange at
1426-
# the end using manually-parsed schema.
1427-
return pandas.concat(frames)[columns]
1427+
# Use _to_dataframe_finished to notify worker threads when to quit.
1428+
# See: https://stackoverflow.com/a/29237343/101923
1429+
self._to_dataframe_finished = False
14281430

14291431
def get_frames(pool):
14301432
frames = []
14311433

14321434
# Manually submit jobs and wait for download to complete rather
14331435
# than using pool.map because pool.map continues running in the
14341436
# background even if there is an exception on the main thread.
1437+
# See: https://github.com/googleapis/google-cloud-python/pull/7698
14351438
not_done = [
1436-
pool.submit(get_dataframe, stream) for stream in session.streams
1439+
pool.submit(
1440+
self._to_dataframe_bqstorage_stream,
1441+
bqstorage_client,
1442+
dtypes,
1443+
columns,
1444+
session,
1445+
stream,
1446+
)
1447+
for stream in session.streams
14371448
]
14381449

14391450
while not_done:
@@ -1450,10 +1461,9 @@ def get_frames(pool):
14501461
# No need for a lock because reading/replacing a variable is
14511462
# defined to be an atomic operation in the Python language
14521463
# definition (enforced by the global interpreter lock).
1453-
finished = True
1464+
self._to_dataframe_finished = True
14541465

1455-
# Use [columns] to ensure column order matches manually-parsed schema.
1456-
return pandas.concat(frames)[columns]
1466+
return pandas.concat(frames)
14571467

14581468
def _get_progress_bar(self, progress_bar_type):
14591469
"""Construct a tqdm progress bar object, if tqdm is installed."""

0 commit comments

Comments
 (0)