Skip to content

Commit 9812563

Browse files
committed
WIP: Add progress bar when BQ Storage API is used.
1 parent 6fc4af2 commit 9812563

File tree

1 file changed

+47
-2
lines changed

1 file changed

+47
-2
lines changed

bigquery/google/cloud/bigquery/table.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import datetime
2222
import json
2323
import operator
24+
import queue
2425
import warnings
2526

2627
import six
@@ -1255,6 +1256,16 @@ def __repr__(self):
12551256
return "Row({}, {})".format(self._xxx_values, f2i)
12561257

12571258

1259+
class _FakeQueue(object):
1260+
"""A fake Queue class that does nothing.
1261+
1262+
This is used when there is no progress bar to send updates to.
1263+
"""
1264+
1265+
def put_nowait(self, item):
1266+
"""Don't actually do anything with the item."""
1267+
1268+
12581269
class RowIterator(HTTPIterator):
12591270
"""A class for iterating through HTTP/JSON API row list responses.
12601271
@@ -1368,7 +1379,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
13681379

13691380
return pandas.concat(frames)
13701381

1371-
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
1382+
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
13721383
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
13731384
import concurrent.futures
13741385
from google.cloud import bigquery_storage_v1beta1
@@ -1407,6 +1418,11 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
14071418
# https://stackoverflow.com/a/29237343/101923
14081419
finished = False
14091420

1421+
# Create a queue to track progress updates across threads.
1422+
progress_queue = _FakeQueue()
1423+
if progress_bar is not None:
1424+
progress_queue = queue.Queue()
1425+
14101426
def get_dataframe(stream):
14111427
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
14121428
rowstream = bqstorage_client.read_rows(position).rows(session)
@@ -1417,6 +1433,13 @@ def get_dataframe(stream):
14171433
return
14181434
frames.append(page.to_dataframe(dtypes=dtypes))
14191435

1436+
try:
1437+
progress_queue.put_nowait(page.num_items)
1438+
except queue.Full:
1439+
# It's okay if we miss a few progress updates. Don't slow
1440+
# down parsing for that.
1441+
pass
1442+
14201443
# Avoid errors on unlucky streams with no blocks. pandas.concat
14211444
# will fail on an empty list.
14221445
if not frames:
@@ -1441,9 +1464,31 @@ def get_frames(pool):
14411464
not_done, timeout=_PROGRESS_INTERVAL
14421465
)
14431466
frames.extend([future.result() for future in done])
1467+
1468+
# The progress bar needs to update on the main thread to avoid
1469+
# contention over stdout / stderr.
1470+
if progress_bar is not None:
1471+
update_progress_bar()
1472+
14441473
return frames
14451474

1475+
def update_progress_bar():
1476+
if finished:
1477+
progress_bar.close()
1478+
return
1479+
1480+
# Output all updates since the last interval.
1481+
while True:
1482+
try:
1483+
next_update = progress_queue.get_nowait()
1484+
progress_bar.update(next_update)
1485+
except queue.Empty:
1486+
return
1487+
14461488
with concurrent.futures.ThreadPoolExecutor() as pool:
1489+
if progress_bar is not None:
1490+
pool.submit(update_progress_bar)
1491+
14471492
try:
14481493
frames = get_frames(pool)
14491494
finally:
@@ -1549,7 +1594,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
15491594

15501595
if bqstorage_client is not None:
15511596
try:
1552-
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
1597+
return self._to_dataframe_bqstorage(bqstorage_client, dtypes, progress_bar=progress_bar)
15531598
except google.api_core.exceptions.Forbidden:
15541599
# Don't hide errors such as insufficient permissions to create
15551600
# a read session, or the API is not enabled. Both of those are

0 commit comments

Comments
 (0)