From 786af5556023144001f1375f1c6265bd46326b24 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Mon, 17 Nov 2025 18:52:23 +0530 Subject: [PATCH 01/24] feat(zb-experimental): implement "open" for write_object_stream (#1613) feat(zb-experimental): implement "open" for write_object_stream --- .../asyncio/async_write_object_stream.py | 52 +++++- .../asyncio/test_async_write_object_stream.py | 164 +++++++++++++++++- 2 files changed, 211 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index 07263ddd8..037bcd07d 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -95,10 +95,54 @@ async def open(self) -> None: """Opening an object for write , should do it's state lookup to know what's the persisted size is. """ - raise NotImplementedError( - "open() is not implemented yet in _AsyncWriteObjectStream" + if self._is_stream_open: + raise ValueError("Stream is already open") + + # Create a new object or overwrite existing one if generation_number + # is None. This makes it consistent with GCS JSON API behavior. + # Created object type would be Appendable Object. + if self.generation_number is None: + self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest( + write_object_spec=_storage_v2.WriteObjectSpec( + resource=_storage_v2.Object( + name=self.object_name, bucket=self._full_bucket_name + ), + appendable=True, + ), + ) + else: + self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest( + append_object_spec=_storage_v2.AppendObjectSpec( + bucket=self._full_bucket_name, + object=self.object_name, + generation=self.generation_number, + ), + state_lookup=True, + ) + + self.socket_like_rpc = AsyncBidiRpc( + self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata ) + await self.socket_like_rpc.open() # this is actually 1 send + response = await self.socket_like_rpc.recv() + self._is_stream_open = True + + if not response.resource: + raise ValueError( + "Failed to obtain object resource after opening the stream" + ) + if not response.resource.generation: + raise ValueError( + "Failed to obtain object generation after opening the stream" + ) + self.generation_number = response.resource.generation + + if not response.write_handle: + raise ValueError("Failed to obtain write_handle after opening the stream") + + self.write_handle = response.write_handle + async def close(self) -> None: """Closes the bidi-gRPC connection.""" raise NotImplementedError( @@ -132,3 +176,7 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse: raise NotImplementedError( "recv() is not implemented yet in _AsyncWriteObjectStream" ) + + @property + def is_stream_open(self) -> bool: + return self._is_stream_open diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 9834b79c9..f159ba3e7 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -22,6 +22,8 @@ BUCKET = "my-bucket" OBJECT = "my-object" +GENERATION = 12345 +WRITE_HANDLE = b"test-handle" @pytest.fixture @@ -91,13 +93,169 @@ def test_async_write_object_stream_init_raises_value_error(): @pytest.mark.asyncio -async def test_unimplemented_methods_raise_error(mock_client): - """Test that unimplemented methods raise NotImplementedError.""" +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_for_new_object(mock_async_bidi_rpc, mock_client): + """Test opening a stream for a new object.""" + # Arrange + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = mock.AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - with pytest.raises(NotImplementedError): + # Act + await stream.open() + + # Assert + assert stream._is_stream_open + socket_like_rpc.open.assert_called_once() + socket_like_rpc.recv.assert_called_once() + assert stream.generation_number == GENERATION + assert stream.write_handle == WRITE_HANDLE + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client): + """Test opening a stream for an existing object.""" + # Arrange + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = mock.AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) + + stream = _AsyncWriteObjectStream( + mock_client, BUCKET, OBJECT, generation_number=GENERATION + ) + + # Act + await stream.open() + + # Assert + assert stream._is_stream_open + socket_like_rpc.open.assert_called_once() + socket_like_rpc.recv.assert_called_once() + assert stream.generation_number == GENERATION + assert stream.write_handle == WRITE_HANDLE + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_client): + """Test that opening an already open stream raises a ValueError.""" + # Arrange + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = mock.AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + await stream.open() + + # Act & Assert + with pytest.raises(ValueError, match="Stream is already open"): await stream.open() + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_object_resource( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if object_resource is not in the response.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + + mock_reponse = mock.AsyncMock() + type(mock_reponse).resource = mock.PropertyMock(return_value=None) + socket_like_rpc.recv.return_value = mock_reponse + + # Note: Don't use below code as unittest library automatically assigns an + # `AsyncMock` object to an attribute, if not set. + # socket_like_rpc.recv.return_value = mock.AsyncMock( + # return_value=_storage_v2.BidiWriteObjectResponse(resource=None) + # ) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Failed to obtain object resource after opening the stream" + ): + await stream.open() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_generation( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if generation is not in the response.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + + # Configure the mock response object + mock_response = mock.AsyncMock() + type(mock_response.resource).generation = mock.PropertyMock(return_value=None) + socket_like_rpc.recv.return_value = mock_response + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Failed to obtain object generation after opening the stream" + ): + await stream.open() + # assert stream.generation_number is None + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_write_handle( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if write_handle is not in the response.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse( + resource=_storage_v2.Object(generation=GENERATION), write_handle=None + ) + ) + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises(ValueError, match="Failed to obtain write_handle"): + await stream.open() + + +@pytest.mark.asyncio +async def test_unimplemented_methods_raise_error(mock_client): + """Test that unimplemented methods raise NotImplementedError.""" + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises(NotImplementedError): await stream.close() From 7a532216a9a3da728bd3d7f196372f8f6528ef75 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Mon, 17 Nov 2025 19:36:21 +0530 Subject: [PATCH 02/24] feat(zb-experimental): implement close (#1614) feat(zb-experimental): implement close --- .../asyncio/async_write_object_stream.py | 7 +- .../asyncio/test_async_write_object_stream.py | 69 +++++++++++++++++-- 2 files changed, 67 insertions(+), 9 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index 037bcd07d..b84dea5a4 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -145,9 +145,10 @@ async def open(self) -> None: async def close(self) -> None: """Closes the bidi-gRPC connection.""" - raise NotImplementedError( - "close() is not implemented yet in _AsyncWriteObjectStream" - ) + if not self._is_stream_open: + raise ValueError("Stream is not open") + await self.socket_like_rpc.close() + self._is_stream_open = False async def send( self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index f159ba3e7..ad83669c3 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -15,6 +15,7 @@ import pytest from unittest import mock +from unittest.mock import AsyncMock from google.cloud.storage._experimental.asyncio.async_write_object_stream import ( _AsyncWriteObjectStream, ) @@ -43,6 +44,27 @@ def mock_client(): return client +async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, open=True): + """Helper to create an instance of _AsyncWriteObjectStream and open it by default.""" + socket_like_rpc = AsyncMock() + mock_cls_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = AsyncMock() + socket_like_rpc.close = AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = AsyncMock(return_value=mock_response) + + write_obj_stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + + if open: + await write_obj_stream.open() + + return write_obj_stream + + def test_async_write_object_stream_init(mock_client): """Test the constructor of _AsyncWriteObjectStream.""" stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) @@ -228,7 +250,6 @@ async def test_open_raises_error_on_missing_generation( ValueError, match="Failed to obtain object generation after opening the stream" ): await stream.open() - # assert stream.generation_number is None @pytest.mark.asyncio @@ -252,13 +273,49 @@ async def test_open_raises_error_on_missing_write_handle( @pytest.mark.asyncio -async def test_unimplemented_methods_raise_error(mock_client): - """Test that unimplemented methods raise NotImplementedError.""" - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_close(mock_cls_async_bidi_rpc, mock_client): + """Test that close successfully closes the stream.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=True + ) - with pytest.raises(NotImplementedError): - await stream.close() + # Act + await write_obj_stream.close() + # Assert + write_obj_stream.socket_like_rpc.close.assert_called_once() + assert not write_obj_stream.is_stream_open + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_close_without_open_should_raise_error( + mock_cls_async_bidi_rpc, mock_client +): + """Test that closing a stream that is not open raises a ValueError.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=False + ) + + # Act & Assert + with pytest.raises(ValueError, match="Stream is not open"): + await write_obj_stream.close() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_unimplemented_methods_raise_error(mock_async_bidi_rpc, mock_client): + """Test that unimplemented methods (send, recv) raise NotImplementedError.""" + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) with pytest.raises(NotImplementedError): await stream.send(_storage_v2.BidiWriteObjectRequest()) From 6da118652c97ba11d82c25347080188c7cf7a338 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Mon, 17 Nov 2025 20:43:45 +0530 Subject: [PATCH 03/24] feat(zb-experimental): implement send & recv (#1615) feat(zb-experimental): implement send & recv --- .../asyncio/async_write_object_stream.py | 12 +-- .../asyncio/test_async_write_object_stream.py | 81 +++++++++++++++++-- 2 files changed, 80 insertions(+), 13 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index b84dea5a4..6d1fd5b31 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -160,9 +160,9 @@ async def send( The request message to send. This is typically used to specify the read offset and limit. """ - raise NotImplementedError( - "send() is not implemented yet in _AsyncWriteObjectStream" - ) + if not self._is_stream_open: + raise ValueError("Stream is not open") + await self.socket_like_rpc.send(bidi_write_object_request) async def recv(self) -> _storage_v2.BidiWriteObjectResponse: """Receives a response from the stream. @@ -174,9 +174,9 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse: :class:`~google.cloud._storage_v2.types.BidiWriteObjectResponse`: The response message from the server. """ - raise NotImplementedError( - "recv() is not implemented yet in _AsyncWriteObjectStream" - ) + if not self._is_stream_open: + raise ValueError("Stream is not open") + return await self.socket_like_rpc.recv() @property def is_stream_open(self) -> bool: diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index ad83669c3..7fa2123c5 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -49,6 +49,7 @@ async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, ope socket_like_rpc = AsyncMock() mock_cls_async_bidi_rpc.return_value = socket_like_rpc socket_like_rpc.open = AsyncMock() + socket_like_rpc.send = AsyncMock() socket_like_rpc.close = AsyncMock() mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) @@ -313,11 +314,77 @@ async def test_close_without_open_should_raise_error( @mock.patch( "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" ) -async def test_unimplemented_methods_raise_error(mock_async_bidi_rpc, mock_client): - """Test that unimplemented methods (send, recv) raise NotImplementedError.""" - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - with pytest.raises(NotImplementedError): - await stream.send(_storage_v2.BidiWriteObjectRequest()) +async def test_send(mock_cls_async_bidi_rpc, mock_client): + """Test that send calls the underlying rpc's send method.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=True + ) + + # Act + bidi_write_object_request = _storage_v2.BidiWriteObjectRequest() + await write_obj_stream.send(bidi_write_object_request) + + # Assert + write_obj_stream.socket_like_rpc.send.assert_called_once_with( + bidi_write_object_request + ) + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_send_without_open_should_raise_error( + mock_cls_async_bidi_rpc, mock_client +): + """Test that sending on a stream that is not open raises a ValueError.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=False + ) + + # Act & Assert + with pytest.raises(ValueError, match="Stream is not open"): + await write_obj_stream.send(_storage_v2.BidiWriteObjectRequest()) + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_recv(mock_cls_async_bidi_rpc, mock_client): + """Test that recv calls the underlying rpc's recv method.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=True + ) + bidi_write_object_response = _storage_v2.BidiWriteObjectResponse() + write_obj_stream.socket_like_rpc.recv = AsyncMock( + return_value=bidi_write_object_response + ) + + # Act + response = await write_obj_stream.recv() + + # Assert + write_obj_stream.socket_like_rpc.recv.assert_called_once() + assert response == bidi_write_object_response + - with pytest.raises(NotImplementedError): - await stream.recv() +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_recv_without_open_should_raise_error( + mock_cls_async_bidi_rpc, mock_client +): + """Test that receiving on a stream that is not open raises a ValueError.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=False + ) + + # Act & Assert + with pytest.raises(ValueError, match="Stream is not open"): + await write_obj_stream.recv() From fdd0a50146d7b5f50f04c1871d43329a10938ff5 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Mon, 17 Nov 2025 22:10:26 +0530 Subject: [PATCH 04/24] feat(zb-experimental): Add Async_appendable_object_writer.py (#1616) feat(zb-experimental): Add Async_appendable_object_writer.py --- .../asyncio/async_appendable_object_writer.py | 158 ++++++++++++++++++ .../test_async_appendable_object_writer.py | 115 +++++++++++++ 2 files changed, 273 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py create mode 100644 tests/unit/asyncio/test_async_appendable_object_writer.py diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py new file mode 100644 index 000000000..ca03c2b6d --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -0,0 +1,158 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NOTE: +This is _experimental module for upcoming support for Rapid Storage. +(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) + +APIs may not work as intended and are not stable yet. Feature is not +GA(Generally Available) yet, please contact your TAM (Technical Account Manager) +if you want to use these Rapid Storage APIs. + +""" +from typing import Optional +from google.cloud.storage._experimental.asyncio.async_grpc_client import ( + AsyncGrpcClient, +) +from google.cloud.storage._experimental.asyncio.async_write_object_stream import ( + _AsyncWriteObjectStream, +) + + +class AsyncAppendableObjectWriter: + """Class for appending data to a GCS Appendable Object asynchronously.""" + + def __init__( + self, + client: AsyncGrpcClient.grpc_client, + bucket_name: str, + object_name: str, + generation=None, + write_handle=None, + ): + """ + Class for appending data to a GCS Appendable Object. + + Example usage: + + ``` + + from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import AsyncAppendableObjectWriter + import asyncio + + client = AsyncGrpcClient().grpc_client + bucket_name = "my-bucket" + object_name = "my-appendable-object" + + # instantiate the writer + writer = AsyncAppendableObjectWriter(client, bucket_name, object_name) + # open the writer, (underlying gRPC bidi-stream will be opened) + await writer.open() + + # append data, it can be called multiple times. + await writer.append(b"hello world") + await writer.append(b"some more data") + + # optionally flush data to persist. + await writer.flush() + + # close the gRPC stream. + # Please note closing the program will also close the stream, + # however it's recommended to close the stream if no more data to append + # to clean up gRPC connection (which means CPU/memory/network resources) + await writer.close() + ``` + + :type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client` + :param client: async grpc client to use for making API requests. + + :type bucket_name: str + :param bucket_name: The name of the GCS bucket containing the object. + + :type object_name: str + :param object_name: The name of the GCS Appendable Object to be written. + + :type generation: int + :param generation: (Optional) If present, selects a specific revision of + that object. + If None, a new object is created. + If None and Object already exists then it'll will be + overwritten. + + :type write_handle: bytes + :param write_handle: (Optional) An existing handle for writing the object. + If provided, opening the bidi-gRPC connection will be faster. + """ + self.client = client + self.bucket_name = bucket_name + self.object_name = object_name + self.write_handle = write_handle + self.generation = generation + + self.write_obj_stream = _AsyncWriteObjectStream( + client=self.client, + bucket_name=self.bucket_name, + object_name=self.object_name, + generation_number=self.generation, + write_handle=self.write_handle, + ) + self._is_stream_open: bool = False + self.offset: Optional[int] = None + self.persisted_size: Optional[int] = None + + async def state_lookup(self): + """Returns the persisted_size.""" + raise NotImplementedError("state_lookup is not implemented yet.") + + async def open(self) -> None: + """Opens the underlying bidi-gRPC stream.""" + raise NotImplementedError("open is not implemented yet.") + + async def append(self, data: bytes): + raise NotImplementedError("append is not implemented yet.") + + async def flush(self) -> int: + """Returns persisted_size""" + raise NotImplementedError("flush is not implemented yet.") + + async def close(self, finalize_on_close=False) -> int: + """Returns persisted_size""" + raise NotImplementedError("close is not implemented yet.") + + async def finalize(self) -> int: + """Returns persisted_size + Note: Once finalized no more data can be appended. + """ + raise NotImplementedError("finalize is not implemented yet.") + + # helper methods. + async def append_from_string(self, data: str): + """ + str data will be encoded to bytes using utf-8 encoding calling + + self.append(data.encode("utf-8")) + """ + raise NotImplementedError("append_from_string is not implemented yet.") + + async def append_from_stream(self, stream_obj): + """ + At a time read a chunk of data (16MiB) from `stream_obj` + and call self.append(chunk) + """ + raise NotImplementedError("append_from_stream is not implemented yet.") + + async def append_from_file(self, file_path: str): + """Create a file object from `file_path` and call append_from_stream(file_obj)""" + raise NotImplementedError("append_from_file is not implemented yet.") diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py new file mode 100644 index 000000000..cf838ac05 --- /dev/null +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -0,0 +1,115 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock + +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) + +BUCKET = "test-bucket" +OBJECT = "test-object" +GENERATION = 123 +WRITE_HANDLE = b"test-write-handle" + + +@pytest.fixture +def mock_client(): + """Mock the async gRPC client.""" + return mock.AsyncMock() + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +def test_init(mock_write_object_stream, mock_client): + """Test the constructor of AsyncAppendableObjectWriter.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + + assert writer.client == mock_client + assert writer.bucket_name == BUCKET + assert writer.object_name == OBJECT + assert writer.generation is None + assert writer.write_handle is None + assert not writer._is_stream_open + assert writer.offset is None + assert writer.persisted_size is None + + mock_write_object_stream.assert_called_once_with( + client=mock_client, + bucket_name=BUCKET, + object_name=OBJECT, + generation_number=None, + write_handle=None, + ) + assert writer.write_obj_stream == mock_write_object_stream.return_value + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +def test_init_with_optional_args(mock_write_object_stream, mock_client): + """Test the constructor with optional arguments.""" + writer = AsyncAppendableObjectWriter( + mock_client, + BUCKET, + OBJECT, + generation=GENERATION, + write_handle=WRITE_HANDLE, + ) + + assert writer.generation == GENERATION + assert writer.write_handle == WRITE_HANDLE + + mock_write_object_stream.assert_called_once_with( + client=mock_client, + bucket_name=BUCKET, + object_name=OBJECT, + generation_number=GENERATION, + write_handle=WRITE_HANDLE, + ) + + +@pytest.mark.asyncio +async def test_unimplemented_methods_raise_error(mock_client): + """Test that all currently unimplemented methods raise NotImplementedError.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + + with pytest.raises(NotImplementedError): + await writer.state_lookup() + + with pytest.raises(NotImplementedError): + await writer.open() + + with pytest.raises(NotImplementedError): + await writer.append(b"data") + + with pytest.raises(NotImplementedError): + await writer.flush() + + with pytest.raises(NotImplementedError): + await writer.close() + + with pytest.raises(NotImplementedError): + await writer.finalize() + + with pytest.raises(NotImplementedError): + await writer.append_from_string("data") + + with pytest.raises(NotImplementedError): + await writer.append_from_stream(mock.Mock()) + + with pytest.raises(NotImplementedError): + await writer.append_from_file("file.txt") From 7d17922e671aa2d975262984ba50bfcfa29ff4a5 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Tue, 18 Nov 2025 12:19:28 +0530 Subject: [PATCH 05/24] feat(zb-experimental): implement state_lookup (#1617) feat(zb-experimental): implement state_lookup --- .../asyncio/async_appendable_object_writer.py | 12 +++++-- .../test_async_appendable_object_writer.py | 32 +++++++++++++++++-- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index ca03c2b6d..be992b641 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -22,6 +22,7 @@ """ from typing import Optional +from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import ( AsyncGrpcClient, ) @@ -112,9 +113,16 @@ def __init__( self.offset: Optional[int] = None self.persisted_size: Optional[int] = None - async def state_lookup(self): + async def state_lookup(self) -> int: """Returns the persisted_size.""" - raise NotImplementedError("state_lookup is not implemented yet.") + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + state_lookup=True, + ) + ) + response = await self.write_obj_stream.recv() + self.persisted_size = response.persisted_size + return self.persisted_size async def open(self) -> None: """Opens the underlying bidi-gRPC stream.""" diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index cf838ac05..d3a6d3830 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -18,11 +18,14 @@ from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( AsyncAppendableObjectWriter, ) +from google.cloud import _storage_v2 + BUCKET = "test-bucket" OBJECT = "test-object" GENERATION = 123 WRITE_HANDLE = b"test-write-handle" +PERSISTED_SIZE = 456 @pytest.fixture @@ -82,14 +85,37 @@ def test_init_with_optional_args(mock_write_object_stream, mock_client): ) +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_state_lookup(mock_write_object_stream, mock_client): + """Test state_lookup method.""" + # Arrange + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=PERSISTED_SIZE) + ) + + expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True) + + # Act + response = await writer.state_lookup() + + # Assert + mock_stream.send.assert_awaited_once_with(expected_request) + mock_stream.recv.assert_awaited_once() + assert writer.persisted_size == PERSISTED_SIZE + assert response == PERSISTED_SIZE + + @pytest.mark.asyncio async def test_unimplemented_methods_raise_error(mock_client): """Test that all currently unimplemented methods raise NotImplementedError.""" writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - with pytest.raises(NotImplementedError): - await writer.state_lookup() - with pytest.raises(NotImplementedError): await writer.open() From ec470a270e189e137c7229cc359367d5a897cdb9 Mon Sep 17 00:00:00 2001 From: Rob Clevenger Date: Tue, 18 Nov 2025 01:28:52 -0800 Subject: [PATCH 06/24] fix(bucket): Move blob fails when the new blob name contains characters that need to be url encoded (#1605) fix(bucket): url encode new_name parameter in move_blob() The move_blob() method was not URL encoding the new_name parameter before passing it to the API call, unlike how the blob encodes its own path. This caused failures when moving blobs to paths with special characters. Added URL encoding for new_name to match the blob path encoding, as both names must fit in the API URL format: "{blob_path}/moveTo/o/{new_name}" Here's an example of what fails: ```python from google.cloud import storage gcs = storage.Client() bucket = gcs.bucket("") blob = bucket.get_blob("test/blob.csv") bucket.move_blob( blob, new_name="test/blob2.csv" ) ``` Fixes #1523 --------- Co-authored-by: Chandra Shekhar Sirimala --- google/cloud/storage/bucket.py | 6 +++++- tests/unit/test_bucket.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/bucket.py b/google/cloud/storage/bucket.py index 0d1f9192b..1621f879e 100644 --- a/google/cloud/storage/bucket.py +++ b/google/cloud/storage/bucket.py @@ -41,6 +41,7 @@ from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage.acl import BucketACL from google.cloud.storage.acl import DefaultObjectACL +from google.cloud.storage.blob import _quote from google.cloud.storage.blob import Blob from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.constants import ARCHIVE_STORAGE_CLASS @@ -2360,7 +2361,10 @@ def move_blob( ) new_blob = Blob(bucket=self, name=new_name) - api_path = blob.path + "/moveTo/o/" + new_blob.name + api_path = "{blob_path}/moveTo/o/{new_name}".format( + blob_path=blob.path, new_name=_quote(new_blob.name) + ) + move_result = client._post_resource( api_path, None, diff --git a/tests/unit/test_bucket.py b/tests/unit/test_bucket.py index 809b572e0..8e4132edf 100644 --- a/tests/unit/test_bucket.py +++ b/tests/unit/test_bucket.py @@ -18,6 +18,7 @@ import mock import pytest +from google.cloud.storage.blob import _quote from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED @@ -2320,6 +2321,37 @@ def test_move_blob_w_no_retry_timeout_and_generation_match(self): _target_object=new_blob, ) + def test_move_blob_needs_url_encoding(self): + source_name = "source" + blob_name = "blob-name" + new_name = "new/name" + api_response = {} + client = mock.Mock(spec=["_post_resource"]) + client._post_resource.return_value = api_response + source = self._make_one(client=client, name=source_name) + blob = self._make_blob(source_name, blob_name) + + new_blob = source.move_blob( + blob, new_name, if_generation_match=0, retry=None, timeout=30 + ) + + self.assertIs(new_blob.bucket, source) + self.assertEqual(new_blob.name, new_name) + + expected_path = "/b/{}/o/{}/moveTo/o/{}".format( + source_name, blob_name, _quote(new_name) + ) + expected_data = None + expected_query_params = {"ifGenerationMatch": 0} + client._post_resource.assert_called_once_with( + expected_path, + expected_data, + query_params=expected_query_params, + timeout=30, + retry=None, + _target_object=new_blob, + ) + def test_move_blob_w_user_project(self): source_name = "source" blob_name = "blob-name" From dfe4566e4b087ee217a7d05fc388ab5bfb760f39 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Tue, 18 Nov 2025 19:49:59 +0530 Subject: [PATCH 07/24] feat(zb-experimental): implement open in writer (#1618) feat(zb-experimental): implement open in writer --------- Co-authored-by: Owl Bot --- .../asyncio/async_appendable_object_writer.py | 12 +++- .../test_async_appendable_object_writer.py | 55 ++++++++++++++++++- tests/unit/test_bucket.py | 2 +- 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index be992b641..319c7964b 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -126,7 +126,17 @@ async def state_lookup(self) -> int: async def open(self) -> None: """Opens the underlying bidi-gRPC stream.""" - raise NotImplementedError("open is not implemented yet.") + if self._is_stream_open: + raise ValueError("Underlying bidi-gRPC stream is already open") + + await self.write_obj_stream.open() + self._is_stream_open = True + if self.generation is None: + self.generation = self.write_obj_stream.generation_number + self.write_handle = self.write_obj_stream.write_handle + + # Update self.persisted_size + _ = await self.state_lookup() async def append(self, data: bytes): raise NotImplementedError("append is not implemented yet.") diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index d3a6d3830..67a074a11 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -112,13 +112,62 @@ async def test_state_lookup(mock_write_object_stream, mock_client): @pytest.mark.asyncio -async def test_unimplemented_methods_raise_error(mock_client): - """Test that all currently unimplemented methods raise NotImplementedError.""" +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_open_appendable_object_writer(mock_write_object_stream, mock_client): + """Test the open method.""" + # Arrange writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + mock_stream = mock_write_object_stream.return_value + mock_stream.open = mock.AsyncMock() + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock() - with pytest.raises(NotImplementedError): + mock_state_response = mock.MagicMock() + mock_state_response.persisted_size = 1024 + mock_stream.recv.return_value = mock_state_response + + mock_stream.generation_number = GENERATION + mock_stream.write_handle = WRITE_HANDLE + + # Act + await writer.open() + + # Assert + mock_stream.open.assert_awaited_once() + assert writer._is_stream_open + assert writer.generation == GENERATION + assert writer.write_handle == WRITE_HANDLE + + expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True) + mock_stream.send.assert_awaited_once_with(expected_request) + mock_stream.recv.assert_awaited_once() + assert writer.persisted_size == 1024 + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_open_when_already_open_raises_error( + mock_write_object_stream, mock_client +): + """Test that opening an already open writer raises a ValueError.""" + # Arrange + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True # Manually set to open + + # Act & Assert + with pytest.raises(ValueError, match="Underlying bidi-gRPC stream is already open"): await writer.open() + +@pytest.mark.asyncio +async def test_unimplemented_methods_raise_error(mock_client): + """Test that all currently unimplemented methods raise NotImplementedError.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises(NotImplementedError): await writer.append(b"data") diff --git a/tests/unit/test_bucket.py b/tests/unit/test_bucket.py index 8e4132edf..850e89d04 100644 --- a/tests/unit/test_bucket.py +++ b/tests/unit/test_bucket.py @@ -2350,7 +2350,7 @@ def test_move_blob_needs_url_encoding(self): timeout=30, retry=None, _target_object=new_blob, - ) + ) def test_move_blob_w_user_project(self): source_name = "source" From 53ced015a7a584cbf88197424e92538f8f18915c Mon Sep 17 00:00:00 2001 From: Faizal Kassamali Date: Tue, 18 Nov 2025 14:24:35 -0800 Subject: [PATCH 08/24] fix: Skip flaky test (#1622) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This test is causing failures in CI. Skipping until it can be fixed properly. Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-storage/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary) Fixes #1611 🦕 --- tests/system/test_bucket.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/system/test_bucket.py b/tests/system/test_bucket.py index 602e407cc..de7074a92 100644 --- a/tests/system/test_bucket.py +++ b/tests/system/test_bucket.py @@ -1339,6 +1339,7 @@ def test_bucket_ip_filter_patch(storage_client, buckets_to_delete): assert len(reloaded_filter.vpc_network_sources) == 1 +@pytest.mark.skip(reason="[https://github.com/googleapis/python-storage/issues/1611]") def test_list_buckets_with_ip_filter(storage_client, buckets_to_delete): """Test that listing buckets returns a summarized IP filter.""" bucket_name = _helpers.unique_name("ip-filter-list") From 361235ac2268c6cfa2c2389cb681a1e67b5fce83 Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Tue, 18 Nov 2025 20:22:25 -0500 Subject: [PATCH 09/24] chore(librarian): onboard to librarian (#1625) Towards https://github.com/googleapis/librarian/issues/2461 --- .../generator-input/.repo-metadata.json | 18 + .../generator-input/librarian.py | 36 +- .librarian/generator-input/noxfile.py | 408 ++++++++++++++++++ .librarian/generator-input/setup.py | 107 +++++ .librarian/state.yaml | 31 ++ google/cloud/_storage_v2/gapic_version.py | 2 +- noxfile.py | 4 +- .../snippet_metadata_google.storage.v2.json | 2 +- testing/constraints-3.7.txt | 2 +- testing/constraints-3.9.txt | 2 +- 10 files changed, 572 insertions(+), 40 deletions(-) create mode 100644 .librarian/generator-input/.repo-metadata.json rename owlbot.py => .librarian/generator-input/librarian.py (65%) create mode 100644 .librarian/generator-input/noxfile.py create mode 100644 .librarian/generator-input/setup.py create mode 100644 .librarian/state.yaml diff --git a/.librarian/generator-input/.repo-metadata.json b/.librarian/generator-input/.repo-metadata.json new file mode 100644 index 000000000..f644429bc --- /dev/null +++ b/.librarian/generator-input/.repo-metadata.json @@ -0,0 +1,18 @@ +{ + "name": "storage", + "name_pretty": "Google Cloud Storage", + "product_documentation": "https://cloud.google.com/storage", + "client_documentation": "https://cloud.google.com/python/docs/reference/storage/latest", + "issue_tracker": "https://issuetracker.google.com/savedsearches/559782", + "release_level": "stable", + "language": "python", + "library_type": "GAPIC_MANUAL", + "repo": "googleapis/python-storage", + "distribution_name": "google-cloud-storage", + "api_id": "storage.googleapis.com", + "requires_billing": true, + "default_version": "v2", + "codeowner_team": "@googleapis/gcs-sdk-team", + "api_shortname": "storage", + "api_description": "is a durable and highly available object storage service. Google Cloud Storage is almost infinitely scalable and guarantees consistency: when a write succeeds, the latest copy of the object will be returned to any GET, globally." +} diff --git a/owlbot.py b/.librarian/generator-input/librarian.py similarity index 65% rename from owlbot.py rename to .librarian/generator-input/librarian.py index 67b2369ce..ad0baa2a3 100644 --- a/owlbot.py +++ b/.librarian/generator-input/librarian.py @@ -98,41 +98,11 @@ "noxfile.py", "CONTRIBUTING.rst", "README.rst", - ".kokoro/continuous/continuous.cfg", - ".kokoro/presubmit/system-3.8.cfg", - ".kokoro/presubmit/prerelease-deps.cfg", - ".kokoro/continuous/prerelease-deps.cfg", - ".github/blunderbuss.yml", # blunderbuss assignment to python squad - ".github/workflows", # exclude gh actions as credentials are needed for tests - ".github/release-please.yml", # special support for a python2 branch in this repo + ".kokoro/**", + ".github/**", ], ) -s.replace( - ".kokoro/build.sh", - "export PYTHONUNBUFFERED=1", - """export PYTHONUNBUFFERED=1 - -# Export variable to override api endpoint -export API_ENDPOINT_OVERRIDE - -# Export variable to override api endpoint version -export API_VERSION_OVERRIDE - -# Export dual region locations -export DUAL_REGION_LOC_1 -export DUAL_REGION_LOC_2 - -# Setup universe domain testing needed environment variables. -export TEST_UNIVERSE_DOMAIN_CREDENTIAL=$(realpath ${KOKORO_GFILE_DIR}/secret_manager/client-library-test-universe-domain-credential) -export TEST_UNIVERSE_DOMAIN=$(gcloud secrets versions access latest --project cloud-devrel-kokoro-resources --secret=client-library-test-universe-domain) -export TEST_UNIVERSE_PROJECT_ID=$(gcloud secrets versions access latest --project cloud-devrel-kokoro-resources --secret=client-library-test-universe-project-id) -export TEST_UNIVERSE_LOCATION=$(gcloud secrets versions access latest --project cloud-devrel-kokoro-resources --secret=client-library-test-universe-storage-location) - -""") - python.py_samples(skip_readmes=True) -# Use a python runtime which is available in the owlbot post processor here -# https://github.com/googleapis/synthtool/blob/master/docker/owlbot/python/Dockerfile -s.shell.run(["nox", "-s", "blacken-3.10"], hide_output=False) +s.shell.run(["nox", "-s", "blacken"], hide_output=False) diff --git a/.librarian/generator-input/noxfile.py b/.librarian/generator-input/noxfile.py new file mode 100644 index 000000000..16cf97b01 --- /dev/null +++ b/.librarian/generator-input/noxfile.py @@ -0,0 +1,408 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +import os +import pathlib +import re +import shutil + +import nox + + +BLACK_VERSION = "black==23.7.0" +BLACK_PATHS = ["docs", "google", "tests", "noxfile.py", "setup.py"] + +DEFAULT_PYTHON_VERSION = "3.12" +SYSTEM_TEST_PYTHON_VERSIONS = ["3.12"] +UNIT_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] +CONFORMANCE_TEST_PYTHON_VERSIONS = ["3.12"] + +CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute() + +# Error if a python version is missing +nox.options.error_on_missing_interpreters = True + +nox.options.sessions = [ + "blacken", + "conftest_retry", + "docfx", + "docs", + "lint", + "lint_setup_py", + "system", + # TODO(https://github.com/googleapis/python-storage/issues/1499): + # Remove or restore testing for Python 3.7/3.8 + "unit-3.9", + "unit-3.10", + "unit-3.11", + "unit-3.12", + "unit-3.13", + # cover must be last to avoid error `No data to report` + "cover", +] + + +@nox.session(python=DEFAULT_PYTHON_VERSION) +def lint(session): + """Run linters. + + Returns a failure if the linters find linting errors or sufficiently + serious code quality issues. + """ + # Pin flake8 to 6.0.0 + # See https://github.com/googleapis/python-storage/issues/1102 + session.install("flake8==6.0.0", BLACK_VERSION) + session.run( + "black", + "--check", + *BLACK_PATHS, + ) + session.run("flake8", "google", "tests") + + +@nox.session(python="3.14") +def blacken(session): + """Run black. + + Format code to uniform standard. + """ + session.install(BLACK_VERSION) + session.run( + "black", + *BLACK_PATHS, + ) + + +@nox.session(python=DEFAULT_PYTHON_VERSION) +def lint_setup_py(session): + """Verify that setup.py is valid (including RST check).""" + session.install("docutils", "pygments", "setuptools>=79.0.1") + session.run("python", "setup.py", "check", "--restructuredtext", "--strict") + + +def default(session, install_extras=True): + constraints_path = str( + CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt" + ) + # Install all test dependencies, then install this package in-place. + session.install( + "mock", + "pytest", + "pytest-cov", + "pytest-asyncio", + "brotli", + "grpcio", + "grpcio-status", + "proto-plus", + "grpc-google-iam-v1", + "-c", + constraints_path, + ) + + if install_extras: + session.install("opentelemetry-api", "opentelemetry-sdk") + + session.install("-e", ".", "-c", constraints_path) + + # This dependency is included in setup.py for backwards compatibility only + # and the client library is expected to pass all tests without it. See + # setup.py and README for details. + session.run("pip", "uninstall", "-y", "google-resumable-media") + + # Run py.test against the unit tests. + session.run( + "py.test", + "--quiet", + f"--junitxml=unit_{session.python}_sponge_log.xml", + "--cov=google.cloud.storage", + "--cov=google.cloud", + "--cov=tests.unit", + "--cov-append", + "--cov-config=.coveragerc", + "--cov-report=", + "--cov-fail-under=0", + os.path.join("tests", "unit"), + os.path.join("tests", "resumable_media", "unit"), + *session.posargs, + ) + + +@nox.session(python=UNIT_TEST_PYTHON_VERSIONS) +def unit(session): + """Run the unit test suite.""" + default(session) + + +@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS) +def system(session): + constraints_path = str( + CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt" + ) + """Run the system test suite.""" + rerun_count = 0 + + # Check the value of `RUN_SYSTEM_TESTS` env var. It defaults to true. + if os.environ.get("RUN_SYSTEM_TESTS", "true") == "false": + session.skip("RUN_SYSTEM_TESTS is set to false, skipping") + # Environment check: Only run tests if the environment variable is set. + if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", ""): + session.skip( + "Credentials must be set via environment variable GOOGLE_APPLICATION_CREDENTIALS" + ) + # mTLS tests requires pyopenssl. + if os.environ.get("GOOGLE_API_USE_CLIENT_CERTIFICATE", "") == "true": + session.install("pyopenssl") + # Check if endpoint is being overriden for rerun_count + if ( + os.getenv("API_ENDPOINT_OVERRIDE", "https://storage.googleapis.com") + != "https://storage.googleapis.com" + ): + rerun_count = 3 + + # Use pre-release gRPC for system tests. + # TODO: Remove ban of 1.52.0rc1 once grpc/grpc#31885 is resolved. + session.install("--pre", "grpcio!=1.52.0rc1") + + # Install all test dependencies, then install this package into the + # virtualenv's dist-packages. + # 2021-05-06: defer installing 'google-cloud-*' to after this package, + # in order to work around Python 2.7 googolapis-common-protos + # issue. + session.install("mock", "pytest", "pytest-rerunfailures", "-c", constraints_path) + session.install("-e", ".", "-c", constraints_path) + session.install( + "google-cloud-testutils", + "google-cloud-iam", + "google-cloud-pubsub", + "google-cloud-kms", + "brotli", + "-c", + constraints_path, + ) + + # Run py.test against the system tests. + session.run( + "py.test", + "--quiet", + f"--junitxml=system_{session.python}_sponge_log.xml", + "--reruns={}".format(rerun_count), + os.path.join("tests", "system"), + os.path.join("tests", "resumable_media", "system"), + *session.posargs, + ) + + +@nox.session(python=CONFORMANCE_TEST_PYTHON_VERSIONS) +def conftest_retry(session): + """Run the retry conformance test suite.""" + conformance_test_folder_path = os.path.join("tests", "conformance") + conformance_test_folder_exists = os.path.exists(conformance_test_folder_path) + # Environment check: only run tests if found. + if not conformance_test_folder_exists: + session.skip("Conformance tests were not found") + + # Install all test dependencies and pytest plugin to run tests in parallel. + # Then install this package in-place. + session.install("pytest", "pytest-xdist") + session.install("-e", ".") + + # Run #CPU processes in parallel if no test session arguments are passed in. + if session.posargs: + test_cmd = [ + "py.test", + "--quiet", + conformance_test_folder_path, + *session.posargs, + ] + else: + test_cmd = ["py.test", "-n", "auto", "--quiet", conformance_test_folder_path] + + # Run py.test against the conformance tests. + session.run(*test_cmd) + + +@nox.session(python=DEFAULT_PYTHON_VERSION) +def cover(session): + """Run the final coverage report. + + This outputs the coverage report aggregating coverage from the unit + test runs (not system test runs), and then erases coverage data. + """ + session.install("coverage", "pytest-cov") + session.run("coverage", "report", "--show-missing", "--fail-under=99") + + session.run("coverage", "erase") + + +@nox.session(python="3.10") +def docs(session): + """Build the docs for this library.""" + + session.install("-e", ".") + session.install( + # We need to pin to specific versions of the `sphinxcontrib-*` packages + # which still support sphinx 4.x. + # See https://github.com/googleapis/sphinx-docfx-yaml/issues/344 + # and https://github.com/googleapis/sphinx-docfx-yaml/issues/345. + "sphinxcontrib-applehelp==1.0.4", + "sphinxcontrib-devhelp==1.0.2", + "sphinxcontrib-htmlhelp==2.0.1", + "sphinxcontrib-qthelp==1.0.3", + "sphinxcontrib-serializinghtml==1.1.5", + "sphinx==4.5.0", + "alabaster", + "recommonmark", + ) + + shutil.rmtree(os.path.join("docs", "_build"), ignore_errors=True) + session.run( + "sphinx-build", + "-W", # warnings as errors + "-T", # show full traceback on exception + "-N", # no colors + "-b", + "html", + "-d", + os.path.join("docs", "_build", "doctrees", ""), + os.path.join("docs", ""), + os.path.join("docs", "_build", "html", ""), + ) + + +@nox.session(python="3.10") +def docfx(session): + """Build the docfx yaml files for this library.""" + + session.install("-e", ".") + session.install("grpcio") + session.install( + # We need to pin to specific versions of the `sphinxcontrib-*` packages + # which still support sphinx 4.x. + # See https://github.com/googleapis/sphinx-docfx-yaml/issues/344 + # and https://github.com/googleapis/sphinx-docfx-yaml/issues/345. + "sphinxcontrib-applehelp==1.0.4", + "sphinxcontrib-devhelp==1.0.2", + "sphinxcontrib-htmlhelp==2.0.1", + "sphinxcontrib-qthelp==1.0.3", + "sphinxcontrib-serializinghtml==1.1.5", + "gcp-sphinx-docfx-yaml", + "alabaster", + "recommonmark", + ) + + shutil.rmtree(os.path.join("docs", "_build"), ignore_errors=True) + session.run( + "sphinx-build", + "-T", # show full traceback on exception + "-N", # no colors + "-D", + ( + "extensions=sphinx.ext.autodoc," + "sphinx.ext.autosummary," + "docfx_yaml.extension," + "sphinx.ext.intersphinx," + "sphinx.ext.coverage," + "sphinx.ext.napoleon," + "sphinx.ext.todo," + "sphinx.ext.viewcode," + "recommonmark" + ), + "-b", + "html", + "-d", + os.path.join("docs", "_build", "doctrees", ""), + os.path.join("docs", ""), + os.path.join("docs", "_build", "html", ""), + ) + + +@nox.session(python=UNIT_TEST_PYTHON_VERSIONS[-1]) +@nox.parametrize( + "protobuf_implementation", + ["python", "upb"], +) +def prerelease_deps(session, protobuf_implementation): + """Run all tests with prerelease versions of dependencies installed.""" + + # Install all test dependencies + session.install("mock", "pytest", "pytest-cov", "brotli") + + # Install dependencies needed for system tests + session.install( + "google-cloud-pubsub", + "google-cloud-kms", + "google-cloud-testutils", + "google-cloud-iam", + ) + + # Install all dependencies + session.install("-e", ".[protobuf, tracing]") + + prerel_deps = [ + "google-api-core", + "google-auth", + "google-cloud-core", + "google-crc32c", + "google-resumable-media", + "opentelemetry-api", + "protobuf", + ] + + package_namespaces = { + "google-api-core": "google.api_core", + "google-auth": "google.auth", + "google-cloud-core": "google.cloud.version", + "opentelemetry-api": "opentelemetry.version", + "protobuf": "google.protobuf", + } + + for dep in prerel_deps: + session.install("--pre", "--no-deps", "--upgrade", dep) + print(f"Installed {dep}") + + version_namespace = package_namespaces.get(dep) + + if version_namespace: + session.run( + "python", + "-c", + f"import {version_namespace}; print({version_namespace}.__version__)", + ) + # Remaining dependencies + other_deps = [ + "requests", + ] + session.install(*other_deps) + + session.run( + "py.test", + "tests/unit", + env={ + "PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": protobuf_implementation, + }, + ) + + session.run( + "py.test", + "--verbose", + f"--junitxml=system_{session.python}_sponge_log.xml", + os.path.join("tests", "system"), + *session.posargs, + env={ + "PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": protobuf_implementation, + }, + ) diff --git a/.librarian/generator-input/setup.py b/.librarian/generator-input/setup.py new file mode 100644 index 000000000..2c4504749 --- /dev/null +++ b/.librarian/generator-input/setup.py @@ -0,0 +1,107 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +import os + +import setuptools + + +# Package metadata. + +name = "google-cloud-storage" +description = "Google Cloud Storage API client library" +# Should be one of: +# 'Development Status :: 3 - Alpha' +# 'Development Status :: 4 - Beta' +# 'Development Status :: 5 - Production/Stable' +release_status = "Development Status :: 5 - Production/Stable" +dependencies = [ + "google-auth >= 2.26.1, < 3.0.0", + "google-api-core >= 2.27.0, < 3.0.0", + "google-cloud-core >= 2.4.2, < 3.0.0", + # The dependency "google-resumable-media" is no longer used. However, the + # dependency is still included here to accommodate users who may be + # importing exception classes from the google-resumable-media without + # installing it explicitly. See the python-storage README for details on + # exceptions and importing. Users who are not importing + # google-resumable-media classes in their application can safely disregard + # this dependency. + "google-resumable-media >= 2.7.2, < 3.0.0", + "requests >= 2.22.0, < 3.0.0", + "google-crc32c >= 1.1.3, < 2.0.0", +] +extras = { + "protobuf": ["protobuf >= 3.20.2, < 7.0.0"], + "tracing": [ + "opentelemetry-api >= 1.1.0, < 2.0.0", + ], +} + + +# Setup boilerplate below this line. + +package_root = os.path.abspath(os.path.dirname(__file__)) + +version = {} +with open(os.path.join(package_root, "google/cloud/storage/version.py")) as fp: + exec(fp.read(), version) +version = version["__version__"] + +readme_filename = os.path.join(package_root, "README.rst") +with io.open(readme_filename, encoding="utf-8") as readme_file: + readme = readme_file.read() + +# Only include packages under the 'google' namespace. Do not include tests, +# benchmarks, etc. +packages = [ + package + for package in setuptools.find_namespace_packages() + if package.startswith("google") +] + + +setuptools.setup( + name=name, + version=version, + description=description, + long_description=readme, + author="Google LLC", + author_email="googleapis-packages@google.com", + license="Apache 2.0", + url="https://github.com/googleapis/python-storage", + classifiers=[ + release_status, + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Operating System :: OS Independent", + "Topic :: Internet", + ], + platforms="Posix; MacOS X; Windows", + packages=packages, + install_requires=dependencies, + extras_require=extras, + python_requires=">=3.7", + include_package_data=True, + zip_safe=False, +) diff --git a/.librarian/state.yaml b/.librarian/state.yaml new file mode 100644 index 000000000..1502e804d --- /dev/null +++ b/.librarian/state.yaml @@ -0,0 +1,31 @@ +image: us-central1-docker.pkg.dev/cloud-sdk-librarian-prod/images-prod/python-librarian-generator@sha256:8e2c32496077054105bd06c54a59d6a6694287bc053588e24debe6da6920ad91 +libraries: + - id: google-cloud-storage + version: 3.6.0 + last_generated_commit: 5400ccce473c439885bd6bf2924fd242271bfcab + apis: + - path: google/storage/v2 + service_config: storage_v2.yaml + source_roots: + - . + preserve_regex: [] + remove_regex: + - ^.flake8 + - ^.pre-commit-config.yaml + - ^.trampolinerc + - ^.repo-metadata.json + - ^LICENSE + - ^MANIFEST.in + - ^SECURITY.md + - ^mypy.ini + - ^noxfile.py + - ^renovate.json + - ^setup.py + - ^docs/summary_overview.md + - ^google/cloud/_storage_v2 + - ^samples/generated_samples + - ^testing/constraints-3.8.txt + - ^testing/constraints-3.1.* + - ^tests/__init__.py + - ^tests/unit/gapic + tag_format: v{version} diff --git a/google/cloud/_storage_v2/gapic_version.py b/google/cloud/_storage_v2/gapic_version.py index 20a9cd975..d69b0530e 100644 --- a/google/cloud/_storage_v2/gapic_version.py +++ b/google/cloud/_storage_v2/gapic_version.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "0.0.0" # {x-release-please-version} +__version__ = "3.6.0" # {x-release-please-version} diff --git a/noxfile.py b/noxfile.py index 451fced3e..16cf97b01 100644 --- a/noxfile.py +++ b/noxfile.py @@ -74,9 +74,7 @@ def lint(session): session.run("flake8", "google", "tests") -# Use a python runtime which is available in the owlbot post processor here -# https://github.com/googleapis/synthtool/blob/master/docker/owlbot/python/Dockerfile -@nox.session(python=["3.10", DEFAULT_PYTHON_VERSION]) +@nox.session(python="3.14") def blacken(session): """Run black. diff --git a/samples/generated_samples/snippet_metadata_google.storage.v2.json b/samples/generated_samples/snippet_metadata_google.storage.v2.json index b2448bff2..4af7ef641 100644 --- a/samples/generated_samples/snippet_metadata_google.storage.v2.json +++ b/samples/generated_samples/snippet_metadata_google.storage.v2.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-storage", - "version": "0.0.0" + "version": "3.6.0" }, "snippets": [ { diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 9c17b387b..151762409 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -5,7 +5,7 @@ # e.g., if setup.py has "google-cloud-foo >= 1.14.0, < 2.0.0", # Then this file should have google-cloud-foo==1.14.0 google-auth==2.26.1 -google-api-core==2.15.0 +google-api-core==2.27.0 google-cloud-core==2.4.2 google-resumable-media==2.7.2 requests==2.22.0 diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index ccf6c1493..f022e9e1c 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -5,7 +5,7 @@ # e.g., if setup.py has "google-cloud-foo >= 1.14.0, < 2.0.0", # Then this file should have google-cloud-foo==1.14.0 google-auth==2.26.1 -google-api-core>=2.15.0 +google-api-core==2.27.0 google-cloud-core==2.4.2 google-resumable-media==2.7.2 requests==2.22.0 From b4134054f57e8767df3d62ee36dcbe3a83e6bfaa Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Wed, 19 Nov 2025 15:58:52 +0530 Subject: [PATCH 10/24] feat(zb-experimental): implement flush, close and finalize (#1619) feat(zb-experimental): implement flush, close and finalize --- .../asyncio/async_appendable_object_writer.py | 44 +++++++-- .../test_async_appendable_object_writer.py | 97 +++++++++++++++++-- 2 files changed, 125 insertions(+), 16 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index 319c7964b..90dabc829 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -114,7 +114,11 @@ def __init__( self.persisted_size: Optional[int] = None async def state_lookup(self) -> int: - """Returns the persisted_size.""" + """Returns the persisted_size + + :rtype: int + :returns: persisted size. + """ await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest( state_lookup=True, @@ -142,18 +146,44 @@ async def append(self, data: bytes): raise NotImplementedError("append is not implemented yet.") async def flush(self) -> int: - """Returns persisted_size""" - raise NotImplementedError("flush is not implemented yet.") + """Flushes the data to the server. + + :rtype: int + :returns: The persisted size after flush. + """ + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + flush=True, + state_lookup=True, + ) + ) + response = await self.write_obj_stream.recv() + self.persisted_size = response.persisted_size + self.offset = self.persisted_size + return self.persisted_size async def close(self, finalize_on_close=False) -> int: """Returns persisted_size""" - raise NotImplementedError("close is not implemented yet.") + if finalize_on_close: + await self.finalize() + + await self.write_obj_stream.close() + self._is_stream_open = False + self.offset = None + + async def finalize(self) -> _storage_v2.Object: + """Finalizes the Appendable Object. - async def finalize(self) -> int: - """Returns persisted_size Note: Once finalized no more data can be appended. + + rtype: google.cloud.storage_v2.types.Object + returns: The finalized object resource. """ - raise NotImplementedError("finalize is not implemented yet.") + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest(finish_write=True) + ) + response = await self.write_obj_stream.recv() + self.object_resource = response.resource # helper methods. async def append_from_string(self, data: str): diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 67a074a11..18f7a8826 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -171,15 +171,6 @@ async def test_unimplemented_methods_raise_error(mock_client): with pytest.raises(NotImplementedError): await writer.append(b"data") - with pytest.raises(NotImplementedError): - await writer.flush() - - with pytest.raises(NotImplementedError): - await writer.close() - - with pytest.raises(NotImplementedError): - await writer.finalize() - with pytest.raises(NotImplementedError): await writer.append_from_string("data") @@ -188,3 +179,91 @@ async def test_unimplemented_methods_raise_error(mock_client): with pytest.raises(NotImplementedError): await writer.append_from_file("file.txt") + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_flush(mock_write_object_stream, mock_client): + """Test that flush sends the correct request and updates state.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=1024) + ) + + persisted_size = await writer.flush() + + expected_request = _storage_v2.BidiWriteObjectRequest(flush=True, state_lookup=True) + mock_stream.send.assert_awaited_once_with(expected_request) + mock_stream.recv.assert_awaited_once() + assert writer.persisted_size == 1024 + assert writer.offset == 1024 + assert persisted_size == 1024 + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_close_without_finalize(mock_write_object_stream, mock_client): + """Test close without finalizing.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.offset = 1024 + mock_stream = mock_write_object_stream.return_value + mock_stream.close = mock.AsyncMock() + writer.finalize = mock.AsyncMock() + + await writer.close(finalize_on_close=False) + + writer.finalize.assert_not_awaited() + mock_stream.close.assert_awaited_once() + assert not writer._is_stream_open + assert writer.offset is None + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_close_with_finalize(mock_write_object_stream, mock_client): + """Test close with finalizing.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.offset = 1024 + mock_stream = mock_write_object_stream.return_value + mock_stream.close = mock.AsyncMock() + writer.finalize = mock.AsyncMock() + + await writer.close(finalize_on_close=True) + + writer.finalize.assert_awaited_once() + mock_stream.close.assert_awaited_once() + assert not writer._is_stream_open + assert writer.offset is None + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_finalize(mock_write_object_stream, mock_client): + """Test that finalize sends the correct request and updates state.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET) + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) + ) + + await writer.finalize() + + mock_stream.send.assert_awaited_once_with( + _storage_v2.BidiWriteObjectRequest(finish_write=True) + ) + mock_stream.recv.assert_awaited_once() + assert writer.object_resource == mock_resource From af2d1f1bf60482bd93795ebfa529a23c20d3a9fa Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Wed, 19 Nov 2025 12:04:18 -0500 Subject: [PATCH 11/24] chore(librarian): remove owlbot configs which was missed in #1625 (#1629) Follow up to #1625 --- .github/.OwlBot.lock.yaml | 17 ----------------- .github/.OwlBot.yaml | 29 ----------------------------- 2 files changed, 46 deletions(-) delete mode 100644 .github/.OwlBot.lock.yaml delete mode 100644 .github/.OwlBot.yaml diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml deleted file mode 100644 index 4a311db02..000000000 --- a/.github/.OwlBot.lock.yaml +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -docker: - image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:543e209e7c1c1ffe720eb4db1a3f045a75099304fb19aa11a47dc717b8aae2a9 -# created: 2025-10-09T14:48:42.914384887Z diff --git a/.github/.OwlBot.yaml b/.github/.OwlBot.yaml deleted file mode 100644 index 67768efb5..000000000 --- a/.github/.OwlBot.yaml +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -docker: - image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - -deep-remove-regex: - - /owl-bot-staging - -# In source, we've used two capturing groups (v2) and (.*) to match the version -# and the directory path. -# In dest, we use $1 to refer to the first capturing group (v2) and $2 to refer -# to the second capturing group (directory path). -deep-copy-regex: - - source: /google/storage/(v2)/storage-v2-py/(.*) - dest: /owl-bot-staging/$1/$2 - -begin-after-commit-hash: 6acf4a0a797f1082027985c55c4b14b60f673dd7 From 46a5728c76a12f6375d95cabce69f8b78490eab2 Mon Sep 17 00:00:00 2001 From: Pulkit Aggarwal <54775856+Pulkit0110@users.noreply.github.com> Date: Thu, 20 Nov 2025 11:58:56 +0530 Subject: [PATCH 12/24] samples: add samples for partial list bucket (#1627) Add samples for the partial list bucket feature. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../storage_list_buckets_partial_success.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 samples/snippets/storage_list_buckets_partial_success.py diff --git a/samples/snippets/storage_list_buckets_partial_success.py b/samples/snippets/storage_list_buckets_partial_success.py new file mode 100644 index 000000000..bea4c9ed3 --- /dev/null +++ b/samples/snippets/storage_list_buckets_partial_success.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python + +# Copyright 2025 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START storage_list_buckets_partial_success] +from google.cloud import storage + + +def list_buckets_with_partial_success(): + """Lists buckets and includes unreachable buckets in the response.""" + + storage_client = storage.Client() + + buckets_iterator = storage_client.list_buckets(return_partial_success=True) + + for page in buckets_iterator.pages: + if page.unreachable: + print("Unreachable locations in this page:") + for location in page.unreachable: + print(location) + + print("Reachable buckets in this page:") + for bucket in page: + print(bucket.name) + + +# [END storage_list_buckets_partial_success] + + +if __name__ == "__main__": + list_buckets_with_partial_success() From 8c25c1b5689fa0e8f3d38b9b03bc9d0267a9a2aa Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Thu, 20 Nov 2025 18:34:55 +0530 Subject: [PATCH 13/24] fix(zb-experimental): pass creds to grpc channel (#1623) fix(zb-experimental): pass creds to grpc channel --- .../asyncio/async_grpc_client.py | 6 +- tests/unit/asyncio/test_async_grpc_client.py | 77 ++++++++++++++++--- 2 files changed, 70 insertions(+), 13 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_grpc_client.py b/google/cloud/storage/_experimental/asyncio/async_grpc_client.py index 75e6f63d2..a5cccca59 100644 --- a/google/cloud/storage/_experimental/asyncio/async_grpc_client.py +++ b/google/cloud/storage/_experimental/asyncio/async_grpc_client.py @@ -65,8 +65,10 @@ def _create_async_grpc_client( transport_cls = storage_v2.StorageAsyncClient.get_transport_class( "grpc_asyncio" ) - channel = transport_cls.create_channel(attempt_direct_path=attempt_direct_path) - transport = transport_cls(credentials=credentials, channel=channel) + channel = transport_cls.create_channel( + attempt_direct_path=attempt_direct_path, credentials=credentials + ) + transport = transport_cls(channel=channel) return storage_v2.StorageAsyncClient( transport=transport, diff --git a/tests/unit/asyncio/test_async_grpc_client.py b/tests/unit/asyncio/test_async_grpc_client.py index 0e2bf9b50..eb06ab938 100644 --- a/tests/unit/asyncio/test_async_grpc_client.py +++ b/tests/unit/asyncio/test_async_grpc_client.py @@ -15,6 +15,7 @@ import unittest from unittest import mock from google.auth import credentials as auth_credentials +from google.auth.credentials import AnonymousCredentials def _make_credentials(spec=None): @@ -38,12 +39,10 @@ def test_constructor_default_options(self, mock_async_storage_client): "grpc_asyncio" ) mock_transport_cls.create_channel.assert_called_once_with( - attempt_direct_path=True + attempt_direct_path=True, credentials=mock_creds ) mock_channel = mock_transport_cls.create_channel.return_value - mock_transport_cls.assert_called_once_with( - credentials=mock_creds, channel=mock_channel - ) + mock_transport_cls.assert_called_once_with(channel=mock_channel) mock_transport = mock_transport_cls.return_value mock_async_storage_client.assert_called_once_with( transport=mock_transport, @@ -64,21 +63,77 @@ def test_constructor_disables_directpath(self, mock_async_storage_client): ) mock_transport_cls.create_channel.assert_called_once_with( - attempt_direct_path=False + attempt_direct_path=False, credentials=mock_creds ) mock_channel = mock_transport_cls.create_channel.return_value - mock_transport_cls.assert_called_once_with( - credentials=mock_creds, channel=mock_channel - ) + mock_transport_cls.assert_called_once_with(channel=mock_channel) @mock.patch("google.cloud._storage_v2.StorageAsyncClient") - def test_grpc_client_property(self, mock_async_storage_client): + def test_grpc_client_property(self, mock_grpc_gapic_client): from google.cloud.storage._experimental.asyncio import async_grpc_client + # Arrange + mock_transport_cls = mock.MagicMock() + mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls + channel_sentinel = mock.sentinel.channel + + mock_transport_cls.create_channel.return_value = channel_sentinel + mock_transport_cls.return_value = mock.sentinel.transport + mock_creds = _make_credentials() + mock_client_info = mock.sentinel.client_info + mock_client_options = mock.sentinel.client_options + mock_attempt_direct_path = mock.sentinel.attempt_direct_path + + # Act + client = async_grpc_client.AsyncGrpcClient( + credentials=mock_creds, + client_info=mock_client_info, + client_options=mock_client_options, + attempt_direct_path=mock_attempt_direct_path, + ) + + mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls + + mock_transport_cls.create_channel.return_value = channel_sentinel + mock_transport_instance = mock.sentinel.transport + mock_transport_cls.return_value = mock_transport_instance + + retrieved_client = client.grpc_client + + # Assert + mock_transport_cls.create_channel.assert_called_once_with( + attempt_direct_path=mock_attempt_direct_path, credentials=mock_creds + ) + mock_transport_cls.assere_with(channel=channel_sentinel) + mock_grpc_gapic_client.assert_called_once_with( + transport=mock_transport_instance, + client_info=mock_client_info, + client_options=mock_client_options, + ) + self.assertIs(retrieved_client, mock_grpc_gapic_client.return_value) - client = async_grpc_client.AsyncGrpcClient(credentials=mock_creds) + @mock.patch("google.cloud._storage_v2.StorageAsyncClient") + def test_grpc_client_with_anon_creds(self, mock_grpc_gapic_client): + from google.cloud.storage._experimental.asyncio import async_grpc_client + # Arrange + mock_transport_cls = mock.MagicMock() + mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls + channel_sentinel = mock.sentinel.channel + + mock_transport_cls.create_channel.return_value = channel_sentinel + mock_transport_cls.return_value = mock.sentinel.transport + + # Act + anonymous_creds = AnonymousCredentials() + client = async_grpc_client.AsyncGrpcClient(credentials=anonymous_creds) retrieved_client = client.grpc_client - self.assertIs(retrieved_client, mock_async_storage_client.return_value) + # Assert + self.assertIs(retrieved_client, mock_grpc_gapic_client.return_value) + + mock_transport_cls.create_channel.assert_called_once_with( + attempt_direct_path=True, credentials=anonymous_creds + ) + mock_transport_cls.assert_called_once_with(channel=channel_sentinel) From 1c6b76f055a5d37fa35197198bfa68902832d1b5 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Fri, 21 Nov 2025 12:00:24 +0530 Subject: [PATCH 14/24] feat(zb-experimental): implement append (#1620) feat(zb-experimental): implement append --- .../asyncio/async_appendable_object_writer.py | 106 ++++++++- .../test_async_appendable_object_writer.py | 225 ++++++++++++++++-- 2 files changed, 310 insertions(+), 21 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index 90dabc829..5c3e54d71 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -21,7 +21,7 @@ if you want to use these Rapid Storage APIs. """ -from typing import Optional +from typing import Optional, Union from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import ( AsyncGrpcClient, @@ -31,6 +31,10 @@ ) +_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB +_MAX_BUFFER_SIZE_BYTES = 16 * 1024 * 1024 # 16 MiB + + class AsyncAppendableObjectWriter: """Class for appending data to a GCS Appendable Object asynchronously.""" @@ -118,7 +122,13 @@ async def state_lookup(self) -> int: :rtype: int :returns: persisted size. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before state_lookup().") + await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest( state_lookup=True, @@ -129,7 +139,11 @@ async def state_lookup(self) -> int: return self.persisted_size async def open(self) -> None: - """Opens the underlying bidi-gRPC stream.""" + """Opens the underlying bidi-gRPC stream. + + :raises ValueError: If the stream is already open. + + """ if self._is_stream_open: raise ValueError("Underlying bidi-gRPC stream is already open") @@ -142,15 +156,65 @@ async def open(self) -> None: # Update self.persisted_size _ = await self.state_lookup() - async def append(self, data: bytes): - raise NotImplementedError("append is not implemented yet.") + async def append(self, data: bytes) -> None: + """Appends data to the Appendable object. + + This method sends the provided data to the GCS server in chunks. It + maintains an internal threshold `_MAX_BUFFER_SIZE_BYTES` and will + automatically flush the data to make it visible to readers when that + threshold has reached. + + :type data: bytes + :param data: The bytes to append to the object. + + :rtype: None + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + """ + + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before append().") + total_bytes = len(data) + if total_bytes == 0: + # TODO: add warning. + return + if self.offset is None: + assert self.persisted_size is not None + self.offset = self.persisted_size + + start_idx = 0 + bytes_to_flush = 0 + while start_idx < total_bytes: + end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes) + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + write_offset=self.offset, + checksummed_data=_storage_v2.ChecksummedData( + content=data[start_idx:end_idx] + ), + ) + ) + chunk_size = end_idx - start_idx + self.offset += chunk_size + bytes_to_flush += chunk_size + if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES: + await self.flush() + bytes_to_flush = 0 + start_idx = end_idx async def flush(self) -> int: """Flushes the data to the server. :rtype: int :returns: The persisted size after flush. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before flush().") + await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest( flush=True, @@ -162,14 +226,34 @@ async def flush(self) -> int: self.offset = self.persisted_size return self.persisted_size - async def close(self, finalize_on_close=False) -> int: - """Returns persisted_size""" + async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object]: + """Closes the underlying bidi-gRPC stream. + + :type finalize_on_close: bool + :param finalize_on_close: Finalizes the Appendable Object. No more data + can be appended. + + rtype: Union[int, _storage_v2.Object] + returns: Updated `self.persisted_size` by default after closing the + bidi-gRPC stream. However, if `finalize_on_close=True` is passed, + returns the finalized object resource. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + + """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before close().") + if finalize_on_close: await self.finalize() + else: + await self.flush() + await self.write_obj_stream.close() - await self.write_obj_stream.close() self._is_stream_open = False self.offset = None + return self.object_resource if finalize_on_close else self.persisted_size async def finalize(self) -> _storage_v2.Object: """Finalizes the Appendable Object. @@ -178,12 +262,20 @@ async def finalize(self) -> _storage_v2.Object: rtype: google.cloud.storage_v2.types.Object returns: The finalized object resource. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before finalize().") + await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest(finish_write=True) ) response = await self.write_obj_stream.recv() self.object_resource = response.resource + self.persisted_size = self.object_resource.size + return self.object_resource # helper methods. async def append_from_string(self, data: str): diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 18f7a8826..902ed5f8b 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -93,6 +93,7 @@ async def test_state_lookup(mock_write_object_stream, mock_client): """Test state_lookup method.""" # Arrange writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() mock_stream.recv = mock.AsyncMock( @@ -111,6 +112,17 @@ async def test_state_lookup(mock_write_object_stream, mock_client): assert response == PERSISTED_SIZE +@pytest.mark.asyncio +async def test_state_lookup_without_open_raises_value_error(mock_client): + """Test that state_lookup raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, + match="Stream is not open. Call open\\(\\) before state_lookup\\(\\).", + ): + await writer.state_lookup() + + @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" @@ -168,9 +180,6 @@ async def test_unimplemented_methods_raise_error(mock_client): """Test that all currently unimplemented methods raise NotImplementedError.""" writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - with pytest.raises(NotImplementedError): - await writer.append(b"data") - with pytest.raises(NotImplementedError): await writer.append_from_string("data") @@ -188,6 +197,7 @@ async def test_unimplemented_methods_raise_error(mock_client): async def test_flush(mock_write_object_stream, mock_client): """Test that flush sends the correct request and updates state.""" writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() mock_stream.recv = mock.AsyncMock( @@ -204,46 +214,79 @@ async def test_flush(mock_write_object_stream, mock_client): assert persisted_size == 1024 +@pytest.mark.asyncio +async def test_flush_without_open_raises_value_error(mock_client): + """Test that flush raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before flush\\(\\)." + ): + await writer.flush() + + @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" ) -async def test_close_without_finalize(mock_write_object_stream, mock_client): - """Test close without finalizing.""" +async def test_close(mock_write_object_stream, mock_client): writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) writer._is_stream_open = True writer.offset = 1024 mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=1024) + ) mock_stream.close = mock.AsyncMock() writer.finalize = mock.AsyncMock() - await writer.close(finalize_on_close=False) + persisted_size = await writer.close() writer.finalize.assert_not_awaited() mock_stream.close.assert_awaited_once() - assert not writer._is_stream_open assert writer.offset is None + assert persisted_size == 1024 + assert not writer._is_stream_open + + +@pytest.mark.asyncio +async def test_close_without_open_raises_value_error(mock_client): + """Test that close raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before close\\(\\)." + ): + await writer.close() @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" ) -async def test_close_with_finalize(mock_write_object_stream, mock_client): +async def test_finalize_on_close(mock_write_object_stream, mock_client): """Test close with finalizing.""" + # Arrange + mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET, size=2048) writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) writer._is_stream_open = True writer.offset = 1024 mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) + ) mock_stream.close = mock.AsyncMock() - writer.finalize = mock.AsyncMock() - await writer.close(finalize_on_close=True) + # Act + result = await writer.close(finalize_on_close=True) - writer.finalize.assert_awaited_once() - mock_stream.close.assert_awaited_once() + # Assert + mock_stream.close.assert_not_awaited() # Based on new implementation assert not writer._is_stream_open assert writer.offset is None + assert writer.object_resource == mock_resource + assert writer.persisted_size == 2048 + assert result == mock_resource @pytest.mark.asyncio @@ -253,17 +296,171 @@ async def test_close_with_finalize(mock_write_object_stream, mock_client): async def test_finalize(mock_write_object_stream, mock_client): """Test that finalize sends the correct request and updates state.""" writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET, size=123) mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET) mock_stream.recv = mock.AsyncMock( return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) ) - await writer.finalize() + gcs_object = await writer.finalize() mock_stream.send.assert_awaited_once_with( _storage_v2.BidiWriteObjectRequest(finish_write=True) ) mock_stream.recv.assert_awaited_once() assert writer.object_resource == mock_resource + assert writer.persisted_size == 123 + assert gcs_object == mock_resource + + +@pytest.mark.asyncio +async def test_finalize_without_open_raises_value_error(mock_client): + """Test that finalize raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before finalize\\(\\)." + ): + await writer.finalize() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_raises_error_if_not_open(mock_write_object_stream, mock_client): + """Test that append raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before append\\(\\)." + ): + await writer.append(b"some data") + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_with_empty_data(mock_write_object_stream, mock_client): + """Test that append does nothing if data is empty.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + + await writer.append(b"") + + mock_stream.send.assert_not_awaited() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client): + """Test that append sends data in chunks and updates offset.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_CHUNK_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 100 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.flush = mock.AsyncMock() + + data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1) + await writer.append(data) + + assert mock_stream.send.await_count == 2 + first_call = mock_stream.send.await_args_list[0] + second_call = mock_stream.send.await_args_list[1] + + # First chunk + assert first_call[0][0].write_offset == 100 + assert len(first_call[0][0].checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES + + # Second chunk + assert second_call[0][0].write_offset == 100 + _MAX_CHUNK_SIZE_BYTES + assert len(second_call[0][0].checksummed_data.content) == 1 + + assert writer.offset == 100 + len(data) + writer.flush.assert_not_awaited() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_flushes_when_buffer_is_full( + mock_write_object_stream, mock_client +): + """Test that append flushes the stream when the buffer size is reached.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_BUFFER_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 0 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.flush = mock.AsyncMock() + + data = b"a" * _MAX_BUFFER_SIZE_BYTES + await writer.append(data) + + writer.flush.assert_awaited_once() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_handles_large_data(mock_write_object_stream, mock_client): + """Test that append handles data larger than the buffer size.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_BUFFER_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 0 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.flush = mock.AsyncMock() + + data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1) + await writer.append(data) + + assert writer.flush.await_count == 2 + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_data_two_times(mock_write_object_stream, mock_client): + """Test that append sends data correctly when called multiple times.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_CHUNK_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 0 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.flush = mock.AsyncMock() + + data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10) + await writer.append(data1) + + data2 = b"b" * (_MAX_CHUNK_SIZE_BYTES + 20) + await writer.append(data2) + + total_data_length = len(data1) + len(data2) + assert writer.offset == total_data_length + assert writer.flush.await_count == 0 From fcda23375c52b994f3b57d005f074f7243ade8e7 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Wed, 26 Nov 2025 14:11:37 +0530 Subject: [PATCH 15/24] chore: Add Sys test for move blob where name needs url encoding (#1624) Sys test for move blob. Add System test for bukect.move_blob where blob name needs url encoding. The fix was done in https://github.com/googleapis/python-storage/pull/1605 --- tests/system/test_bucket.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/system/test_bucket.py b/tests/system/test_bucket.py index de7074a92..32806bd4c 100644 --- a/tests/system/test_bucket.py +++ b/tests/system/test_bucket.py @@ -472,6 +472,36 @@ def test_bucket_move_blob_hns( assert source_gen != dest.generation +def test_bucket_move_blob_with_name_needs_encoding( + storage_client, + buckets_to_delete, + blobs_to_delete, +): + payload = b"move_blob_with_name_which_has_a_char_that_needs_url_encoding" + + bucket_name = _helpers.unique_name("move-blob") + bucket_obj = storage_client.bucket(bucket_name) + created = _helpers.retry_429_503(storage_client.create_bucket)(bucket_obj) + buckets_to_delete.append(created) + + source = created.blob("source") + source_gen = source.generation + source.upload_from_string(payload) + blobs_to_delete.append(source) + + dest = created.move_blob( + source, + "dest/dest_file.txt", + if_source_generation_match=source.generation, + if_source_metageneration_match=source.metageneration, + ) + blobs_to_delete.append(dest) + + assert dest.download_as_bytes() == payload + assert dest.generation is not None + assert source_gen != dest.generation + + def test_bucket_get_blob_with_user_project( storage_client, buckets_to_delete, From 9e6fefdc24a12a9189f7119bc9119e84a061842f Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Fri, 28 Nov 2025 11:36:06 +0530 Subject: [PATCH 16/24] fix: Add simple flush for optimized writes (#1633) fix: Add simple flush for optimized writes --- .../asyncio/async_appendable_object_writer.py | 20 +++++++- .../test_async_appendable_object_writer.py | 48 +++++++++++++++---- 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index 5c3e54d71..d34c844d5 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -199,10 +199,28 @@ async def append(self, data: bytes) -> None: self.offset += chunk_size bytes_to_flush += chunk_size if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES: - await self.flush() + await self.simple_flush() bytes_to_flush = 0 start_idx = end_idx + async def simple_flush(self) -> None: + """Flushes the data to the server. + Please note: Unlike `flush` it does not do `state_lookup` + + :rtype: None + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before simple_flush().") + + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + flush=True, + ) + ) + async def flush(self) -> int: """Flushes the data to the server. diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 902ed5f8b..a75824f8b 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -224,6 +224,38 @@ async def test_flush_without_open_raises_value_error(mock_client): await writer.flush() +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_simple_flush(mock_write_object_stream, mock_client): + """Test that flush sends the correct request and updates state.""" + # Arrange + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + + # Act + await writer.simple_flush() + + # Assert + mock_stream.send.assert_awaited_once_with( + _storage_v2.BidiWriteObjectRequest(flush=True) + ) + + +@pytest.mark.asyncio +async def test_simple_flush_without_open_raises_value_error(mock_client): + """Test that flush raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, + match="Stream is not open. Call open\\(\\) before simple_flush\\(\\).", + ): + await writer.simple_flush() + + @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" @@ -369,7 +401,7 @@ async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client writer.persisted_size = 100 mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - writer.flush = mock.AsyncMock() + writer.simple_flush = mock.AsyncMock() data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1) await writer.append(data) @@ -387,7 +419,7 @@ async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client assert len(second_call[0][0].checksummed_data.content) == 1 assert writer.offset == 100 + len(data) - writer.flush.assert_not_awaited() + writer.simple_flush.assert_not_awaited() @pytest.mark.asyncio @@ -407,12 +439,12 @@ async def test_append_flushes_when_buffer_is_full( writer.persisted_size = 0 mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - writer.flush = mock.AsyncMock() + writer.simple_flush = mock.AsyncMock() data = b"a" * _MAX_BUFFER_SIZE_BYTES await writer.append(data) - writer.flush.assert_awaited_once() + writer.simple_flush.assert_awaited_once() @pytest.mark.asyncio @@ -430,12 +462,12 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client): writer.persisted_size = 0 mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - writer.flush = mock.AsyncMock() + writer.simple_flush = mock.AsyncMock() data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1) await writer.append(data) - assert writer.flush.await_count == 2 + assert writer.simple_flush.await_count == 2 @pytest.mark.asyncio @@ -453,7 +485,7 @@ async def test_append_data_two_times(mock_write_object_stream, mock_client): writer.persisted_size = 0 mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - writer.flush = mock.AsyncMock() + writer.simple_flush = mock.AsyncMock() data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10) await writer.append(data1) @@ -463,4 +495,4 @@ async def test_append_data_two_times(mock_write_object_stream, mock_client): total_data_length = len(data1) + len(data2) assert writer.offset == total_data_length - assert writer.flush.await_count == 0 + assert writer.simple_flush.await_count == 0 From a1c25073e353c317574f9dbdad19e29566f96d65 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Thu, 4 Dec 2025 23:38:48 +0530 Subject: [PATCH 17/24] chore: fix conformance test failure due to VM's API version mismatch. (#1635) Conformance tests were failing due to Docker API version mismatch in the kokoro's VM and docker client. This PR pins the `DOCKER_API_VERSION` to 1.39 RCA for kokoro failures - https://screenshot.googleplex.com/4zsxoQ8UxqWnTky. `Error response from daemon: client version 1.52 is too new. Maximum supported API version is 1.39` --- noxfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index 16cf97b01..b89b9d319 100644 --- a/noxfile.py +++ b/noxfile.py @@ -232,7 +232,7 @@ def conftest_retry(session): test_cmd = ["py.test", "-n", "auto", "--quiet", conformance_test_folder_path] # Run py.test against the conformance tests. - session.run(*test_cmd) + session.run(*test_cmd, env={"DOCKER_API_VERSION": "1.39"}) @nox.session(python=DEFAULT_PYTHON_VERSION) From 6ab8d9240ccade0a7a9570d03713ffb81a9efa86 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Fri, 5 Dec 2025 20:40:41 +0530 Subject: [PATCH 18/24] chore: Add gRPC packages under extra-dependencies (#1640) Add gRPC packages under extra-dependencies Since gRPC in Python SDK is still under `_experimental` directory. Keeping grpc packages under extra-dependencies. These should be moved into mandatory dependencies once gRPC is out of `_experimental` . See - b/465352227 --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- setup.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/setup.py b/setup.py index 2c4504749..374a71cf4 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,18 @@ "google-crc32c >= 1.1.3, < 2.0.0", ] extras = { + # TODO: Make these extra dependencies as mandatory once gRPC out of + # experimental in this SDK. More info in b/465352227 + "grpc": [ + "google-api-core[grpc] >= 2.27.0, < 3.0.0", + "grpcio >= 1.33.2, < 2.0.0; python_version < '3.14'", + "grpcio >= 1.75.1, < 2.0.0; python_version >= '3.14'", + "grpcio-status >= 1.76.0, < 2.0.0", + "proto-plus >= 1.22.3, <2.0.0; python_version < '3.13'", + "proto-plus >= 1.25.0, <2.0.0; python_version >= '3.13'", + "protobuf>=3.20.2,<7.0.0,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5", + "grpc-google-iam-v1 >= 0.14.0, <1.0.0", + ], "protobuf": ["protobuf >= 3.20.2, < 7.0.0"], "tracing": [ "opentelemetry-api >= 1.1.0, < 2.0.0", From 6ed12b97126ab936b6033a16d5e61ef424e94b37 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Mon, 8 Dec 2025 09:50:49 +0530 Subject: [PATCH 19/24] chore: fix failing system test due to version upgrade of urllib3 (#1651) chore: fix failing system tests due to version upgrade of urllib3. [2.6.0](https://urllib3.readthedocs.io/en/stable/changelog.html#id1) of urllib3 added security fixes for compressed data reads, which caused issues from #1642 to #1649 This PR temporarily mitigates failing system test to unblock other PRs. Actual fix will be tracked in b/466813444 --- testing/constraints-3.12.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/testing/constraints-3.12.txt b/testing/constraints-3.12.txt index ef1c92fff..4e5f201e2 100644 --- a/testing/constraints-3.12.txt +++ b/testing/constraints-3.12.txt @@ -7,3 +7,4 @@ grpcio proto-plus protobuf grpc-google-iam-v1 +urllib3==2.5.0 \ No newline at end of file From 4c6d549c3f6f14ee73975de8250f9d975dd0fa10 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Mon, 8 Dec 2025 14:47:31 +0530 Subject: [PATCH 20/24] chore: setup system tests for zonal buckets in cloud build (#1639) Setup system tests for zonal buckets in cloud build. --- cloudbuild/run_zonal_tests.sh | 26 ++++++++ cloudbuild/zb-system-tests-cloudbuild.yaml | 70 ++++++++++++++++++++++ tests/system/test_zonal.py | 59 ++++++++++++++++++ 3 files changed, 155 insertions(+) create mode 100644 cloudbuild/run_zonal_tests.sh create mode 100644 cloudbuild/zb-system-tests-cloudbuild.yaml create mode 100644 tests/system/test_zonal.py diff --git a/cloudbuild/run_zonal_tests.sh b/cloudbuild/run_zonal_tests.sh new file mode 100644 index 000000000..ef94e629b --- /dev/null +++ b/cloudbuild/run_zonal_tests.sh @@ -0,0 +1,26 @@ + +set -euxo pipefail +echo '--- Installing git and cloning repository on VM ---' +sudo apt-get update && sudo apt-get install -y git python3-pip python3-venv + +# Clone the repository and checkout the specific commit from the build trigger. +git clone https://github.com/googleapis/python-storage.git +cd python-storage +git checkout ${COMMIT_SHA} + + +echo '--- Installing Python and dependencies on VM ---' +python3 -m venv env +source env/bin/activate + +echo 'Install testing libraries explicitly, as they are not in setup.py' +pip install --upgrade pip +pip install pytest pytest-timeout pytest-subtests pytest-asyncio +pip install google-cloud-testutils google-cloud-kms +pip install -e . + +echo '--- Setting up environment variables on VM ---' +export ZONAL_BUCKET=${_ZONAL_BUCKET} +export RUN_ZONAL_SYSTEM_TESTS=True +echo '--- Running Zonal tests on VM ---' +pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' tests/system/test_zonal.py diff --git a/cloudbuild/zb-system-tests-cloudbuild.yaml b/cloudbuild/zb-system-tests-cloudbuild.yaml new file mode 100644 index 000000000..be790ebd4 --- /dev/null +++ b/cloudbuild/zb-system-tests-cloudbuild.yaml @@ -0,0 +1,70 @@ +substitutions: + _REGION: "us-central1" + _ZONE: "us-central1-a" + _SHORT_BUILD_ID: ${BUILD_ID:0:8} + +steps: + + # Step 1 Create a GCE VM to run the tests. + # The VM is created in the same zone as the buckets to test rapid storage features. + # It's given the 'cloud-platform' scope to allow it to access GCS and other services. + - name: "gcr.io/google.com/cloudsdktool/cloud-sdk" + id: "create-vm" + entrypoint: "gcloud" + args: + - "compute" + - "instances" + - "create" + - "gcsfs-test-vm-${_SHORT_BUILD_ID}" + - "--project=${PROJECT_ID}" + - "--zone=${_ZONE}" + - "--machine-type=e2-medium" + - "--image-family=debian-13" + - "--image-project=debian-cloud" + - "--service-account=${_ZONAL_VM_SERVICE_ACCOUNT}" + - "--scopes=https://www.googleapis.com/auth/devstorage.full_control,https://www.googleapis.com/auth/devstorage.read_only,https://www.googleapis.com/auth/devstorage.read_write" + - "--metadata=enable-oslogin=TRUE" + waitFor: ["-"] + + # Step 2: Run the integration tests inside the newly created VM and cleanup. + # This step uses 'gcloud compute ssh' to execute a remote script. + # The VM is deleted after tests are run, regardless of success. + - name: "gcr.io/google.com/cloudsdktool/cloud-sdk" + id: "run-tests-and-delete-vm" + entrypoint: "bash" + args: + - "-c" + - | + set -e + # Wait for the VM to be fully initialized and SSH to be ready. + for i in {1..10}; do + if gcloud compute ssh gcsfs-test-vm-${_SHORT_BUILD_ID} --zone=${_ZONE} --internal-ip --command="echo VM is ready"; then + break + fi + echo "Waiting for VM to become available... (attempt $i/10)" + sleep 15 + done + # copy the script to the VM + gcloud compute scp cloudbuild/run_zonal_tests.sh gcsfs-test-vm-${_SHORT_BUILD_ID}:~ --zone=${_ZONE} --internal-ip + + # Execute the script on the VM via SSH. + # Capture the exit code to ensure cleanup happens before the build fails. + set +e + gcloud compute ssh gcsfs-test-vm-${_SHORT_BUILD_ID} --zone=${_ZONE} --internal-ip --command="COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} bash run_zonal_tests.sh" + EXIT_CODE=$? + set -e + + echo "--- Deleting GCE VM ---" + gcloud compute instances delete "gcsfs-test-vm-${_SHORT_BUILD_ID}" --zone=${_ZONE} --quiet + + # Exit with the original exit code from the test script. + exit $$EXIT_CODE + waitFor: + - "create-vm" + +timeout: "3600s" # 60 minutes + +options: + logging: CLOUD_LOGGING_ONLY + pool: + name: "projects/${PROJECT_ID}/locations/us-central1/workerPools/cloud-build-worker-pool" \ No newline at end of file diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py new file mode 100644 index 000000000..909b9ddf1 --- /dev/null +++ b/tests/system/test_zonal.py @@ -0,0 +1,59 @@ +# py standard imports +import os +import uuid +from io import BytesIO + +# python additional imports +import pytest + +# current library imports +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + +pytestmark = pytest.mark.skipif( + os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True", + reason="Zonal system tests need to be explicitly enabled. This helps scheduling tests in Kokoro and Cloud Build.", +) + + +# TODO: replace this with a fixture once zonal bucket creation / deletion +# is supported in grpc client or json client client. +_ZONAL_BUCKET = os.getenv("ZONAL_BUCKET") + + +@pytest.mark.asyncio +async def test_basic_wrd(storage_client, blobs_to_delete): + bytes_to_upload = b"dummy_bytes_to_write_read_and_delete_appendable_object" + object_name = f"test_basic_wrd-{str(uuid.uuid4())}" + + # Client instantiation; it cannot be part of fixture because. + # grpc_client's event loop and event loop of coroutine running it + # (i.e. this test) must be same. + # Note: + # 1. @pytest.mark.asyncio ensures new event for each test. + # 2. we can keep the same event loop for entire module but that may + # create issues if tests are run in parallel and one test hogs the event + # loop slowing down other tests. + grpc_client = AsyncGrpcClient().grpc_client + + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(bytes_to_upload) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == len(bytes_to_upload) + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == bytes_to_upload + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) From 4e91c541363f0e583bf9dd1b81a95ff2cb618bac Mon Sep 17 00:00:00 2001 From: agrawalradhika-cell Date: Tue, 9 Dec 2025 00:26:23 +0530 Subject: [PATCH 21/24] feat: Auto enable mTLS when supported certificates are detected (#1637) The Python SDK will use a hybrid approach for mTLS enablement: If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable is set (either true or false or any value), the SDK will respect that setting. This is necessary for test scenarios and users who need to explicitly control mTLS behavior. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable is not set, the SDK will automatically enable mTLS only if it detects Managed Workload Identity (MWID) or X.509 Workforce Identity Federation (WIF) certificate sources. In other cases where the variable is not set, mTLS will remain disabled. --------- Signed-off-by: Radhika Agrawal Co-authored-by: Chandra Shekhar Sirimala --- .../_storage_v2/services/storage/client.py | 46 +++- google/cloud/storage/_helpers.py | 4 - google/cloud/storage/client.py | 14 +- tests/unit/gapic/storage_v2/test_storage.py | 254 ++++++++++++++++-- 4 files changed, 268 insertions(+), 50 deletions(-) diff --git a/google/cloud/_storage_v2/services/storage/client.py b/google/cloud/_storage_v2/services/storage/client.py index 16c76a01f..cdccf3fab 100644 --- a/google/cloud/_storage_v2/services/storage/client.py +++ b/google/cloud/_storage_v2/services/storage/client.py @@ -184,6 +184,34 @@ def _get_default_mtls_endpoint(api_endpoint): _DEFAULT_ENDPOINT_TEMPLATE = "storage.{UNIVERSE_DOMAIN}" _DEFAULT_UNIVERSE = "googleapis.com" + @staticmethod + def _use_client_cert_effective(): + """Returns whether client certificate should be used for mTLS if the + google-auth version supports should_use_client_cert automatic mTLS enablement. + + Alternatively, read from the GOOGLE_API_USE_CLIENT_CERTIFICATE env var. + + Returns: + bool: whether client certificate should be used for mTLS + Raises: + ValueError: (If using a version of google-auth without should_use_client_cert and + GOOGLE_API_USE_CLIENT_CERTIFICATE is set to an unexpected value.) + """ + # check if google-auth version supports should_use_client_cert for automatic mTLS enablement + if hasattr(mtls, "should_use_client_cert"): + return mtls.should_use_client_cert() + else: + # if unsupported, fallback to reading from env var + use_client_cert_str = os.getenv( + "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false" + ).lower() + if use_client_cert_str not in ("true", "false"): + raise ValueError( + "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be" + " either `true` or `false`" + ) + return use_client_cert_str == "true" + @classmethod def from_service_account_info(cls, info: dict, *args, **kwargs): """Creates an instance of this client using the provided credentials @@ -390,12 +418,8 @@ def get_mtls_endpoint_and_cert_source( ) if client_options is None: client_options = client_options_lib.ClientOptions() - use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") + use_client_cert = StorageClient._use_client_cert_effective() use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto") - if use_client_cert not in ("true", "false"): - raise ValueError( - "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" - ) if use_mtls_endpoint not in ("auto", "never", "always"): raise MutualTLSChannelError( "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" @@ -403,7 +427,7 @@ def get_mtls_endpoint_and_cert_source( # Figure out the client cert source to use. client_cert_source = None - if use_client_cert == "true": + if use_client_cert: if client_options.client_cert_source: client_cert_source = client_options.client_cert_source elif mtls.has_default_client_cert_source(): @@ -435,20 +459,14 @@ def _read_environment_variables(): google.auth.exceptions.MutualTLSChannelError: If GOOGLE_API_USE_MTLS_ENDPOINT is not any of ["auto", "never", "always"]. """ - use_client_cert = os.getenv( - "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false" - ).lower() + use_client_cert = StorageClient._use_client_cert_effective() use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto").lower() universe_domain_env = os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN") - if use_client_cert not in ("true", "false"): - raise ValueError( - "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" - ) if use_mtls_endpoint not in ("auto", "never", "always"): raise MutualTLSChannelError( "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" ) - return use_client_cert == "true", use_mtls_endpoint, universe_domain_env + return use_client_cert, use_mtls_endpoint, universe_domain_env @staticmethod def _get_client_cert_source(provided_cert_source, use_cert_flag): diff --git a/google/cloud/storage/_helpers.py b/google/cloud/storage/_helpers.py index 682f8784d..24f72ad71 100644 --- a/google/cloud/storage/_helpers.py +++ b/google/cloud/storage/_helpers.py @@ -111,10 +111,6 @@ def _virtual_hosted_style_base_url(url, bucket, trailing_slash=False): return base_url -def _use_client_cert(): - return os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") == "true" - - def _get_environ_project(): return os.getenv( environment_vars.PROJECT, diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index 3764c7a53..85575f067 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -20,11 +20,12 @@ import datetime import functools import json +import os import warnings import google.api_core.client_options from google.auth.credentials import AnonymousCredentials - +from google.auth.transport import mtls from google.api_core import page_iterator from google.cloud._helpers import _LocalStack from google.cloud.client import ClientWithProject @@ -35,7 +36,6 @@ from google.cloud.storage._helpers import _get_api_endpoint_override from google.cloud.storage._helpers import _get_environ_project from google.cloud.storage._helpers import _get_storage_emulator_override -from google.cloud.storage._helpers import _use_client_cert from google.cloud.storage._helpers import _virtual_hosted_style_base_url from google.cloud.storage._helpers import _DEFAULT_UNIVERSE_DOMAIN from google.cloud.storage._helpers import _DEFAULT_SCHEME @@ -218,7 +218,15 @@ def __init__( # The final decision of whether to use mTLS takes place in # google-auth-library-python. We peek at the environment variable # here only to issue an exception in case of a conflict. - if _use_client_cert(): + use_client_cert = False + if hasattr(mtls, "should_use_client_cert"): + use_client_cert = mtls.should_use_client_cert() + else: + use_client_cert = ( + os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") == "true" + ) + + if use_client_cert: raise ValueError( 'The "GOOGLE_API_USE_CLIENT_CERTIFICATE" env variable is ' 'set to "true" and a non-default universe domain is ' diff --git a/tests/unit/gapic/storage_v2/test_storage.py b/tests/unit/gapic/storage_v2/test_storage.py index 20b680341..7b6340aa7 100644 --- a/tests/unit/gapic/storage_v2/test_storage.py +++ b/tests/unit/gapic/storage_v2/test_storage.py @@ -148,12 +148,19 @@ def test__read_environment_variables(): with mock.patch.dict( os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} ): - with pytest.raises(ValueError) as excinfo: - StorageClient._read_environment_variables() - assert ( - str(excinfo.value) - == "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" - ) + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with pytest.raises(ValueError) as excinfo: + StorageClient._read_environment_variables() + assert ( + str(excinfo.value) + == "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" + ) + else: + assert StorageClient._read_environment_variables() == ( + False, + "auto", + None, + ) with mock.patch.dict(os.environ, {"GOOGLE_API_USE_MTLS_ENDPOINT": "never"}): assert StorageClient._read_environment_variables() == (False, "never", None) @@ -176,6 +183,105 @@ def test__read_environment_variables(): assert StorageClient._read_environment_variables() == (False, "auto", "foo.com") +def test_use_client_cert_effective(): + # Test case 1: Test when `should_use_client_cert` returns True. + # We mock the `should_use_client_cert` function to simulate a scenario where + # the google-auth library supports automatic mTLS and determines that a + # client certificate should be used. + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch( + "google.auth.transport.mtls.should_use_client_cert", return_value=True + ): + assert StorageClient._use_client_cert_effective() is True + + # Test case 2: Test when `should_use_client_cert` returns False. + # We mock the `should_use_client_cert` function to simulate a scenario where + # the google-auth library supports automatic mTLS and determines that a + # client certificate should NOT be used. + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch( + "google.auth.transport.mtls.should_use_client_cert", return_value=False + ): + assert StorageClient._use_client_cert_effective() is False + + # Test case 3: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "true". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "true"}): + assert StorageClient._use_client_cert_effective() is True + + # Test case 4: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "false". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "false"} + ): + assert StorageClient._use_client_cert_effective() is False + + # Test case 5: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "True". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "True"}): + assert StorageClient._use_client_cert_effective() is True + + # Test case 6: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "False". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "False"} + ): + assert StorageClient._use_client_cert_effective() is False + + # Test case 7: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "TRUE". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "TRUE"}): + assert StorageClient._use_client_cert_effective() is True + + # Test case 8: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "FALSE". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "FALSE"} + ): + assert StorageClient._use_client_cert_effective() is False + + # Test case 9: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not set. + # In this case, the method should return False, which is the default value. + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict(os.environ, clear=True): + assert StorageClient._use_client_cert_effective() is False + + # Test case 10: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to an invalid value. + # The method should raise a ValueError as the environment variable must be either + # "true" or "false". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "unsupported"} + ): + with pytest.raises(ValueError): + StorageClient._use_client_cert_effective() + + # Test case 11: Test when `should_use_client_cert` is available and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to an invalid value. + # The method should return False as the environment variable is set to an invalid value. + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "unsupported"} + ): + assert StorageClient._use_client_cert_effective() is False + + # Test case 12: Test when `should_use_client_cert` is available and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is unset. Also, + # the GOOGLE_API_CONFIG environment variable is unset. + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": ""}): + with mock.patch.dict(os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": ""}): + assert StorageClient._use_client_cert_effective() is False + + def test__get_client_cert_source(): mock_provided_cert_source = mock.Mock() mock_default_cert_source = mock.Mock() @@ -515,17 +621,6 @@ def test_storage_client_client_options(client_class, transport_class, transport_ == "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" ) - # Check the case GOOGLE_API_USE_CLIENT_CERTIFICATE has unsupported value. - with mock.patch.dict( - os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} - ): - with pytest.raises(ValueError) as excinfo: - client = client_class(transport=transport_name) - assert ( - str(excinfo.value) - == "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" - ) - # Check the case quota_project_id is provided options = client_options.ClientOptions(quota_project_id="octopus") with mock.patch.object(transport_class, "__init__") as patched: @@ -733,6 +828,119 @@ def test_storage_client_get_mtls_endpoint_and_cert_source(client_class): assert api_endpoint == mock_api_endpoint assert cert_source is None + # Test the case GOOGLE_API_USE_CLIENT_CERTIFICATE is "Unsupported". + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} + ): + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + mock_client_cert_source = mock.Mock() + mock_api_endpoint = "foo" + options = client_options.ClientOptions( + client_cert_source=mock_client_cert_source, + api_endpoint=mock_api_endpoint, + ) + api_endpoint, cert_source = client_class.get_mtls_endpoint_and_cert_source( + options + ) + assert api_endpoint == mock_api_endpoint + assert cert_source is None + + # Test cases for mTLS enablement when GOOGLE_API_USE_CLIENT_CERTIFICATE is unset. + test_cases = [ + ( + # With workloads present in config, mTLS is enabled. + { + "version": 1, + "cert_configs": { + "workload": { + "cert_path": "path/to/cert/file", + "key_path": "path/to/key/file", + } + }, + }, + mock_client_cert_source, + ), + ( + # With workloads not present in config, mTLS is disabled. + { + "version": 1, + "cert_configs": {}, + }, + None, + ), + ] + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + for config_data, expected_cert_source in test_cases: + env = os.environ.copy() + env.pop("GOOGLE_API_USE_CLIENT_CERTIFICATE", None) + with mock.patch.dict(os.environ, env, clear=True): + config_filename = "mock_certificate_config.json" + config_file_content = json.dumps(config_data) + m = mock.mock_open(read_data=config_file_content) + with mock.patch("builtins.open", m): + with mock.patch.dict( + os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": config_filename} + ): + mock_api_endpoint = "foo" + options = client_options.ClientOptions( + client_cert_source=mock_client_cert_source, + api_endpoint=mock_api_endpoint, + ) + ( + api_endpoint, + cert_source, + ) = client_class.get_mtls_endpoint_and_cert_source(options) + assert api_endpoint == mock_api_endpoint + assert cert_source is expected_cert_source + + # Test cases for mTLS enablement when GOOGLE_API_USE_CLIENT_CERTIFICATE is unset(empty). + test_cases = [ + ( + # With workloads present in config, mTLS is enabled. + { + "version": 1, + "cert_configs": { + "workload": { + "cert_path": "path/to/cert/file", + "key_path": "path/to/key/file", + } + }, + }, + mock_client_cert_source, + ), + ( + # With workloads not present in config, mTLS is disabled. + { + "version": 1, + "cert_configs": {}, + }, + None, + ), + ] + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + for config_data, expected_cert_source in test_cases: + env = os.environ.copy() + env.pop("GOOGLE_API_USE_CLIENT_CERTIFICATE", "") + with mock.patch.dict(os.environ, env, clear=True): + config_filename = "mock_certificate_config.json" + config_file_content = json.dumps(config_data) + m = mock.mock_open(read_data=config_file_content) + with mock.patch("builtins.open", m): + with mock.patch.dict( + os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": config_filename} + ): + mock_api_endpoint = "foo" + options = client_options.ClientOptions( + client_cert_source=mock_client_cert_source, + api_endpoint=mock_api_endpoint, + ) + ( + api_endpoint, + cert_source, + ) = client_class.get_mtls_endpoint_and_cert_source(options) + assert api_endpoint == mock_api_endpoint + assert cert_source is expected_cert_source + # Test the case GOOGLE_API_USE_MTLS_ENDPOINT is "never". with mock.patch.dict(os.environ, {"GOOGLE_API_USE_MTLS_ENDPOINT": "never"}): api_endpoint, cert_source = client_class.get_mtls_endpoint_and_cert_source() @@ -783,18 +991,6 @@ def test_storage_client_get_mtls_endpoint_and_cert_source(client_class): == "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" ) - # Check the case GOOGLE_API_USE_CLIENT_CERTIFICATE has unsupported value. - with mock.patch.dict( - os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} - ): - with pytest.raises(ValueError) as excinfo: - client_class.get_mtls_endpoint_and_cert_source() - - assert ( - str(excinfo.value) - == "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" - ) - @pytest.mark.parametrize("client_class", [StorageClient, StorageAsyncClient]) @mock.patch.object( From ddce7e53a13e6c0487221bb14e88161da7ed9e08 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Tue, 9 Dec 2025 13:32:28 +0530 Subject: [PATCH 22/24] feat: send entire object checksum in the final api call of resumable upload (#1654) feat: send entire object checksum in the final api call of resumable upload fixes b/461994245 --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- google/cloud/storage/_media/_upload.py | 7 ++++ .../system/requests/test_upload.py | 24 ------------ tests/unit/test_blob.py | 37 +++++++++++++++++-- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/google/cloud/storage/_media/_upload.py b/google/cloud/storage/_media/_upload.py index 765716882..4a919d18a 100644 --- a/google/cloud/storage/_media/_upload.py +++ b/google/cloud/storage/_media/_upload.py @@ -688,6 +688,13 @@ def _prepare_request(self): _CONTENT_TYPE_HEADER: self._content_type, _helpers.CONTENT_RANGE_HEADER: content_range, } + if (start_byte + len(payload) == self._total_bytes) and ( + self._checksum_object is not None + ): + local_checksum = _helpers.prepare_checksum_digest( + self._checksum_object.digest() + ) + headers["x-goog-hash"] = f"{self._checksum_type}={local_checksum}" return _PUT, self.resumable_url, payload, headers def _update_checksum(self, start_byte, payload): diff --git a/tests/resumable_media/system/requests/test_upload.py b/tests/resumable_media/system/requests/test_upload.py index dd90aa53b..47f4f6003 100644 --- a/tests/resumable_media/system/requests/test_upload.py +++ b/tests/resumable_media/system/requests/test_upload.py @@ -27,7 +27,6 @@ import google.cloud.storage._media.requests as resumable_requests from google.cloud.storage._media import _helpers from .. import utils -from google.cloud.storage._media import _upload from google.cloud.storage.exceptions import InvalidResponse from google.cloud.storage.exceptions import DataCorruption @@ -372,29 +371,6 @@ def test_resumable_upload_with_headers( _resumable_upload_helper(authorized_transport, img_stream, cleanup, headers=headers) -@pytest.mark.parametrize("checksum", ["md5", "crc32c"]) -def test_resumable_upload_with_bad_checksum( - authorized_transport, img_stream, bucket, cleanup, checksum -): - fake_checksum_object = _helpers._get_checksum_object(checksum) - fake_checksum_object.update(b"bad data") - fake_prepared_checksum_digest = _helpers.prepare_checksum_digest( - fake_checksum_object.digest() - ) - with mock.patch.object( - _helpers, "prepare_checksum_digest", return_value=fake_prepared_checksum_digest - ): - with pytest.raises(DataCorruption) as exc_info: - _resumable_upload_helper( - authorized_transport, img_stream, cleanup, checksum=checksum - ) - expected_checksums = {"md5": "1bsd83IYNug8hd+V1ING3Q==", "crc32c": "YQGPxA=="} - expected_message = _upload._UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( - checksum.upper(), fake_prepared_checksum_digest, expected_checksums[checksum] - ) - assert exc_info.value.args[0] == expected_message - - def test_resumable_upload_bad_chunk_size(authorized_transport, img_stream): blob_name = os.path.basename(img_stream.name) # Create the actual upload object. diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index f3b6da5d1..cbf53b398 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -3049,7 +3049,14 @@ def test__initiate_resumable_upload_with_client_custom_headers(self): self._initiate_resumable_helper(client=client) def _make_resumable_transport( - self, headers1, headers2, headers3, total_bytes, data_corruption=False + self, + headers1, + headers2, + headers3, + total_bytes, + data_corruption=False, + md5_checksum_value=None, + crc32c_checksum_value=None, ): fake_transport = mock.Mock(spec=["request"]) @@ -3057,7 +3064,7 @@ def _make_resumable_transport( fake_response2 = self._mock_requests_response( http.client.PERMANENT_REDIRECT, headers2 ) - json_body = f'{{"size": "{total_bytes:d}"}}' + json_body = json.dumps({"size": str(total_bytes), "md5Hash": md5_checksum_value, "crc32c": crc32c_checksum_value}) if data_corruption: fake_response3 = DataCorruption(None) else: @@ -3151,6 +3158,9 @@ def _do_resumable_upload_call2( if_metageneration_match=None, if_metageneration_not_match=None, timeout=None, + checksum=None, + crc32c_checksum_value=None, + md5_checksum_value=None, ): # Third mock transport.request() does sends last chunk. content_range = f"bytes {blob.chunk_size:d}-{total_bytes - 1:d}/{total_bytes:d}" @@ -3161,6 +3171,11 @@ def _do_resumable_upload_call2( "content-type": content_type, "content-range": content_range, } + if checksum == "crc32c": + expected_headers["x-goog-hash"] = f"crc32c={crc32c_checksum_value}" + elif checksum == "md5": + expected_headers["x-goog-hash"] = f"md5={md5_checksum_value}" + payload = data[blob.chunk_size :] return mock.call( "PUT", @@ -3181,12 +3196,17 @@ def _do_resumable_helper( timeout=None, data_corruption=False, retry=None, + checksum=None, # None is also a valid value, when user decides to disable checksum validation. ): CHUNK_SIZE = 256 * 1024 USER_AGENT = "testing 1.2.3" content_type = "text/html" # Data to be uploaded. data = b"" + (b"A" * CHUNK_SIZE) + b"" + + # Data calcuated offline and entered here. (Unit test best practice). + crc32c_checksum_value = "mQ30hg==" + md5_checksum_value = "wajHeg1f2Q2u9afI6fjPOw==" total_bytes = len(data) if use_size: size = total_bytes @@ -3213,6 +3233,8 @@ def _do_resumable_helper( headers3, total_bytes, data_corruption=data_corruption, + md5_checksum_value=md5_checksum_value, + crc32c_checksum_value=crc32c_checksum_value, ) # Create some mock arguments and call the method under test. @@ -3247,7 +3269,7 @@ def _do_resumable_helper( if_generation_not_match, if_metageneration_match, if_metageneration_not_match, - checksum=None, + checksum=checksum, retry=retry, **timeout_kwarg, ) @@ -3296,6 +3318,9 @@ def _do_resumable_helper( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=expected_timeout, + checksum=checksum, + crc32c_checksum_value=crc32c_checksum_value, + md5_checksum_value=md5_checksum_value, ) self.assertEqual(transport.request.mock_calls, [call0, call1, call2]) @@ -3308,6 +3333,12 @@ def test__do_resumable_upload_no_size(self): def test__do_resumable_upload_with_size(self): self._do_resumable_helper(use_size=True) + def test__do_resumable_upload_with_size_with_crc32c_checksum(self): + self._do_resumable_helper(use_size=True, checksum="crc32c") + + def test__do_resumable_upload_with_size_with_md5_checksum(self): + self._do_resumable_helper(use_size=True, checksum="md5") + def test__do_resumable_upload_with_retry(self): self._do_resumable_helper(retry=DEFAULT_RETRY) From 57405e956a7ca579b20582bf6435cec42743c478 Mon Sep 17 00:00:00 2001 From: Chandra Shekhar Sirimala Date: Tue, 9 Dec 2025 20:37:39 +0530 Subject: [PATCH 23/24] feat: Support urllib3 >= 2.6.0 (#1658) feat: Support urllib3 >= 2.6.0 **Context**: * This library implements a custom decoders ( `_GzipDecoder` , `_BrotliDecoder` ) which inherit from `urllib3.response.ContentDecoder` * Interface of `urllib3.response.ContentDecoder` was changed in [2.6.0](https://urllib3.readthedocs.io/en/stable/changelog.html#id1) to fix security vulnerability for highly compressed data reads. (Decompression bombs) Hence we need to change our interfaces as well. **Changes** * Add `max_length` param on decompress method, provide default value of -1 (same as urllib3's decompress) * Provide backwards compatibility ( ie urllib3 <= 2.5.0) --- .../cloud/storage/_media/requests/download.py | 21 +++++++++++++++---- testing/constraints-3.12.txt | 1 - 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/_media/requests/download.py b/google/cloud/storage/_media/requests/download.py index b8e2758e1..13e049bd3 100644 --- a/google/cloud/storage/_media/requests/download.py +++ b/google/cloud/storage/_media/requests/download.py @@ -711,7 +711,7 @@ def __init__(self, checksum): super().__init__() self._checksum = checksum - def decompress(self, data): + def decompress(self, data, max_length=-1): """Decompress the bytes. Args: @@ -721,7 +721,11 @@ def decompress(self, data): bytes: The decompressed bytes from ``data``. """ self._checksum.update(data) - return super().decompress(data) + try: + return super().decompress(data, max_length=max_length) + except TypeError: + # Fallback for urllib3 < 2.6.0 which lacks `max_length` support. + return super().decompress(data) # urllib3.response.BrotliDecoder might not exist depending on whether brotli is @@ -747,7 +751,7 @@ def __init__(self, checksum): self._decoder = urllib3.response.BrotliDecoder() self._checksum = checksum - def decompress(self, data): + def decompress(self, data, max_length=-1): """Decompress the bytes. Args: @@ -757,10 +761,19 @@ def decompress(self, data): bytes: The decompressed bytes from ``data``. """ self._checksum.update(data) - return self._decoder.decompress(data) + try: + return self._decoder.decompress(data, max_length=max_length) + except TypeError: + # Fallback for urllib3 < 2.6.0 which lacks `max_length` support. + return self._decoder.decompress(data) def flush(self): return self._decoder.flush() + @property + def has_unconsumed_tail(self) -> bool: + return self._decoder.has_unconsumed_tail + + else: # pragma: NO COVER _BrotliDecoder = None # type: ignore # pragma: NO COVER diff --git a/testing/constraints-3.12.txt b/testing/constraints-3.12.txt index 4e5f201e2..ef1c92fff 100644 --- a/testing/constraints-3.12.txt +++ b/testing/constraints-3.12.txt @@ -7,4 +7,3 @@ grpcio proto-plus protobuf grpc-google-iam-v1 -urllib3==2.5.0 \ No newline at end of file From 89a947a877f0d8f19be3a2bc45e5dfb8b8a977e5 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Tue, 9 Dec 2025 23:49:23 +0530 Subject: [PATCH 24/24] chore(main): release 3.7.0 (#1621) :robot: I have created a release *beep* *boop* --- ## [3.7.0](https://github.com/googleapis/python-storage/compare/v3.6.0...v3.7.0) (2025-12-09) ### Features * Auto enable mTLS when supported certificates are detected ([#1637](https://github.com/googleapis/python-storage/issues/1637)) ([4e91c54](https://github.com/googleapis/python-storage/commit/4e91c541363f0e583bf9dd1b81a95ff2cb618bac)) * Send entire object checksum in the final api call of resumable upload ([#1654](https://github.com/googleapis/python-storage/issues/1654)) ([ddce7e5](https://github.com/googleapis/python-storage/commit/ddce7e53a13e6c0487221bb14e88161da7ed9e08)) * Support urllib3 >= 2.6.0 ([#1658](https://github.com/googleapis/python-storage/issues/1658)) ([57405e9](https://github.com/googleapis/python-storage/commit/57405e956a7ca579b20582bf6435cec42743c478)) ### Bug Fixes * **bucket:** Move blob fails when the new blob name contains characters that need to be url encoded ([#1605](https://github.com/googleapis/python-storage/issues/1605)) ([ec470a2](https://github.com/googleapis/python-storage/commit/ec470a270e189e137c7229cc359367d5a897cdb9)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). --------- Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: Chandra Shekhar Sirimala --- CHANGELOG.md | 14 ++++++++++++++ google/cloud/storage/version.py | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ee1c7beb..da1f2149b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,20 @@ [1]: https://pypi.org/project/google-cloud-storage/#history +## [3.7.0](https://github.com/googleapis/python-storage/compare/v3.6.0...v3.7.0) (2025-12-09) + + +### Features + +* Auto enable mTLS when supported certificates are detected ([#1637](https://github.com/googleapis/python-storage/issues/1637)) ([4e91c54](https://github.com/googleapis/python-storage/commit/4e91c541363f0e583bf9dd1b81a95ff2cb618bac)) +* Send entire object checksum in the final api call of resumable upload ([#1654](https://github.com/googleapis/python-storage/issues/1654)) ([ddce7e5](https://github.com/googleapis/python-storage/commit/ddce7e53a13e6c0487221bb14e88161da7ed9e08)) +* Support urllib3 >= 2.6.0 ([#1658](https://github.com/googleapis/python-storage/issues/1658)) ([57405e9](https://github.com/googleapis/python-storage/commit/57405e956a7ca579b20582bf6435cec42743c478)) + + +### Bug Fixes + +* Fix for [move_blob](https://github.com/googleapis/python-storage/blob/57405e956a7ca579b20582bf6435cec42743c478/google/cloud/storage/bucket.py#L2256) failure when the new blob name contains characters that need to be url encoded ([#1605](https://github.com/googleapis/python-storage/issues/1605)) ([ec470a2](https://github.com/googleapis/python-storage/commit/ec470a270e189e137c7229cc359367d5a897cdb9)) + ## [3.6.0](https://github.com/googleapis/python-storage/compare/v3.5.0...v3.6.0) (2025-11-17) diff --git a/google/cloud/storage/version.py b/google/cloud/storage/version.py index 102b96095..dc87b3c5b 100644 --- a/google/cloud/storage/version.py +++ b/google/cloud/storage/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "3.6.0" +__version__ = "3.7.0"