Skip to content

Commit 71112b0

Browse files
committed
Add worker queue for progress bar to prevent lost tqdm updates.
The worker queue runs in a background thread, so it's more likely to be able to keep up with the other workers that are adding to the worker queue.
1 parent 6e3037c commit 71112b0

File tree

2 files changed

+56
-9
lines changed

2 files changed

+56
-9
lines changed

bigquery/google/cloud/bigquery/table.py

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import datetime
2323
import json
2424
import operator
25+
import threading
26+
import time
2527
import warnings
2628

2729
import six
@@ -69,6 +71,11 @@
6971
_MARKER = object()
7072
_PROGRESS_INTERVAL = 0.2 # Time between download status updates, in seconds.
7173

74+
# Send multiple updates from the worker threads, so there are at least a few
75+
# waiting next time the prgrogess bar is updated.
76+
_PROGRESS_UPDATES_PER_INTERVAL = 3
77+
_PROGRESS_WORKER_INTERVAL = _PROGRESS_INTERVAL / _PROGRESS_UPDATES_PER_INTERVAL
78+
7279

7380
def _reference_getter(table):
7481
"""A :class:`~google.cloud.bigquery.table.TableReference` pointing to
@@ -1275,7 +1282,7 @@ def __repr__(self):
12751282
return "Row({}, {})".format(self._xxx_values, f2i)
12761283

12771284

1278-
class _FakeQueue(object):
1285+
class _NoopProgressBarQueue(object):
12791286
"""A fake Queue class that does nothing.
12801287
12811288
This is used when there is no progress bar to send updates to.
@@ -1403,7 +1410,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
14031410
return pandas.concat(frames)
14041411

14051412
def _to_dataframe_bqstorage_stream(
1406-
self, bqstorage_client, dtypes, columns, session, stream, progress_queue
1413+
self, bqstorage_client, dtypes, columns, session, stream, worker_queue
14071414
):
14081415
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
14091416
rowstream = bqstorage_client.read_rows(position).rows(session)
@@ -1415,7 +1422,7 @@ def _to_dataframe_bqstorage_stream(
14151422
frames.append(page.to_dataframe(dtypes=dtypes))
14161423

14171424
try:
1418-
progress_queue.put_nowait(page.num_items)
1425+
worker_queue.put_nowait(page.num_items)
14191426
except queue.Full:
14201427
# It's okay if we miss a few progress updates. Don't slow
14211428
# down parsing for that.
@@ -1430,6 +1437,30 @@ def _to_dataframe_bqstorage_stream(
14301437
# the end using manually-parsed schema.
14311438
return pandas.concat(frames)[columns]
14321439

1440+
def _process_worker_updates(self, worker_queue, progress_queue):
1441+
last_update_time = time.time()
1442+
current_update = 0
1443+
1444+
# Sum all updates in a contant loop.
1445+
while True:
1446+
try:
1447+
current_update += worker_queue.get(timeout=_PROGRESS_INTERVAL)
1448+
1449+
# Time to send to the progress bar queue?
1450+
current_time = time.time()
1451+
elapsed_time = current_time - last_update_time
1452+
if elapsed_time > _PROGRESS_WORKER_INTERVAL:
1453+
progress_queue.put(current_update)
1454+
last_update_time = current_time
1455+
current_update = 0
1456+
1457+
except queue.Empty:
1458+
# Keep going, unless there probably aren't going to be any
1459+
# additional updates.
1460+
if self._to_dataframe_finished:
1461+
progress_queue.put(current_update)
1462+
return
1463+
14331464
def _process_progress_updates(self, progress_queue, progress_bar):
14341465
if progress_bar is None:
14351466
return
@@ -1486,9 +1517,16 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
14861517
self._to_dataframe_finished = False
14871518

14881519
# Create a queue to track progress updates across threads.
1489-
progress_queue = _FakeQueue()
1520+
worker_queue = _NoopProgressBarQueue()
1521+
progress_queue = None
1522+
progress_thread = None
14901523
if progress_bar is not None:
1524+
worker_queue = queue.Queue()
14911525
progress_queue = queue.Queue()
1526+
progress_thread = threading.Thread(
1527+
target=self._process_worker_updates, args=(worker_queue, progress_queue)
1528+
)
1529+
progress_thread.start()
14921530

14931531
def get_frames(pool):
14941532
frames = []
@@ -1505,7 +1543,7 @@ def get_frames(pool):
15051543
columns,
15061544
session,
15071545
stream,
1508-
progress_queue,
1546+
worker_queue,
15091547
)
15101548
for stream in session.streams
15111549
]
@@ -1531,6 +1569,12 @@ def get_frames(pool):
15311569
# definition (enforced by the global interpreter lock).
15321570
self._to_dataframe_finished = True
15331571

1572+
# Shutdown all background threads, now that they should know to
1573+
# exit early.
1574+
pool.shutdown(wait=True)
1575+
if progress_thread is not None:
1576+
progress_thread.join()
1577+
15341578
# Update the progress bar one last time to close it.
15351579
self._process_progress_updates(progress_queue, progress_bar)
15361580
return pandas.concat(frames)

bigquery/tests/unit/test_table.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1857,7 +1857,7 @@ def blocking_to_dataframe(*args, **kwargs):
18571857
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)
18581858

18591859
# Test that full queue errors are ignored.
1860-
mock_queue = mock.create_autospec(mut._FakeQueue)
1860+
mock_queue = mock.create_autospec(mut._NoopProgressBarQueue)
18611861
mock_queue().put_nowait.side_effect = queue.Full
18621862

18631863
schema = [
@@ -1875,7 +1875,7 @@ def blocking_to_dataframe(*args, **kwargs):
18751875
selected_fields=schema,
18761876
)
18771877

1878-
with mock.patch.object(mut, "_FakeQueue", mock_queue), mock.patch(
1878+
with mock.patch.object(mut, "_NoopProgressBarQueue", mock_queue), mock.patch(
18791879
"concurrent.futures.wait", wraps=concurrent.futures.wait
18801880
) as mock_wait:
18811881
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)
@@ -1953,8 +1953,11 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
19531953
# Make sure that this test updated the progress bar once per page from
19541954
# each stream.
19551955
total_pages = len(streams) * len(mock_pages)
1956-
self.assertEqual(tqdm_mock().update.call_count, total_pages)
1957-
tqdm_mock().update.assert_called_with(len(page_items))
1956+
expected_total_rows = total_pages * len(page_items)
1957+
actual_total_rows = sum(
1958+
[args[0] for args, kwargs in tqdm_mock().update.call_args_list]
1959+
)
1960+
self.assertEqual(actual_total_rows, expected_total_rows)
19581961
tqdm_mock().close.assert_called_once()
19591962

19601963
@unittest.skipIf(pandas is None, "Requires `pandas`")

0 commit comments

Comments
 (0)