|
21 | 21 | import datetime |
22 | 22 | import json |
23 | 23 | import operator |
| 24 | +import queue |
24 | 25 | import warnings |
25 | 26 |
|
26 | 27 | import six |
|
56 | 57 | ) |
57 | 58 | _TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"' |
58 | 59 | _MARKER = object() |
| 60 | +_PROGRESS_INTERVAL = 1.0 # Time between tqdm updates, in seconds. |
59 | 61 |
|
60 | 62 |
|
61 | 63 | def _reference_getter(table): |
@@ -1254,6 +1256,16 @@ def __repr__(self): |
1254 | 1256 | return "Row({}, {})".format(self._xxx_values, f2i) |
1255 | 1257 |
|
1256 | 1258 |
|
| 1259 | +class _FakeQueue(object): |
| 1260 | + """A fake Queue class that does nothing. |
| 1261 | +
|
| 1262 | + This is used when there is no progress bar to send updates to. |
| 1263 | + """ |
| 1264 | + |
| 1265 | + def put(self, item): |
| 1266 | + """Don't actually do anything with the item.""" |
| 1267 | + |
| 1268 | + |
1257 | 1269 | class RowIterator(HTTPIterator): |
1258 | 1270 | """A class for iterating through HTTP/JSON API row list responses. |
1259 | 1271 |
|
@@ -1367,7 +1379,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None): |
1367 | 1379 |
|
1368 | 1380 | return pandas.concat(frames) |
1369 | 1381 |
|
1370 | | - def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): |
| 1382 | + def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None): |
1371 | 1383 | """Use (faster, but billable) BQ Storage API to construct DataFrame.""" |
1372 | 1384 | import concurrent.futures |
1373 | 1385 | from google.cloud import bigquery_storage_v1beta1 |
@@ -1402,16 +1414,57 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): |
1402 | 1414 | if not session.streams: |
1403 | 1415 | return pandas.DataFrame(columns=columns) |
1404 | 1416 |
|
| 1417 | + exit_early = False |
| 1418 | + progress_queue = _FakeQueue() |
| 1419 | + if progress_bar is not None: |
| 1420 | + progress_queue = queue.Queue() |
| 1421 | + |
1405 | 1422 | def get_dataframe(stream): |
1406 | 1423 | position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) |
1407 | | - rowstream = bqstorage_client.read_rows(position) |
1408 | | - return rowstream.to_dataframe(session, dtypes=dtypes) |
| 1424 | + rowstream = bqstorage_client.read_rows(position).rows(session) |
| 1425 | + |
| 1426 | + frames = [] |
| 1427 | + for page in rowstream.pages: |
| 1428 | + if exit_early: |
| 1429 | + return pandas.DataFrame(columns=columns) |
| 1430 | + frames.append(page.to_dataframe(dtypes=dtypes)) |
| 1431 | + progress_queue.put(page.num_items) |
| 1432 | + |
| 1433 | + # page.to_dataframe() does not preserve column order. Rearrange at |
| 1434 | + # the end using manually-parsed schema. |
| 1435 | + return pandas.concat(frames)[columns] |
| 1436 | + |
| 1437 | + def update_progress_bar(): |
| 1438 | + try: |
| 1439 | + while True: |
| 1440 | + next_update = progress_queue.get_nowait() |
| 1441 | + progress_bar.update(next_update) |
| 1442 | + except queue.Empty: |
| 1443 | + pass |
| 1444 | + |
| 1445 | + def frames_with_progress_bar(pool): |
| 1446 | + # Write progress to main thread. |
| 1447 | + frames = [] |
| 1448 | + not_done = [pool.submit(get_dataframe, stream) for stream in session.streams] |
| 1449 | + |
| 1450 | + while not_done: |
| 1451 | + done, not_done = concurrent.futures.wait(not_done, timeout=_PROGRESS_INTERVAL) |
| 1452 | + frames.extend([future.result() for future in done]) |
| 1453 | + update_progress_bar() |
| 1454 | + |
| 1455 | + return frames |
1409 | 1456 |
|
1410 | 1457 | with concurrent.futures.ThreadPoolExecutor() as pool: |
1411 | | - frames = pool.map(get_dataframe, session.streams) |
| 1458 | + try: |
| 1459 | + if progress_bar is None: |
| 1460 | + frames = pool.map(get_dataframe, session.streams) |
| 1461 | + else: |
| 1462 | + frames = frames_with_progress_bar(pool) |
| 1463 | + except: |
| 1464 | + exit_early = True |
| 1465 | + raise |
1412 | 1466 |
|
1413 | | - # rowstream.to_dataframe() does not preserve column order. Rearrange at |
1414 | | - # the end using manually-parsed schema. |
| 1467 | + # Use [columns] to ensure column order matches manually-parsed schema. |
1415 | 1468 | return pandas.concat(frames)[columns] |
1416 | 1469 |
|
1417 | 1470 | def _get_progress_bar(self, progress_bar_type): |
@@ -1508,7 +1561,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non |
1508 | 1561 |
|
1509 | 1562 | if bqstorage_client is not None: |
1510 | 1563 | try: |
1511 | | - return self._to_dataframe_bqstorage(bqstorage_client, dtypes) |
| 1564 | + return self._to_dataframe_bqstorage(bqstorage_client, dtypes, progress_bar=progress_bar) |
1512 | 1565 | except google.api_core.exceptions.Forbidden: |
1513 | 1566 | # Don't hide errors such as insufficient permissions to create |
1514 | 1567 | # a read session, or the API is not enabled. Both of those are |
|
0 commit comments