1515"""User-friendly container for Google Cloud Bigtable Table."""
1616
1717
18- import six
19-
20- from google .api_core .exceptions import RetryError
2118from google .api_core .exceptions import Aborted
2219from google .api_core .exceptions import DeadlineExceeded
23- from google .api_core .exceptions import ServiceUnavailable
2420from google .api_core .exceptions import from_grpc_status
25- from google .api_core .retry import Retry
21+ from google .api_core .exceptions import RetryError
22+ from google .api_core .exceptions import ServiceUnavailable
2623from google .api_core .retry import if_exception_type
24+ from google .api_core .retry import Retry
2725from google .cloud ._helpers import _to_bytes
2826from google .cloud .bigtable ._generated import (
2927 bigtable_pb2 as data_messages_v2_pb2 )
4139
4240
4341# Maximum number of mutations in bulk (MutateRowsRequest message):
44- # https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowRequest
42+ # (https://cloud.google.com/bigtable/docs/reference/data/rpc/
43+ # google.bigtable.v2#google.bigtable.v2.MutateRowRequest)
4544_MAX_BULK_MUTATIONS = 100000
4645
4746DEFAULT_RETRY = Retry (
48- predicate = if_exception_type ((Aborted ,
49- DeadlineExceeded ,
50- ServiceUnavailable )),
51- initial = 1.0 ,
52- maximum = 15.0 ,
53- multiplier = 2.0 ,
54- deadline = 60.0 * 2.0 )
47+ predicate = if_exception_type (
48+ (
49+ Aborted ,
50+ DeadlineExceeded ,
51+ ServiceUnavailable ,
52+ ),
53+ ),
54+ initial = 1.0 ,
55+ maximum = 15.0 ,
56+ multiplier = 2.0 ,
57+ deadline = 120.0 , # 2 minutes
58+ )
59+ """The default retry stategy to be used on retry-able errors.
60+
61+ Used by :meth:`~google.cloud.bigtable.table.Table.mutate_rows`.
62+ """
5563
5664
5765class TableMismatchError (ValueError ):
@@ -320,16 +328,22 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY):
320328 If some of the rows weren't updated, it would not remove mutations.
321329 They can be applied to the row separately.
322330 If row mutations finished successfully, they would be cleaned up.
323- Optionally specify a `retry` to re-attempt rows that return transient
324- errors, until all rows succeed or the deadline is reached.
331+
332+ Optionally, a ``retry`` strategy can be specified to re-attempt
333+ mutations on rows that return transient errors. This method will retry
334+ until all rows succeed or until the request deadline is reached. To
335+ specify a ``retry`` strategy of "do-nothing", a deadline of ``0.0``
336+ can be specified.
325337
326338 :type rows: list
327339 :param rows: List or other iterable of :class:`.DirectRow` instances.
328340
329341 :type retry: :class:`~google.api_core.retry.Retry`
330- :param retry: (Optional) Retry delay and deadline arguments. Can be
331- specified using ``DEFAULT_RETRY.with_delay`` and/or
332- ``DEFAULT_RETRY.with_deadline``.
342+ :param retry:
343+ (Optional) Retry delay and deadline arguments. To override, the
344+ default value :attr:`DEFAULT_RETRY` can be used and modified with
345+ the :meth:`~google.api_core.retry.Retry.with_delay` method or the
346+ :meth:`~google.api_core.retry.Retry.with_deadline` method.
333347
334348 :rtype: list
335349 :returns: A list of response statuses (`google.rpc.status_pb2.Status`)
@@ -392,13 +406,13 @@ class _RetryableMutateRowsWorker(object):
392406 StatusCode .ABORTED .value [0 ],
393407 StatusCode .UNAVAILABLE .value [0 ],
394408 )
409+ # pylint: enable=unsubscriptable-object
395410
396411 def __init__ (self , client , table_name , rows ):
397412 self .client = client
398413 self .table_name = table_name
399414 self .rows = rows
400- self .responses_statuses = [
401- None for _ in six .moves .xrange (len (self .rows ))]
415+ self .responses_statuses = [None ] * len (self .rows )
402416
403417 def __call__ (self , retry = DEFAULT_RETRY ):
404418 """Attempt to mutate all rows and retry rows with transient errors.
@@ -412,13 +426,14 @@ def __call__(self, retry=DEFAULT_RETRY):
412426 sent. These will be in the same order as the ``rows``.
413427 """
414428 try :
415- retry (self .__class__ . _do_mutate_retryable_rows )(self )
429+ retry (self ._do_mutate_retryable_rows )()
416430 except (RetryError , ValueError ) as err :
417431 # Upon timeout or sleep generator error, return responses_statuses
418432 pass
419433 return self .responses_statuses
420434
421- def _is_retryable (self , status ): # pylint: disable=no-self-use
435+ @staticmethod
436+ def _is_retryable (status ):
422437 return (status is None or
423438 status .code in _RetryableMutateRowsWorker .RETRY_CODES )
424439
@@ -429,17 +444,23 @@ def _do_mutate_retryable_rows(self):
429444 in a transient error in a previous call.
430445
431446 :rtype: list
432- :return: ``responses_statuses`` (`google.rpc.status_pb2.Status`)
433- :raises: :exc:`~google.api_core.exceptions.ServiceUnavailable` if any
434- row returned a transient error. An artificial exception
435- to work with ``DEFAULT_RETRY``.
447+ :return: The responses statuses, which is a list of
448+ :class:`~google.rpc.status_pb2.Status`.
449+ :raises: One of the following:
450+
451+ * :exc:`~google.api_core.exceptions.ServiceUnavailable` if any
452+ row returned a transient error. This is "artificial" in
453+ the sense that we intentionally raise the error because it
454+ will be caught by the retry strategy.
455+ * :exc:`RuntimeError` if the number of responses doesn't
456+ match the number of rows that were retried
436457 """
437458 retryable_rows = []
438459 index_into_all_rows = []
439- for i , status in enumerate (self .responses_statuses ):
460+ for index , status in enumerate (self .responses_statuses ):
440461 if self ._is_retryable (status ):
441- retryable_rows .append (self .rows [i ])
442- index_into_all_rows .append (i )
462+ retryable_rows .append (self .rows [index ])
463+ index_into_all_rows .append (index )
443464
444465 if not retryable_rows :
445466 # All mutations are either successful or non-retryable now.
@@ -462,11 +483,15 @@ def _do_mutate_retryable_rows(self):
462483 if entry .status .code == 0 :
463484 self .rows [index ].clear ()
464485
465- assert len (retryable_rows ) == num_responses
486+ if len (retryable_rows ) != num_responses :
487+ raise RuntimeError (
488+ 'Unexpected the number of responses' , num_responses ,
489+ 'Expected' , len (retryable_rows ))
466490
467491 if num_retryable_responses :
468492 raise from_grpc_status (StatusCode .UNAVAILABLE ,
469493 'MutateRows retryable error.' )
494+
470495 return self .responses_statuses
471496
472497
0 commit comments