Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 07438ca

Browse files
feat: improve timeout structure (#819)
1 parent eedde1e commit 07438ca

File tree

12 files changed

+469
-267
lines changed

12 files changed

+469
-267
lines changed

.github/.OwlBot.lock.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@
1313
# limitations under the License.
1414
docker:
1515
image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest
16-
digest: sha256:240b5bcc2bafd450912d2da2be15e62bc6de2cf839823ae4bf94d4f392b451dc
17-
# created: 2023-06-03T21:25:37.968717478Z
16+
digest: sha256:ddf4551385d566771dc713090feb7b4c1164fb8a698fe52bbe7670b24236565b
17+
# created: 2023-06-27T13:04:21.96690344Z

google/cloud/bigtable/data/_async/_mutate_rows.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,15 @@ def __init__(
5252
table: "TableAsync",
5353
mutation_entries: list["RowMutationEntry"],
5454
operation_timeout: float,
55-
per_request_timeout: float | None,
55+
attempt_timeout: float | None,
5656
):
5757
"""
5858
Args:
5959
- gapic_client: the client to use for the mutate_rows call
6060
- table: the table associated with the request
6161
- mutation_entries: a list of RowMutationEntry objects to send to the server
62-
- operation_timeout: the timeout t o use for the entire operation, in seconds.
63-
- per_request_timeout: the timeoutto use for each mutate_rows attempt, in seconds.
62+
- operation_timeout: the timeout to use for the entire operation, in seconds.
63+
- attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
6464
If not specified, the request will run until operation_timeout is reached.
6565
"""
6666
# check that mutations are within limits
@@ -99,7 +99,7 @@ def __init__(
9999
self._operation = _convert_retry_deadline(retry_wrapped, operation_timeout)
100100
# initialize state
101101
self.timeout_generator = _attempt_timeout_generator(
102-
per_request_timeout, operation_timeout
102+
attempt_timeout, operation_timeout
103103
)
104104
self.mutations = mutation_entries
105105
self.remaining_indices = list(range(len(self.mutations)))

google/cloud/bigtable/data/_async/_read_rows.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ def __init__(
6363
client: BigtableAsyncClient,
6464
*,
6565
operation_timeout: float = 600.0,
66-
per_request_timeout: float | None = None,
66+
attempt_timeout: float | None = None,
6767
):
6868
"""
6969
Args:
7070
- request: the request dict to send to the Bigtable API
7171
- client: the Bigtable client to use to make the request
7272
- operation_timeout: the timeout to use for the entire operation, in seconds
73-
- per_request_timeout: the timeout to use when waiting for each individual grpc request, in seconds
73+
- attempt_timeout: the timeout to use when waiting for each individual grpc request, in seconds
7474
If not specified, defaults to operation_timeout
7575
"""
7676
self._last_emitted_row_key: bytes | None = None
@@ -79,7 +79,7 @@ def __init__(
7979
self.operation_timeout = operation_timeout
8080
# use generator to lower per-attempt timeout as we approach operation_timeout deadline
8181
attempt_timeout_gen = _attempt_timeout_generator(
82-
per_request_timeout, operation_timeout
82+
attempt_timeout, operation_timeout
8383
)
8484
row_limit = request.get("rows_limit", 0)
8585
# lock in paramters for retryable wrapper

google/cloud/bigtable/data/_async/client.py

Lines changed: 233 additions & 135 deletions
Large diffs are not rendered by default.

google/cloud/bigtable/data/_async/mutations_batcher.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from google.cloud.bigtable.data.mutations import RowMutationEntry
2424
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
2525
from google.cloud.bigtable.data.exceptions import FailedMutationEntryError
26+
from google.cloud.bigtable.data._helpers import _validate_timeouts
2627

2728
from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
2829
from google.cloud.bigtable.data._async._mutate_rows import (
@@ -189,7 +190,7 @@ def __init__(
189190
flow_control_max_mutation_count: int = 100_000,
190191
flow_control_max_bytes: int = 100 * _MB_SIZE,
191192
batch_operation_timeout: float | None = None,
192-
batch_per_request_timeout: float | None = None,
193+
batch_attempt_timeout: float | None = None,
193194
):
194195
"""
195196
Args:
@@ -202,26 +203,20 @@ def __init__(
202203
- flow_control_max_mutation_count: Maximum number of inflight mutations.
203204
- flow_control_max_bytes: Maximum number of inflight bytes.
204205
- batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None,
205-
table default_operation_timeout will be used
206-
- batch_per_request_timeout: timeout for each individual request, in seconds. If None,
207-
table default_per_request_timeout will be used
206+
table default_mutate_rows_operation_timeout will be used
207+
- batch_attempt_timeout: timeout for each individual request, in seconds. If None,
208+
table default_mutate_rows_attempt_timeout will be used, or batch_operation_timeout
209+
if that is also None.
208210
"""
209211
self._operation_timeout: float = (
210-
batch_operation_timeout or table.default_operation_timeout
212+
batch_operation_timeout or table.default_mutate_rows_operation_timeout
211213
)
212-
self._per_request_timeout: float = (
213-
batch_per_request_timeout
214-
or table.default_per_request_timeout
214+
self._attempt_timeout: float = (
215+
batch_attempt_timeout
216+
or table.default_mutate_rows_attempt_timeout
215217
or self._operation_timeout
216218
)
217-
if self._operation_timeout <= 0:
218-
raise ValueError("batch_operation_timeout must be greater than 0")
219-
if self._per_request_timeout <= 0:
220-
raise ValueError("batch_per_request_timeout must be greater than 0")
221-
if self._per_request_timeout > self._operation_timeout:
222-
raise ValueError(
223-
"batch_per_request_timeout must be less than batch_operation_timeout"
224-
)
219+
_validate_timeouts(self._operation_timeout, self._attempt_timeout)
225220
self.closed: bool = False
226221
self._table = table
227222
self._staged_entries: list[RowMutationEntry] = []
@@ -346,7 +341,7 @@ async def _execute_mutate_rows(
346341
347342
Args:
348343
- batch: list of RowMutationEntry objects to send to server
349-
- timeout: timeout in seconds. Used as operation_timeout and per_request_timeout.
344+
- timeout: timeout in seconds. Used as operation_timeout and attempt_timeout.
350345
If not given, will use table defaults
351346
Returns:
352347
- list of FailedMutationEntryError objects for mutations that failed.
@@ -361,7 +356,7 @@ async def _execute_mutate_rows(
361356
self._table,
362357
batch,
363358
operation_timeout=self._operation_timeout,
364-
per_request_timeout=self._per_request_timeout,
359+
attempt_timeout=self._attempt_timeout,
365360
)
366361
await operation.start()
367362
except MutationsExceptionGroup as e:

google/cloud/bigtable/data/_helpers.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,26 @@ def wrapper(*args, **kwargs):
109109
handle_error()
110110

111111
return wrapper_async if iscoroutinefunction(func) else wrapper
112+
113+
114+
def _validate_timeouts(
115+
operation_timeout: float, attempt_timeout: float | None, allow_none: bool = False
116+
):
117+
"""
118+
Helper function that will verify that timeout values are valid, and raise
119+
an exception if they are not.
120+
121+
Args:
122+
- operation_timeout: The timeout value to use for the entire operation, in seconds.
123+
- attempt_timeout: The timeout value to use for each attempt, in seconds.
124+
- allow_none: If True, attempt_timeout can be None. If False, None values will raise an exception.
125+
Raises:
126+
- ValueError if operation_timeout or attempt_timeout are invalid.
127+
"""
128+
if operation_timeout <= 0:
129+
raise ValueError("operation_timeout must be greater than 0")
130+
if not allow_none and attempt_timeout is None:
131+
raise ValueError("attempt_timeout must not be None")
132+
elif attempt_timeout is not None:
133+
if attempt_timeout <= 0:
134+
raise ValueError("attempt_timeout must be greater than 0")

noxfile.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,9 @@ def docfx(session):
366366

367367
session.install("-e", ".")
368368
session.install(
369-
"sphinx==4.0.1",
369+
"gcp-sphinx-docfx-yaml",
370370
"alabaster",
371371
"recommonmark",
372-
"gcp-sphinx-docfx-yaml",
373372
)
374373

375374
shutil.rmtree(os.path.join("docs", "_build"), ignore_errors=True)

tests/unit/data/_async/test__mutate_rows.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def _make_one(self, *args, **kwargs):
4848
kwargs["table"] = kwargs.pop("table", AsyncMock())
4949
kwargs["mutation_entries"] = kwargs.pop("mutation_entries", [])
5050
kwargs["operation_timeout"] = kwargs.pop("operation_timeout", 5)
51-
kwargs["per_request_timeout"] = kwargs.pop("per_request_timeout", 0.1)
51+
kwargs["attempt_timeout"] = kwargs.pop("attempt_timeout", 0.1)
5252
return self._target_class()(*args, **kwargs)
5353

5454
async def _mock_stream(self, mutation_list, error_dict):
@@ -267,7 +267,7 @@ async def test_run_attempt_single_entry_success(self):
267267
mock_gapic_fn = self._make_mock_gapic({0: mutation})
268268
instance = self._make_one(
269269
mutation_entries=[mutation],
270-
per_request_timeout=expected_timeout,
270+
attempt_timeout=expected_timeout,
271271
)
272272
with mock.patch.object(instance, "_gapic_fn", mock_gapic_fn):
273273
await instance._run_attempt()

tests/unit/data/_async/test__read_rows.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def test_ctor(self):
8989
request,
9090
client,
9191
operation_timeout=expected_operation_timeout,
92-
per_request_timeout=expected_request_timeout,
92+
attempt_timeout=expected_request_timeout,
9393
)
9494
assert time_gen_mock.call_count == 1
9595
time_gen_mock.assert_called_once_with(

0 commit comments

Comments
 (0)