Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 8708a25

Browse files
feat: add test proxy (#836)
1 parent b6d232a commit 8708a25

19 files changed

+2057
-7
lines changed

.kokoro/presubmit/conformance.cfg

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Format: //devtools/kokoro/config/proto/build.proto
2+
3+
env_vars: {
4+
key: "NOX_SESSION"
5+
value: "conformance"
6+
}

google/cloud/bigtable/data/_async/_mutate_rows.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
from typing import TYPE_CHECKING
18+
import asyncio
1819
import functools
1920

2021
from google.api_core import exceptions as core_exceptions
@@ -183,6 +184,13 @@ async def _run_attempt(self):
183184
self._handle_entry_error(orig_idx, entry_error)
184185
# remove processed entry from active list
185186
del active_request_indices[result.index]
187+
except asyncio.CancelledError:
188+
# when retry wrapper timeout expires, the operation is cancelled
189+
# make sure incomplete indices are tracked,
190+
# but don't record exception (it will be raised by wrapper)
191+
# TODO: remove asyncio.wait_for in retry wrapper. Let grpc call handle expiration
192+
self.remaining_indices.extend(active_request_indices.values())
193+
raise
186194
except Exception as exc:
187195
# add this exception to list for each mutation that wasn't
188196
# already handled, and update remaining_indices if mutation is retryable

google/cloud/bigtable/data/_async/client.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import warnings
3131
import sys
3232
import random
33+
import os
3334

3435
from collections import namedtuple
3536

@@ -38,10 +39,12 @@
3839
from google.cloud.bigtable_v2.services.bigtable.async_client import DEFAULT_CLIENT_INFO
3940
from google.cloud.bigtable_v2.services.bigtable.transports.pooled_grpc_asyncio import (
4041
PooledBigtableGrpcAsyncIOTransport,
42+
PooledChannel,
4143
)
4244
from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest
4345
from google.cloud.client import ClientWithProject
4446
from google.api_core.exceptions import GoogleAPICallError
47+
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore
4548
from google.api_core import retry_async as retries
4649
from google.api_core import exceptions as core_exceptions
4750
from google.cloud.bigtable.data._async._read_rows import _ReadRowsOperationAsync
@@ -150,18 +153,35 @@ def __init__(
150153
# keep track of table objects associated with each instance
151154
# only remove instance from _active_instances when all associated tables remove it
152155
self._instance_owners: dict[_WarmedInstanceKey, Set[int]] = {}
153-
# attempt to start background tasks
154156
self._channel_init_time = time.monotonic()
155157
self._channel_refresh_tasks: list[asyncio.Task[None]] = []
156-
try:
157-
self._start_background_channel_refresh()
158-
except RuntimeError:
158+
self._emulator_host = os.getenv(BIGTABLE_EMULATOR)
159+
if self._emulator_host is not None:
160+
# connect to an emulator host
159161
warnings.warn(
160-
f"{self.__class__.__name__} should be started in an "
161-
"asyncio event loop. Channel refresh will not be started",
162+
"Connecting to Bigtable emulator at {}".format(self._emulator_host),
162163
RuntimeWarning,
163164
stacklevel=2,
164165
)
166+
self.transport._grpc_channel = PooledChannel(
167+
pool_size=pool_size,
168+
host=self._emulator_host,
169+
insecure=True,
170+
)
171+
# refresh cached stubs to use emulator pool
172+
self.transport._stubs = {}
173+
self.transport._prep_wrapped_messages(client_info)
174+
else:
175+
# attempt to start background channel refresh tasks
176+
try:
177+
self._start_background_channel_refresh()
178+
except RuntimeError:
179+
warnings.warn(
180+
f"{self.__class__.__name__} should be started in an "
181+
"asyncio event loop. Channel refresh will not be started",
182+
RuntimeWarning,
183+
stacklevel=2,
184+
)
165185

166186
@staticmethod
167187
def _client_version() -> str:
@@ -176,7 +196,7 @@ def _start_background_channel_refresh(self) -> None:
176196
Raises:
177197
- RuntimeError if not called in an asyncio event loop
178198
"""
179-
if not self._channel_refresh_tasks:
199+
if not self._channel_refresh_tasks and not self._emulator_host:
180200
# raise RuntimeError if there is no event loop
181201
asyncio.get_running_loop()
182202
for channel_idx in range(self.transport.pool_size):

noxfile.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,27 @@ def system_emulated(session):
278278
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
279279

280280

281+
@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS)
282+
def conformance(session):
283+
"""
284+
Run the set of shared bigtable conformance tests
285+
"""
286+
TEST_REPO_URL = "https://github.com/googleapis/cloud-bigtable-clients-test.git"
287+
CLONE_REPO_DIR = "cloud-bigtable-clients-test"
288+
# install dependencies
289+
constraints_path = str(
290+
CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt"
291+
)
292+
install_unittest_dependencies(session, "-c", constraints_path)
293+
with session.chdir("test_proxy"):
294+
# download the conformance test suite
295+
clone_dir = os.path.join(CURRENT_DIRECTORY, CLONE_REPO_DIR)
296+
if not os.path.exists(clone_dir):
297+
print("downloading copy of test repo")
298+
session.run("git", "clone", TEST_REPO_URL, CLONE_REPO_DIR, external=True)
299+
session.run("bash", "-e", "run_tests.sh", external=True)
300+
301+
281302
@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS)
282303
def system(session):
283304
"""Run the system test suite."""

test_proxy/README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# CBT Python Test Proxy
2+
3+
The CBT test proxy is intended for running conformance tests for Cloud Bigtable Python Client.
4+
5+
## Option 1: Run Tests with Nox
6+
7+
You can run the conformance tests in a single line by calling `nox -s conformance` from the repo root
8+
9+
10+
```
11+
cd python-bigtable/test_proxy
12+
nox -s conformance
13+
```
14+
15+
## Option 2: Run processes manually
16+
17+
### Start test proxy
18+
19+
You can use `test_proxy.py` to launch a new test proxy process directly
20+
21+
```
22+
cd python-bigtable/test_proxy
23+
python test_proxy.py
24+
```
25+
26+
The port can be set by passing in an extra positional argument
27+
28+
```
29+
cd python-bigtable/test_proxy
30+
python test_proxy.py --port 8080
31+
```
32+
33+
You can run the test proxy against the previous `v2` client by running it with the `--legacy-client` flag:
34+
35+
```
36+
python test_proxy.py --legacy-client
37+
```
38+
39+
### Run the test cases
40+
41+
Prerequisites:
42+
- If you have not already done so, [install golang](https://go.dev/doc/install).
43+
- Before running tests, [launch an instance of the test proxy](#start-test-proxy)
44+
in a separate shell session, and make note of the port
45+
46+
47+
Clone and navigate to the go test library:
48+
49+
```
50+
git clone https://github.com/googleapis/cloud-bigtable-clients-test.git
51+
cd cloud-bigtable-clients-test/tests
52+
```
53+
54+
55+
Launch the tests
56+
57+
```
58+
go test -v -proxy_addr=:50055
59+
```
60+

0 commit comments

Comments
 (0)