2222import mock
2323import pytest
2424import six
25+ from six .moves import queue
2526
2627import google .api_core .exceptions
2728
@@ -1815,9 +1816,12 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
18151816 bqstorage_client = mock .create_autospec (
18161817 bigquery_storage_v1beta1 .BigQueryStorageClient
18171818 )
1818- session = bigquery_storage_v1beta1 .types .ReadSession (
1819- streams = [{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" }]
1820- )
1819+ streams = [
1820+ # Use two streams we want to check frames are read from each stream.
1821+ {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" },
1822+ {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/5678" },
1823+ ]
1824+ session = bigquery_storage_v1beta1 .types .ReadSession (streams = streams )
18211825 session .avro_schema .schema = json .dumps (
18221826 {
18231827 "fields" : [
@@ -1835,20 +1839,25 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
18351839
18361840 mock_rows = mock .create_autospec (reader .ReadRowsIterable )
18371841 mock_rowstream .rows .return_value = mock_rows
1842+ page_items = [
1843+ {"colA" : 1 , "colB" : "abc" , "colC" : 2.0 },
1844+ {"colA" : - 1 , "colB" : "def" , "colC" : 4.0 },
1845+ ]
18381846
18391847 def blocking_to_dataframe (* args , ** kwargs ):
18401848 # Sleep for longer than the waiting interval so that we know we're
18411849 # only reading one page per loop at most.
18421850 time .sleep (2 * mut ._PROGRESS_INTERVAL )
1843- return pandas .DataFrame (
1844- {"colA" : [1 , - 1 ], "colB" : ["abc" , "def" ], "colC" : [2.0 , 4.0 ]},
1845- columns = ["colA" , "colB" , "colC" ],
1846- )
1851+ return pandas .DataFrame (page_items , columns = ["colA" , "colB" , "colC" ])
18471852
18481853 mock_page = mock .create_autospec (reader .ReadRowsPage )
18491854 mock_page .to_dataframe .side_effect = blocking_to_dataframe
1850- mock_pages = mock .PropertyMock (return_value = (mock_page , mock_page , mock_page ))
1851- type(mock_rows ).pages = mock_pages
1855+ mock_pages = (mock_page , mock_page , mock_page )
1856+ type(mock_rows ).pages = mock .PropertyMock (return_value = mock_pages )
1857+
1858+ # Test that full queue errors are ignored.
1859+ mock_queue = mock .create_autospec (mut ._FakeQueue )
1860+ mock_queue ().put_nowait .side_effect = queue .Full
18521861
18531862 schema = [
18541863 schema .SchemaField ("colA" , "IGNORED" ),
@@ -1865,17 +1874,88 @@ def blocking_to_dataframe(*args, **kwargs):
18651874 selected_fields = schema ,
18661875 )
18671876
1868- with mock .patch (
1877+ with mock .patch . object ( mut , "_FakeQueue" , mock_queue ), mock . patch (
18691878 "concurrent.futures.wait" , wraps = concurrent .futures .wait
18701879 ) as mock_wait :
18711880 got = row_iterator .to_dataframe (bqstorage_client = bqstorage_client )
18721881
1882+ # Are the columns in the expected order?
18731883 column_names = ["colA" , "colC" , "colB" ]
18741884 self .assertEqual (list (got ), column_names )
1875- self .assertEqual (len (got .index ), 6 )
1885+
1886+ # Have expected number of rows?
1887+ total_pages = len (streams ) * len (mock_pages )
1888+ total_rows = len (page_items ) * total_pages
1889+ self .assertEqual (len (got .index ), total_rows )
1890+
18761891 # Make sure that this test looped through multiple progress intervals.
18771892 self .assertGreaterEqual (mock_wait .call_count , 2 )
18781893
1894+ # Make sure that this test pushed to the progress queue.
1895+ self .assertEqual (mock_queue ().put_nowait .call_count , total_pages )
1896+
1897+ @unittest .skipIf (pandas is None , "Requires `pandas`" )
1898+ @unittest .skipIf (
1899+ bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
1900+ )
1901+ @unittest .skipIf (tqdm is None , "Requires `tqdm`" )
1902+ @mock .patch ("tqdm.tqdm" )
1903+ def test_to_dataframe_w_bqstorage_updates_progress_bar (self , tqdm_mock ):
1904+ from google .cloud .bigquery import schema
1905+ from google .cloud .bigquery import table as mut
1906+ from google .cloud .bigquery_storage_v1beta1 import reader
1907+
1908+ # Speed up testing.
1909+ mut ._PROGRESS_INTERVAL = 0.01
1910+
1911+ bqstorage_client = mock .create_autospec (
1912+ bigquery_storage_v1beta1 .BigQueryStorageClient
1913+ )
1914+ streams = [
1915+ # Use two streams we want to check that progress bar updates are
1916+ # sent from each stream.
1917+ {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" },
1918+ {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/5678" },
1919+ ]
1920+ session = bigquery_storage_v1beta1 .types .ReadSession (streams = streams )
1921+ session .avro_schema .schema = json .dumps ({"fields" : [{"name" : "testcol" }]})
1922+ bqstorage_client .create_read_session .return_value = session
1923+
1924+ mock_rowstream = mock .create_autospec (reader .ReadRowsStream )
1925+ bqstorage_client .read_rows .return_value = mock_rowstream
1926+
1927+ mock_rows = mock .create_autospec (reader .ReadRowsIterable )
1928+ mock_rowstream .rows .return_value = mock_rows
1929+
1930+ mock_page = mock .create_autospec (reader .ReadRowsPage )
1931+ page_items = [- 1 , 0 , 1 ]
1932+ type(mock_page ).num_items = mock .PropertyMock (return_value = len (page_items ))
1933+ mock_page .to_dataframe .return_value = pandas .DataFrame ({"testcol" : page_items })
1934+ mock_pages = (mock_page , mock_page , mock_page , mock_page , mock_page )
1935+ type(mock_rows ).pages = mock .PropertyMock (return_value = mock_pages )
1936+
1937+ schema = [schema .SchemaField ("testcol" , "IGNORED" )]
1938+
1939+ row_iterator = mut .RowIterator (
1940+ _mock_client (),
1941+ None , # api_request: ignored
1942+ None , # path: ignored
1943+ schema ,
1944+ table = mut .TableReference .from_string ("proj.dset.tbl" ),
1945+ selected_fields = schema ,
1946+ )
1947+
1948+ row_iterator .to_dataframe (
1949+ bqstorage_client = bqstorage_client , progress_bar_type = "tqdm"
1950+ )
1951+
1952+ # Make sure that this test updated the progress bar once per page from
1953+ # each stream.
1954+ total_pages = len (streams ) * len (mock_pages )
1955+ self .assertEqual (tqdm_mock ().update .call_count , total_pages )
1956+ tqdm_mock ().update .assert_called_with (len (page_items ))
1957+ tqdm_mock ().close .assert_called_once ()
1958+
18791959 @unittest .skipIf (pandas is None , "Requires `pandas`" )
18801960 @unittest .skipIf (
18811961 bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
0 commit comments