diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 101e754d118f..d50fec487a31 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -22,9 +22,12 @@ import datetime import json import operator +import threading +import time import warnings import six +from six.moves import queue try: from google.cloud import bigquery_storage_v1beta1 @@ -66,7 +69,12 @@ ) _TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"' _MARKER = object() -_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds. +_PROGRESS_INTERVAL = 0.2 # Time between download status updates, in seconds. + +# Send multiple updates from the worker threads, so there are at least a few +# waiting next time the prgrogess bar is updated. +_PROGRESS_UPDATES_PER_INTERVAL = 3 +_PROGRESS_WORKER_INTERVAL = _PROGRESS_INTERVAL / _PROGRESS_UPDATES_PER_INTERVAL def _reference_getter(table): @@ -1274,6 +1282,16 @@ def __repr__(self): return "Row({}, {})".format(self._xxx_values, f2i) +class _NoopProgressBarQueue(object): + """A fake Queue class that does nothing. + + This is used when there is no progress bar to send updates to. + """ + + def put_nowait(self, item): + """Don't actually do anything with the item.""" + + class RowIterator(HTTPIterator): """A class for iterating through HTTP/JSON API row list responses. @@ -1392,7 +1410,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None): return pandas.concat(frames) def _to_dataframe_bqstorage_stream( - self, bqstorage_client, dtypes, columns, session, stream + self, bqstorage_client, dtypes, columns, session, stream, worker_queue ): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) rowstream = bqstorage_client.read_rows(position).rows(session) @@ -1403,6 +1421,13 @@ def _to_dataframe_bqstorage_stream( return frames.append(page.to_dataframe(dtypes=dtypes)) + try: + worker_queue.put_nowait(page.num_items) + except queue.Full: + # It's okay if we miss a few progress updates. Don't slow + # down parsing for that. + pass + # Avoid errors on unlucky streams with no blocks. pandas.concat # will fail on an empty list. if not frames: @@ -1412,7 +1437,47 @@ def _to_dataframe_bqstorage_stream( # the end using manually-parsed schema. return pandas.concat(frames)[columns] - def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): + def _process_worker_updates(self, worker_queue, progress_queue): + last_update_time = time.time() + current_update = 0 + + # Sum all updates in a contant loop. + while True: + try: + current_update += worker_queue.get(timeout=_PROGRESS_INTERVAL) + + # Time to send to the progress bar queue? + current_time = time.time() + elapsed_time = current_time - last_update_time + if elapsed_time > _PROGRESS_WORKER_INTERVAL: + progress_queue.put(current_update) + last_update_time = current_time + current_update = 0 + + except queue.Empty: + # Keep going, unless there probably aren't going to be any + # additional updates. + if self._to_dataframe_finished: + progress_queue.put(current_update) + return + + def _process_progress_updates(self, progress_queue, progress_bar): + if progress_bar is None: + return + + # Output all updates since the last interval. + while True: + try: + next_update = progress_queue.get_nowait() + progress_bar.update(next_update) + except queue.Empty: + break + + if self._to_dataframe_finished: + progress_bar.close() + return + + def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" if bigquery_storage_v1beta1 is None: raise ValueError(_NO_BQSTORAGE_ERROR) @@ -1451,6 +1516,18 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): # See: https://stackoverflow.com/a/29237343/101923 self._to_dataframe_finished = False + # Create a queue to track progress updates across threads. + worker_queue = _NoopProgressBarQueue() + progress_queue = None + progress_thread = None + if progress_bar is not None: + worker_queue = queue.Queue() + progress_queue = queue.Queue() + progress_thread = threading.Thread( + target=self._process_worker_updates, args=(worker_queue, progress_queue) + ) + progress_thread.start() + def get_frames(pool): frames = [] @@ -1466,6 +1543,7 @@ def get_frames(pool): columns, session, stream, + worker_queue, ) for stream in session.streams ] @@ -1475,6 +1553,11 @@ def get_frames(pool): not_done, timeout=_PROGRESS_INTERVAL ) frames.extend([future.result() for future in done]) + + # The progress bar needs to update on the main thread to avoid + # contention over stdout / stderr. + self._process_progress_updates(progress_queue, progress_bar) + return frames with concurrent.futures.ThreadPoolExecutor() as pool: @@ -1486,6 +1569,14 @@ def get_frames(pool): # definition (enforced by the global interpreter lock). self._to_dataframe_finished = True + # Shutdown all background threads, now that they should know to + # exit early. + pool.shutdown(wait=True) + if progress_thread is not None: + progress_thread.join() + + # Update the progress bar one last time to close it. + self._process_progress_updates(progress_queue, progress_bar) return pandas.concat(frames) def _get_progress_bar(self, progress_bar_type): @@ -1585,7 +1676,9 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non if bqstorage_client is not None: try: - return self._to_dataframe_bqstorage(bqstorage_client, dtypes) + return self._to_dataframe_bqstorage( + bqstorage_client, dtypes, progress_bar=progress_bar + ) except google.api_core.exceptions.Forbidden: # Don't hide errors such as insufficient permissions to create # a read session, or the API is not enabled. Both of those are diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index ef397195882f..18ca125e804c 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -22,6 +22,7 @@ import mock import pytest import six +from six.moves import queue import google.api_core.exceptions @@ -1816,9 +1817,12 @@ def test_to_dataframe_w_bqstorage_nonempty(self): bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) - session = bigquery_storage_v1beta1.types.ReadSession( - streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}] - ) + streams = [ + # Use two streams we want to check frames are read from each stream. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) session.avro_schema.schema = json.dumps( { "fields": [ @@ -1836,20 +1840,25 @@ def test_to_dataframe_w_bqstorage_nonempty(self): mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_rowstream.rows.return_value = mock_rows + page_items = [ + {"colA": 1, "colB": "abc", "colC": 2.0}, + {"colA": -1, "colB": "def", "colC": 4.0}, + ] def blocking_to_dataframe(*args, **kwargs): # Sleep for longer than the waiting interval so that we know we're # only reading one page per loop at most. time.sleep(2 * mut._PROGRESS_INTERVAL) - return pandas.DataFrame( - {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, - columns=["colA", "colB", "colC"], - ) + return pandas.DataFrame(page_items, columns=["colA", "colB", "colC"]) mock_page = mock.create_autospec(reader.ReadRowsPage) mock_page.to_dataframe.side_effect = blocking_to_dataframe - mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) - type(mock_rows).pages = mock_pages + mock_pages = (mock_page, mock_page, mock_page) + type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) + + # Test that full queue errors are ignored. + mock_queue = mock.create_autospec(mut._NoopProgressBarQueue) + mock_queue().put_nowait.side_effect = queue.Full schema = [ schema.SchemaField("colA", "IGNORED"), @@ -1866,17 +1875,100 @@ def blocking_to_dataframe(*args, **kwargs): selected_fields=schema, ) - with mock.patch( + with mock.patch.object(mut, "_NoopProgressBarQueue", mock_queue), mock.patch( "concurrent.futures.wait", wraps=concurrent.futures.wait ) as mock_wait: got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + # Are the columns in the expected order? column_names = ["colA", "colC", "colB"] self.assertEqual(list(got), column_names) - self.assertEqual(len(got.index), 6) + + # Have expected number of rows? + total_pages = len(streams) * len(mock_pages) + total_rows = len(page_items) * total_pages + self.assertEqual(len(got.index), total_rows) + # Make sure that this test looped through multiple progress intervals. self.assertGreaterEqual(mock_wait.call_count, 2) + # Make sure that this test pushed to the progress queue. + self.assertEqual(mock_queue().put_nowait.call_count, total_pages) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + @unittest.skipIf(tqdm is None, "Requires `tqdm`") + @mock.patch("tqdm.tqdm") + def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + # Speed up testing. + mut._PROGRESS_INTERVAL = 0.01 + + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + streams = [ + # Use two streams we want to check that progress bar updates are + # sent from each stream. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) + session.avro_schema.schema = json.dumps({"fields": [{"name": "testcol"}]}) + bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + mock_page = mock.create_autospec(reader.ReadRowsPage) + page_items = [-1, 0, 1] + type(mock_page).num_items = mock.PropertyMock(return_value=len(page_items)) + + def blocking_to_dataframe(*args, **kwargs): + # Sleep for longer than the waiting interval. This ensures the + # progress_queue gets written to more than once because it gives + # the worker->progress updater time to sum intermediate updates. + time.sleep(2 * mut._PROGRESS_INTERVAL) + return pandas.DataFrame({"testcol": page_items}) + + mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_pages = (mock_page, mock_page, mock_page, mock_page, mock_page) + type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) + + schema = [schema.SchemaField("testcol", "IGNORED")] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, + ) + + row_iterator.to_dataframe( + bqstorage_client=bqstorage_client, progress_bar_type="tqdm" + ) + + # Make sure that this test updated the progress bar once per page from + # each stream. + total_pages = len(streams) * len(mock_pages) + expected_total_rows = total_pages * len(page_items) + progress_updates = [ + args[0] for args, kwargs in tqdm_mock().update.call_args_list + ] + # Should have sent >1 update due to delay in blocking_to_dataframe. + self.assertGreater(len(progress_updates), 1) + self.assertEqual(sum(progress_updates), expected_total_rows) + tqdm_mock().close.assert_called_once() + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"