Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 359a23c

Browse files
committed
adding more changes
1 parent 99b20b7 commit 359a23c

16 files changed

+393
-614
lines changed

google/cloud/storage/_experimental/asyncio/_utils.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,3 @@ def raise_if_no_fast_crc32c():
3333
"C extension is required for faster data integrity checks."
3434
"For more information, see https://github.com/googleapis/python-crc32c."
3535
)
36-
37-
38-
def update_write_handle_if_exists(obj, response):
39-
"""Update the write_handle attribute of an object if it exists in the response."""
40-
if hasattr(response, "write_handle") and response.write_handle is not None:
41-
obj.write_handle = response.write_handle

google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class _AsyncAbstractObjectStream(abc.ABC):
3232
:param generation_number: (Optional) If present, selects a specific revision of
3333
this object.
3434
35-
:type handle: Any
35+
:type handle: bytes
3636
:param handle: (Optional) The handle for the object, could be read_handle or
3737
write_handle, based on how the stream is used.
3838
"""
@@ -42,13 +42,13 @@ def __init__(
4242
bucket_name: str,
4343
object_name: str,
4444
generation_number: Optional[int] = None,
45-
handle: Optional[Any] = None,
45+
handle: Optional[bytes] = None,
4646
) -> None:
4747
super().__init__()
4848
self.bucket_name: str = bucket_name
4949
self.object_name: str = object_name
5050
self.generation_number: Optional[int] = generation_number
51-
self.handle: Optional[Any] = handle
51+
self.handle: Optional[bytes] = handle
5252

5353
@abc.abstractmethod
5454
async def open(self) -> None:

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from google.cloud._storage_v2.types.storage import BidiWriteObjectRequest
3737

3838

39-
from . import _utils
39+
from ._utils import raise_if_no_fast_crc32c
4040
from google.cloud import _storage_v2
4141
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
4242
AsyncGrpcClient,
@@ -117,8 +117,8 @@ def __init__(
117117
client: AsyncGrpcClient.grpc_client,
118118
bucket_name: str,
119119
object_name: str,
120-
generation: Optional[int] = None,
121-
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
120+
generation=None,
121+
write_handle=None,
122122
writer_options: Optional[dict] = None,
123123
):
124124
"""
@@ -164,7 +164,7 @@ def __init__(
164164
:type object_name: str
165165
:param object_name: The name of the GCS Appendable Object to be written.
166166
167-
:type generation: Optional[int]
167+
:type generation: int
168168
:param generation: (Optional) If present, creates writer for that
169169
specific revision of that object. Use this to append data to an
170170
existing Appendable Object.
@@ -174,10 +174,10 @@ def __init__(
174174
overwriting existing objects).
175175
176176
Warning: If `None`, a new object is created. If an object with the
177-
same name already exists, it will be overwritten the moment
177+
same name already exists, it will be overwritten the moment
178178
`writer.open()` is called.
179179
180-
:type write_handle: _storage_v2.BidiWriteHandle
180+
:type write_handle: bytes
181181
:param write_handle: (Optional) An handle for writing the object.
182182
If provided, opening the bidi-gRPC connection will be faster.
183183
@@ -189,7 +189,7 @@ def __init__(
189189
servers. Default is `_DEFAULT_FLUSH_INTERVAL_BYTES`.
190190
Must be a multiple of `_MAX_CHUNK_SIZE_BYTES`.
191191
"""
192-
_utils.raise_if_no_fast_crc32c()
192+
raise_if_no_fast_crc32c()
193193
self.client = client
194194
self.bucket_name = bucket_name
195195
self.object_name = object_name
@@ -372,12 +372,13 @@ async def append(
372372
attempt_count = 0
373373

374374
def send_and_recv_generator(
375-
requests_generator,
375+
requests: List[BidiWriteObjectRequest],
376376
state: dict[str, _WriteState],
377377
metadata: Optional[List[Tuple[str, str]]] = None,
378378
):
379379
async def generator():
380380
nonlocal attempt_count
381+
nonlocal requests
381382
attempt_count += 1
382383
resp = None
383384
async with self._lock:
@@ -414,33 +415,16 @@ async def generator():
414415
write_state.bytes_sent = write_state.persisted_size
415416
write_state.bytes_since_last_flush = 0
416417

417-
# Process requests from the generator
418-
# Strategy handles state_lookup and flush on the last request,
419-
# so we just stream requests directly
420-
for chunk_req in requests_generator:
421-
# Check if this is an open/state-lookup request (no checksummed_data)
422-
if chunk_req.state_lookup and not chunk_req.checksummed_data:
423-
# This is an open request - send it and get response
424-
await self.write_obj_stream.send(chunk_req)
425-
resp = await self.write_obj_stream.recv()
426-
427-
# Update state from open response
428-
if resp:
429-
if resp.persisted_size is not None:
430-
self.persisted_size = resp.persisted_size
431-
write_state.persisted_size = resp.persisted_size
432-
self.offset = self.persisted_size
433-
if resp.write_handle:
434-
self.write_handle = resp.write_handle
435-
write_state.write_handle = resp.write_handle
436-
continue
437-
438-
# This is a data request - send it
418+
requests = strategy.generate_requests(state)
419+
420+
num_requests = len(requests)
421+
for i, chunk_req in enumerate(requests):
422+
if i == num_requests - 1:
423+
chunk_req.state_lookup = True
424+
chunk_req.flush = True
439425
await self.write_obj_stream.send(chunk_req)
440426

441-
# Get final response from the last request (which has state_lookup=True)
442427
resp = await self.write_obj_stream.recv()
443-
444428
if resp:
445429
if resp.persisted_size is not None:
446430
self.persisted_size = resp.persisted_size
@@ -584,6 +568,7 @@ async def finalize(self) -> _storage_v2.Object:
584568
def is_stream_open(self) -> bool:
585569
return self._is_stream_open
586570

571+
587572
# helper methods.
588573
async def append_from_string(self, data: str):
589574
"""

google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ async def create_mrd(
129129
bucket_name: str,
130130
object_name: str,
131131
generation_number: Optional[int] = None,
132-
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
132+
read_handle: Optional[bytes] = None,
133133
retry_policy: Optional[AsyncRetry] = None,
134134
metadata: Optional[List[Tuple[str, str]]] = None,
135135
) -> AsyncMultiRangeDownloader:
@@ -149,7 +149,7 @@ async def create_mrd(
149149
:param generation_number: (Optional) If present, selects a specific
150150
revision of this object.
151151
152-
:type read_handle: _storage_v2.BidiReadHandle
152+
:type read_handle: bytes
153153
:param read_handle: (Optional) An existing handle for reading the object.
154154
If provided, opening the bidi-gRPC connection will be faster.
155155
@@ -172,7 +172,7 @@ def __init__(
172172
bucket_name: str,
173173
object_name: str,
174174
generation_number: Optional[int] = None,
175-
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
175+
read_handle: Optional[bytes] = None,
176176
) -> None:
177177
"""Constructor for AsyncMultiRangeDownloader, clients are not adviced to
178178
use it directly. Instead it's adviced to use the classmethod `create_mrd`.
@@ -190,7 +190,7 @@ def __init__(
190190
:param generation_number: (Optional) If present, selects a specific revision of
191191
this object.
192192
193-
:type read_handle: _storage_v2.BidiReadHandle
193+
:type read_handle: bytes
194194
:param read_handle: (Optional) An existing read handle.
195195
"""
196196

@@ -200,7 +200,7 @@ def __init__(
200200
self.bucket_name = bucket_name
201201
self.object_name = object_name
202202
self.generation_number = generation_number
203-
self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle
203+
self.read_handle = read_handle
204204
self.read_obj_str: Optional[_AsyncReadObjectStream] = None
205205
self._is_stream_open: bool = False
206206
self._routing_token: Optional[str] = None
@@ -385,7 +385,7 @@ async def download_ranges(
385385
attempt_count = 0
386386

387387
def send_ranges_and_get_bytes(
388-
requests_generator,
388+
requests: List[_storage_v2.ReadRange],
389389
state: Dict[str, Any],
390390
metadata: Optional[List[Tuple[str, str]]] = None,
391391
):
@@ -395,7 +395,7 @@ async def generator():
395395

396396
if attempt_count > 1:
397397
logger.info(
398-
f"Resuming download (attempt {attempt_count - 1})."
398+
f"Resuming download (attempt {attempt_count - 1}) for {len(requests)} ranges."
399399
)
400400

401401
async with lock:
@@ -444,28 +444,17 @@ async def generator():
444444
)
445445
self._is_stream_open = True
446446

447-
# Stream requests directly without materializing
448-
pending_read_ids = set()
449-
current_batch = []
450-
451-
for read_range in requests_generator:
452-
pending_read_ids.add(read_range.read_id)
453-
current_batch.append(read_range)
454-
455-
# Send batch when it reaches max size
456-
if len(current_batch) >= _MAX_READ_RANGES_PER_BIDI_READ_REQUEST:
457-
await self.read_obj_str.send(
458-
_storage_v2.BidiReadObjectRequest(read_ranges=current_batch)
459-
)
460-
current_batch = []
447+
pending_read_ids = {r.read_id for r in requests}
461448

462-
# Send remaining partial batch
463-
if current_batch:
449+
# Send Requests
450+
for i in range(
451+
0, len(requests), _MAX_READ_RANGES_PER_BIDI_READ_REQUEST
452+
):
453+
batch = requests[i : i + _MAX_READ_RANGES_PER_BIDI_READ_REQUEST]
464454
await self.read_obj_str.send(
465-
_storage_v2.BidiReadObjectRequest(read_ranges=current_batch)
455+
_storage_v2.BidiReadObjectRequest(read_ranges=batch)
466456
)
467457

468-
# Receive responses
469458
while pending_read_ids:
470459
response = await self.read_obj_str.recv()
471460
if response is None:
@@ -504,4 +493,4 @@ async def close(self):
504493

505494
@property
506495
def is_stream_open(self) -> bool:
507-
return self._is_stream_open
496+
return self._is_stream_open

google/cloud/storage/_experimental/asyncio/async_read_object_stream.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream):
5151
:param generation_number: (Optional) If present, selects a specific revision of
5252
this object.
5353
54-
:type read_handle: _storage_v2.BidiReadHandle
54+
:type read_handle: bytes
5555
:param read_handle: (Optional) An existing handle for reading the object.
5656
If provided, opening the bidi-gRPC connection will be faster.
5757
"""
@@ -62,7 +62,7 @@ def __init__(
6262
bucket_name: str,
6363
object_name: str,
6464
generation_number: Optional[int] = None,
65-
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
65+
read_handle: Optional[bytes] = None,
6666
) -> None:
6767
if client is None:
6868
raise ValueError("client must be provided")
@@ -77,7 +77,7 @@ def __init__(
7777
generation_number=generation_number,
7878
)
7979
self.client: AsyncGrpcClient.grpc_client = client
80-
self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle
80+
self.read_handle: Optional[bytes] = read_handle
8181

8282
self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"
8383

@@ -195,4 +195,4 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse:
195195

196196
@property
197197
def is_stream_open(self) -> bool:
198-
return self._is_stream_open
198+
return self._is_stream_open

google/cloud/storage/_experimental/asyncio/async_write_object_stream.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
2323
"""
2424
from typing import List, Optional, Tuple
25-
from . import _utils
2625
from google.cloud import _storage_v2
2726
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
2827
from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import (
@@ -60,7 +59,7 @@ class _AsyncWriteObjectStream(_AsyncAbstractObjectStream):
6059
same name already exists, it will be overwritten the moment
6160
`writer.open()` is called.
6261
63-
:type write_handle: _storage_v2.BidiWriteHandle
62+
:type write_handle: bytes
6463
:param write_handle: (Optional) An existing handle for writing the object.
6564
If provided, opening the bidi-gRPC connection will be faster.
6665
"""
@@ -125,9 +124,6 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
125124
# Created object type would be Appendable Object.
126125
# if `generation_number` == 0 new object will be created only if there
127126
# isn't any existing object.
128-
is_open_via_write_handle = (
129-
self.write_handle is not None and self.generation_number
130-
)
131127
if self.generation_number is None or self.generation_number == 0:
132128
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
133129
write_object_spec=_storage_v2.WriteObjectSpec(
@@ -196,8 +192,9 @@ async def close(self) -> None:
196192

197193
async def requests_done(self):
198194
"""Signals that all requests have been sent."""
195+
199196
await self.socket_like_rpc.send(None)
200-
_utils.update_write_handle_if_exists(self, await self.socket_like_rpc.recv())
197+
await self.socket_like_rpc.recv()
201198

202199
async def send(
203200
self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest
@@ -239,3 +236,4 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
239236
@property
240237
def is_stream_open(self) -> bool:
241238
return self._is_stream_open
239+

google/cloud/storage/_experimental/asyncio/retry/base_strategy.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,16 @@ class _BaseResumptionStrategy(abc.ABC):
2727
"""
2828

2929
@abc.abstractmethod
30-
def generate_requests(self, state: Any):
31-
"""Generates requests based on the current state as a generator.
30+
def generate_requests(self, state: Any) -> Iterable[Any]:
31+
"""Generates the next batch of requests based on the current state.
3232
3333
This method is called at the beginning of each retry attempt. It should
34-
inspect the provided state object and yield request protos to send to
35-
the server. For example, a read strategy would use this to implement
36-
"Smarter Resumption" by creating smaller `ReadRange` requests for
37-
partially downloaded ranges. For bidi-writes, it will set the
38-
`write_offset` field to the persisted size received from the server
39-
in the next request.
40-
41-
This is a generator that yields requests incrementally rather than
42-
returning them all at once, allowing for better memory efficiency
43-
and on-demand generation.
34+
inspect the provided state object and generate the appropriate list of
35+
request protos to send to the server. For example, a read strategy
36+
would use this to implement "Smarter Resumption" by creating smaller
37+
`ReadRange` requests for partially downloaded ranges. For bidi-writes,
38+
it will set the `write_offset` field to the persisted size received
39+
from the server in the next request.
4440
4541
:type state: Any
4642
:param state: An object containing all the state needed for the

google/cloud/storage/_experimental/asyncio/retry/bidi_stream_retry_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ async def execute(self, initial_state: Any, retry_policy):
5050
state = initial_state
5151

5252
async def attempt():
53-
requests_generator = self._strategy.generate_requests(state)
54-
stream = self._send_and_recv(requests_generator, state)
53+
requests = self._strategy.generate_requests(state)
54+
stream = self._send_and_recv(requests, state)
5555
try:
5656
async for response in stream:
5757
self._strategy.update_state_from_response(response, state)

0 commit comments

Comments
 (0)