Skip to content

Commit 0e7b01f

Browse files
committed
Making BigQuery table.fetch_data() into an iterator.
1 parent 092dd09 commit 0e7b01f

7 files changed

Lines changed: 113 additions & 59 deletions

File tree

bigquery/google/cloud/bigquery/table.py

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
from google.cloud.streaming.transfer import RESUMABLE_UPLOAD
3333
from google.cloud.streaming.transfer import Upload
3434
from google.cloud.bigquery.schema import SchemaField
35-
from google.cloud.bigquery._helpers import _rows_from_json
35+
from google.cloud.bigquery._helpers import _row_from_json
36+
from google.cloud.iterator import Iterator
3637

3738

3839
_TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'table.reload()'"
@@ -653,47 +654,36 @@ def fetch_data(self, max_results=None, page_token=None, client=None):
653654
up-to-date with the schema as defined on the back-end: if the
654655
two schemas are not identical, the values returned may be
655656
incomplete. To ensure that the local copy of the schema is
656-
up-to-date, call the table's ``reload`` method.
657+
up-to-date, call :meth:`reload`.
657658
658659
:type max_results: int
659-
:param max_results: (Optional) maximum number of rows to return.
660+
:param max_results: (Optional) Maximum number of rows to return.
660661
661662
:type page_token: str
662-
:param page_token:
663-
(Optional) token representing a cursor into the table's rows.
664-
665-
:type client: :class:`~google.cloud.bigquery.client.Client` or
666-
``NoneType``
667-
:param client: the client to use. If not passed, falls back to the
668-
``client`` stored on the current dataset.
669-
670-
:rtype: tuple
671-
:returns: ``(row_data, total_rows, page_token)``, where ``row_data``
672-
is a list of tuples, one per result row, containing only
673-
the values; ``total_rows`` is a count of the total number
674-
of rows in the table; and ``page_token`` is an opaque
675-
string which can be used to fetch the next batch of rows
676-
(``None`` if no further batches can be fetched).
663+
:param page_token: (Optional) Token representing a cursor into the
664+
table's rows.
665+
666+
:type client: :class:`~google.cloud.bigquery.client.Client`
667+
:param client: (Optional) The client to use. If not passed, falls
668+
back to the ``client`` stored on the current dataset.
669+
670+
:rtype: :class:`~google.cloud.iterator.Iterator`
671+
:returns: Iterator of row data :class:`tuple`s. Each page in the
672+
iterator will have the ``total_rows`` attribute set,
673+
which counts the total number of rows **in the table**
674+
(this is distinct from the total number of rows in the
675+
current page: ``iterator.page.num_items``).
677676
"""
678677
client = self._require_client(client)
679-
params = {}
680-
681-
if max_results is not None:
682-
params['maxResults'] = max_results
683-
684-
if page_token is not None:
685-
params['pageToken'] = page_token
686-
687-
response = client.connection.api_request(method='GET',
688-
path='%s/data' % self.path,
689-
query_params=params)
690-
total_rows = response.get('totalRows')
691-
if total_rows is not None:
692-
total_rows = int(total_rows)
693-
page_token = response.get('pageToken')
694-
rows_data = _rows_from_json(response.get('rows', ()), self._schema)
695-
696-
return rows_data, total_rows, page_token
678+
path = '%s/data' % (self.path,)
679+
iterator = Iterator(client=client, path=path,
680+
item_to_value=_item_to_row, items_key='rows',
681+
page_token=page_token, max_results=max_results,
682+
page_start=_rows_page_start)
683+
iterator.schema = self._schema
684+
# Over-ride the key used to retrieve the next page token.
685+
iterator._NEXT_TOKEN = 'pageToken'
686+
return iterator
697687

698688
def insert_data(self,
699689
rows,
@@ -1083,6 +1073,46 @@ def _build_schema_resource(fields):
10831073
return infos
10841074

10851075

1076+
def _item_to_row(iterator, resource):
1077+
"""Convert a JSON row to the native object.
1078+
1079+
.. note::
1080+
1081+
This assumes that the ``schema`` attribute has been
1082+
added to the iterator after being created, which
1083+
should be done by the caller.
1084+
1085+
:type iterator: :class:`~google.cloud.iterator.Iterator`
1086+
:param iterator: The iterator that is currently in use.
1087+
1088+
:type resource: dict
1089+
:param resource: An item to be converted to a row.
1090+
1091+
:rtype: tuple
1092+
:returns: The next row in the page.
1093+
"""
1094+
return _row_from_json(resource, iterator.schema)
1095+
1096+
1097+
# pylint: disable=unused-argument
1098+
def _rows_page_start(iterator, page, response):
1099+
"""Grab total rows after a :class:`~google.cloud.iterator.Page` started.
1100+
1101+
:type iterator: :class:`~google.cloud.iterator.Iterator`
1102+
:param iterator: The iterator that is currently in use.
1103+
1104+
:type page: :class:`~google.cloud.iterator.Page`
1105+
:param page: The page that was just created.
1106+
1107+
:type response: dict
1108+
:param response: The JSON API response for a page of rows in a table.
1109+
"""
1110+
total_rows = response.get('totalRows')
1111+
if total_rows is not None:
1112+
page.total_rows = int(total_rows)
1113+
# pylint: enable=unused-argument
1114+
1115+
10861116
class _UploadConfig(object):
10871117
"""Faux message FBO apitools' 'configure_request'."""
10881118
accept = ['*/*']

bigquery/unit_tests/test_table.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,7 +1068,11 @@ def _bigquery_timestamp_float_repr(ts_float):
10681068
table = self._makeOne(self.TABLE_NAME, dataset=dataset,
10691069
schema=[full_name, age, joined])
10701070

1071-
rows, total_rows, page_token = table.fetch_data()
1071+
iterator = table.fetch_data()
1072+
iterator.update_page()
1073+
rows = list(iterator.page)
1074+
total_rows = iterator.page.total_rows
1075+
page_token = iterator.next_page_token
10721076

10731077
self.assertEqual(len(rows), 4)
10741078
self.assertEqual(rows[0], ('Phred Phlyntstone', 32, WHEN))
@@ -1129,9 +1133,12 @@ def test_fetch_data_w_alternate_client(self):
11291133
table = self._makeOne(self.TABLE_NAME, dataset=dataset,
11301134
schema=[full_name, age, voter, score])
11311135

1132-
rows, total_rows, page_token = table.fetch_data(client=client2,
1133-
max_results=MAX,
1134-
page_token=TOKEN)
1136+
iterator = table.fetch_data(
1137+
client=client2, max_results=MAX, page_token=TOKEN)
1138+
iterator.update_page()
1139+
rows = list(iterator.page)
1140+
total_rows = getattr(iterator.page, 'total_rows', None)
1141+
page_token = iterator.next_page_token
11351142

11361143
self.assertEqual(len(rows), 4)
11371144
self.assertEqual(rows[0], ('Phred Phlyntstone', 32, True, 3.1415926))
@@ -1177,7 +1184,11 @@ def test_fetch_data_w_repeated_fields(self):
11771184
table = self._makeOne(self.TABLE_NAME, dataset=dataset,
11781185
schema=[full_name, struct])
11791186

1180-
rows, total_rows, page_token = table.fetch_data()
1187+
iterator = table.fetch_data()
1188+
iterator.update_page()
1189+
rows = list(iterator.page)
1190+
total_rows = iterator.page.total_rows
1191+
page_token = iterator.next_page_token
11811192

11821193
self.assertEqual(len(rows), 1)
11831194
self.assertEqual(rows[0][0], ['red', 'green'])
@@ -1227,7 +1238,11 @@ def test_fetch_data_w_record_schema(self):
12271238
table = self._makeOne(self.TABLE_NAME, dataset=dataset,
12281239
schema=[full_name, phone])
12291240

1230-
rows, total_rows, page_token = table.fetch_data()
1241+
iterator = table.fetch_data()
1242+
iterator.update_page()
1243+
rows = list(iterator.page)
1244+
total_rows = iterator.page.total_rows
1245+
page_token = iterator.next_page_token
12311246

12321247
self.assertEqual(len(rows), 3)
12331248
self.assertEqual(rows[0][0], 'Phred Phlyntstone')

core/google/cloud/iterator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ class HTTPIterator(Iterator):
297297

298298
_PAGE_TOKEN = 'pageToken'
299299
_MAX_RESULTS = 'maxResults'
300+
_NEXT_TOKEN = 'nextPageToken'
300301
_RESERVED_PARAMS = frozenset([_PAGE_TOKEN, _MAX_RESULTS])
301302
_HTTP_METHOD = 'GET'
302303

docs/bigquery-usage.rst

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,17 @@ Run a query which can be expected to complete within bounded time:
209209
:start-after: [START client_run_sync_query]
210210
:end-before: [END client_run_sync_query]
211211

212-
If the rows returned by the query do not fit into the inital response,
213-
then we need to fetch the remaining rows via ``fetch_data``:
212+
If the rows returned by the query do not fit into the initial response,
213+
then we need to fetch the remaining rows via
214+
:meth:`~google.cloud.bigquery.query.QueryResults.fetch_data`:
214215

215216
.. literalinclude:: bigquery_snippets.py
216217
:start-after: [START client_run_sync_query_paged]
217218
:end-before: [END client_run_sync_query_paged]
218219

219220
If the query takes longer than the timeout allowed, ``query.complete``
220221
will be ``False``. In that case, we need to poll the associated job until
221-
it is done, and then fetch the reuslts:
222+
it is done, and then fetch the results:
222223

223224
.. literalinclude:: bigquery_snippets.py
224225
:start-after: [START client_run_sync_query_timeout]

docs/bigquery_snippets.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,9 @@ def _warm_up_inserted_table_data(table):
341341

342342
while len(rows) == 0 and counter > 0:
343343
counter -= 1
344-
rows, _, _ = table.fetch_data()
344+
iterator = table.fetch_data()
345+
iterator.update_page()
346+
rows = list(iterator.page)
345347
if len(rows) == 0:
346348
time.sleep(5)
347349

@@ -376,13 +378,8 @@ def do_something(row):
376378
found_rows.append(row)
377379

378380
# [START table_fetch_data]
379-
rows, _, token = table.fetch_data()
380-
while True:
381-
for row in rows:
382-
do_something(row)
383-
if token is None:
384-
break
385-
rows, _, token = table.fetch_data(page_token=token)
381+
for row in table.fetch_data():
382+
do_something(row)
386383
# [END table_fetch_data]
387384

388385
assert len(found_rows) == len(ROWS_TO_INSERT)
@@ -424,7 +421,11 @@ def table_upload_from_file(client, to_delete):
424421

425422
_warm_up_inserted_table_data(table)
426423

427-
rows, total, token = table.fetch_data()
424+
iterator = table.fetch_data()
425+
iterator.update_page()
426+
rows = list(iterator.page)
427+
total = iterator.page.total_rows
428+
token = iterator.next_page_token
428429

429430
assert len(rows) == total == 2
430431
assert token is None

scripts/run_pylint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
}
7373
TEST_RC_REPLACEMENTS = {
7474
'FORMAT': {
75-
'max-module-lines': 1960,
75+
'max-module-lines': 2000,
7676
},
7777
}
7878

system_tests/bigquery.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,12 @@ def test_update_table(self):
263263
self.assertEqual(found.field_type, expected.field_type)
264264
self.assertEqual(found.mode, expected.mode)
265265

266+
@staticmethod
267+
def _fetch_single_page(table):
268+
iterator = table.fetch_data()
269+
iterator.update_page()
270+
return list(iterator.page)
271+
266272
def test_insert_data_then_dump_table(self):
267273
import datetime
268274
from google.cloud._helpers import UTC
@@ -303,11 +309,11 @@ def test_insert_data_then_dump_table(self):
303309
def _has_rows(result):
304310
return len(result[0]) > 0
305311

306-
# Allow for 90 seconds of "warm up" before rows visible. See:
312+
# Allow for "warm up" before rows visible. See:
307313
# https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability
308314
# 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds
309315
retry = RetryResult(_has_rows, max_tries=8)
310-
rows, _, _ = retry(table.fetch_data)()
316+
rows = retry(self._fetch_single_page)(table)
311317

312318
by_age = operator.itemgetter(1)
313319
self.assertEqual(sorted(rows, key=by_age),
@@ -361,7 +367,7 @@ def _job_done(instance):
361367

362368
self.assertEqual(job.output_rows, len(ROWS))
363369

364-
rows, _, _ = table.fetch_data()
370+
rows = self._fetch_single_page(table)
365371
by_age = operator.itemgetter(1)
366372
self.assertEqual(sorted(rows, key=by_age),
367373
sorted(ROWS, key=by_age))
@@ -431,7 +437,7 @@ def _job_done(instance):
431437
retry = RetryInstanceState(_job_done, max_tries=8)
432438
retry(job.reload)()
433439

434-
rows, _, _ = table.fetch_data()
440+
rows = self._fetch_single_page(table)
435441
by_age = operator.itemgetter(1)
436442
self.assertEqual(sorted(rows, key=by_age),
437443
sorted(ROWS, key=by_age))

0 commit comments

Comments
 (0)