@@ -1368,6 +1368,27 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
13681368
13691369 return pandas .concat (frames )
13701370
1371+ def _to_dataframe_bqstorage_stream (
1372+ self , bqstorage_client , dtypes , columns , session , stream
1373+ ):
1374+ position = bigquery_storage_v1beta1 .types .StreamPosition (stream = stream )
1375+ rowstream = bqstorage_client .read_rows (position ).rows (session )
1376+
1377+ frames = []
1378+ for page in rowstream .pages :
1379+ if self ._to_dataframe_finished :
1380+ return
1381+ frames .append (page .to_dataframe (dtypes = dtypes ))
1382+
1383+ # Avoid errors on unlucky streams with no blocks. pandas.concat
1384+ # will fail on an empty list.
1385+ if not frames :
1386+ return pandas .DataFrame (columns = columns )
1387+
1388+ # page.to_dataframe() does not preserve column order. Rearrange at
1389+ # the end using manually-parsed schema.
1390+ return pandas .concat (frames )[columns ]
1391+
13711392 def _to_dataframe_bqstorage (self , bqstorage_client , dtypes ):
13721393 """Use (faster, but billable) BQ Storage API to construct DataFrame."""
13731394 import concurrent .futures
@@ -1403,37 +1424,27 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
14031424 if not session .streams :
14041425 return pandas .DataFrame (columns = columns )
14051426
1406- # Use finished to notify worker threads when to quit. See:
1407- # https://stackoverflow.com/a/29237343/101923
1408- finished = False
1409-
1410- def get_dataframe (stream ):
1411- position = bigquery_storage_v1beta1 .types .StreamPosition (stream = stream )
1412- rowstream = bqstorage_client .read_rows (position ).rows (session )
1413-
1414- frames = []
1415- for page in rowstream .pages :
1416- if finished :
1417- return
1418- frames .append (page .to_dataframe (dtypes = dtypes ))
1419-
1420- # Avoid errors on unlucky streams with no blocks. pandas.concat
1421- # will fail on an empty list.
1422- if not frames :
1423- return pandas .DataFrame (columns = columns )
1424-
1425- # page.to_dataframe() does not preserve column order. Rearrange at
1426- # the end using manually-parsed schema.
1427- return pandas .concat (frames )[columns ]
1427+ # Use _to_dataframe_finished to notify worker threads when to quit.
1428+ # See: https://stackoverflow.com/a/29237343/101923
1429+ self ._to_dataframe_finished = False
14281430
14291431 def get_frames (pool ):
14301432 frames = []
14311433
14321434 # Manually submit jobs and wait for download to complete rather
14331435 # than using pool.map because pool.map continues running in the
14341436 # background even if there is an exception on the main thread.
1437+ # See: https://github.com/googleapis/google-cloud-python/pull/7698
14351438 not_done = [
1436- pool .submit (get_dataframe , stream ) for stream in session .streams
1439+ pool .submit (
1440+ self ._to_dataframe_bqstorage_stream ,
1441+ bqstorage_client ,
1442+ dtypes ,
1443+ columns ,
1444+ session ,
1445+ stream ,
1446+ )
1447+ for stream in session .streams
14371448 ]
14381449
14391450 while not_done :
@@ -1450,10 +1461,9 @@ def get_frames(pool):
14501461 # No need for a lock because reading/replacing a variable is
14511462 # defined to be an atomic operation in the Python language
14521463 # definition (enforced by the global interpreter lock).
1453- finished = True
1464+ self . _to_dataframe_finished = True
14541465
1455- # Use [columns] to ensure column order matches manually-parsed schema.
1456- return pandas .concat (frames )[columns ]
1466+ return pandas .concat (frames )
14571467
14581468 def _get_progress_bar (self , progress_bar_type ):
14591469 """Construct a tqdm progress bar object, if tqdm is installed."""
0 commit comments