Skip to content

Commit 5a0e549

Browse files
calpeysertseaver
authored andcommitted
Add RPC retries to Bigtable (googleapis#3811)
1 parent 23f75db commit 5a0e549

9 files changed

Lines changed: 687 additions & 88 deletions

File tree

.circleci/config.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ jobs:
6262
name: Run tests - google.cloud.bigtable
6363
command: |
6464
if [[ -n $(grep bigtable ~/target_packages) ]]; then
65+
test_utils/scripts/circleci/prepare_bigtable.sh
66+
export DOWNLOAD_BIGTABLE_TEST_SERVER=0
6567
nox -f bigtable/nox.py
6668
fi
6769
- run:
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
# Copyright 2017 Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Provides function wrappers that implement retrying."""
16+
17+
import random
18+
import time
19+
import six
20+
import sys
21+
22+
from google.cloud._helpers import _to_bytes
23+
from google.cloud.bigtable._generated import (
24+
bigtable_pb2 as data_messages_v2_pb2)
25+
from google.gax import config, errors
26+
from grpc import RpcError
27+
28+
29+
_MILLIS_PER_SECOND = 1000
30+
31+
32+
class ReadRowsIterator(object):
33+
"""Creates an iterator equivalent to a_iter, but that retries on certain
34+
exceptions.
35+
"""
36+
37+
def __init__(self, client, name, start_key, end_key, filter_, limit,
38+
end_inclusive, retry_options, **kwargs):
39+
self.client = client
40+
self.retry_options = retry_options
41+
self.name = name
42+
self.start_key = start_key
43+
self.start_key_closed = True
44+
self.end_key = end_key
45+
self.filter_ = filter_
46+
self.limit = limit
47+
self.end_inclusive = end_inclusive
48+
self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier
49+
self.max_delay_millis = \
50+
retry_options.backoff_settings.max_retry_delay_millis
51+
self.timeout_mult = \
52+
retry_options.backoff_settings.rpc_timeout_multiplier
53+
self.max_timeout = \
54+
(retry_options.backoff_settings.max_rpc_timeout_millis /
55+
_MILLIS_PER_SECOND)
56+
self.total_timeout = \
57+
(retry_options.backoff_settings.total_timeout_millis /
58+
_MILLIS_PER_SECOND)
59+
self._responses_for_row = 0
60+
self.set_stream()
61+
62+
def set_start_key(self, start_key):
63+
"""
64+
Sets the row key at which this iterator will begin reading.
65+
"""
66+
self.start_key = start_key
67+
self.start_key_closed = False
68+
69+
def set_stream(self):
70+
"""
71+
Resets the read stream by making an RPC on the 'ReadRows' endpoint.
72+
"""
73+
req_pb = _create_row_request(self.name, start_key=self.start_key,
74+
start_key_closed=self.start_key_closed,
75+
end_key=self.end_key,
76+
filter_=self.filter_, limit=self.limit,
77+
end_inclusive=self.end_inclusive)
78+
self.stream = self.client._data_stub.ReadRows(req_pb)
79+
80+
@property
81+
def responses_for_row(self):
82+
""" Property that gives the number of calls made so far for the current
83+
row. If 1, then either this row is being read for the first time,
84+
or the most recent response required a retry, causing the row to be
85+
read again
86+
87+
:rtype: int
88+
:returns: Int that gives the number of calls made so far for the
89+
current row.
90+
"""
91+
return self._responses_for_row
92+
93+
def clear_responses_for_row(self):
94+
"""
95+
Signals that a new row has been started.
96+
"""
97+
self._responses_for_row = 0
98+
99+
def next(self, *args, **kwargs):
100+
"""
101+
Read and return the next chunk from the stream.
102+
Retry on idempotent failure.
103+
"""
104+
delay = self.retry_options.backoff_settings.initial_retry_delay_millis
105+
exc = errors.RetryError('Retry total timeout exceeded before any'
106+
'response was received')
107+
108+
now = time.time()
109+
deadline = now + self.total_timeout
110+
while deadline is None or now < deadline:
111+
self._responses_for_row += 1
112+
try:
113+
return(six.next(self.stream))
114+
except StopIteration as stop:
115+
raise stop
116+
except RpcError as error: # pylint: disable=broad-except
117+
code = config.exc_to_code(error)
118+
if code not in self.retry_options.retry_codes:
119+
six.reraise(type(error), error)
120+
121+
# pylint: disable=redefined-variable-type
122+
exc = errors.RetryError(
123+
'Retry total timeout exceeded with exception', error)
124+
125+
# Sleep a random number which will, on average, equal the
126+
# expected delay.
127+
to_sleep = random.uniform(0, delay * 2)
128+
time.sleep(to_sleep / _MILLIS_PER_SECOND)
129+
delay = min(delay * self.delay_mult, self.max_delay_millis)
130+
now = time.time()
131+
self._responses_for_row = 0
132+
self.set_stream()
133+
134+
six.reraise(errors.RetryError, exc, sys.exc_info()[2])
135+
136+
def __next__(self, *args, **kwargs):
137+
return self.next(*args, **kwargs)
138+
139+
140+
def _create_row_request(table_name, row_key=None, start_key=None,
141+
start_key_closed=True, end_key=None, filter_=None,
142+
limit=None, end_inclusive=False):
143+
"""Creates a request to read rows in a table.
144+
145+
:type table_name: str
146+
:param table_name: The name of the table to read from.
147+
148+
:type row_key: bytes
149+
:param row_key: (Optional) The key of a specific row to read from.
150+
151+
:type start_key: bytes
152+
:param start_key: (Optional) The beginning of a range of row keys to
153+
read from. The range will include ``start_key``. If
154+
left empty, will be interpreted as the empty string.
155+
156+
:type end_key: bytes
157+
:param end_key: (Optional) The end of a range of row keys to read from.
158+
The range will not include ``end_key``. If left empty,
159+
will be interpreted as an infinite string.
160+
161+
:type filter_: :class:`.RowFilter`
162+
:param filter_: (Optional) The filter to apply to the contents of the
163+
specified row(s). If unset, reads the entire table.
164+
165+
:type limit: int
166+
:param limit: (Optional) The read will terminate after committing to N
167+
rows' worth of results. The default (zero) is to return
168+
all results.
169+
170+
:rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
171+
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
172+
:raises: :class:`ValueError <exceptions.ValueError>` if both
173+
``row_key`` and one of ``start_key`` and ``end_key`` are set
174+
"""
175+
request_kwargs = {'table_name': table_name}
176+
if (row_key is not None and
177+
(start_key is not None or end_key is not None)):
178+
raise ValueError('Row key and row range cannot be '
179+
'set simultaneously')
180+
range_kwargs = {}
181+
if start_key is not None or end_key is not None:
182+
if start_key is not None:
183+
if start_key_closed:
184+
range_kwargs['start_key_closed'] = _to_bytes(start_key)
185+
else:
186+
range_kwargs['start_key_open'] = _to_bytes(start_key)
187+
if end_key is not None:
188+
end_key_key = 'end_key_open'
189+
if end_inclusive:
190+
end_key_key = 'end_key_closed'
191+
range_kwargs[end_key_key] = _to_bytes(end_key)
192+
if filter_ is not None:
193+
request_kwargs['filter'] = filter_.to_pb()
194+
if limit is not None:
195+
request_kwargs['rows_limit'] = limit
196+
197+
message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs)
198+
199+
if row_key is not None:
200+
message.rows.row_keys.append(_to_bytes(row_key))
201+
202+
if range_kwargs:
203+
message.rows.row_ranges.add(**range_kwargs)
204+
205+
return message

bigtable/google/cloud/bigtable/row_data.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,10 @@ def consume_next(self):
267267

268268
self._last_scanned_row_key = response.last_scanned_row_key
269269

270+
if hasattr(self._response_iterator, 'responses_for_row'):
271+
if (self._response_iterator.responses_for_row == 1):
272+
self._clear_accumulated_row()
273+
270274
row = self._row
271275
cell = self._cell
272276

@@ -300,6 +304,10 @@ def consume_next(self):
300304

301305
if chunk.commit_row:
302306
self._save_current_row()
307+
if hasattr(self._response_iterator, 'set_start_key'):
308+
self._response_iterator.set_start_key(chunk.row_key)
309+
if hasattr(self._response_iterator, 'clear_responses_for_row'):
310+
self._response_iterator.clear_responses_for_row()
303311
row = cell = None
304312
continue
305313

@@ -345,6 +353,11 @@ def _validate_chunk_status(chunk):
345353
# No negative value_size (inferred as a general constraint).
346354
_raise_if(chunk.value_size < 0)
347355

356+
def _clear_accumulated_row(self):
357+
self._row = None
358+
self._cell = None
359+
self._previous_cell = None
360+
348361
def _validate_chunk_new_row(self, chunk):
349362
"""Helper for :meth:`_validate_chunk`."""
350363
assert self.state == self.NEW_ROW

bigtable/google/cloud/bigtable/table.py

Lines changed: 31 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from google.api_core.exceptions import RetryError
1919
from google.api_core.retry import if_exception_type
2020
from google.api_core.retry import Retry
21-
from google.cloud._helpers import _to_bytes
2221
from google.cloud.bigtable._generated import (
2322
bigtable_pb2 as data_messages_v2_pb2)
2423
from google.cloud.bigtable._generated import (
@@ -31,8 +30,27 @@
3130
from google.cloud.bigtable.row import ConditionalRow
3231
from google.cloud.bigtable.row import DirectRow
3332
from google.cloud.bigtable.row_data import PartialRowsData
33+
from google.gax import RetryOptions, BackoffSettings
34+
from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request
3435
from grpc import StatusCode
3536

37+
BACKOFF_SETTINGS = BackoffSettings(
38+
initial_retry_delay_millis=10,
39+
retry_delay_multiplier=1.3,
40+
max_retry_delay_millis=30000,
41+
initial_rpc_timeout_millis=25 * 60 * 1000,
42+
rpc_timeout_multiplier=1.0,
43+
max_rpc_timeout_millis=25 * 60 * 1000,
44+
total_timeout_millis=30 * 60 * 1000
45+
)
46+
47+
RETRY_CODES = [
48+
StatusCode.DEADLINE_EXCEEDED,
49+
StatusCode.ABORTED,
50+
StatusCode.INTERNAL,
51+
StatusCode.UNAVAILABLE
52+
]
53+
3654

3755
# Maximum number of mutations in bulk (MutateRowsRequest message):
3856
# (https://cloud.google.com/bigtable/docs/reference/data/rpc/
@@ -277,7 +295,7 @@ def read_row(self, row_key, filter_=None):
277295
return rows_data.rows[row_key]
278296

279297
def read_rows(self, start_key=None, end_key=None, limit=None,
280-
filter_=None, end_inclusive=False):
298+
filter_=None, end_inclusive=False, backoff_settings=None):
281299
"""Read rows from this table.
282300
283301
:type start_key: bytes
@@ -308,13 +326,18 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
308326
:returns: A :class:`.PartialRowsData` convenience wrapper for consuming
309327
the streamed results.
310328
"""
311-
request_pb = _create_row_request(
312-
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
313-
limit=limit, end_inclusive=end_inclusive)
314329
client = self._instance._client
315-
response_iterator = client._data_stub.ReadRows(request_pb)
316-
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
317-
return PartialRowsData(response_iterator)
330+
if backoff_settings is None:
331+
backoff_settings = BACKOFF_SETTINGS
332+
RETRY_OPTIONS = RetryOptions(
333+
retry_codes=RETRY_CODES,
334+
backoff_settings=backoff_settings
335+
)
336+
337+
retrying_iterator = ReadRowsIterator(client, self.name, start_key,
338+
end_key, filter_, limit,
339+
end_inclusive, RETRY_OPTIONS)
340+
return PartialRowsData(retrying_iterator)
318341

319342
def mutate_rows(self, rows, retry=DEFAULT_RETRY):
320343
"""Mutates multiple rows in bulk.
@@ -495,74 +518,6 @@ def _do_mutate_retryable_rows(self):
495518
return self.responses_statuses
496519

497520

498-
def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
499-
filter_=None, limit=None, end_inclusive=False):
500-
"""Creates a request to read rows in a table.
501-
502-
:type table_name: str
503-
:param table_name: The name of the table to read from.
504-
505-
:type row_key: bytes
506-
:param row_key: (Optional) The key of a specific row to read from.
507-
508-
:type start_key: bytes
509-
:param start_key: (Optional) The beginning of a range of row keys to
510-
read from. The range will include ``start_key``. If
511-
left empty, will be interpreted as the empty string.
512-
513-
:type end_key: bytes
514-
:param end_key: (Optional) The end of a range of row keys to read from.
515-
The range will not include ``end_key``. If left empty,
516-
will be interpreted as an infinite string.
517-
518-
:type filter_: :class:`.RowFilter`
519-
:param filter_: (Optional) The filter to apply to the contents of the
520-
specified row(s). If unset, reads the entire table.
521-
522-
:type limit: int
523-
:param limit: (Optional) The read will terminate after committing to N
524-
rows' worth of results. The default (zero) is to return
525-
all results.
526-
527-
:type end_inclusive: bool
528-
:param end_inclusive: (Optional) Whether the ``end_key`` should be
529-
considered inclusive. The default is False (exclusive).
530-
531-
:rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
532-
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
533-
:raises: :class:`ValueError <exceptions.ValueError>` if both
534-
``row_key`` and one of ``start_key`` and ``end_key`` are set
535-
"""
536-
request_kwargs = {'table_name': table_name}
537-
if (row_key is not None and
538-
(start_key is not None or end_key is not None)):
539-
raise ValueError('Row key and row range cannot be '
540-
'set simultaneously')
541-
range_kwargs = {}
542-
if start_key is not None or end_key is not None:
543-
if start_key is not None:
544-
range_kwargs['start_key_closed'] = _to_bytes(start_key)
545-
if end_key is not None:
546-
end_key_key = 'end_key_open'
547-
if end_inclusive:
548-
end_key_key = 'end_key_closed'
549-
range_kwargs[end_key_key] = _to_bytes(end_key)
550-
if filter_ is not None:
551-
request_kwargs['filter'] = filter_.to_pb()
552-
if limit is not None:
553-
request_kwargs['rows_limit'] = limit
554-
555-
message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs)
556-
557-
if row_key is not None:
558-
message.rows.row_keys.append(_to_bytes(row_key))
559-
560-
if range_kwargs:
561-
message.rows.row_ranges.add(**range_kwargs)
562-
563-
return message
564-
565-
566521
def _mutate_rows_request(table_name, rows):
567522
"""Creates a request to mutate rows in a table.
568523

0 commit comments

Comments
 (0)