|
18 | 18 | from google.api_core.exceptions import RetryError |
19 | 19 | from google.api_core.retry import if_exception_type |
20 | 20 | from google.api_core.retry import Retry |
| 21 | +from google.cloud._helpers import _to_bytes |
21 | 22 | from google.cloud.bigtable._generated import ( |
22 | 23 | bigtable_pb2 as data_messages_v2_pb2) |
23 | 24 | from google.cloud.bigtable._generated import ( |
|
30 | 31 | from google.cloud.bigtable.row import ConditionalRow |
31 | 32 | from google.cloud.bigtable.row import DirectRow |
32 | 33 | from google.cloud.bigtable.row_data import PartialRowsData |
33 | | -from google.gax import RetryOptions, BackoffSettings |
34 | | -from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request |
35 | 34 | from grpc import StatusCode |
36 | 35 |
|
37 | | -BACKOFF_SETTINGS = BackoffSettings( |
38 | | - initial_retry_delay_millis=10, |
39 | | - retry_delay_multiplier=1.3, |
40 | | - max_retry_delay_millis=30000, |
41 | | - initial_rpc_timeout_millis=25 * 60 * 1000, |
42 | | - rpc_timeout_multiplier=1.0, |
43 | | - max_rpc_timeout_millis=25 * 60 * 1000, |
44 | | - total_timeout_millis=30 * 60 * 1000 |
45 | | -) |
46 | | - |
47 | | -RETRY_CODES = [ |
48 | | - StatusCode.DEADLINE_EXCEEDED, |
49 | | - StatusCode.ABORTED, |
50 | | - StatusCode.INTERNAL, |
51 | | - StatusCode.UNAVAILABLE |
52 | | -] |
53 | | - |
54 | 36 |
|
55 | 37 | # Maximum number of mutations in bulk (MutateRowsRequest message): |
56 | 38 | # (https://cloud.google.com/bigtable/docs/reference/data/rpc/ |
@@ -295,7 +277,7 @@ def read_row(self, row_key, filter_=None): |
295 | 277 | return rows_data.rows[row_key] |
296 | 278 |
|
297 | 279 | def read_rows(self, start_key=None, end_key=None, limit=None, |
298 | | - filter_=None, end_inclusive=False, backoff_settings=None): |
| 280 | + filter_=None, end_inclusive=False): |
299 | 281 | """Read rows from this table. |
300 | 282 |
|
301 | 283 | :type start_key: bytes |
@@ -326,18 +308,13 @@ def read_rows(self, start_key=None, end_key=None, limit=None, |
326 | 308 | :returns: A :class:`.PartialRowsData` convenience wrapper for consuming |
327 | 309 | the streamed results. |
328 | 310 | """ |
| 311 | + request_pb = _create_row_request( |
| 312 | + self.name, start_key=start_key, end_key=end_key, filter_=filter_, |
| 313 | + limit=limit, end_inclusive=end_inclusive) |
329 | 314 | client = self._instance._client |
330 | | - if backoff_settings is None: |
331 | | - backoff_settings = BACKOFF_SETTINGS |
332 | | - RETRY_OPTIONS = RetryOptions( |
333 | | - retry_codes=RETRY_CODES, |
334 | | - backoff_settings=backoff_settings |
335 | | - ) |
336 | | - |
337 | | - retrying_iterator = ReadRowsIterator(client, self.name, start_key, |
338 | | - end_key, filter_, limit, |
339 | | - end_inclusive, RETRY_OPTIONS) |
340 | | - return PartialRowsData(retrying_iterator) |
| 315 | + response_iterator = client._data_stub.ReadRows(request_pb) |
| 316 | + # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` |
| 317 | + return PartialRowsData(response_iterator) |
341 | 318 |
|
342 | 319 | def mutate_rows(self, rows, retry=DEFAULT_RETRY): |
343 | 320 | """Mutates multiple rows in bulk. |
@@ -518,6 +495,74 @@ def _do_mutate_retryable_rows(self): |
518 | 495 | return self.responses_statuses |
519 | 496 |
|
520 | 497 |
|
| 498 | +def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, |
| 499 | + filter_=None, limit=None, end_inclusive=False): |
| 500 | + """Creates a request to read rows in a table. |
| 501 | +
|
| 502 | + :type table_name: str |
| 503 | + :param table_name: The name of the table to read from. |
| 504 | +
|
| 505 | + :type row_key: bytes |
| 506 | + :param row_key: (Optional) The key of a specific row to read from. |
| 507 | +
|
| 508 | + :type start_key: bytes |
| 509 | + :param start_key: (Optional) The beginning of a range of row keys to |
| 510 | + read from. The range will include ``start_key``. If |
| 511 | + left empty, will be interpreted as the empty string. |
| 512 | +
|
| 513 | + :type end_key: bytes |
| 514 | + :param end_key: (Optional) The end of a range of row keys to read from. |
| 515 | + The range will not include ``end_key``. If left empty, |
| 516 | + will be interpreted as an infinite string. |
| 517 | +
|
| 518 | + :type filter_: :class:`.RowFilter` |
| 519 | + :param filter_: (Optional) The filter to apply to the contents of the |
| 520 | + specified row(s). If unset, reads the entire table. |
| 521 | +
|
| 522 | + :type limit: int |
| 523 | + :param limit: (Optional) The read will terminate after committing to N |
| 524 | + rows' worth of results. The default (zero) is to return |
| 525 | + all results. |
| 526 | +
|
| 527 | + :type end_inclusive: bool |
| 528 | + :param end_inclusive: (Optional) Whether the ``end_key`` should be |
| 529 | + considered inclusive. The default is False (exclusive). |
| 530 | +
|
| 531 | + :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` |
| 532 | + :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. |
| 533 | + :raises: :class:`ValueError <exceptions.ValueError>` if both |
| 534 | + ``row_key`` and one of ``start_key`` and ``end_key`` are set |
| 535 | + """ |
| 536 | + request_kwargs = {'table_name': table_name} |
| 537 | + if (row_key is not None and |
| 538 | + (start_key is not None or end_key is not None)): |
| 539 | + raise ValueError('Row key and row range cannot be ' |
| 540 | + 'set simultaneously') |
| 541 | + range_kwargs = {} |
| 542 | + if start_key is not None or end_key is not None: |
| 543 | + if start_key is not None: |
| 544 | + range_kwargs['start_key_closed'] = _to_bytes(start_key) |
| 545 | + if end_key is not None: |
| 546 | + end_key_key = 'end_key_open' |
| 547 | + if end_inclusive: |
| 548 | + end_key_key = 'end_key_closed' |
| 549 | + range_kwargs[end_key_key] = _to_bytes(end_key) |
| 550 | + if filter_ is not None: |
| 551 | + request_kwargs['filter'] = filter_.to_pb() |
| 552 | + if limit is not None: |
| 553 | + request_kwargs['rows_limit'] = limit |
| 554 | + |
| 555 | + message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) |
| 556 | + |
| 557 | + if row_key is not None: |
| 558 | + message.rows.row_keys.append(_to_bytes(row_key)) |
| 559 | + |
| 560 | + if range_kwargs: |
| 561 | + message.rows.row_ranges.add(**range_kwargs) |
| 562 | + |
| 563 | + return message |
| 564 | + |
| 565 | + |
521 | 566 | def _mutate_rows_request(table_name, rows): |
522 | 567 | """Creates a request to mutate rows in a table. |
523 | 568 |
|
|
0 commit comments