Skip to content

Commit 0d96dbd

Browse files
committed
WIP: Add progress bar when BQ Storage API is used.
1 parent 2776aed commit 0d96dbd

1 file changed

Lines changed: 60 additions & 7 deletions

File tree

bigquery/google/cloud/bigquery/table.py

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import datetime
2222
import json
2323
import operator
24+
import queue
2425
import warnings
2526

2627
import six
@@ -56,6 +57,7 @@
5657
)
5758
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
5859
_MARKER = object()
60+
_PROGRESS_INTERVAL = 1.0 # Time between tqdm updates, in seconds.
5961

6062

6163
def _reference_getter(table):
@@ -1254,6 +1256,16 @@ def __repr__(self):
12541256
return "Row({}, {})".format(self._xxx_values, f2i)
12551257

12561258

1259+
class _FakeQueue(object):
1260+
"""A fake Queue class that does nothing.
1261+
1262+
This is used when there is no progress bar to send updates to.
1263+
"""
1264+
1265+
def put(self, item):
1266+
"""Don't actually do anything with the item."""
1267+
1268+
12571269
class RowIterator(HTTPIterator):
12581270
"""A class for iterating through HTTP/JSON API row list responses.
12591271
@@ -1367,7 +1379,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
13671379

13681380
return pandas.concat(frames)
13691381

1370-
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
1382+
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
13711383
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
13721384
import concurrent.futures
13731385
from google.cloud import bigquery_storage_v1beta1
@@ -1402,16 +1414,57 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
14021414
if not session.streams:
14031415
return pandas.DataFrame(columns=columns)
14041416

1417+
exit_early = False
1418+
progress_queue = _FakeQueue()
1419+
if progress_bar is not None:
1420+
progress_queue = queue.Queue()
1421+
14051422
def get_dataframe(stream):
14061423
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
1407-
rowstream = bqstorage_client.read_rows(position)
1408-
return rowstream.to_dataframe(session, dtypes=dtypes)
1424+
rowstream = bqstorage_client.read_rows(position).rows(session)
1425+
1426+
frames = []
1427+
for page in rowstream.pages:
1428+
if exit_early:
1429+
return pandas.DataFrame(columns=columns)
1430+
frames.append(page.to_dataframe(dtypes=dtypes))
1431+
progress_queue.put(page.num_items)
1432+
1433+
# page.to_dataframe() does not preserve column order. Rearrange at
1434+
# the end using manually-parsed schema.
1435+
return pandas.concat(frames)[columns]
1436+
1437+
def update_progress_bar():
1438+
try:
1439+
while True:
1440+
next_update = progress_queue.get_nowait()
1441+
progress_bar.update(next_update)
1442+
except queue.Empty:
1443+
pass
1444+
1445+
def frames_with_progress_bar(pool):
1446+
# Write progress to main thread.
1447+
frames = []
1448+
not_done = [pool.submit(get_dataframe, stream) for stream in session.streams]
1449+
1450+
while not_done:
1451+
done, not_done = concurrent.futures.wait(not_done, timeout=_PROGRESS_INTERVAL)
1452+
frames.extend([future.result() for future in done])
1453+
update_progress_bar()
1454+
1455+
return frames
14091456

14101457
with concurrent.futures.ThreadPoolExecutor() as pool:
1411-
frames = pool.map(get_dataframe, session.streams)
1458+
try:
1459+
if progress_bar is None:
1460+
frames = pool.map(get_dataframe, session.streams)
1461+
else:
1462+
frames = frames_with_progress_bar(pool)
1463+
except:
1464+
exit_early = True
1465+
raise
14121466

1413-
# rowstream.to_dataframe() does not preserve column order. Rearrange at
1414-
# the end using manually-parsed schema.
1467+
# Use [columns] to ensure column order matches manually-parsed schema.
14151468
return pandas.concat(frames)[columns]
14161469

14171470
def _get_progress_bar(self, progress_bar_type):
@@ -1508,7 +1561,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
15081561

15091562
if bqstorage_client is not None:
15101563
try:
1511-
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
1564+
return self._to_dataframe_bqstorage(bqstorage_client, dtypes, progress_bar=progress_bar)
15121565
except google.api_core.exceptions.Forbidden:
15131566
# Don't hide errors such as insufficient permissions to create
15141567
# a read session, or the API is not enabled. Both of those are

0 commit comments

Comments
 (0)