diff --git a/google/cloud/bigtable/table.py b/google/cloud/bigtable/table.py index 95fb55c50..bff4b7a2a 100644 --- a/google/cloud/bigtable/table.py +++ b/google/cloud/bigtable/table.py @@ -53,6 +53,9 @@ _MAX_BULK_MUTATIONS = 100000 VIEW_NAME_ONLY = enums.Table.View.NAME_ONLY +RETRYABLE_MUTATION_ERRORS = (Aborted, DeadlineExceeded, ServiceUnavailable) +"""Errors which can be retried during row mutation.""" + class _BigtableRetryableError(Exception): """Retry-able error expected by the default retry strategy.""" @@ -1039,10 +1042,8 @@ class _RetryableMutateRowsWorker(object): are retryable, any subsequent call on this callable will be a no-op. """ - RETRY_CODES = ( - Aborted.grpc_status_code.value[0], - DeadlineExceeded.grpc_status_code.value[0], - ServiceUnavailable.grpc_status_code.value[0], + RETRY_CODES = tuple( + retryable.grpc_status_code.value[0] for retryable in RETRYABLE_MUTATION_ERRORS ) def __init__(self, client, table_name, rows, app_profile_id=None, timeout=None): @@ -1125,7 +1126,7 @@ def _do_mutate_retryable_rows(self): retry=None, **kwargs ) - except (ServiceUnavailable, DeadlineExceeded, Aborted): + except RETRYABLE_MUTATION_ERRORS: # If an exception, considered retryable by `RETRY_CODES`, is # returned from the initial call, consider # it to be retryable. Wrap as a Bigtable Retryable Error. diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index ccb8350a3..5992796fa 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -1533,6 +1533,8 @@ class Test__RetryableMutateRowsWorker(unittest.TestCase): SUCCESS = StatusCode.OK.value[0] RETRYABLE_1 = StatusCode.DEADLINE_EXCEEDED.value[0] RETRYABLE_2 = StatusCode.ABORTED.value[0] + RETRYABLE_3 = StatusCode.UNAVAILABLE.value[0] + RETRYABLES = (RETRYABLE_1, RETRYABLE_2, RETRYABLE_3) NON_RETRYABLE = StatusCode.CANCELLED.value[0] @staticmethod @@ -1711,31 +1713,21 @@ def test_callable_retry(self): self.assertEqual(client._table_data_client.mutate_rows.call_count, 2) self.assertEqual(result, expected_result) - def test_do_mutate_retryable_rows_empty_rows(self): - from google.cloud.bigtable_admin_v2.services.bigtable_table_admin import ( - client as bigtable_table_admin, - ) - - table_api = mock.create_autospec(bigtable_table_admin.BigtableTableAdminClient) - credentials = _make_credentials() - client = self._make_client( - project="project-id", credentials=credentials, admin=True - ) - client._table_admin_client = table_api - instance = client.instance(instance_id=self.INSTANCE_ID) - table = self._make_table(self.TABLE_ID, instance) - - worker = self._make_worker(client, table.name, []) - statuses = worker._do_mutate_retryable_rows() - - self.assertEqual(len(statuses), 0) - - def test_do_mutate_retryable_rows(self): + def _do_mutate_retryable_rows_helper( + self, + row_cells, + responses, + prior_statuses=None, + expected_result=None, + raising_retry=False, + retryable_error=False, + timeout=None, + ): + from google.api_core.exceptions import ServiceUnavailable from google.cloud.bigtable.row import DirectRow + from google.cloud.bigtable.table import _BigtableRetryableError from google.cloud.bigtable_v2.services.bigtable import BigtableClient - from google.cloud.bigtable_admin_v2.services.bigtable_table_admin import ( - client as bigtable_table_admin, - ) + from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2 # Setup: # - Mutate 2 rows. @@ -1745,101 +1737,160 @@ def test_do_mutate_retryable_rows(self): # - Expect [success, non-retryable] data_api = mock.create_autospec(BigtableClient) - table_api = mock.create_autospec(bigtable_table_admin.BigtableTableAdminClient) credentials = _make_credentials() client = self._make_client( project="project-id", credentials=credentials, admin=True ) client._table_data_client = data_api - client._table_admin_client = table_api instance = client.instance(instance_id=self.INSTANCE_ID) table = self._make_table(self.TABLE_ID, instance) - row_1 = DirectRow(row_key=b"row_key", table=table) - row_1.set_cell("cf", b"col", b"value1") - row_2 = DirectRow(row_key=b"row_key_2", table=table) - row_2.set_cell("cf", b"col", b"value2") + rows = [] + for row_key, cell_data in row_cells: + row = DirectRow(row_key=row_key, table=table) + row.set_cell(*cell_data) + rows.append(row) - response = self._make_responses([self.SUCCESS, self.NON_RETRYABLE]) + response = self._make_responses(responses) - # Patch the stub used by the API method. - client._table_data_client.mutate_rows.side_effect = [[response]] - table._instance._client._table_data_client = data_api - table._instance._client._table_admin_client = table_api + if retryable_error: + data_api.mutate_rows.side_effect = ServiceUnavailable("testing") + else: + data_api.mutate_rows.side_effect = [[response]] - worker = self._make_worker(client, table.name, [row_1, row_2]) - statuses = worker._do_mutate_retryable_rows() + worker = self._make_worker(client, table.name, rows=rows) + if prior_statuses is not None: + assert len(prior_statuses) == len(rows) + worker.responses_statuses = self._make_responses_statuses(prior_statuses) - result = [status.code for status in statuses] - expected_result = [self.SUCCESS, self.NON_RETRYABLE] + expected_entries = [] + for row, prior_status in zip(rows, worker.responses_statuses): - self.assertEqual(result, expected_result) + if prior_status is None or prior_status.code in self.RETRYABLES: + mutations = row._get_mutations().copy() # row clears on success + entry = data_messages_v2_pb2.MutateRowsRequest.Entry( + row_key=row.row_key, mutations=mutations, + ) + expected_entries.append(entry) - def test_do_mutate_retryable_rows_retry(self): - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable.table import _BigtableRetryableError - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - from google.cloud.bigtable_admin_v2.services.bigtable_table_admin import ( - client as bigtable_table_admin, - ) + expected_kwargs = {} + if timeout is not None: + worker.timeout = timeout + expected_kwargs["timeout"] = mock.ANY + if retryable_error or raising_retry: + with self.assertRaises(_BigtableRetryableError): + worker._do_mutate_retryable_rows() + statuses = worker.responses_statuses + else: + statuses = worker._do_mutate_retryable_rows() + + if not retryable_error: + result = [status.code for status in statuses] + + if expected_result is None: + expected_result = responses + + self.assertEqual(result, expected_result) + + if len(responses) == 0 and not retryable_error: + data_api.mutate_rows.assert_not_called() + else: + data_api.mutate_rows.assert_called_once_with( + table_name=table.name, + entries=expected_entries, + app_profile_id=None, + retry=None, + **expected_kwargs, + ) + if timeout is not None: + called = data_api.mutate_rows.mock_calls[0] + self.assertEqual(called.kwargs["timeout"]._deadline, timeout) + + def test_do_mutate_retryable_rows_empty_rows(self): + # # Setup: - # - Mutate 3 rows. + # - No mutated rows. # Action: - # - Initial attempt will mutate all 3 rows. + # - No API call made. # Expectation: - # - Second row returns retryable error code, so expect a raise. - # - State of responses_statuses should be - # [success, retryable, non-retryable] + # - No change. + # + row_cells = [] + responses = [] - data_api = mock.create_autospec(BigtableClient) - table_api = mock.create_autospec(bigtable_table_admin.BigtableTableAdminClient) - credentials = _make_credentials() - client = self._make_client( - project="project-id", credentials=credentials, admin=True - ) - client._table_data_client = data_api - client._table_admin_client = table_api - instance = client.instance(instance_id=self.INSTANCE_ID) - table = self._make_table(self.TABLE_ID, instance) + self._do_mutate_retryable_rows_helper(row_cells, responses) - row_1 = DirectRow(row_key=b"row_key", table=table) - row_1.set_cell("cf", b"col", b"value1") - row_2 = DirectRow(row_key=b"row_key_2", table=table) - row_2.set_cell("cf", b"col", b"value2") - row_3 = DirectRow(row_key=b"row_key_3", table=table) - row_3.set_cell("cf", b"col", b"value3") + def test_do_mutate_retryable_rows_w_timeout(self): + # + # Setup: + # - Mutate 2 rows. + # Action: + # - Initial attempt will mutate all 2 rows. + # Expectation: + # - No retryable error codes, so don't expect a raise. + # - State of responses_statuses should be [success, non-retryable]. + # + row_cells = [ + (b"row_key_1", ("cf", b"col", b"value1")), + (b"row_key_2", ("cf", b"col", b"value2")), + ] - response = self._make_responses( - [self.SUCCESS, self.RETRYABLE_1, self.NON_RETRYABLE] - ) + responses = [self.SUCCESS, self.NON_RETRYABLE] - # Patch the stub used by the API method. - client._table_data_client.mutate_rows.side_effect = [[response]] + timeout = 5 # seconds - table._instance._client._table_data_client = data_api - table._instance._client._table_admin_client = table_api + self._do_mutate_retryable_rows_helper( + row_cells, responses, timeout=timeout, + ) - worker = self._make_worker(client, table.name, [row_1, row_2, row_3]) + def test_do_mutate_retryable_rows_w_retryable_error(self): + # + # Setup: + # - Mutate 2 rows. + # Action: + # - Initial attempt will mutate all 2 rows. + # Expectation: + # - No retryable error codes, so don't expect a raise. + # - State of responses_statuses should be [success, non-retryable]. + # + row_cells = [ + (b"row_key_1", ("cf", b"col", b"value1")), + (b"row_key_2", ("cf", b"col", b"value2")), + ] - with self.assertRaises(_BigtableRetryableError): - worker._do_mutate_retryable_rows() + responses = () - statuses = worker.responses_statuses - result = [status.code for status in statuses] - expected_result = [self.SUCCESS, self.RETRYABLE_1, self.NON_RETRYABLE] + self._do_mutate_retryable_rows_helper( + row_cells, responses, retryable_error=True, + ) - self.assertEqual(result, expected_result) + def test_do_mutate_retryable_rows_retry(self): + # + # Setup: + # - Mutate 3 rows. + # Action: + # - Initial attempt will mutate all 3 rows. + # Expectation: + # - Second row returns retryable error code, so expect a raise. + # - State of responses_statuses should be + # [success, retryable, non-retryable] + # + row_cells = [ + (b"row_key_1", ("cf", b"col", b"value1")), + (b"row_key_2", ("cf", b"col", b"value2")), + (b"row_key_3", ("cf", b"col", b"value3")), + ] - def test_do_mutate_retryable_rows_second_retry(self): - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable.table import _BigtableRetryableError - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - from google.cloud.bigtable_admin_v2.services.bigtable_table_admin import ( - client as bigtable_table_admin, + responses = [self.SUCCESS, self.RETRYABLE_1, self.NON_RETRYABLE] + + self._do_mutate_retryable_rows_helper( + row_cells, responses, raising_retry=True, ) + def test_do_mutate_retryable_rows_second_retry(self): + # # Setup: # - Mutate 4 rows. # - First try results: @@ -1853,45 +1904,23 @@ def test_do_mutate_retryable_rows_second_retry(self): # so expect a raise. # - Exception contains response whose index should be '3' even though # only two rows were retried. + # + row_cells = [ + (b"row_key_1", ("cf", b"col", b"value1")), + (b"row_key_2", ("cf", b"col", b"value2")), + (b"row_key_3", ("cf", b"col", b"value3")), + (b"row_key_4", ("cf", b"col", b"value4")), + ] - data_api = mock.create_autospec(BigtableClient) - table_api = mock.create_autospec(bigtable_table_admin.BigtableTableAdminClient) - credentials = _make_credentials() - client = self._make_client( - project="project-id", credentials=credentials, admin=True - ) - client._table_data_client = data_api - client._table_admin_client = table_api - instance = client.instance(instance_id=self.INSTANCE_ID) - table = self._make_table(self.TABLE_ID, instance) - - row_1 = DirectRow(row_key=b"row_key", table=table) - row_1.set_cell("cf", b"col", b"value1") - row_2 = DirectRow(row_key=b"row_key_2", table=table) - row_2.set_cell("cf", b"col", b"value2") - row_3 = DirectRow(row_key=b"row_key_3", table=table) - row_3.set_cell("cf", b"col", b"value3") - row_4 = DirectRow(row_key=b"row_key_4", table=table) - row_4.set_cell("cf", b"col", b"value4") - - response = self._make_responses([self.SUCCESS, self.RETRYABLE_1]) - - # Patch the stub used by the API method. - client._table_data_client.mutate_rows.side_effect = [[response]] - - table._instance._client._table_data_client = data_api - table._instance._client._table_admin_client = table_api - - worker = self._make_worker(client, table.name, [row_1, row_2, row_3, row_4]) - worker.responses_statuses = self._make_responses_statuses( - [self.SUCCESS, self.RETRYABLE_1, self.NON_RETRYABLE, self.RETRYABLE_2] - ) + responses = [self.SUCCESS, self.RETRYABLE_1] - with self.assertRaises(_BigtableRetryableError): - worker._do_mutate_retryable_rows() + prior_statuses = [ + self.SUCCESS, + self.RETRYABLE_1, + self.NON_RETRYABLE, + self.RETRYABLE_2, + ] - statuses = worker.responses_statuses - result = [status.code for status in statuses] expected_result = [ self.SUCCESS, self.SUCCESS, @@ -1899,15 +1928,16 @@ def test_do_mutate_retryable_rows_second_retry(self): self.RETRYABLE_1, ] - self.assertEqual(result, expected_result) - - def test_do_mutate_retryable_rows_second_try(self): - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - from google.cloud.bigtable_admin_v2.services.bigtable_table_admin import ( - client as bigtable_table_admin, + self._do_mutate_retryable_rows_helper( + row_cells, + responses, + prior_statuses=prior_statuses, + expected_result=expected_result, + raising_retry=True, ) + def test_do_mutate_retryable_rows_second_try(self): + # # Setup: # - Mutate 4 rows. # - First try results: @@ -1917,43 +1947,23 @@ def test_do_mutate_retryable_rows_second_try(self): # Expectation: # - After second try: # [success, non-retryable, non-retryable, success] + # + row_cells = [ + (b"row_key_1", ("cf", b"col", b"value1")), + (b"row_key_2", ("cf", b"col", b"value2")), + (b"row_key_3", ("cf", b"col", b"value3")), + (b"row_key_4", ("cf", b"col", b"value4")), + ] - data_api = mock.create_autospec(BigtableClient) - table_api = mock.create_autospec(bigtable_table_admin.BigtableTableAdminClient) - credentials = _make_credentials() - client = self._make_client( - project="project-id", credentials=credentials, admin=True - ) - client._table_data_client = data_api - client._table_admin_client = table_api - instance = client.instance(instance_id=self.INSTANCE_ID) - table = self._make_table(self.TABLE_ID, instance) - - row_1 = DirectRow(row_key=b"row_key", table=table) - row_1.set_cell("cf", b"col", b"value1") - row_2 = DirectRow(row_key=b"row_key_2", table=table) - row_2.set_cell("cf", b"col", b"value2") - row_3 = DirectRow(row_key=b"row_key_3", table=table) - row_3.set_cell("cf", b"col", b"value3") - row_4 = DirectRow(row_key=b"row_key_4", table=table) - row_4.set_cell("cf", b"col", b"value4") - - response = self._make_responses([self.NON_RETRYABLE, self.SUCCESS]) - - # Patch the stub used by the API method. - client._table_data_client.mutate_rows.side_effect = [[response]] - - table._instance._client._table_data_client = data_api - table._instance._client._table_admin_client = table_api - - worker = self._make_worker(client, table.name, [row_1, row_2, row_3, row_4]) - worker.responses_statuses = self._make_responses_statuses( - [self.SUCCESS, self.RETRYABLE_1, self.NON_RETRYABLE, self.RETRYABLE_2] - ) + responses = [self.NON_RETRYABLE, self.SUCCESS] - statuses = worker._do_mutate_retryable_rows() + prior_statuses = [ + self.SUCCESS, + self.RETRYABLE_1, + self.NON_RETRYABLE, + self.RETRYABLE_2, + ] - result = [status.code for status in statuses] expected_result = [ self.SUCCESS, self.NON_RETRYABLE, @@ -1961,14 +1971,15 @@ def test_do_mutate_retryable_rows_second_try(self): self.SUCCESS, ] - self.assertEqual(result, expected_result) - - def test_do_mutate_retryable_rows_second_try_no_retryable(self): - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable_admin_v2.services.bigtable_table_admin import ( - client as bigtable_table_admin, + self._do_mutate_retryable_rows_helper( + row_cells, + responses, + prior_statuses=prior_statuses, + expected_result=expected_result, ) + def test_do_mutate_retryable_rows_second_try_no_retryable(self): + # # Setup: # - Mutate 2 rows. # - First try results: [success, non-retryable] @@ -1976,69 +1987,41 @@ def test_do_mutate_retryable_rows_second_try_no_retryable(self): # - Second try has no row to retry. # Expectation: # - After second try: [success, non-retryable] + # + row_cells = [ + (b"row_key_1", ("cf", b"col", b"value1")), + (b"row_key_2", ("cf", b"col", b"value2")), + ] - table_api = mock.create_autospec(bigtable_table_admin.BigtableTableAdminClient) - credentials = _make_credentials() - client = self._make_client( - project="project-id", credentials=credentials, admin=True - ) - client._table_admin_client = table_api - instance = client.instance(instance_id=self.INSTANCE_ID) - table = self._make_table(self.TABLE_ID, instance) - - row_1 = DirectRow(row_key=b"row_key", table=table) - row_1.set_cell("cf", b"col", b"value1") - row_2 = DirectRow(row_key=b"row_key_2", table=table) - row_2.set_cell("cf", b"col", b"value2") - - worker = self._make_worker(client, table.name, [row_1, row_2]) - worker.responses_statuses = self._make_responses_statuses( - [self.SUCCESS, self.NON_RETRYABLE] - ) - - table._instance._client._table_admin_client = table_api - - statuses = worker._do_mutate_retryable_rows() - - result = [status.code for status in statuses] - expected_result = [self.SUCCESS, self.NON_RETRYABLE] + responses = [] # no calls will be made - self.assertEqual(result, expected_result) + prior_statuses = [ + self.SUCCESS, + self.NON_RETRYABLE, + ] - def test_do_mutate_retryable_rows_mismatch_num_responses(self): - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - from google.cloud.bigtable_admin_v2.services.bigtable_table_admin import ( - client as bigtable_table_admin, - ) + expected_result = [ + self.SUCCESS, + self.NON_RETRYABLE, + ] - data_api = mock.create_autospec(BigtableClient) - table_api = mock.create_autospec(bigtable_table_admin.BigtableTableAdminClient) - credentials = _make_credentials() - client = self._make_client( - project="project-id", credentials=credentials, admin=True + self._do_mutate_retryable_rows_helper( + row_cells, + responses, + prior_statuses=prior_statuses, + expected_result=expected_result, ) - client._table_data_client = data_api - client._table_admin_client = table_api - instance = client.instance(instance_id=self.INSTANCE_ID) - table = self._make_table(self.TABLE_ID, instance) - row_1 = DirectRow(row_key=b"row_key", table=table) - row_1.set_cell("cf", b"col", b"value1") - row_2 = DirectRow(row_key=b"row_key_2", table=table) - row_2.set_cell("cf", b"col", b"value2") - - response = self._make_responses([self.SUCCESS]) - - # Patch the stub used by the API method. - client._table_data_client.mutate_rows.side_effect = [[response]] + def test_do_mutate_retryable_rows_mismatch_num_responses(self): + row_cells = [ + (b"row_key_1", ("cf", b"col", b"value1")), + (b"row_key_2", ("cf", b"col", b"value2")), + ] - table._instance._client._table_data_client = data_api - table._instance._client._table_admin_client = table_api + responses = [self.SUCCESS] - worker = self._make_worker(client, table.name, [row_1, row_2]) with self.assertRaises(RuntimeError): - worker._do_mutate_retryable_rows() + self._do_mutate_retryable_rows_helper(row_cells, responses) class Test__create_row_request(unittest.TestCase):