Skip to content

Commit 9f54754

Browse files
tswastparthea
authored andcommitted
Add page iterator to ReadRowsStream (#7680)
* Add page iterator to ReadRowsStream This allows readers to read blocks (called pages for compatibility with BigQuery client library) one at a time from a stream. This enables use cases such as progress bar support and streaming workers that expect pandas DataFrames.
1 parent 192e119 commit 9f54754

File tree

2 files changed

+329
-33
lines changed

2 files changed

+329
-33
lines changed

packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1beta1/reader.py

Lines changed: 182 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from __future__ import absolute_import
1616

1717
import collections
18-
import itertools
1918
import json
2019

2120
try:
@@ -38,6 +37,7 @@
3837
google.api_core.exceptions.ServiceUnavailable,
3938
)
4039
_FASTAVRO_REQUIRED = "fastavro is required to parse Avro blocks"
40+
_PANDAS_REQUIRED = "pandas is required to create a DataFrame"
4141

4242

4343
class ReadRowsStream(object):
@@ -156,9 +156,7 @@ def rows(self, read_session):
156156
if fastavro is None:
157157
raise ImportError(_FASTAVRO_REQUIRED)
158158

159-
avro_schema, _ = _avro_schema(read_session)
160-
blocks = (_avro_rows(block, avro_schema) for block in self)
161-
return itertools.chain.from_iterable(blocks)
159+
return ReadRowsIterable(self, read_session)
162160

163161
def to_dataframe(self, read_session, dtypes=None):
164162
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
@@ -192,29 +190,186 @@ def to_dataframe(self, read_session, dtypes=None):
192190
if fastavro is None:
193191
raise ImportError(_FASTAVRO_REQUIRED)
194192
if pandas is None:
195-
raise ImportError("pandas is required to create a DataFrame")
193+
raise ImportError(_PANDAS_REQUIRED)
196194

197-
if dtypes is None:
198-
dtypes = {}
195+
return self.rows(read_session).to_dataframe(dtypes=dtypes)
196+
197+
198+
class ReadRowsIterable(object):
199+
"""An iterable of rows from a read session.
200+
201+
Args:
202+
reader (google.cloud.bigquery_storage_v1beta1.reader.ReadRowsStream):
203+
A read rows stream.
204+
read_session (google.cloud.bigquery_storage_v1beta1.types.ReadSession):
205+
A read session. This is required because it contains the schema
206+
used in the stream blocks.
207+
"""
208+
209+
# This class is modelled after the google.cloud.bigquery.table.RowIterator
210+
# and aims to be API compatible where possible.
211+
212+
def __init__(self, reader, read_session):
213+
self._status = None
214+
self._reader = reader
215+
self._read_session = read_session
216+
217+
@property
218+
def total_rows(self):
219+
"""int: Number of estimated rows in the current stream.
220+
221+
May change over time.
222+
"""
223+
return getattr(self._status, "estimated_row_count", None)
224+
225+
@property
226+
def pages(self):
227+
"""A generator of all pages in the stream.
228+
229+
Returns:
230+
types.GeneratorType[google.cloud.bigquery_storage_v1beta1.ReadRowsPage]:
231+
A generator of pages.
232+
"""
233+
# Each page is an iterator of rows. But also has num_items, remaining,
234+
# and to_dataframe.
235+
avro_schema, column_names = _avro_schema(self._read_session)
236+
for block in self._reader:
237+
self._status = block.status
238+
yield ReadRowsPage(avro_schema, column_names, block)
239+
240+
def __iter__(self):
241+
"""Iterator for each row in all pages."""
242+
for page in self.pages:
243+
for row in page:
244+
yield row
245+
246+
def to_dataframe(self, dtypes=None):
247+
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
248+
249+
This method requires the pandas libary to create a data frame and the
250+
fastavro library to parse row blocks.
251+
252+
.. warning::
253+
DATETIME columns are not supported. They are currently parsed as
254+
strings in the fastavro library.
255+
256+
Args:
257+
dtypes ( \
258+
Map[str, Union[str, pandas.Series.dtype]] \
259+
):
260+
Optional. A dictionary of column names pandas ``dtype``s. The
261+
provided ``dtype`` is used when constructing the series for
262+
the column specified. Otherwise, the default pandas behavior
263+
is used.
264+
265+
Returns:
266+
pandas.DataFrame:
267+
A data frame of all rows in the stream.
268+
"""
269+
if pandas is None:
270+
raise ImportError(_PANDAS_REQUIRED)
199271

200-
avro_schema, column_names = _avro_schema(read_session)
201272
frames = []
202-
for block in self:
203-
dataframe = _to_dataframe_with_dtypes(
204-
_avro_rows(block, avro_schema), column_names, dtypes
205-
)
206-
frames.append(dataframe)
273+
for page in self.pages:
274+
frames.append(page.to_dataframe(dtypes=dtypes))
207275
return pandas.concat(frames)
208276

209277

210-
def _to_dataframe_with_dtypes(rows, column_names, dtypes):
211-
columns = collections.defaultdict(list)
212-
for row in rows:
213-
for column in row:
214-
columns[column].append(row[column])
215-
for column in dtypes:
216-
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
217-
return pandas.DataFrame(columns, columns=column_names)
278+
class ReadRowsPage(object):
279+
"""An iterator of rows from a read session block.
280+
281+
Args:
282+
avro_schema (fastavro.schema):
283+
A parsed Avro schema, using :func:`fastavro.schema.parse_schema`
284+
column_names (Tuple[str]]):
285+
A read session's column names (in requested order).
286+
block (google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse):
287+
A block of data from a read rows stream.
288+
"""
289+
290+
# This class is modeled after google.api_core.page_iterator.Page and aims
291+
# to provide API compatibility where possible.
292+
293+
def __init__(self, avro_schema, column_names, block):
294+
self._avro_schema = avro_schema
295+
self._column_names = column_names
296+
self._block = block
297+
self._iter_rows = None
298+
self._num_items = None
299+
self._remaining = None
300+
301+
def _parse_block(self):
302+
"""Parse metadata and rows from the block only once."""
303+
if self._iter_rows is not None:
304+
return
305+
306+
rows = _avro_rows(self._block, self._avro_schema)
307+
self._num_items = self._block.avro_rows.row_count
308+
self._remaining = self._block.avro_rows.row_count
309+
self._iter_rows = iter(rows)
310+
311+
@property
312+
def num_items(self):
313+
"""int: Total items in the page."""
314+
self._parse_block()
315+
return self._num_items
316+
317+
@property
318+
def remaining(self):
319+
"""int: Remaining items in the page."""
320+
self._parse_block()
321+
return self._remaining
322+
323+
def __iter__(self):
324+
"""A ``ReadRowsPage`` is an iterator."""
325+
return self
326+
327+
def next(self):
328+
"""Get the next row in the page."""
329+
self._parse_block()
330+
if self._remaining > 0:
331+
self._remaining -= 1
332+
return six.next(self._iter_rows)
333+
334+
# Alias needed for Python 2/3 support.
335+
__next__ = next
336+
337+
def to_dataframe(self, dtypes=None):
338+
"""Create a :class:`pandas.DataFrame` of rows in the page.
339+
340+
This method requires the pandas libary to create a data frame and the
341+
fastavro library to parse row blocks.
342+
343+
.. warning::
344+
DATETIME columns are not supported. They are currently parsed as
345+
strings in the fastavro library.
346+
347+
Args:
348+
dtypes ( \
349+
Map[str, Union[str, pandas.Series.dtype]] \
350+
):
351+
Optional. A dictionary of column names pandas ``dtype``s. The
352+
provided ``dtype`` is used when constructing the series for
353+
the column specified. Otherwise, the default pandas behavior
354+
is used.
355+
356+
Returns:
357+
pandas.DataFrame:
358+
A data frame of all rows in the stream.
359+
"""
360+
if pandas is None:
361+
raise ImportError(_PANDAS_REQUIRED)
362+
363+
if dtypes is None:
364+
dtypes = {}
365+
366+
columns = collections.defaultdict(list)
367+
for row in self:
368+
for column in row:
369+
columns[column].append(row[column])
370+
for column in dtypes:
371+
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
372+
return pandas.DataFrame(columns, columns=self._column_names)
218373

219374

220375
def _avro_schema(read_session):
@@ -242,12 +397,13 @@ def _avro_rows(block, avro_schema):
242397
"""Parse all rows in a stream block.
243398
244399
Args:
245-
read_session ( \
246-
~google.cloud.bigquery_storage_v1beta1.types.ReadSession \
400+
block ( \
401+
~google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse \
247402
):
248-
The read session associated with this read rows stream. This
249-
contains the schema, which is required to parse the data
250-
blocks.
403+
A block containing Avro bytes to parse into rows.
404+
avro_schema (fastavro.schema):
405+
A parsed Avro schema, used to deserialized the bytes in the
406+
block.
251407
252408
Returns:
253409
Iterable[Mapping]:

0 commit comments

Comments
 (0)