Skip to content

Commit dfe8a41

Browse files
authored
Revert "Add RPC retries to Bigtable (googleapis#3811)" (googleapis#4524)
This reverts commit 5a0e549 / PR googleapis#3811.
1 parent 7bc2860 commit dfe8a41

File tree

9 files changed

+88
-687
lines changed

9 files changed

+88
-687
lines changed

.circleci/config.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ jobs:
6262
name: Run tests - google.cloud.bigtable
6363
command: |
6464
if [[ -n $(grep bigtable ~/target_packages) ]]; then
65-
test_utils/scripts/circleci/prepare_bigtable.sh
66-
export DOWNLOAD_BIGTABLE_TEST_SERVER=0
6765
nox -f bigtable/nox.py
6866
fi
6967
- run:

bigtable/google/cloud/bigtable/retry.py

Lines changed: 0 additions & 205 deletions
This file was deleted.

bigtable/google/cloud/bigtable/row_data.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -267,10 +267,6 @@ def consume_next(self):
267267

268268
self._last_scanned_row_key = response.last_scanned_row_key
269269

270-
if hasattr(self._response_iterator, 'responses_for_row'):
271-
if (self._response_iterator.responses_for_row == 1):
272-
self._clear_accumulated_row()
273-
274270
row = self._row
275271
cell = self._cell
276272

@@ -304,10 +300,6 @@ def consume_next(self):
304300

305301
if chunk.commit_row:
306302
self._save_current_row()
307-
if hasattr(self._response_iterator, 'set_start_key'):
308-
self._response_iterator.set_start_key(chunk.row_key)
309-
if hasattr(self._response_iterator, 'clear_responses_for_row'):
310-
self._response_iterator.clear_responses_for_row()
311303
row = cell = None
312304
continue
313305

@@ -353,11 +345,6 @@ def _validate_chunk_status(chunk):
353345
# No negative value_size (inferred as a general constraint).
354346
_raise_if(chunk.value_size < 0)
355347

356-
def _clear_accumulated_row(self):
357-
self._row = None
358-
self._cell = None
359-
self._previous_cell = None
360-
361348
def _validate_chunk_new_row(self, chunk):
362349
"""Helper for :meth:`_validate_chunk`."""
363350
assert self.state == self.NEW_ROW

bigtable/google/cloud/bigtable/table.py

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from google.api_core.exceptions import RetryError
1919
from google.api_core.retry import if_exception_type
2020
from google.api_core.retry import Retry
21+
from google.cloud._helpers import _to_bytes
2122
from google.cloud.bigtable._generated import (
2223
bigtable_pb2 as data_messages_v2_pb2)
2324
from google.cloud.bigtable._generated import (
@@ -30,27 +31,8 @@
3031
from google.cloud.bigtable.row import ConditionalRow
3132
from google.cloud.bigtable.row import DirectRow
3233
from google.cloud.bigtable.row_data import PartialRowsData
33-
from google.gax import RetryOptions, BackoffSettings
34-
from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request
3534
from grpc import StatusCode
3635

37-
BACKOFF_SETTINGS = BackoffSettings(
38-
initial_retry_delay_millis=10,
39-
retry_delay_multiplier=1.3,
40-
max_retry_delay_millis=30000,
41-
initial_rpc_timeout_millis=25 * 60 * 1000,
42-
rpc_timeout_multiplier=1.0,
43-
max_rpc_timeout_millis=25 * 60 * 1000,
44-
total_timeout_millis=30 * 60 * 1000
45-
)
46-
47-
RETRY_CODES = [
48-
StatusCode.DEADLINE_EXCEEDED,
49-
StatusCode.ABORTED,
50-
StatusCode.INTERNAL,
51-
StatusCode.UNAVAILABLE
52-
]
53-
5436

5537
# Maximum number of mutations in bulk (MutateRowsRequest message):
5638
# (https://cloud.google.com/bigtable/docs/reference/data/rpc/
@@ -295,7 +277,7 @@ def read_row(self, row_key, filter_=None):
295277
return rows_data.rows[row_key]
296278

297279
def read_rows(self, start_key=None, end_key=None, limit=None,
298-
filter_=None, end_inclusive=False, backoff_settings=None):
280+
filter_=None, end_inclusive=False):
299281
"""Read rows from this table.
300282
301283
:type start_key: bytes
@@ -326,18 +308,13 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
326308
:returns: A :class:`.PartialRowsData` convenience wrapper for consuming
327309
the streamed results.
328310
"""
311+
request_pb = _create_row_request(
312+
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
313+
limit=limit, end_inclusive=end_inclusive)
329314
client = self._instance._client
330-
if backoff_settings is None:
331-
backoff_settings = BACKOFF_SETTINGS
332-
RETRY_OPTIONS = RetryOptions(
333-
retry_codes=RETRY_CODES,
334-
backoff_settings=backoff_settings
335-
)
336-
337-
retrying_iterator = ReadRowsIterator(client, self.name, start_key,
338-
end_key, filter_, limit,
339-
end_inclusive, RETRY_OPTIONS)
340-
return PartialRowsData(retrying_iterator)
315+
response_iterator = client._data_stub.ReadRows(request_pb)
316+
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
317+
return PartialRowsData(response_iterator)
341318

342319
def mutate_rows(self, rows, retry=DEFAULT_RETRY):
343320
"""Mutates multiple rows in bulk.
@@ -518,6 +495,74 @@ def _do_mutate_retryable_rows(self):
518495
return self.responses_statuses
519496

520497

498+
def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
499+
filter_=None, limit=None, end_inclusive=False):
500+
"""Creates a request to read rows in a table.
501+
502+
:type table_name: str
503+
:param table_name: The name of the table to read from.
504+
505+
:type row_key: bytes
506+
:param row_key: (Optional) The key of a specific row to read from.
507+
508+
:type start_key: bytes
509+
:param start_key: (Optional) The beginning of a range of row keys to
510+
read from. The range will include ``start_key``. If
511+
left empty, will be interpreted as the empty string.
512+
513+
:type end_key: bytes
514+
:param end_key: (Optional) The end of a range of row keys to read from.
515+
The range will not include ``end_key``. If left empty,
516+
will be interpreted as an infinite string.
517+
518+
:type filter_: :class:`.RowFilter`
519+
:param filter_: (Optional) The filter to apply to the contents of the
520+
specified row(s). If unset, reads the entire table.
521+
522+
:type limit: int
523+
:param limit: (Optional) The read will terminate after committing to N
524+
rows' worth of results. The default (zero) is to return
525+
all results.
526+
527+
:type end_inclusive: bool
528+
:param end_inclusive: (Optional) Whether the ``end_key`` should be
529+
considered inclusive. The default is False (exclusive).
530+
531+
:rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
532+
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
533+
:raises: :class:`ValueError <exceptions.ValueError>` if both
534+
``row_key`` and one of ``start_key`` and ``end_key`` are set
535+
"""
536+
request_kwargs = {'table_name': table_name}
537+
if (row_key is not None and
538+
(start_key is not None or end_key is not None)):
539+
raise ValueError('Row key and row range cannot be '
540+
'set simultaneously')
541+
range_kwargs = {}
542+
if start_key is not None or end_key is not None:
543+
if start_key is not None:
544+
range_kwargs['start_key_closed'] = _to_bytes(start_key)
545+
if end_key is not None:
546+
end_key_key = 'end_key_open'
547+
if end_inclusive:
548+
end_key_key = 'end_key_closed'
549+
range_kwargs[end_key_key] = _to_bytes(end_key)
550+
if filter_ is not None:
551+
request_kwargs['filter'] = filter_.to_pb()
552+
if limit is not None:
553+
request_kwargs['rows_limit'] = limit
554+
555+
message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs)
556+
557+
if row_key is not None:
558+
message.rows.row_keys.append(_to_bytes(row_key))
559+
560+
if range_kwargs:
561+
message.rows.row_ranges.add(**range_kwargs)
562+
563+
return message
564+
565+
521566
def _mutate_rows_request(table_name, rows):
522567
"""Creates a request to mutate rows in a table.
523568

0 commit comments

Comments
 (0)