Skip to content

Commit c8521b8

Browse files
authored
bigquery: add streaming buffer info (#4161)
Unfortunately there's no good way to write a system test for this, since you can never be sure that one gets created. But I informally verified that the code works by running create_rows a lot until I got a streaming buffer.
1 parent a95f1cc commit c8521b8

2 files changed

Lines changed: 39 additions & 0 deletions

File tree

bigquery/google/cloud/bigquery/table.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,12 @@ def view_use_legacy_sql(self, value):
524524
self._properties['view'] = {}
525525
self._properties['view']['useLegacySql'] = value
526526

527+
@property
528+
def streaming_buffer(self):
529+
sb = self._properties.get('streamingBuffer')
530+
if sb is not None:
531+
return StreamingBuffer(sb)
532+
527533
@classmethod
528534
def from_api_repr(cls, resource):
529535
"""Factory: construct a table given its API representation
@@ -658,6 +664,23 @@ def _row_from_mapping(mapping, schema):
658664
return tuple(row)
659665

660666

667+
class StreamingBuffer(object):
668+
"""Information about a table's streaming buffer.
669+
670+
See https://cloud.google.com/bigquery/streaming-data-into-bigquery.
671+
672+
:type resource: dict
673+
:param resource: streaming buffer representation returned from the API
674+
"""
675+
676+
def __init__(self, resource):
677+
self.estimated_bytes = int(resource['estimatedBytes'])
678+
self.estimated_rows = int(resource['estimatedRows'])
679+
# time is in milliseconds since the epoch.
680+
self.oldest_entry_time = _datetime_from_microseconds(
681+
1000.0 * int(resource['oldestEntryTime']))
682+
683+
661684
def _parse_schema_resource(info):
662685
"""Parse a resource fragment into a schema field.
663686

bigquery/tests/unit/test_table.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ def _setUpConstants(self):
174174
self.RESOURCE_URL = 'http://example.com/path/to/resource'
175175
self.NUM_BYTES = 12345
176176
self.NUM_ROWS = 67
177+
self.NUM_EST_BYTES = 1234
178+
self.NUM_EST_ROWS = 23
177179

178180
def _makeResource(self):
179181
self._setUpConstants()
@@ -194,6 +196,10 @@ def _makeResource(self):
194196
'numRows': self.NUM_ROWS,
195197
'numBytes': self.NUM_BYTES,
196198
'type': 'TABLE',
199+
'streamingBuffer': {
200+
'estimatedRows': str(self.NUM_EST_ROWS),
201+
'estimatedBytes': str(self.NUM_EST_BYTES),
202+
'oldestEntryTime': self.WHEN_TS * 1000},
197203
}
198204

199205
def _verifyReadonlyResourceProperties(self, table, resource):
@@ -222,6 +228,16 @@ def _verifyReadonlyResourceProperties(self, table, resource):
222228
else:
223229
self.assertIsNone(table.self_link)
224230

231+
if 'streamingBuffer' in resource:
232+
self.assertEqual(table.streaming_buffer.estimated_rows,
233+
self.NUM_EST_ROWS)
234+
self.assertEqual(table.streaming_buffer.estimated_bytes,
235+
self.NUM_EST_BYTES)
236+
self.assertEqual(table.streaming_buffer.oldest_entry_time,
237+
self.WHEN)
238+
else:
239+
self.assertIsNone(table.streaming_buffer)
240+
225241
self.assertEqual(table.full_table_id, self.TABLE_FULL_ID)
226242
self.assertEqual(table.table_type,
227243
'TABLE' if 'view' not in resource else 'VIEW')

0 commit comments

Comments
 (0)