Skip to content

Commit 6fc4af2

Browse files
committed
fix: KeyboardInterrupt during to_dataframe (with BQ Storage API) no longer hangs
I noticed in manually testing `to_dataframe` that it would stop the current cell when I hit Ctrl-C, but data kept on downloading in the background. Trying to exit the Python shell, I'd notice that it would hang until I pressed Ctrl-C a few more times. Rather than get the DataFrame for each stream in one big chunk, loop through each block and exit if the function needs to quit early. This follows the pattern at https://stackoverflow.com/a/29237343/101923 Update tests to ensure multiple progress interval loops.
1 parent 11efc46 commit 6fc4af2

File tree

3 files changed

+216
-16
lines changed

3 files changed

+216
-16
lines changed

bigquery/google/cloud/bigquery/table.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
)
5757
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
5858
_MARKER = object()
59+
_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds.
5960

6061

6162
def _reference_getter(table):
@@ -1402,16 +1403,56 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
14021403
if not session.streams:
14031404
return pandas.DataFrame(columns=columns)
14041405

1406+
# Use finished to notify worker threads when to quit. See:
1407+
# https://stackoverflow.com/a/29237343/101923
1408+
finished = False
1409+
14051410
def get_dataframe(stream):
14061411
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
1407-
rowstream = bqstorage_client.read_rows(position)
1408-
return rowstream.to_dataframe(session, dtypes=dtypes)
1412+
rowstream = bqstorage_client.read_rows(position).rows(session)
1413+
1414+
frames = []
1415+
for page in rowstream.pages:
1416+
if finished:
1417+
return
1418+
frames.append(page.to_dataframe(dtypes=dtypes))
1419+
1420+
# Avoid errors on unlucky streams with no blocks. pandas.concat
1421+
# will fail on an empty list.
1422+
if not frames:
1423+
return pandas.DataFrame(columns=columns)
1424+
1425+
# page.to_dataframe() does not preserve column order. Rearrange at
1426+
# the end using manually-parsed schema.
1427+
return pandas.concat(frames)[columns]
1428+
1429+
def get_frames(pool):
1430+
frames = []
1431+
1432+
# Manually submit jobs and wait for download to complete rather
1433+
# than using pool.map because pool.map continues running in the
1434+
# background even if there is an exception on the main thread.
1435+
not_done = [
1436+
pool.submit(get_dataframe, stream) for stream in session.streams
1437+
]
1438+
1439+
while not_done:
1440+
done, not_done = concurrent.futures.wait(
1441+
not_done, timeout=_PROGRESS_INTERVAL
1442+
)
1443+
frames.extend([future.result() for future in done])
1444+
return frames
14091445

14101446
with concurrent.futures.ThreadPoolExecutor() as pool:
1411-
frames = pool.map(get_dataframe, session.streams)
1412-
1413-
# rowstream.to_dataframe() does not preserve column order. Rearrange at
1414-
# the end using manually-parsed schema.
1447+
try:
1448+
frames = get_frames(pool)
1449+
finally:
1450+
# No need for a lock because reading/replacing a variable is
1451+
# defined to be an atomic operation in the Python language
1452+
# definition (enforced by the global interpreter lock).
1453+
finished = True
1454+
1455+
# Use [columns] to ensure column order matches manually-parsed schema.
14151456
return pandas.concat(frames)[columns]
14161457

14171458
def _get_progress_bar(self, progress_bar_type):

bigquery/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
]
3838
extras = {
3939
"bqstorage": [
40-
"google-cloud-bigquery-storage >= 0.2.0dev1, <2.0.0dev",
40+
"google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev",
4141
"fastavro>=0.21.2",
4242
],
4343
"pandas": ["pandas>=0.17.1"],

bigquery/tests/unit/test_table.py

Lines changed: 168 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import concurrent.futures
1516
import itertools
1617
import json
18+
import time
1719
import unittest
1820
import warnings
1921

@@ -1705,7 +1707,7 @@ def test_to_dataframe_error_if_pandas_is_none(self):
17051707
@unittest.skipIf(
17061708
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
17071709
)
1708-
def test_to_dataframe_w_bqstorage_empty(self):
1710+
def test_to_dataframe_w_bqstorage_no_streams(self):
17091711
from google.cloud.bigquery import schema
17101712
from google.cloud.bigquery import table as mut
17111713

@@ -1746,18 +1748,70 @@ def test_to_dataframe_w_bqstorage_empty(self):
17461748
@unittest.skipIf(
17471749
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
17481750
)
1749-
def test_to_dataframe_w_bqstorage_nonempty(self):
1751+
def test_to_dataframe_w_bqstorage_empty_streams(self):
17501752
from google.cloud.bigquery import schema
17511753
from google.cloud.bigquery import table as mut
17521754
from google.cloud.bigquery_storage_v1beta1 import reader
17531755

1756+
bqstorage_client = mock.create_autospec(
1757+
bigquery_storage_v1beta1.BigQueryStorageClient
1758+
)
1759+
session = bigquery_storage_v1beta1.types.ReadSession(
1760+
streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}]
1761+
)
1762+
session.avro_schema.schema = json.dumps(
1763+
{
1764+
"fields": [
1765+
{"name": "colA"},
1766+
# Not alphabetical to test column order.
1767+
{"name": "colC"},
1768+
{"name": "colB"},
1769+
]
1770+
}
1771+
)
1772+
bqstorage_client.create_read_session.return_value = session
1773+
17541774
mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
1755-
mock_rowstream.to_dataframe.return_value = pandas.DataFrame(
1756-
[
1757-
{"colA": 1, "colB": "abc", "colC": 2.0},
1758-
{"colA": -1, "colB": "def", "colC": 4.0},
1759-
]
1775+
bqstorage_client.read_rows.return_value = mock_rowstream
1776+
1777+
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
1778+
mock_rowstream.rows.return_value = mock_rows
1779+
mock_pages = mock.PropertyMock(return_value=())
1780+
type(mock_rows).pages = mock_pages
1781+
1782+
schema = [
1783+
schema.SchemaField("colA", "IGNORED"),
1784+
schema.SchemaField("colC", "IGNORED"),
1785+
schema.SchemaField("colB", "IGNORED"),
1786+
]
1787+
1788+
row_iterator = mut.RowIterator(
1789+
_mock_client(),
1790+
None, # api_request: ignored
1791+
None, # path: ignored
1792+
schema,
1793+
table=mut.TableReference.from_string("proj.dset.tbl"),
1794+
selected_fields=schema,
17601795
)
1796+
1797+
got = row_iterator.to_dataframe(bqstorage_client)
1798+
1799+
column_names = ["colA", "colC", "colB"]
1800+
self.assertEqual(list(got), column_names)
1801+
self.assertTrue(got.empty)
1802+
1803+
@unittest.skipIf(pandas is None, "Requires `pandas`")
1804+
@unittest.skipIf(
1805+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
1806+
)
1807+
def test_to_dataframe_w_bqstorage_nonempty(self):
1808+
from google.cloud.bigquery import schema
1809+
from google.cloud.bigquery import table as mut
1810+
from google.cloud.bigquery_storage_v1beta1 import reader
1811+
1812+
# Speed up testing.
1813+
mut._PROGRESS_INTERVAL = 0.01
1814+
17611815
bqstorage_client = mock.create_autospec(
17621816
bigquery_storage_v1beta1.BigQueryStorageClient
17631817
)
@@ -1775,7 +1829,27 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
17751829
}
17761830
)
17771831
bqstorage_client.create_read_session.return_value = session
1832+
1833+
mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
17781834
bqstorage_client.read_rows.return_value = mock_rowstream
1835+
1836+
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
1837+
mock_rowstream.rows.return_value = mock_rows
1838+
1839+
def blocking_to_dataframe(*args, **kwargs):
1840+
# Sleep for longer than the waiting interval so that we know we're
1841+
# only reading one page per loop at most.
1842+
time.sleep(2 * mut._PROGRESS_INTERVAL)
1843+
return pandas.DataFrame(
1844+
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
1845+
columns=["colA", "colB", "colC"],
1846+
)
1847+
1848+
mock_page = mock.create_autospec(reader.ReadRowsPage)
1849+
mock_page.to_dataframe.side_effect = blocking_to_dataframe
1850+
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
1851+
type(mock_rows).pages = mock_pages
1852+
17791853
schema = [
17801854
schema.SchemaField("colA", "IGNORED"),
17811855
schema.SchemaField("colC", "IGNORED"),
@@ -1791,10 +1865,95 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
17911865
selected_fields=schema,
17921866
)
17931867

1794-
got = row_iterator.to_dataframe(bqstorage_client)
1868+
with mock.patch(
1869+
"concurrent.futures.wait", wraps=concurrent.futures.wait
1870+
) as mock_wait:
1871+
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)
1872+
17951873
column_names = ["colA", "colC", "colB"]
17961874
self.assertEqual(list(got), column_names)
1797-
self.assertEqual(len(got.index), 2)
1875+
self.assertEqual(len(got.index), 6)
1876+
# Make sure that this test looped through multiple progress intervals.
1877+
self.assertGreaterEqual(mock_wait.call_count, 2)
1878+
1879+
@unittest.skipIf(pandas is None, "Requires `pandas`")
1880+
@unittest.skipIf(
1881+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
1882+
)
1883+
def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
1884+
from google.cloud.bigquery import schema
1885+
from google.cloud.bigquery import table as mut
1886+
from google.cloud.bigquery_storage_v1beta1 import reader
1887+
1888+
# Speed up testing.
1889+
mut._PROGRESS_INTERVAL = 0.01
1890+
1891+
bqstorage_client = mock.create_autospec(
1892+
bigquery_storage_v1beta1.BigQueryStorageClient
1893+
)
1894+
session = bigquery_storage_v1beta1.types.ReadSession(
1895+
streams=[
1896+
# Use two streams because one will fail with a
1897+
# KeyboardInterrupt, and we want to check that the other stream
1898+
# ends early.
1899+
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
1900+
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
1901+
]
1902+
)
1903+
session.avro_schema.schema = json.dumps(
1904+
{"fields": [{"name": "colA"}, {"name": "colB"}, {"name": "colC"}]}
1905+
)
1906+
bqstorage_client.create_read_session.return_value = session
1907+
1908+
def blocking_to_dataframe(*args, **kwargs):
1909+
# Sleep for longer than the waiting interval so that we know we're
1910+
# only reading one page per loop at most.
1911+
time.sleep(2 * mut._PROGRESS_INTERVAL)
1912+
return pandas.DataFrame(
1913+
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
1914+
columns=["colA", "colB", "colC"],
1915+
)
1916+
1917+
mock_page = mock.create_autospec(reader.ReadRowsPage)
1918+
mock_page.to_dataframe.side_effect = blocking_to_dataframe
1919+
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
1920+
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
1921+
type(mock_rows).pages = mock_pages
1922+
mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
1923+
mock_rowstream.rows.return_value = mock_rows
1924+
1925+
mock_cancelled_rows = mock.create_autospec(reader.ReadRowsIterable)
1926+
mock_cancelled_pages = mock.PropertyMock(side_effect=KeyboardInterrupt)
1927+
type(mock_cancelled_rows).pages = mock_cancelled_pages
1928+
mock_cancelled_rowstream = mock.create_autospec(reader.ReadRowsStream)
1929+
mock_cancelled_rowstream.rows.return_value = mock_cancelled_rows
1930+
1931+
bqstorage_client.read_rows.side_effect = (
1932+
mock_cancelled_rowstream,
1933+
mock_rowstream,
1934+
)
1935+
1936+
schema = [
1937+
schema.SchemaField("colA", "IGNORED"),
1938+
schema.SchemaField("colB", "IGNORED"),
1939+
schema.SchemaField("colC", "IGNORED"),
1940+
]
1941+
1942+
row_iterator = mut.RowIterator(
1943+
_mock_client(),
1944+
None, # api_request: ignored
1945+
None, # path: ignored
1946+
schema,
1947+
table=mut.TableReference.from_string("proj.dset.tbl"),
1948+
selected_fields=schema,
1949+
)
1950+
1951+
with pytest.raises(KeyboardInterrupt):
1952+
row_iterator.to_dataframe(bqstorage_client=bqstorage_client)
1953+
1954+
# Should not have fetched the third page of results because exit_early
1955+
# should have been set.
1956+
self.assertLessEqual(mock_page.to_dataframe.call_count, 2)
17981957

17991958
@unittest.skipIf(pandas is None, "Requires `pandas`")
18001959
@unittest.skipIf(

0 commit comments

Comments
 (0)