1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ import concurrent .futures
1516import itertools
1617import json
1718import time
@@ -1784,6 +1785,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
17841785 from google .cloud .bigquery import table as mut
17851786 from google .cloud .bigquery_storage_v1beta1 import reader
17861787
1788+ # Speed up testing.
1789+ mut ._PROGRESS_INTERVAL = 0.01
1790+
17871791 bqstorage_client = mock .create_autospec (
17881792 bigquery_storage_v1beta1 .BigQueryStorageClient
17891793 )
@@ -1808,12 +1812,18 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
18081812 mock_rows = mock .create_autospec (reader .ReadRowsIterable )
18091813 mock_rowstream .rows .return_value = mock_rows
18101814
1815+ def blocking_to_dataframe (* args , ** kwargs ):
1816+ # Sleep for longer than the waiting interval so that we know we're
1817+ # only reading one page per loop at most.
1818+ time .sleep (2 * mut ._PROGRESS_INTERVAL )
1819+ return pandas .DataFrame (
1820+ {"colA" : [1 , - 1 ], "colB" : ["abc" , "def" ], "colC" : [2.0 , 4.0 ]},
1821+ columns = ["colA" , "colB" , "colC" ],
1822+ )
1823+
18111824 mock_page = mock .create_autospec (reader .ReadRowsPage )
1812- mock_page .to_dataframe .return_value = pandas .DataFrame (
1813- {"colA" : [1 , - 1 ], "colB" : ["abc" , "def" ], "colC" : [2.0 , 4.0 ]},
1814- columns = ["colA" , "colB" , "colC" ],
1815- )
1816- mock_pages = mock .PropertyMock (return_value = (mock_page ,))
1825+ mock_page .to_dataframe .side_effect = blocking_to_dataframe
1826+ mock_pages = mock .PropertyMock (return_value = (mock_page , mock_page , mock_page ))
18171827 type(mock_rows ).pages = mock_pages
18181828
18191829 schema = [
@@ -1831,11 +1841,16 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
18311841 selected_fields = schema ,
18321842 )
18331843
1834- got = row_iterator .to_dataframe (bqstorage_client )
1844+ with mock .patch (
1845+ "concurrent.futures.wait" , wraps = concurrent .futures .wait
1846+ ) as mock_wait :
1847+ got = row_iterator .to_dataframe (bqstorage_client = bqstorage_client )
18351848
18361849 column_names = ["colA" , "colC" , "colB" ]
18371850 self .assertEqual (list (got ), column_names )
1838- self .assertEqual (len (got .index ), 2 )
1851+ self .assertEqual (len (got .index ), 6 )
1852+ # Make sure that this test looped through multiple progress intervals.
1853+ self .assertGreaterEqual (mock_wait .call_count , 2 )
18391854
18401855 @unittest .skipIf (pandas is None , "Requires `pandas`" )
18411856 @unittest .skipIf (
0 commit comments