Skip to content

Commit df5500e

Browse files
committed
Add page iterator to ReadRowsStream
This allows readers to read blocks (called pages for compatibility with BigQuery client library) one at a time from a stream. This enables use cases such as progress bar support and streaming workers that expect pandas DataFrames.
1 parent 958935e commit df5500e

File tree

2 files changed

+332
-32
lines changed

2 files changed

+332
-32
lines changed

bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py

Lines changed: 182 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
google.api_core.exceptions.ServiceUnavailable,
3939
)
4040
_FASTAVRO_REQUIRED = "fastavro is required to parse Avro blocks"
41+
_PANDAS_REQUIRED = "pandas is required to create a DataFrame"
4142

4243

4344
class ReadRowsStream(object):
@@ -156,9 +157,7 @@ def rows(self, read_session):
156157
if fastavro is None:
157158
raise ImportError(_FASTAVRO_REQUIRED)
158159

159-
avro_schema, _ = _avro_schema(read_session)
160-
blocks = (_avro_rows(block, avro_schema) for block in self)
161-
return itertools.chain.from_iterable(blocks)
160+
return ReadRowsIterable(self, read_session)
162161

163162
def to_dataframe(self, read_session, dtypes=None):
164163
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
@@ -192,29 +191,186 @@ def to_dataframe(self, read_session, dtypes=None):
192191
if fastavro is None:
193192
raise ImportError(_FASTAVRO_REQUIRED)
194193
if pandas is None:
195-
raise ImportError("pandas is required to create a DataFrame")
194+
raise ImportError(_PANDAS_REQUIRED)
196195

197-
if dtypes is None:
198-
dtypes = {}
196+
return self.rows(read_session).to_dataframe(dtypes=dtypes)
197+
198+
199+
class ReadRowsIterable(object):
200+
"""An iterable of rows from a read session.
201+
202+
Args:
203+
reader (google.cloud.bigquery_storage_v1beta1.reader.ReadRowsStream):
204+
A read rows stream.
205+
read_session (google.cloud.bigquery_storage_v1beta1.types.ReadSession):
206+
A read session. This is required because it contains the schema
207+
used in the stream blocks.
208+
"""
209+
210+
# This class is modelled after the google.cloud.bigquery.table.RowIterator
211+
# and aims to be API compatible where possible.
212+
213+
def __init__(self, reader, read_session):
214+
self._status = None
215+
self._reader = reader
216+
self._read_session = read_session
217+
218+
@property
219+
def total_rows(self):
220+
"""int: Number of estimated rows in the current stream.
221+
222+
May change over time.
223+
"""
224+
return getattr(self._status, "estimated_row_count", None)
225+
226+
@property
227+
def pages(self):
228+
"""A generator of all pages in the stream.
229+
230+
Returns:
231+
types.GeneratorType[google.cloud.bigquery_storage_v1beta1.ReadRowsPage]:
232+
A generator of pages.
233+
"""
234+
# Each page is an iterator of rows. But also has num_items, remaining,
235+
# and to_dataframe.
236+
avro_schema, column_names = _avro_schema(self._read_session)
237+
for block in self._reader:
238+
self._status = block.status
239+
yield ReadRowsPage(avro_schema, column_names, block)
240+
241+
def __iter__(self):
242+
"""Iterator for each row in all pages."""
243+
for page in self.pages:
244+
for row in page:
245+
yield row
246+
247+
def to_dataframe(self, dtypes=None):
248+
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
249+
250+
This method requires the pandas libary to create a data frame and the
251+
fastavro library to parse row blocks.
252+
253+
.. warning::
254+
DATETIME columns are not supported. They are currently parsed as
255+
strings in the fastavro library.
256+
257+
Args:
258+
dtypes ( \
259+
Map[str, Union[str, pandas.Series.dtype]] \
260+
):
261+
Optional. A dictionary of column names pandas ``dtype``s. The
262+
provided ``dtype`` is used when constructing the series for
263+
the column specified. Otherwise, the default pandas behavior
264+
is used.
265+
266+
Returns:
267+
pandas.DataFrame:
268+
A data frame of all rows in the stream.
269+
"""
270+
if pandas is None:
271+
raise ImportError(_PANDAS_REQUIRED)
199272

200-
avro_schema, column_names = _avro_schema(read_session)
201273
frames = []
202-
for block in self:
203-
dataframe = _to_dataframe_with_dtypes(
204-
_avro_rows(block, avro_schema), column_names, dtypes
205-
)
206-
frames.append(dataframe)
274+
for page in self.pages:
275+
frames.append(page.to_dataframe(dtypes=dtypes))
207276
return pandas.concat(frames)
208277

209278

210-
def _to_dataframe_with_dtypes(rows, column_names, dtypes):
211-
columns = collections.defaultdict(list)
212-
for row in rows:
213-
for column in row:
214-
columns[column].append(row[column])
215-
for column in dtypes:
216-
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
217-
return pandas.DataFrame(columns, columns=column_names)
279+
class ReadRowsPage(object):
280+
"""An iterator of rows from a read session block.
281+
282+
Args:
283+
avro_schema (fastavro.schema):
284+
A parsed Avro schema, using :func:`fastavro.schema.parse_schema`
285+
column_names (Tuple[str]]):
286+
A read session's column names (in requested order).
287+
block (google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse):
288+
A block of data from a read rows stream.
289+
"""
290+
291+
# This class is modeled after google.api_core.page_iterator.Page and aims
292+
# to provide API compatibility where possible.
293+
294+
def __init__(self, avro_schema, column_names, block):
295+
self._avro_schema = avro_schema
296+
self._column_names = column_names
297+
self._block = block
298+
self._iter_rows = None
299+
self._num_items = None
300+
self._remaining = None
301+
302+
def _parse_block(self):
303+
"""Parse metadata and rows from the block only once."""
304+
if self._iter_rows is not None:
305+
return
306+
307+
rows = _avro_rows(self._block, self._avro_schema)
308+
self._num_items = self._block.avro_rows.row_count
309+
self._remaining = self._block.avro_rows.row_count
310+
self._iter_rows = iter(rows)
311+
312+
@property
313+
def num_items(self):
314+
"""int: Total items in the page."""
315+
self._parse_block()
316+
return self._num_items
317+
318+
@property
319+
def remaining(self):
320+
"""int: Remaining items in the page."""
321+
self._parse_block()
322+
return self._remaining
323+
324+
def __iter__(self):
325+
"""A ``ReadRowsPage`` is an iterator."""
326+
return self
327+
328+
def next(self):
329+
"""Get the next row in the page."""
330+
self._parse_block()
331+
if self._remaining > 0:
332+
self._remaining -= 1
333+
return six.next(self._iter_rows)
334+
335+
# Alias needed for Python 2/3 support.
336+
__next__ = next
337+
338+
def to_dataframe(self, dtypes=None):
339+
"""Create a :class:`pandas.DataFrame` of rows in the page.
340+
341+
This method requires the pandas libary to create a data frame and the
342+
fastavro library to parse row blocks.
343+
344+
.. warning::
345+
DATETIME columns are not supported. They are currently parsed as
346+
strings in the fastavro library.
347+
348+
Args:
349+
dtypes ( \
350+
Map[str, Union[str, pandas.Series.dtype]] \
351+
):
352+
Optional. A dictionary of column names pandas ``dtype``s. The
353+
provided ``dtype`` is used when constructing the series for
354+
the column specified. Otherwise, the default pandas behavior
355+
is used.
356+
357+
Returns:
358+
pandas.DataFrame:
359+
A data frame of all rows in the stream.
360+
"""
361+
if pandas is None:
362+
raise ImportError(_PANDAS_REQUIRED)
363+
364+
if dtypes is None:
365+
dtypes = {}
366+
367+
columns = collections.defaultdict(list)
368+
for row in self:
369+
for column in row:
370+
columns[column].append(row[column])
371+
for column in dtypes:
372+
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
373+
return pandas.DataFrame(columns, columns=self._column_names)
218374

219375

220376
def _avro_schema(read_session):
@@ -242,12 +398,13 @@ def _avro_rows(block, avro_schema):
242398
"""Parse all rows in a stream block.
243399
244400
Args:
245-
read_session ( \
246-
~google.cloud.bigquery_storage_v1beta1.types.ReadSession \
401+
block ( \
402+
~google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse \
247403
):
248-
The read session associated with this read rows stream. This
249-
contains the schema, which is required to parse the data
250-
blocks.
404+
A block containing Avro bytes to parse into rows.
405+
avro_schema (fastavro.schema):
406+
A parsed Avro schema, used to deserialized the bytes in the
407+
block.
251408
252409
Returns:
253410
Iterable[Mapping]:

0 commit comments

Comments
 (0)