1818
1919import copy
2020import datetime
21+ import json
2122import operator
2223import warnings
2324
@@ -1242,6 +1243,17 @@ class RowIterator(HTTPIterator):
12421243 page_size (int, optional): The number of items to return per page.
12431244 extra_params (Dict[str, object]):
12441245 Extra query string parameters for the API call.
1246+ table (Union[ \
1247+ :class:`~google.cloud.bigquery.table.Table`, \
1248+ :class:`~google.cloud.bigquery.table.TableReference`, \
1249+ ]):
1250+ Optional. The table which these rows belong to, or a reference to
1251+ it. Used to call the BigQuery Storage API to fetch rows.
1252+ selected_fields (Sequence[ \
1253+ google.cloud.bigquery.schema.SchemaField, \
1254+ ]):
1255+ Optional. A subset of columns to select from this table.
1256+
12451257 """
12461258
12471259 def __init__ (
@@ -1254,6 +1266,8 @@ def __init__(
12541266 max_results = None ,
12551267 page_size = None ,
12561268 extra_params = None ,
1269+ table = None ,
1270+ selected_fields = None ,
12571271 ):
12581272 super (RowIterator , self ).__init__ (
12591273 client ,
@@ -1271,6 +1285,9 @@ def __init__(
12711285 self ._field_to_index = _helpers ._field_to_index_mapping (schema )
12721286 self ._total_rows = None
12731287 self ._page_size = page_size
1288+ self ._table = table
1289+ self ._selected_fields = selected_fields
1290+ self ._project = client .project
12741291
12751292 def _get_next_page_response (self ):
12761293 """Requests the next page from the path provided.
@@ -1296,9 +1313,81 @@ def total_rows(self):
12961313 """int: The total number of rows in the table."""
12971314 return self ._total_rows
12981315
1299- def to_dataframe (self ):
1316+ def _to_dataframe_tabledata_list (self ):
1317+ """Use (slower, but free) tabledata.list to construct a DataFrame."""
1318+ column_headers = [field .name for field in self .schema ]
1319+ # Use generator, rather than pulling the whole rowset into memory.
1320+ rows = (row .values () for row in iter (self ))
1321+ return pandas .DataFrame (rows , columns = column_headers )
1322+
1323+ def _to_dataframe_bqstorage (self , bqstorage_client ):
1324+ """Use (faster, but billable) BQ Storage API to construct DataFrame."""
1325+ import concurrent .futures
1326+ from google .cloud import bigquery_storage_v1beta1
1327+
1328+ if "$" in self ._table .table_id :
1329+ raise ValueError (
1330+ "Reading from a specific partition is not currently supported."
1331+ )
1332+ if "@" in self ._table .table_id :
1333+ raise ValueError (
1334+ "Reading from a specific snapshot is not currently supported."
1335+ )
1336+
1337+ read_options = bigquery_storage_v1beta1 .types .TableReadOptions ()
1338+ if self ._selected_fields is not None :
1339+ for field in self ._selected_fields :
1340+ read_options .selected_fields .append (field .name )
1341+
1342+ session = bqstorage_client .create_read_session (
1343+ self ._table .to_bqstorage (),
1344+ "projects/{}" .format (self ._project ),
1345+ read_options = read_options ,
1346+ )
1347+
1348+ # We need to parse the schema manually so that we can rearrange the
1349+ # columns.
1350+ schema = json .loads (session .avro_schema .schema )
1351+ columns = [field ["name" ] for field in schema ["fields" ]]
1352+
1353+ # Avoid reading rows from an empty table. pandas.concat will fail on an
1354+ # empty list.
1355+ if not session .streams :
1356+ return pandas .DataFrame (columns = columns )
1357+
1358+ def get_dataframe (stream ):
1359+ position = bigquery_storage_v1beta1 .types .StreamPosition (stream = stream )
1360+ rowstream = bqstorage_client .read_rows (position )
1361+ return rowstream .to_dataframe (session )
1362+
1363+ with concurrent .futures .ThreadPoolExecutor () as pool :
1364+ frames = pool .map (get_dataframe , session .streams )
1365+
1366+ # rowstream.to_dataframe() does not preserve column order. Rearrange at
1367+ # the end using manually-parsed schema.
1368+ return pandas .concat (frames )[columns ]
1369+
1370+ def to_dataframe (self , bqstorage_client = None ):
13001371 """Create a pandas DataFrame from the query results.
13011372
1373+ Args:
1374+ bqstorage_client ( \
1375+ google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
1376+ ):
1377+ Optional. A BigQuery Storage API client. If supplied, use the
1378+ faster BigQuery Storage API to fetch rows from BigQuery. This
1379+ API is a billable API.
1380+
1381+ This method requires the ``fastavro`` and
1382+ ``google-cloud-bigquery-storage`` libraries.
1383+
1384+ Reading from a specific partition or snapshot is not
1385+ currently supported by this method.
1386+
1387+ **Caution**: There is a known issue reading small anonymous
1388+ query result tables with the BQ Storage API. Write your query
1389+ results to a destination table to work around this issue.
1390+
13021391 Returns:
13031392 pandas.DataFrame:
13041393 A :class:`~pandas.DataFrame` populated with row data and column
@@ -1312,11 +1401,10 @@ def to_dataframe(self):
13121401 if pandas is None :
13131402 raise ValueError (_NO_PANDAS_ERROR )
13141403
1315- column_headers = [field .name for field in self .schema ]
1316- # Use generator, rather than pulling the whole rowset into memory.
1317- rows = (row .values () for row in iter (self ))
1318-
1319- return pandas .DataFrame (rows , columns = column_headers )
1404+ if bqstorage_client is not None :
1405+ return self ._to_dataframe_bqstorage (bqstorage_client )
1406+ else :
1407+ return self ._to_dataframe_tabledata_list ()
13201408
13211409
13221410class _EmptyRowIterator (object ):
0 commit comments