2222import datetime
2323import json
2424import operator
25+ import threading
26+ import time
2527import warnings
2628
2729import six
6971_MARKER = object ()
7072_PROGRESS_INTERVAL = 0.2 # Time between download status updates, in seconds.
7173
74+ # Send multiple updates from the worker threads, so there are at least a few
75+ # waiting next time the prgrogess bar is updated.
76+ _PROGRESS_UPDATES_PER_INTERVAL = 3
77+ _PROGRESS_WORKER_INTERVAL = _PROGRESS_INTERVAL / _PROGRESS_UPDATES_PER_INTERVAL
78+
7279
7380def _reference_getter (table ):
7481 """A :class:`~google.cloud.bigquery.table.TableReference` pointing to
@@ -1275,7 +1282,7 @@ def __repr__(self):
12751282 return "Row({}, {})" .format (self ._xxx_values , f2i )
12761283
12771284
1278- class _FakeQueue (object ):
1285+ class _NoopProgressBarQueue (object ):
12791286 """A fake Queue class that does nothing.
12801287
12811288 This is used when there is no progress bar to send updates to.
@@ -1403,7 +1410,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
14031410 return pandas .concat (frames )
14041411
14051412 def _to_dataframe_bqstorage_stream (
1406- self , bqstorage_client , dtypes , columns , session , stream , progress_queue
1413+ self , bqstorage_client , dtypes , columns , session , stream , worker_queue
14071414 ):
14081415 position = bigquery_storage_v1beta1 .types .StreamPosition (stream = stream )
14091416 rowstream = bqstorage_client .read_rows (position ).rows (session )
@@ -1415,7 +1422,7 @@ def _to_dataframe_bqstorage_stream(
14151422 frames .append (page .to_dataframe (dtypes = dtypes ))
14161423
14171424 try :
1418- progress_queue .put_nowait (page .num_items )
1425+ worker_queue .put_nowait (page .num_items )
14191426 except queue .Full :
14201427 # It's okay if we miss a few progress updates. Don't slow
14211428 # down parsing for that.
@@ -1430,6 +1437,30 @@ def _to_dataframe_bqstorage_stream(
14301437 # the end using manually-parsed schema.
14311438 return pandas .concat (frames )[columns ]
14321439
1440+ def _process_worker_updates (self , worker_queue , progress_queue ):
1441+ last_update_time = time .time ()
1442+ current_update = 0
1443+
1444+ # Sum all updates in a contant loop.
1445+ while True :
1446+ try :
1447+ current_update += worker_queue .get (timeout = _PROGRESS_INTERVAL )
1448+
1449+ # Time to send to the progress bar queue?
1450+ current_time = time .time ()
1451+ elapsed_time = current_time - last_update_time
1452+ if elapsed_time > _PROGRESS_WORKER_INTERVAL :
1453+ progress_queue .put (current_update )
1454+ last_update_time = current_time
1455+ current_update = 0
1456+
1457+ except queue .Empty :
1458+ # Keep going, unless there probably aren't going to be any
1459+ # additional updates.
1460+ if self ._to_dataframe_finished :
1461+ progress_queue .put (current_update )
1462+ return
1463+
14331464 def _process_progress_updates (self , progress_queue , progress_bar ):
14341465 if progress_bar is None :
14351466 return
@@ -1486,9 +1517,16 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
14861517 self ._to_dataframe_finished = False
14871518
14881519 # Create a queue to track progress updates across threads.
1489- progress_queue = _FakeQueue ()
1520+ worker_queue = _NoopProgressBarQueue ()
1521+ progress_queue = None
1522+ progress_thread = None
14901523 if progress_bar is not None :
1524+ worker_queue = queue .Queue ()
14911525 progress_queue = queue .Queue ()
1526+ progress_thread = threading .Thread (
1527+ target = self ._process_worker_updates , args = (worker_queue , progress_queue )
1528+ )
1529+ progress_thread .start ()
14921530
14931531 def get_frames (pool ):
14941532 frames = []
@@ -1505,7 +1543,7 @@ def get_frames(pool):
15051543 columns ,
15061544 session ,
15071545 stream ,
1508- progress_queue ,
1546+ worker_queue ,
15091547 )
15101548 for stream in session .streams
15111549 ]
@@ -1531,6 +1569,12 @@ def get_frames(pool):
15311569 # definition (enforced by the global interpreter lock).
15321570 self ._to_dataframe_finished = True
15331571
1572+ # Shutdown all background threads, now that they should know to
1573+ # exit early.
1574+ pool .shutdown (wait = True )
1575+ if progress_thread is not None :
1576+ progress_thread .join ()
1577+
15341578 # Update the progress bar one last time to close it.
15351579 self ._process_progress_updates (progress_queue , progress_bar )
15361580 return pandas .concat (frames )
0 commit comments