From 6e3037c47adac3c99c46b05d7b4cd4df1172a8d6 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Apr 2019 15:58:34 -0700 Subject: [PATCH 1/3] fix: `to_dataframe` respects `progress_bar_type` with BQ Storage API * Add unit test for progress bar. * Add test for full queue. --- bigquery/google/cloud/bigquery/table.py | 57 ++++++++++++- bigquery/tests/unit/test_table.py | 102 +++++++++++++++++++++--- 2 files changed, 144 insertions(+), 15 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 101e754d118f..92c3e3ede08f 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -25,6 +25,7 @@ import warnings import six +from six.moves import queue try: from google.cloud import bigquery_storage_v1beta1 @@ -66,7 +67,7 @@ ) _TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"' _MARKER = object() -_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds. +_PROGRESS_INTERVAL = 0.2 # Time between download status updates, in seconds. def _reference_getter(table): @@ -1274,6 +1275,16 @@ def __repr__(self): return "Row({}, {})".format(self._xxx_values, f2i) +class _FakeQueue(object): + """A fake Queue class that does nothing. + + This is used when there is no progress bar to send updates to. + """ + + def put_nowait(self, item): + """Don't actually do anything with the item.""" + + class RowIterator(HTTPIterator): """A class for iterating through HTTP/JSON API row list responses. @@ -1392,7 +1403,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None): return pandas.concat(frames) def _to_dataframe_bqstorage_stream( - self, bqstorage_client, dtypes, columns, session, stream + self, bqstorage_client, dtypes, columns, session, stream, progress_queue ): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) rowstream = bqstorage_client.read_rows(position).rows(session) @@ -1403,6 +1414,13 @@ def _to_dataframe_bqstorage_stream( return frames.append(page.to_dataframe(dtypes=dtypes)) + try: + progress_queue.put_nowait(page.num_items) + except queue.Full: + # It's okay if we miss a few progress updates. Don't slow + # down parsing for that. + pass + # Avoid errors on unlucky streams with no blocks. pandas.concat # will fail on an empty list. if not frames: @@ -1412,7 +1430,23 @@ def _to_dataframe_bqstorage_stream( # the end using manually-parsed schema. return pandas.concat(frames)[columns] - def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): + def _process_progress_updates(self, progress_queue, progress_bar): + if progress_bar is None: + return + + # Output all updates since the last interval. + while True: + try: + next_update = progress_queue.get_nowait() + progress_bar.update(next_update) + except queue.Empty: + break + + if self._to_dataframe_finished: + progress_bar.close() + return + + def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" if bigquery_storage_v1beta1 is None: raise ValueError(_NO_BQSTORAGE_ERROR) @@ -1451,6 +1485,11 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): # See: https://stackoverflow.com/a/29237343/101923 self._to_dataframe_finished = False + # Create a queue to track progress updates across threads. + progress_queue = _FakeQueue() + if progress_bar is not None: + progress_queue = queue.Queue() + def get_frames(pool): frames = [] @@ -1466,6 +1505,7 @@ def get_frames(pool): columns, session, stream, + progress_queue, ) for stream in session.streams ] @@ -1475,6 +1515,11 @@ def get_frames(pool): not_done, timeout=_PROGRESS_INTERVAL ) frames.extend([future.result() for future in done]) + + # The progress bar needs to update on the main thread to avoid + # contention over stdout / stderr. + self._process_progress_updates(progress_queue, progress_bar) + return frames with concurrent.futures.ThreadPoolExecutor() as pool: @@ -1486,6 +1531,8 @@ def get_frames(pool): # definition (enforced by the global interpreter lock). self._to_dataframe_finished = True + # Update the progress bar one last time to close it. + self._process_progress_updates(progress_queue, progress_bar) return pandas.concat(frames) def _get_progress_bar(self, progress_bar_type): @@ -1585,7 +1632,9 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non if bqstorage_client is not None: try: - return self._to_dataframe_bqstorage(bqstorage_client, dtypes) + return self._to_dataframe_bqstorage( + bqstorage_client, dtypes, progress_bar=progress_bar + ) except google.api_core.exceptions.Forbidden: # Don't hide errors such as insufficient permissions to create # a read session, or the API is not enabled. Both of those are diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index ef397195882f..106b7d1ee585 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -22,6 +22,7 @@ import mock import pytest import six +from six.moves import queue import google.api_core.exceptions @@ -1816,9 +1817,12 @@ def test_to_dataframe_w_bqstorage_nonempty(self): bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) - session = bigquery_storage_v1beta1.types.ReadSession( - streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}] - ) + streams = [ + # Use two streams we want to check frames are read from each stream. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) session.avro_schema.schema = json.dumps( { "fields": [ @@ -1836,20 +1840,25 @@ def test_to_dataframe_w_bqstorage_nonempty(self): mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_rowstream.rows.return_value = mock_rows + page_items = [ + {"colA": 1, "colB": "abc", "colC": 2.0}, + {"colA": -1, "colB": "def", "colC": 4.0}, + ] def blocking_to_dataframe(*args, **kwargs): # Sleep for longer than the waiting interval so that we know we're # only reading one page per loop at most. time.sleep(2 * mut._PROGRESS_INTERVAL) - return pandas.DataFrame( - {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, - columns=["colA", "colB", "colC"], - ) + return pandas.DataFrame(page_items, columns=["colA", "colB", "colC"]) mock_page = mock.create_autospec(reader.ReadRowsPage) mock_page.to_dataframe.side_effect = blocking_to_dataframe - mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) - type(mock_rows).pages = mock_pages + mock_pages = (mock_page, mock_page, mock_page) + type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) + + # Test that full queue errors are ignored. + mock_queue = mock.create_autospec(mut._FakeQueue) + mock_queue().put_nowait.side_effect = queue.Full schema = [ schema.SchemaField("colA", "IGNORED"), @@ -1866,17 +1875,88 @@ def blocking_to_dataframe(*args, **kwargs): selected_fields=schema, ) - with mock.patch( + with mock.patch.object(mut, "_FakeQueue", mock_queue), mock.patch( "concurrent.futures.wait", wraps=concurrent.futures.wait ) as mock_wait: got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + # Are the columns in the expected order? column_names = ["colA", "colC", "colB"] self.assertEqual(list(got), column_names) - self.assertEqual(len(got.index), 6) + + # Have expected number of rows? + total_pages = len(streams) * len(mock_pages) + total_rows = len(page_items) * total_pages + self.assertEqual(len(got.index), total_rows) + # Make sure that this test looped through multiple progress intervals. self.assertGreaterEqual(mock_wait.call_count, 2) + # Make sure that this test pushed to the progress queue. + self.assertEqual(mock_queue().put_nowait.call_count, total_pages) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + @unittest.skipIf(tqdm is None, "Requires `tqdm`") + @mock.patch("tqdm.tqdm") + def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + # Speed up testing. + mut._PROGRESS_INTERVAL = 0.01 + + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + streams = [ + # Use two streams we want to check that progress bar updates are + # sent from each stream. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) + session.avro_schema.schema = json.dumps({"fields": [{"name": "testcol"}]}) + bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + + mock_page = mock.create_autospec(reader.ReadRowsPage) + page_items = [-1, 0, 1] + type(mock_page).num_items = mock.PropertyMock(return_value=len(page_items)) + mock_page.to_dataframe.return_value = pandas.DataFrame({"testcol": page_items}) + mock_pages = (mock_page, mock_page, mock_page, mock_page, mock_page) + type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) + + schema = [schema.SchemaField("testcol", "IGNORED")] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, + ) + + row_iterator.to_dataframe( + bqstorage_client=bqstorage_client, progress_bar_type="tqdm" + ) + + # Make sure that this test updated the progress bar once per page from + # each stream. + total_pages = len(streams) * len(mock_pages) + self.assertEqual(tqdm_mock().update.call_count, total_pages) + tqdm_mock().update.assert_called_with(len(page_items)) + tqdm_mock().close.assert_called_once() + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" From 71112b0a9e6fc4c25346c596567d5a82cd2f8d68 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 22 Apr 2019 17:28:25 -0700 Subject: [PATCH 2/3] Add worker queue for progress bar to prevent lost tqdm updates. The worker queue runs in a background thread, so it's more likely to be able to keep up with the other workers that are adding to the worker queue. --- bigquery/google/cloud/bigquery/table.py | 54 ++++++++++++++++++++++--- bigquery/tests/unit/test_table.py | 11 +++-- 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 92c3e3ede08f..d50fec487a31 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -22,6 +22,8 @@ import datetime import json import operator +import threading +import time import warnings import six @@ -69,6 +71,11 @@ _MARKER = object() _PROGRESS_INTERVAL = 0.2 # Time between download status updates, in seconds. +# Send multiple updates from the worker threads, so there are at least a few +# waiting next time the prgrogess bar is updated. +_PROGRESS_UPDATES_PER_INTERVAL = 3 +_PROGRESS_WORKER_INTERVAL = _PROGRESS_INTERVAL / _PROGRESS_UPDATES_PER_INTERVAL + def _reference_getter(table): """A :class:`~google.cloud.bigquery.table.TableReference` pointing to @@ -1275,7 +1282,7 @@ def __repr__(self): return "Row({}, {})".format(self._xxx_values, f2i) -class _FakeQueue(object): +class _NoopProgressBarQueue(object): """A fake Queue class that does nothing. This is used when there is no progress bar to send updates to. @@ -1403,7 +1410,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None): return pandas.concat(frames) def _to_dataframe_bqstorage_stream( - self, bqstorage_client, dtypes, columns, session, stream, progress_queue + self, bqstorage_client, dtypes, columns, session, stream, worker_queue ): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) rowstream = bqstorage_client.read_rows(position).rows(session) @@ -1415,7 +1422,7 @@ def _to_dataframe_bqstorage_stream( frames.append(page.to_dataframe(dtypes=dtypes)) try: - progress_queue.put_nowait(page.num_items) + worker_queue.put_nowait(page.num_items) except queue.Full: # It's okay if we miss a few progress updates. Don't slow # down parsing for that. @@ -1430,6 +1437,30 @@ def _to_dataframe_bqstorage_stream( # the end using manually-parsed schema. return pandas.concat(frames)[columns] + def _process_worker_updates(self, worker_queue, progress_queue): + last_update_time = time.time() + current_update = 0 + + # Sum all updates in a contant loop. + while True: + try: + current_update += worker_queue.get(timeout=_PROGRESS_INTERVAL) + + # Time to send to the progress bar queue? + current_time = time.time() + elapsed_time = current_time - last_update_time + if elapsed_time > _PROGRESS_WORKER_INTERVAL: + progress_queue.put(current_update) + last_update_time = current_time + current_update = 0 + + except queue.Empty: + # Keep going, unless there probably aren't going to be any + # additional updates. + if self._to_dataframe_finished: + progress_queue.put(current_update) + return + def _process_progress_updates(self, progress_queue, progress_bar): if progress_bar is None: return @@ -1486,9 +1517,16 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None): self._to_dataframe_finished = False # Create a queue to track progress updates across threads. - progress_queue = _FakeQueue() + worker_queue = _NoopProgressBarQueue() + progress_queue = None + progress_thread = None if progress_bar is not None: + worker_queue = queue.Queue() progress_queue = queue.Queue() + progress_thread = threading.Thread( + target=self._process_worker_updates, args=(worker_queue, progress_queue) + ) + progress_thread.start() def get_frames(pool): frames = [] @@ -1505,7 +1543,7 @@ def get_frames(pool): columns, session, stream, - progress_queue, + worker_queue, ) for stream in session.streams ] @@ -1531,6 +1569,12 @@ def get_frames(pool): # definition (enforced by the global interpreter lock). self._to_dataframe_finished = True + # Shutdown all background threads, now that they should know to + # exit early. + pool.shutdown(wait=True) + if progress_thread is not None: + progress_thread.join() + # Update the progress bar one last time to close it. self._process_progress_updates(progress_queue, progress_bar) return pandas.concat(frames) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 106b7d1ee585..d2c241f4153a 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1857,7 +1857,7 @@ def blocking_to_dataframe(*args, **kwargs): type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) # Test that full queue errors are ignored. - mock_queue = mock.create_autospec(mut._FakeQueue) + mock_queue = mock.create_autospec(mut._NoopProgressBarQueue) mock_queue().put_nowait.side_effect = queue.Full schema = [ @@ -1875,7 +1875,7 @@ def blocking_to_dataframe(*args, **kwargs): selected_fields=schema, ) - with mock.patch.object(mut, "_FakeQueue", mock_queue), mock.patch( + with mock.patch.object(mut, "_NoopProgressBarQueue", mock_queue), mock.patch( "concurrent.futures.wait", wraps=concurrent.futures.wait ) as mock_wait: got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) @@ -1953,8 +1953,11 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): # Make sure that this test updated the progress bar once per page from # each stream. total_pages = len(streams) * len(mock_pages) - self.assertEqual(tqdm_mock().update.call_count, total_pages) - tqdm_mock().update.assert_called_with(len(page_items)) + expected_total_rows = total_pages * len(page_items) + actual_total_rows = sum( + [args[0] for args, kwargs in tqdm_mock().update.call_args_list] + ) + self.assertEqual(actual_total_rows, expected_total_rows) tqdm_mock().close.assert_called_once() @unittest.skipIf(pandas is None, "Requires `pandas`") From e69b1b41338086b216e9636ab198a86557dbda5c Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 23 Apr 2019 13:00:29 -0700 Subject: [PATCH 3/3] Test that progress bar updates more than once. --- bigquery/tests/unit/test_table.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index d2c241f4153a..18ca125e804c 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1927,11 +1927,18 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_rowstream.rows.return_value = mock_rows - mock_page = mock.create_autospec(reader.ReadRowsPage) page_items = [-1, 0, 1] type(mock_page).num_items = mock.PropertyMock(return_value=len(page_items)) - mock_page.to_dataframe.return_value = pandas.DataFrame({"testcol": page_items}) + + def blocking_to_dataframe(*args, **kwargs): + # Sleep for longer than the waiting interval. This ensures the + # progress_queue gets written to more than once because it gives + # the worker->progress updater time to sum intermediate updates. + time.sleep(2 * mut._PROGRESS_INTERVAL) + return pandas.DataFrame({"testcol": page_items}) + + mock_page.to_dataframe.side_effect = blocking_to_dataframe mock_pages = (mock_page, mock_page, mock_page, mock_page, mock_page) type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) @@ -1954,10 +1961,12 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): # each stream. total_pages = len(streams) * len(mock_pages) expected_total_rows = total_pages * len(page_items) - actual_total_rows = sum( - [args[0] for args, kwargs in tqdm_mock().update.call_args_list] - ) - self.assertEqual(actual_total_rows, expected_total_rows) + progress_updates = [ + args[0] for args, kwargs in tqdm_mock().update.call_args_list + ] + # Should have sent >1 update due to delay in blocking_to_dataframe. + self.assertGreater(len(progress_updates), 1) + self.assertEqual(sum(progress_updates), expected_total_rows) tqdm_mock().close.assert_called_once() @unittest.skipIf(pandas is None, "Requires `pandas`")