2121import datetime
2222import json
2323import operator
24+ import queue
2425import warnings
2526
2627import six
@@ -1255,6 +1256,16 @@ def __repr__(self):
12551256 return "Row({}, {})" .format (self ._xxx_values , f2i )
12561257
12571258
1259+ class _FakeQueue (object ):
1260+ """A fake Queue class that does nothing.
1261+
1262+ This is used when there is no progress bar to send updates to.
1263+ """
1264+
1265+ def put_nowait (self , item ):
1266+ """Don't actually do anything with the item."""
1267+
1268+
12581269class RowIterator (HTTPIterator ):
12591270 """A class for iterating through HTTP/JSON API row list responses.
12601271
@@ -1368,7 +1379,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
13681379
13691380 return pandas .concat (frames )
13701381
1371- def _to_dataframe_bqstorage (self , bqstorage_client , dtypes ):
1382+ def _to_dataframe_bqstorage (self , bqstorage_client , dtypes , progress_bar = None ):
13721383 """Use (faster, but billable) BQ Storage API to construct DataFrame."""
13731384 import concurrent .futures
13741385 from google .cloud import bigquery_storage_v1beta1
@@ -1407,6 +1418,11 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
14071418 # https://stackoverflow.com/a/29237343/101923
14081419 finished = False
14091420
1421+ # Create a queue to track progress updates across threads.
1422+ progress_queue = _FakeQueue ()
1423+ if progress_bar is not None :
1424+ progress_queue = queue .Queue ()
1425+
14101426 def get_dataframe (stream ):
14111427 position = bigquery_storage_v1beta1 .types .StreamPosition (stream = stream )
14121428 rowstream = bqstorage_client .read_rows (position ).rows (session )
@@ -1417,6 +1433,13 @@ def get_dataframe(stream):
14171433 return
14181434 frames .append (page .to_dataframe (dtypes = dtypes ))
14191435
1436+ try :
1437+ progress_queue .put_nowait (page .num_items )
1438+ except queue .Full :
1439+ # It's okay if we miss a few progress updates. Don't slow
1440+ # down parsing for that.
1441+ pass
1442+
14201443 # Avoid errors on unlucky streams with no blocks. pandas.concat
14211444 # will fail on an empty list.
14221445 if not frames :
@@ -1441,9 +1464,31 @@ def get_frames(pool):
14411464 not_done , timeout = _PROGRESS_INTERVAL
14421465 )
14431466 frames .extend ([future .result () for future in done ])
1467+
1468+ # The progress bar needs to update on the main thread to avoid
1469+ # contention over stdout / stderr.
1470+ if progress_bar is not None :
1471+ update_progress_bar ()
1472+
14441473 return frames
14451474
1475+ def update_progress_bar ():
1476+ if finished :
1477+ progress_bar .close ()
1478+ return
1479+
1480+ # Output all updates since the last interval.
1481+ while True :
1482+ try :
1483+ next_update = progress_queue .get_nowait ()
1484+ progress_bar .update (next_update )
1485+ except queue .Empty :
1486+ return
1487+
14461488 with concurrent .futures .ThreadPoolExecutor () as pool :
1489+ if progress_bar is not None :
1490+ pool .submit (update_progress_bar )
1491+
14471492 try :
14481493 frames = get_frames (pool )
14491494 finally :
@@ -1549,7 +1594,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
15491594
15501595 if bqstorage_client is not None :
15511596 try :
1552- return self ._to_dataframe_bqstorage (bqstorage_client , dtypes )
1597+ return self ._to_dataframe_bqstorage (bqstorage_client , dtypes , progress_bar = progress_bar )
15531598 except google .api_core .exceptions .Forbidden :
15541599 # Don't hide errors such as insufficient permissions to create
15551600 # a read session, or the API is not enabled. Both of those are
0 commit comments