Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 9bc6375

Browse files
committed
feat: add support for opening via write handle
1 parent 35a9e05 commit 9bc6375

File tree

4 files changed

+126
-31
lines changed

4 files changed

+126
-31
lines changed

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def __init__(
9696
:type object_name: str
9797
:param object_name: The name of the GCS Appendable Object to be written.
9898
99-
:type generation: int
99+
:type generation: Optional[int]
100100
:param generation: (Optional) If present, creates writer for that
101101
specific revision of that object. Use this to append data to an
102102
existing Appendable Object.

google/cloud/storage/_experimental/asyncio/async_write_object_stream.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ async def open(self) -> None:
120120
# Created object type would be Appendable Object.
121121
# if `generation_number` == 0 new object will be created only if there
122122
# isn't any existing object.
123+
is_open_via_write_handle = (
124+
self.write_handle is not None and self.generation_number
125+
)
123126
if self.generation_number is None or self.generation_number == 0:
124127
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
125128
write_object_spec=_storage_v2.WriteObjectSpec(
@@ -136,6 +139,7 @@ async def open(self) -> None:
136139
bucket=self._full_bucket_name,
137140
object=self.object_name,
138141
generation=self.generation_number,
142+
write_handle=self.write_handle,
139143
),
140144
)
141145
self.socket_like_rpc = AsyncBidiRpc(
@@ -145,25 +149,32 @@ async def open(self) -> None:
145149
await self.socket_like_rpc.open() # this is actually 1 send
146150
response = await self.socket_like_rpc.recv()
147151
self._is_stream_open = True
148-
149-
if not response.resource:
150-
raise ValueError(
151-
"Failed to obtain object resource after opening the stream"
152-
)
153-
if not response.resource.generation:
154-
raise ValueError(
155-
"Failed to obtain object generation after opening the stream"
156-
)
152+
if is_open_via_write_handle:
153+
# Don't use if not response.persisted_size because this will be true
154+
# if persisted_size==0 (0 is considered "Falsy" in Python)
155+
if response.persisted_size is None:
156+
raise ValueError(
157+
"Failed to obtain persisted_size after opening the stream via write_handle"
158+
)
159+
self.persisted_size = response.persisted_size
160+
else:
161+
if not response.resource:
162+
raise ValueError(
163+
"Failed to obtain object resource after opening the stream"
164+
)
165+
if not response.resource.generation:
166+
raise ValueError(
167+
"Failed to obtain object generation after opening the stream"
168+
)
169+
if not response.resource.size:
170+
# Appending to a 0 byte appendable object.
171+
self.persisted_size = 0
172+
else:
173+
self.persisted_size = response.resource.size
157174

158175
if not response.write_handle:
159176
raise ValueError("Failed to obtain write_handle after opening the stream")
160177

161-
if not response.resource.size:
162-
# Appending to a 0 byte appendable object.
163-
self.persisted_size = 0
164-
else:
165-
self.persisted_size = response.resource.size
166-
167178
self.generation_number = response.resource.generation
168179
self.write_handle = response.write_handle
169180

tests/system/test_zonal.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,56 @@ async def _run():
333333
event_loop.run_until_complete(_run())
334334

335335

336+
def test_wrd_open_with_write_handle(
337+
event_loop, grpc_client_direct, storage_client, blobs_to_delete
338+
):
339+
object_name = f"test_write_handl-{str(uuid.uuid4())[:4]}"
340+
341+
async def _run():
342+
# 1. Create an object and get its write_handle
343+
writer = AsyncAppendableObjectWriter(
344+
grpc_client_direct, _ZONAL_BUCKET, object_name
345+
)
346+
await writer.open()
347+
write_handle = writer.write_handle
348+
await writer.close()
349+
350+
# 2. Open a new writer using the obtained `write_handle` and generation
351+
new_writer = AsyncAppendableObjectWriter(
352+
grpc_client_direct,
353+
_ZONAL_BUCKET,
354+
object_name,
355+
write_handle=write_handle,
356+
generation=writer.generation,
357+
)
358+
await new_writer.open()
359+
# Verify that the new writer is open and has the same write_handle
360+
assert new_writer.is_stream_open
361+
assert new_writer.generation == writer.generation
362+
363+
# 3. Append some data using the new writer
364+
test_data = b"data_from_new_writer"
365+
await new_writer.append(test_data)
366+
await new_writer.close()
367+
368+
# 4. Verify the data was written correctly by reading it back
369+
mrd = AsyncMultiRangeDownloader(grpc_client_direct, _ZONAL_BUCKET, object_name)
370+
buffer = BytesIO()
371+
await mrd.open()
372+
await mrd.download_ranges([(0, 0, buffer)])
373+
await mrd.close()
374+
assert buffer.getvalue() == test_data
375+
376+
# Clean up
377+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
378+
del writer
379+
del new_writer
380+
del mrd
381+
gc.collect()
382+
383+
event_loop.run_until_complete(_run())
384+
385+
336386
def test_read_unfinalized_appendable_object_with_generation(
337387
storage_client, blobs_to_delete, event_loop, grpc_client_direct
338388
):

tests/unit/asyncio/test_async_write_object_stream.py

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
OBJECT = "my-object"
2626
GENERATION = 12345
2727
WRITE_HANDLE = b"test-handle"
28+
WRITE_HANDLE_PROTO = _storage_v2.BidiWriteHandle(handle=WRITE_HANDLE)
2829

2930

3031
@pytest.fixture
@@ -56,7 +57,7 @@ async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, ope
5657
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
5758
mock_response.resource.generation = GENERATION
5859
mock_response.resource.size = 0
59-
mock_response.write_handle = WRITE_HANDLE
60+
mock_response.write_handle = WRITE_HANDLE_PROTO
6061
socket_like_rpc.recv = AsyncMock(return_value=mock_response)
6162

6263
write_obj_stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
@@ -90,18 +91,16 @@ def test_async_write_object_stream_init(mock_client):
9091

9192
def test_async_write_object_stream_init_with_generation_and_handle(mock_client):
9293
"""Test the constructor with optional arguments."""
93-
generation = 12345
94-
write_handle = b"test-handle"
9594
stream = _AsyncWriteObjectStream(
9695
mock_client,
9796
BUCKET,
9897
OBJECT,
99-
generation_number=generation,
100-
write_handle=write_handle,
98+
generation_number=GENERATION,
99+
write_handle=WRITE_HANDLE_PROTO,
101100
)
102101

103-
assert stream.generation_number == generation
104-
assert stream.write_handle == write_handle
102+
assert stream.generation_number == GENERATION
103+
assert stream.write_handle == WRITE_HANDLE_PROTO
105104

106105

107106
def test_async_write_object_stream_init_raises_value_error():
@@ -131,7 +130,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
131130
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
132131
mock_response.resource.generation = GENERATION
133132
mock_response.resource.size = 0
134-
mock_response.write_handle = WRITE_HANDLE
133+
mock_response.write_handle = WRITE_HANDLE_PROTO
135134
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
136135

137136
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
@@ -144,7 +143,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
144143
socket_like_rpc.open.assert_called_once()
145144
socket_like_rpc.recv.assert_called_once()
146145
assert stream.generation_number == GENERATION
147-
assert stream.write_handle == WRITE_HANDLE
146+
assert stream.write_handle == WRITE_HANDLE_PROTO
148147
assert stream.persisted_size == 0
149148

150149

@@ -163,7 +162,7 @@ async def test_open_for_new_object_with_generation_zero(mock_async_bidi_rpc, moc
163162
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
164163
mock_response.resource.generation = GENERATION
165164
mock_response.resource.size = 0
166-
mock_response.write_handle = WRITE_HANDLE
165+
mock_response.write_handle = WRITE_HANDLE_PROTO
167166
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
168167

169168
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT, generation_number=0)
@@ -180,7 +179,7 @@ async def test_open_for_new_object_with_generation_zero(mock_async_bidi_rpc, moc
180179
socket_like_rpc.open.assert_called_once()
181180
socket_like_rpc.recv.assert_called_once()
182181
assert stream.generation_number == GENERATION
183-
assert stream.write_handle == WRITE_HANDLE
182+
assert stream.write_handle == WRITE_HANDLE_PROTO
184183
assert stream.persisted_size == 0
185184

186185

@@ -199,7 +198,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client):
199198
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
200199
mock_response.resource.size = 1024
201200
mock_response.resource.generation = GENERATION
202-
mock_response.write_handle = WRITE_HANDLE
201+
mock_response.write_handle = WRITE_HANDLE_PROTO
203202
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
204203

205204
stream = _AsyncWriteObjectStream(
@@ -214,7 +213,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client):
214213
socket_like_rpc.open.assert_called_once()
215214
socket_like_rpc.recv.assert_called_once()
216215
assert stream.generation_number == GENERATION
217-
assert stream.write_handle == WRITE_HANDLE
216+
assert stream.write_handle == WRITE_HANDLE_PROTO
218217
assert stream.persisted_size == 1024
219218

220219

@@ -233,7 +232,7 @@ async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_cli
233232
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
234233
mock_response.resource.generation = GENERATION
235234
mock_response.resource.size = 0
236-
mock_response.write_handle = WRITE_HANDLE
235+
mock_response.write_handle = WRITE_HANDLE_PROTO
237236
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
238237

239238
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
@@ -315,6 +314,41 @@ async def test_open_raises_error_on_missing_write_handle(
315314
await stream.open()
316315

317316

317+
@pytest.mark.asyncio
318+
@mock.patch(
319+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
320+
)
321+
async def test_open_raises_error_on_missing_persisted_size_with_write_handle(
322+
mock_async_bidi_rpc, mock_client
323+
):
324+
"""Test that open raises ValueError if persisted_size is None when opened via write_handle."""
325+
socket_like_rpc = mock.AsyncMock()
326+
mock_async_bidi_rpc.return_value = socket_like_rpc
327+
328+
#
329+
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
330+
mock_response.persisted_size = None # This is the key part of the test
331+
mock_response.write_handle = (
332+
WRITE_HANDLE_PROTO # Ensure write_handle is present to avoid that error
333+
)
334+
socket_like_rpc.recv.return_value = mock_response
335+
336+
# ACT
337+
stream = _AsyncWriteObjectStream(
338+
mock_client,
339+
BUCKET,
340+
OBJECT,
341+
write_handle=WRITE_HANDLE_PROTO,
342+
generation_number=GENERATION,
343+
)
344+
345+
with pytest.raises(
346+
ValueError,
347+
match="Failed to obtain persisted_size after opening the stream via write_handle",
348+
):
349+
await stream.open()
350+
351+
318352
@pytest.mark.asyncio
319353
@mock.patch(
320354
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
@@ -452,4 +486,4 @@ async def test_requests_done(mock_cls_async_bidi_rpc, mock_client):
452486

453487
# Assert
454488
write_obj_stream.socket_like_rpc.send.assert_called_once_with(None)
455-
write_obj_stream.socket_like_rpc.recv.assert_called_once()
489+
write_obj_stream.socket_like_rpc.recv.assert_called_once()

0 commit comments

Comments
 (0)