Skip to content

Commit fa9a239

Browse files
feat: Triger bidi conformance tests (#1757)
feat: Triger bidi conformance tests Changes / fixes - When (read/write) stream breaks, don't do `.close` on it. (this leads to reading the same error message again and again until retry loop times out) - Bidi Conf test were ran directly as a module , change that so that they get triggered in Kokoro (done that by adding `test_` so that pytest pickup the tests) - Provide a way to create anonymous connection in grpc client. - Unit tests and other minor changes. ~blocked because of b/489420625~ --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 2d83fe1 commit fa9a239

File tree

11 files changed

+447
-181
lines changed

11 files changed

+447
-181
lines changed

packages/google-cloud-storage/google/cloud/storage/asyncio/async_appendable_object_writer.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
_extract_bidi_writes_redirect_proto,
4444
)
4545

46-
4746
_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB
4847
_DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB
4948
_BIDI_WRITE_REDIRECTED_TYPE_URL = (
@@ -289,8 +288,7 @@ async def _do_open():
289288
await self.write_obj_stream.close()
290289
except Exception as e:
291290
logger.warning(
292-
"Error closing previous write stream during open retry. Got exception: ",
293-
{e},
291+
f"Error closing previous write stream during open retry. Got exception: {e}"
294292
)
295293
self.write_obj_stream = None
296294
self._is_stream_open = False
@@ -383,8 +381,6 @@ async def generator():
383381
logger.info(
384382
f"Re-opening the stream with attempt_count: {attempt_count}"
385383
)
386-
if self.write_obj_stream and self.write_obj_stream.is_stream_open:
387-
await self.write_obj_stream.close()
388384

389385
current_metadata = list(metadata) if metadata else []
390386
if write_state.routing_token:

packages/google-cloud-storage/google/cloud/storage/asyncio/async_grpc_client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
DEFAULT_CLIENT_INFO,
2020
)
2121
from google.cloud.storage import __version__
22+
import grpc
23+
from google.auth import credentials as auth_credentials
2224

2325

2426
class AsyncGrpcClient:
@@ -52,6 +54,12 @@ def __init__(
5254
*,
5355
attempt_direct_path=True,
5456
):
57+
if isinstance(credentials, auth_credentials.AnonymousCredentials):
58+
self._grpc_client = self._create_anonymous_client(
59+
client_options, credentials
60+
)
61+
return
62+
5563
if client_info is None:
5664
client_info = DEFAULT_CLIENT_INFO
5765
client_info.client_library_version = __version__
@@ -68,6 +76,21 @@ def __init__(
6876
attempt_direct_path=attempt_direct_path,
6977
)
7078

79+
def _create_anonymous_client(self, client_options, credentials):
80+
channel = grpc.aio.insecure_channel(client_options.api_endpoint)
81+
transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport(
82+
channel=channel, credentials=credentials
83+
)
84+
return storage_v2.StorageAsyncClient(transport=transport)
85+
86+
@classmethod
87+
def _create_insecure_grpc_client(cls, client_options):
88+
return cls(
89+
credentials=auth_credentials.AnonymousCredentials(),
90+
client_options=client_options,
91+
attempt_direct_path=False,
92+
)
93+
7194
def _create_async_grpc_client(
7295
self,
7396
credentials=None,

packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
from google.cloud import _storage_v2
4242
from google.cloud.storage._helpers import generate_random_56_bit_integer
4343

44-
4544
_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100
4645
_BIDI_READ_REDIRECTED_TYPE_URL = (
4746
"type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError"
@@ -230,7 +229,6 @@ def __init__(
230229
self.persisted_size: Optional[int] = None # updated after opening the stream
231230
self._open_retries: int = 0
232231

233-
234232
async def __aenter__(self):
235233
"""Opens the underlying bidi-gRPC connection to read from the object."""
236234
await self.open()
@@ -243,6 +241,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
243241

244242
def _on_open_error(self, exc):
245243
"""Extracts routing token and read handle on redirect error during open."""
244+
logger.warning(f"Error occurred while opening MRD: {exc}")
246245
routing_token, read_handle = _handle_redirect(exc)
247246
if routing_token:
248247
self._routing_token = routing_token
@@ -432,7 +431,7 @@ async def generator():
432431

433432
if attempt_count > 1:
434433
logger.info(
435-
f"Resuming download (attempt {attempt_count - 1}) for {len(requests)} ranges."
434+
f"Resuming download (attempt {attempt_count}) for {len(requests)} ranges."
436435
)
437436

438437
async with lock:
@@ -453,11 +452,7 @@ async def generator():
453452
logger.info(
454453
f"Re-opening stream with routing token: {current_token}"
455454
)
456-
# Close existing stream if any
457-
if self.read_obj_str and self.read_obj_str.is_stream_open:
458-
await self.read_obj_str.close()
459455

460-
# Re-initialize stream
461456
self.read_obj_str = _AsyncReadObjectStream(
462457
client=self.client.grpc_client,
463458
bucket_name=self.bucket_name,

packages/google-cloud-storage/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ async def attempt():
5858
return
5959
except Exception as e:
6060
if retry_policy._predicate(e):
61-
logger.info(
61+
logger.warning(
6262
f"Bidi stream operation failed: {e}. Attempting state recovery and retry."
6363
)
6464
await self._strategy.recover_state_on_failure(e, state)

packages/google-cloud-storage/noxfile.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ def conftest_retry(session):
236236
session.install(
237237
"pytest",
238238
"pytest-xdist",
239+
"pytest-asyncio",
239240
"grpcio",
240241
"grpcio-status",
241242
"grpc-google-iam-v1",
@@ -247,13 +248,15 @@ def conftest_retry(session):
247248
# Run #CPU processes in parallel if no test session arguments are passed in.
248249
if session.posargs:
249250
test_cmd = [
250-
"py.test",
251-
"--quiet",
251+
"pytest",
252+
"-vv",
253+
"-s",
254+
# "--quiet",
252255
conformance_test_folder_path,
253256
*session.posargs,
254257
]
255258
else:
256-
test_cmd = ["py.test", "-n", "auto", "--quiet", conformance_test_folder_path]
259+
test_cmd = ["pytest", "-vv", "-s", "-n", "auto", conformance_test_folder_path]
257260

258261
# Run py.test against the conformance tests.
259262
session.run(*test_cmd, env={"DOCKER_API_VERSION": "1.39"})
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import time
2+
import requests
3+
import traceback
4+
5+
def start_grpc_server(grpc_endpoint, http_endpoint):
6+
"""Starts the testbench gRPC server if it's not already running.
7+
8+
this essentially makes -
9+
10+
`curl -s --retry 5 --retry-max-time 40 "http://localhost:9000/start_grpc?port=8888"`
11+
"""
12+
start_time = time.time()
13+
max_time = 40
14+
retries = 5
15+
port = grpc_endpoint.split(":")[-1]
16+
url = f"{http_endpoint}/start_grpc?port={port}"
17+
18+
for i in range(retries):
19+
try:
20+
response = requests.get(url, timeout=10)
21+
if response.status_code == 200:
22+
return
23+
except requests.exceptions.RequestException:
24+
print("Failed to create grpc server", traceback.format_exc())
25+
raise
26+
27+
elapsed_time = time.time() - start_time
28+
if elapsed_time >= max_time:
29+
raise RuntimeError("Failed to start gRPC server within the time limit.")
30+
31+
# backoff
32+
time.sleep(1)

0 commit comments

Comments
 (0)