Skip to content

Commit 0971781

Browse files
committed
Use finally so that 'finished' is always set at end of looping.
Update tests to ensure multiple progress interval loops.
1 parent ff0a26b commit 0971781

File tree

2 files changed

+27
-13
lines changed

2 files changed

+27
-13
lines changed

bigquery/google/cloud/bigquery/table.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,17 +1403,17 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
14031403
if not session.streams:
14041404
return pandas.DataFrame(columns=columns)
14051405

1406-
# Use exit_early to notify worker threads when to quit. See:
1406+
# Use finished to notify worker threads when to quit. See:
14071407
# https://stackoverflow.com/a/29237343/101923
1408-
exit_early = False
1408+
finished = False
14091409

14101410
def get_dataframe(stream):
14111411
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
14121412
rowstream = bqstorage_client.read_rows(position).rows(session)
14131413

14141414
frames = []
14151415
for page in rowstream.pages:
1416-
if exit_early:
1416+
if finished:
14171417
return
14181418
frames.append(page.to_dataframe(dtypes=dtypes))
14191419

@@ -1446,12 +1446,11 @@ def get_frames(pool):
14461446
with concurrent.futures.ThreadPoolExecutor() as pool:
14471447
try:
14481448
frames = get_frames(pool)
1449-
except:
1449+
finally:
14501450
# No need for a lock because reading/replacing a variable is
14511451
# defined to be an atomic operation in the Python language
14521452
# definition (enforced by the global interpreter lock).
1453-
exit_early = True
1454-
raise
1453+
finished = True
14551454

14561455
# Use [columns] to ensure column order matches manually-parsed schema.
14571456
return pandas.concat(frames)[columns]

bigquery/tests/unit/test_table.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import concurrent.futures
1516
import itertools
1617
import json
1718
import time
@@ -1784,6 +1785,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
17841785
from google.cloud.bigquery import table as mut
17851786
from google.cloud.bigquery_storage_v1beta1 import reader
17861787

1788+
# Speed up testing.
1789+
mut._PROGRESS_INTERVAL = 0.01
1790+
17871791
bqstorage_client = mock.create_autospec(
17881792
bigquery_storage_v1beta1.BigQueryStorageClient
17891793
)
@@ -1808,12 +1812,18 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
18081812
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
18091813
mock_rowstream.rows.return_value = mock_rows
18101814

1815+
def blocking_to_dataframe(*args, **kwargs):
1816+
# Sleep for longer than the waiting interval so that we know we're
1817+
# only reading one page per loop at most.
1818+
time.sleep(2 * mut._PROGRESS_INTERVAL)
1819+
return pandas.DataFrame(
1820+
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
1821+
columns=["colA", "colB", "colC"],
1822+
)
1823+
18111824
mock_page = mock.create_autospec(reader.ReadRowsPage)
1812-
mock_page.to_dataframe.return_value = pandas.DataFrame(
1813-
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
1814-
columns=["colA", "colB", "colC"],
1815-
)
1816-
mock_pages = mock.PropertyMock(return_value=(mock_page,))
1825+
mock_page.to_dataframe.side_effect = blocking_to_dataframe
1826+
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
18171827
type(mock_rows).pages = mock_pages
18181828

18191829
schema = [
@@ -1831,11 +1841,16 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
18311841
selected_fields=schema,
18321842
)
18331843

1834-
got = row_iterator.to_dataframe(bqstorage_client)
1844+
with mock.patch(
1845+
"concurrent.futures.wait", wraps=concurrent.futures.wait
1846+
) as mock_wait:
1847+
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)
18351848

18361849
column_names = ["colA", "colC", "colB"]
18371850
self.assertEqual(list(got), column_names)
1838-
self.assertEqual(len(got.index), 2)
1851+
self.assertEqual(len(got.index), 6)
1852+
# Make sure that this test looped through multiple progress intervals.
1853+
self.assertGreaterEqual(mock_wait.call_count, 2)
18391854

18401855
@unittest.skipIf(pandas is None, "Requires `pandas`")
18411856
@unittest.skipIf(

0 commit comments

Comments
 (0)