diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 77a642659..1688f0bbd 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -3,9 +3,7 @@ # # For syntax help see: # https://help.github.com/en/github/creating-cloning-and-archiving-repositories/about-code-owners#codeowners-syntax +* @googleapis/cloud-sdk-python-team @googleapis/gcs-team @googleapis/gcs-fs-team -# @googleapis/yoshi-python @googleapis/gcs-sdk-team are the default owners for changes in this repo -* @googleapis/yoshi-python @googleapis/gcs-sdk-team @googleapis/gcs-fs - -# @googleapis/python-samples-reviewers @googleapis/gcs-sdk-team are the default owners for samples changes -/samples/ @googleapis/python-samples-reviewers @googleapis/gcs-sdk-team +# @googleapis/python-samples-reviewers @googleapis/gcs-team are the default owners for samples changes +/samples/ @googleapis/python-samples-reviewers @googleapis/gcs-team diff --git a/.github/release-please.yml b/.github/release-please.yml deleted file mode 100644 index 8a7214bdd..000000000 --- a/.github/release-please.yml +++ /dev/null @@ -1,6 +0,0 @@ -branches: -- branch: python2 - handleGHRelease: true - releaseType: python -releaseType: python -handleGHRelease: true diff --git a/.github/release-trigger.yml b/.github/release-trigger.yml deleted file mode 100644 index 5980127a4..000000000 --- a/.github/release-trigger.yml +++ /dev/null @@ -1,2 +0,0 @@ -enabled: true -multiScmName: python-storage diff --git a/.github/sync-repo-settings.yaml b/.github/sync-repo-settings.yaml index 19c1d0ba4..073e7d995 100644 --- a/.github/sync-repo-settings.yaml +++ b/.github/sync-repo-settings.yaml @@ -10,5 +10,5 @@ branchProtectionRules: - 'Kokoro' - 'cla/google' - 'Kokoro system-3.14' - - 'Kokoro system-3.9' + - 'Kokoro system-3.10' - 'OwlBot Post Processor' diff --git a/.kokoro/presubmit/system-3.9.cfg b/.kokoro/presubmit/system-3.10.cfg similarity index 91% rename from .kokoro/presubmit/system-3.9.cfg rename to .kokoro/presubmit/system-3.10.cfg index d21467d02..26958ac2a 100644 --- a/.kokoro/presubmit/system-3.9.cfg +++ b/.kokoro/presubmit/system-3.10.cfg @@ -3,7 +3,7 @@ # Only run this nox session. env_vars: { key: "NOX_SESSION" - value: "system-3.9" + value: "system-3.10" } # Credentials needed to test universe domain. diff --git a/.kokoro/trampoline_v2.sh b/.kokoro/trampoline_v2.sh index 35fa52923..d03f92dfc 100755 --- a/.kokoro/trampoline_v2.sh +++ b/.kokoro/trampoline_v2.sh @@ -26,8 +26,8 @@ # To run this script, first download few files from gcs to /dev/shm. # (/dev/shm is passed into the container as KOKORO_GFILE_DIR). # -# gsutil cp gs://cloud-devrel-kokoro-resources/python-docs-samples/secrets_viewer_service_account.json /dev/shm -# gsutil cp gs://cloud-devrel-kokoro-resources/python-docs-samples/automl_secrets.txt /dev/shm +# gcloud storage cp gs://cloud-devrel-kokoro-resources/python-docs-samples/secrets_viewer_service_account.json /dev/shm +# gcloud storage cp gs://cloud-devrel-kokoro-resources/python-docs-samples/automl_secrets.txt /dev/shm # # Then run the script. # .kokoro/trampoline_v2.sh diff --git a/.librarian/generator-input/noxfile.py b/.librarian/generator-input/noxfile.py index ca527decd..c9ada0739 100644 --- a/.librarian/generator-input/noxfile.py +++ b/.librarian/generator-input/noxfile.py @@ -27,8 +27,8 @@ BLACK_PATHS = ["docs", "google", "tests", "noxfile.py", "setup.py"] DEFAULT_PYTHON_VERSION = "3.14" -SYSTEM_TEST_PYTHON_VERSIONS = ["3.9", "3.14"] -UNIT_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] +SYSTEM_TEST_PYTHON_VERSIONS = ["3.10", "3.14"] +UNIT_TEST_PYTHON_VERSIONS = ["3.10", "3.11", "3.12", "3.13", "3.14"] CONFORMANCE_TEST_PYTHON_VERSIONS = ["3.12"] CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute() @@ -44,9 +44,6 @@ "lint", "lint_setup_py", "system", - # TODO(https://github.com/googleapis/python-storage/issues/1499): - # Remove or restore testing for Python 3.7/3.8 - "unit-3.9", "unit-3.10", "unit-3.11", "unit-3.12", diff --git a/.librarian/generator-input/setup.py b/.librarian/generator-input/setup.py index 89971aa33..294e63892 100644 --- a/.librarian/generator-input/setup.py +++ b/.librarian/generator-input/setup.py @@ -87,9 +87,6 @@ "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", diff --git a/.librarian/state.yaml b/.librarian/state.yaml index 80e2355be..d91f3f9be 100644 --- a/.librarian/state.yaml +++ b/.librarian/state.yaml @@ -1,7 +1,7 @@ image: us-central1-docker.pkg.dev/cloud-sdk-librarian-prod/images-prod/python-librarian-generator@sha256:8e2c32496077054105bd06c54a59d6a6694287bc053588e24debe6da6920ad91 libraries: - id: google-cloud-storage - version: 3.9.0 + version: 3.10.0 last_generated_commit: 5400ccce473c439885bd6bf2924fd242271bfcab apis: - path: google/storage/v2 diff --git a/.repo-metadata.json b/.repo-metadata.json index bd870f959..59ebe7f61 100644 --- a/.repo-metadata.json +++ b/.repo-metadata.json @@ -12,7 +12,7 @@ "api_id": "storage.googleapis.com", "requires_billing": true, "default_version": "v2", - "codeowner_team": "@googleapis/yoshi-python @googleapis/gcs-sdk-team @googleapis/gcs-fs", + "codeowner_team": "@googleapis/cloud-sdk-python-team @googleapis/gcs-team @googleapis/gcs-fs", "api_shortname": "storage", "api_description": "is a durable and highly available object storage service. Google Cloud Storage is almost infinitely scalable and guarantees consistency: when a write succeeds, the latest copy of the object will be returned to any GET, globally." } diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c46db115..297ef7a1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,27 @@ [1]: https://pypi.org/project/google-cloud-storage/#history +## [3.10.0](https://github.com/googleapis/python-storage/compare/v3.9.0...v3.10.0) (2026-03-18) + + +### Features + +* [Bucket Encryption Enforcement] add support for bucket encryption enforcement config (#1742) ([2a6e8b00e4e6ff57460373f8e628fd363be47811](https://github.com/googleapis/python-storage/commit/2a6e8b00e4e6ff57460373f8e628fd363be47811)) + +### Perf Improvments + +* [Rapid Buckets Reads] Use raw proto access for read resumption strategy (#1764) ([14cfd61ce35365a409650981239ef742cdf375fb](https://github.com/googleapis/python-storage/commit/14cfd61ce35365a409650981239ef742cdf375fb)) +* [Rapid Buckets Benchmarks] init mp pool & grpc client once, use os.sched_setaffinity (#1751) ([a9eb82c1b9b3c6ae5717d47b76284ed0deb5f769](https://github.com/googleapis/python-storage/commit/a9eb82c1b9b3c6ae5717d47b76284ed0deb5f769)) +* [Rapid Buckets Writes] don't flush at every append, results in bad perf (#1746) ([ab62d728ac7d7be3c4fe9a99d72e35ead310805a](https://github.com/googleapis/python-storage/commit/ab62d728ac7d7be3c4fe9a99d72e35ead310805a)) + + +### Bug Fixes + +* [Windows] skip downloading blobs whose name contain `":" ` eg: `C:` `D:` etc when application runs in Windows. (#1774) ([558198823ed51918db9c0137715d1e7f5b593975](https://github.com/googleapis/python-storage/commit/558198823ed51918db9c0137715d1e7f5b593975)) +* [Path Traversal] Prevent path traversal in `download_many_to_path` (#1768) ([700fec3bf7aa37bd5ea4b163cc3f9e8e6892bd5a](https://github.com/googleapis/python-storage/commit/700fec3bf7aa37bd5ea4b163cc3f9e8e6892bd5a)) +* [Rapid Buckets] pass token correctly, '&' instead of ',' (#1756) ([d8dd1e074d2431de9b45e0103181dce749a447a0](https://github.com/googleapis/python-storage/commit/d8dd1e074d2431de9b45e0103181dce749a447a0)) + + ## [3.9.0](https://github.com/googleapis/python-storage/compare/v3.8.0...v3.9.0) (2026-02-02) diff --git a/cloudbuild/run_zonal_tests.sh b/cloudbuild/run_zonal_tests.sh index 22ca8fe4b..2d42ce6d5 100644 --- a/cloudbuild/run_zonal_tests.sh +++ b/cloudbuild/run_zonal_tests.sh @@ -23,6 +23,7 @@ pip install -e . echo '--- Setting up environment variables on VM ---' export ZONAL_BUCKET=${_ZONAL_BUCKET} export RUN_ZONAL_SYSTEM_TESTS=True +export GCE_METADATA_MTLS_MODE=None CURRENT_ULIMIT=$(ulimit -n) echo '--- Running Zonal tests on VM with ulimit set to ---' $CURRENT_ULIMIT pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' tests/system/test_zonal.py diff --git a/cloudbuild/zb-system-tests-cloudbuild.yaml b/cloudbuild/zb-system-tests-cloudbuild.yaml index 562eae175..26daa8ae9 100644 --- a/cloudbuild/zb-system-tests-cloudbuild.yaml +++ b/cloudbuild/zb-system-tests-cloudbuild.yaml @@ -68,7 +68,7 @@ steps: # Execute the script on the VM via SSH. # Capture the exit code to ensure cleanup happens before the build fails. set +e - gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh" + gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} CROSS_REGION_BUCKET=${_CROSS_REGION_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh" EXIT_CODE=$? set -e diff --git a/google/cloud/_storage_v2/gapic_version.py b/google/cloud/_storage_v2/gapic_version.py index 0d5599e8b..af050a89f 100644 --- a/google/cloud/_storage_v2/gapic_version.py +++ b/google/cloud/_storage_v2/gapic_version.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "3.9.0" # {x-release-please-version} +__version__ = "3.10.0" # {x-release-please-version} diff --git a/google/cloud/storage/asyncio/async_appendable_object_writer.py b/google/cloud/storage/asyncio/async_appendable_object_writer.py index 3ab06f8ba..c65209680 100644 --- a/google/cloud/storage/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/asyncio/async_appendable_object_writer.py @@ -43,7 +43,6 @@ _extract_bidi_writes_redirect_proto, ) - _MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB _DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB _BIDI_WRITE_REDIRECTED_TYPE_URL = ( @@ -211,6 +210,7 @@ def __init__( self.bytes_appended_since_last_flush = 0 self._routing_token: Optional[str] = None self.object_resource: Optional[_storage_v2.Object] = None + self._flush_count = 0 async def state_lookup(self) -> int: """Returns the persisted_size @@ -288,8 +288,7 @@ async def _do_open(): await self.write_obj_stream.close() except Exception as e: logger.warning( - "Error closing previous write stream during open retry. Got exception: ", - {e}, + f"Error closing previous write stream during open retry. Got exception: {e}" ) self.write_obj_stream = None self._is_stream_open = False @@ -309,7 +308,7 @@ async def _do_open(): ) await self.write_obj_stream.open( - metadata=current_metadata if metadata else None + metadata=current_metadata if current_metadata else None ) if self.write_obj_stream.generation_number: @@ -318,6 +317,8 @@ async def _do_open(): self.write_handle = self.write_obj_stream.write_handle if self.write_obj_stream.persisted_size is not None: self.persisted_size = self.write_obj_stream.persisted_size + # set offset while opening + self.offset = self.persisted_size self._is_stream_open = True self._routing_token = None @@ -380,8 +381,6 @@ async def generator(): logger.info( f"Re-opening the stream with attempt_count: {attempt_count}" ) - if self.write_obj_stream and self.write_obj_stream.is_stream_open: - await self.write_obj_stream.close() current_metadata = list(metadata) if metadata else [] if write_state.routing_token: @@ -403,28 +402,31 @@ async def generator(): write_state.user_buffer.seek(write_state.persisted_size) write_state.bytes_sent = write_state.persisted_size write_state.bytes_since_last_flush = 0 + self.bytes_appended_since_last_flush = 0 requests = strategy.generate_requests(state) - num_requests = len(requests) - for i, chunk_req in enumerate(requests): - if i == num_requests - 1: - chunk_req.state_lookup = True - chunk_req.flush = True + for chunk_req in requests: await self.write_obj_stream.send(chunk_req) - - resp = await self.write_obj_stream.recv() - if resp: - if resp.persisted_size is not None: - self.persisted_size = resp.persisted_size - state["write_state"].persisted_size = resp.persisted_size - self.offset = self.persisted_size - if resp.write_handle: - self.write_handle = resp.write_handle - state["write_state"].write_handle = resp.write_handle - self.bytes_appended_since_last_flush = 0 - - yield resp + if chunk_req.flush: + self._flush_count += 1 + + resp = None + if chunk_req.state_lookup: + # TODO: if there's error, it'll raise error + # and will be handled by `recover_state_on_failure` + resp = await self.write_obj_stream.recv() + + if resp: + if resp.persisted_size is not None: + self.persisted_size = resp.persisted_size + state["write_state"].persisted_size = resp.persisted_size + self.offset = self.persisted_size + if resp.write_handle: + self.write_handle = resp.write_handle + state["write_state"].write_handle = resp.write_handle + + yield resp return generator() @@ -432,7 +434,8 @@ async def generator(): write_state = _WriteState(_MAX_CHUNK_SIZE_BYTES, buffer, self.flush_interval) write_state.write_handle = self.write_handle write_state.persisted_size = self.persisted_size - write_state.bytes_sent = self.persisted_size + # offset is set during `open()` call. + write_state.bytes_sent = self.offset or 0 write_state.bytes_since_last_flush = self.bytes_appended_since_last_flush retry_manager = _BidiStreamRetryManager( @@ -442,11 +445,8 @@ async def generator(): await retry_manager.execute({"write_state": write_state}, retry_policy) # Sync local markers - self.write_obj_stream.persisted_size = write_state.persisted_size - self.write_obj_stream.write_handle = write_state.write_handle self.bytes_appended_since_last_flush = write_state.bytes_since_last_flush - self.persisted_size = write_state.persisted_size - self.offset = write_state.persisted_size + self.offset = write_state.bytes_sent async def simple_flush(self) -> None: """Flushes the data to the server. @@ -516,7 +516,6 @@ async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object] await self.write_obj_stream.close() self._is_stream_open = False - self.offset = None return self.persisted_size async def finalize(self) -> _storage_v2.Object: diff --git a/google/cloud/storage/asyncio/async_grpc_client.py b/google/cloud/storage/asyncio/async_grpc_client.py index 640e7fe38..88566b246 100644 --- a/google/cloud/storage/asyncio/async_grpc_client.py +++ b/google/cloud/storage/asyncio/async_grpc_client.py @@ -19,6 +19,8 @@ DEFAULT_CLIENT_INFO, ) from google.cloud.storage import __version__ +import grpc +from google.auth import credentials as auth_credentials class AsyncGrpcClient: @@ -52,6 +54,12 @@ def __init__( *, attempt_direct_path=True, ): + if isinstance(credentials, auth_credentials.AnonymousCredentials): + self._grpc_client = self._create_anonymous_client( + client_options, credentials + ) + return + if client_info is None: client_info = DEFAULT_CLIENT_INFO client_info.client_library_version = __version__ @@ -68,6 +76,21 @@ def __init__( attempt_direct_path=attempt_direct_path, ) + def _create_anonymous_client(self, client_options, credentials): + channel = grpc.aio.insecure_channel(client_options.api_endpoint) + transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( + channel=channel, credentials=credentials + ) + return storage_v2.StorageAsyncClient(transport=transport) + + @classmethod + def _create_insecure_grpc_client(cls, client_options): + return cls( + credentials=auth_credentials.AnonymousCredentials(), + client_options=client_options, + attempt_direct_path=False, + ) + def _create_async_grpc_client( self, credentials=None, diff --git a/google/cloud/storage/asyncio/async_multi_range_downloader.py b/google/cloud/storage/asyncio/async_multi_range_downloader.py index 8ad4d319f..51afd255b 100644 --- a/google/cloud/storage/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/asyncio/async_multi_range_downloader.py @@ -41,7 +41,6 @@ from google.cloud import _storage_v2 from google.cloud.storage._helpers import generate_random_56_bit_integer - _MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100 _BIDI_READ_REDIRECTED_TYPE_URL = ( "type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError" @@ -208,6 +207,10 @@ def __init__( "Cannot set both 'generation' and 'generation_number'. " "Use 'generation' for new code." ) + logger.warning( + "'generation_number' is deprecated and will be removed in a future " + "major release. Please use 'generation' instead." + ) generation = kwargs.pop("generation_number") raise_if_no_fast_crc32c() @@ -224,6 +227,7 @@ def __init__( self._read_id_to_download_ranges_id = {} self._download_ranges_id_to_pending_read_ids = {} self.persisted_size: Optional[int] = None # updated after opening the stream + self._open_retries: int = 0 async def __aenter__(self): """Opens the underlying bidi-gRPC connection to read from the object.""" @@ -237,6 +241,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def _on_open_error(self, exc): """Extracts routing token and read handle on redirect error during open.""" + logger.warning(f"Error occurred while opening MRD: {exc}") routing_token, read_handle = _handle_redirect(exc) if routing_token: self._routing_token = routing_token @@ -253,13 +258,18 @@ async def open( raise ValueError("Underlying bidi-gRPC stream is already open") if retry_policy is None: + def on_error_wrapper(exc): + self._open_retries += 1 + self._on_open_error(exc) + retry_policy = AsyncRetry( - predicate=_is_read_retryable, on_error=self._on_open_error + predicate=_is_read_retryable, on_error=on_error_wrapper ) else: original_on_error = retry_policy._on_error def combined_on_error(exc): + self._open_retries += 1 self._on_open_error(exc) if original_on_error: original_on_error(exc) @@ -421,7 +431,7 @@ async def generator(): if attempt_count > 1: logger.info( - f"Resuming download (attempt {attempt_count - 1}) for {len(requests)} ranges." + f"Resuming download (attempt {attempt_count}) for {len(requests)} ranges." ) async with lock: @@ -442,11 +452,7 @@ async def generator(): logger.info( f"Re-opening stream with routing token: {current_token}" ) - # Close existing stream if any - if self.read_obj_str and self.read_obj_str.is_stream_open: - await self.read_obj_str.close() - # Re-initialize stream self.read_obj_str = _AsyncReadObjectStream( client=self.client.grpc_client, bucket_name=self.bucket_name, diff --git a/google/cloud/storage/asyncio/async_read_object_stream.py b/google/cloud/storage/asyncio/async_read_object_stream.py index d456f16cc..bde6c1651 100644 --- a/google/cloud/storage/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/asyncio/async_read_object_stream.py @@ -115,7 +115,7 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: other_metadata.append((key, value)) current_metadata = other_metadata - current_metadata.append(("x-goog-request-params", ",".join(request_params))) + current_metadata.append(("x-goog-request-params", "&".join(request_params))) self.socket_like_rpc = AsyncBidiRpc( self.rpc, diff --git a/google/cloud/storage/asyncio/async_write_object_stream.py b/google/cloud/storage/asyncio/async_write_object_stream.py index 319f394dd..4729cfd20 100644 --- a/google/cloud/storage/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/asyncio/async_write_object_stream.py @@ -145,7 +145,7 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None: else: final_metadata.append((key, value)) - final_metadata.append(("x-goog-request-params", ",".join(request_param_values))) + final_metadata.append(("x-goog-request-params", "&".join(request_param_values))) self.socket_like_rpc = AsyncBidiRpc( self.rpc, @@ -190,7 +190,10 @@ async def requests_done(self): _utils.update_write_handle_if_exists(self, first_resp) if first_resp != grpc.aio.EOF: - self.persisted_size = first_resp.persisted_size + # this persisted_size will not be upto date., also what if response + # doesn't have persisted_size? , it'll throw error. + if hasattr(first_resp, "persisted_size"): + self.persisted_size = first_resp.persisted_size second_resp = await self.socket_like_rpc.recv() assert second_resp == grpc.aio.EOF diff --git a/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py b/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py index 23bffb63d..947ee74c1 100644 --- a/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py +++ b/google/cloud/storage/asyncio/retry/bidi_stream_retry_manager.py @@ -58,7 +58,7 @@ async def attempt(): return except Exception as e: if retry_policy._predicate(e): - logger.info( + logger.warning( f"Bidi stream operation failed: {e}. Attempting state recovery and retry." ) await self._strategy.recover_state_on_failure(e, state) diff --git a/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py b/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py index 468954332..e7003c105 100644 --- a/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py +++ b/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py @@ -81,23 +81,28 @@ def update_state_from_response( self, response: storage_v2.BidiReadObjectResponse, state: Dict[str, Any] ) -> None: """Processes a server response, performs integrity checks, and updates state.""" + proto = getattr(response, "_pb", response) # Capture read_handle if provided. - if response.read_handle: - state["read_handle"] = response.read_handle + if proto.HasField("read_handle"): + state["read_handle"] = storage_v2.BidiReadHandle( + handle=proto.read_handle.handle + ) download_states = state["download_states"] - for object_data_range in response.object_data_ranges: + for object_data_range in proto.object_data_ranges: # Ignore empty ranges or ranges for IDs not in our state # (e.g., from a previously cancelled request on the same stream). - if not object_data_range.read_range: + if not object_data_range.HasField("read_range"): logger.warning( "Received response with missing read_range field; ignoring." ) continue - read_id = object_data_range.read_range.read_id + read_range_pb = object_data_range.read_range + read_id = read_range_pb.read_id + if read_id not in download_states: logger.warning( f"Received data for unknown or stale read_id {read_id}; ignoring." @@ -107,7 +112,8 @@ def update_state_from_response( read_state = download_states[read_id] # Offset Verification - chunk_offset = object_data_range.read_range.read_offset + # We must validate data before updating state or writing to buffer. + chunk_offset = read_range_pb.read_offset if chunk_offset != read_state.next_expected_offset: raise DataCorruption( response, @@ -116,11 +122,11 @@ def update_state_from_response( ) # Checksum Verification - # We must validate data before updating state or writing to buffer. - data = object_data_range.checksummed_data.content - server_checksum = object_data_range.checksummed_data.crc32c + checksummed_data = object_data_range.checksummed_data + data = checksummed_data.content - if server_checksum is not None: + if checksummed_data.HasField("crc32c"): + server_checksum = checksummed_data.crc32c client_checksum = int.from_bytes(Checksum(data).digest(), "big") if server_checksum != client_checksum: raise DataCorruption( diff --git a/google/cloud/storage/asyncio/retry/writes_resumption_strategy.py b/google/cloud/storage/asyncio/retry/writes_resumption_strategy.py index b98b9b2e7..f7dcd1e17 100644 --- a/google/cloud/storage/asyncio/retry/writes_resumption_strategy.py +++ b/google/cloud/storage/asyncio/retry/writes_resumption_strategy.py @@ -47,6 +47,11 @@ def __init__( self.chunk_size = chunk_size self.user_buffer = user_buffer self.persisted_size: int = 0 + # Bytes sent to the server (it may be unpersisted), + # i.e. latest object size = persisted_size + some more bytes. + # Please note: these bytes are sent from client to server, server might have also received it. + # but might not have persisted it yet (may be in memory buffer on server side). + # This variable is same as `offset variable` in the instance of `AppendableObjectWriter`. self.bytes_sent: int = 0 self.bytes_since_last_flush: int = 0 self.flush_interval: int = flush_interval @@ -91,7 +96,7 @@ def generate_requests( if write_state.bytes_since_last_flush >= write_state.flush_interval: request.flush = True - # reset counter after marking flush + request.state_lookup = True write_state.bytes_since_last_flush = 0 requests.append(request) diff --git a/google/cloud/storage/bucket.py b/google/cloud/storage/bucket.py index 1621f879e..b4001e09d 100644 --- a/google/cloud/storage/bucket.py +++ b/google/cloud/storage/bucket.py @@ -43,7 +43,11 @@ from google.cloud.storage.acl import DefaultObjectACL from google.cloud.storage.blob import _quote from google.cloud.storage.blob import Blob -from google.cloud.storage.constants import _DEFAULT_TIMEOUT +from google.cloud.storage.constants import ( + _DEFAULT_TIMEOUT, + ENFORCEMENT_MODE_FULLY_RESTRICTED, + ENFORCEMENT_MODE_NOT_RESTRICTED, +) from google.cloud.storage.constants import ARCHIVE_STORAGE_CLASS from google.cloud.storage.constants import COLDLINE_STORAGE_CLASS from google.cloud.storage.constants import DUAL_REGION_LOCATION_TYPE @@ -65,7 +69,6 @@ from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED - _UBLA_BPO_ENABLED_MESSAGE = ( "Pass only one of 'uniform_bucket_level_access_enabled' / " "'bucket_policy_only_enabled' to 'IAMConfiguration'." @@ -2538,6 +2541,25 @@ def cors(self, entries): :rtype: bool or ``NoneType`` """ + @property + def encryption(self): + """Retrieve encryption configuration for this bucket. + + :rtype: :class:`BucketEncryption` + :returns: an instance for managing the bucket's encryption configuration. + """ + info = self._properties.get("encryption", {}) + return BucketEncryption.from_api_repr(info, self) + + @encryption.setter + def encryption(self, value): + """Set encryption configuration for this bucket. + + :type value: :class:`BucketEncryption` or dict + :param value: The encryption configuration. + """ + self._patch_property("encryption", value) + @property def default_kms_key_name(self): """Retrieve / set default KMS encryption key for objects in the bucket. @@ -3965,6 +3987,247 @@ def ip_filter(self, value): self._patch_property(_IP_FILTER_PROPERTY, value) +class EncryptionEnforcementConfig(dict): + """Map a bucket's encryption enforcement configuration. + + :type restriction_mode: str + :param restriction_mode: + (Optional) The restriction mode for the encryption type. + When set to ``FullyRestricted``, the bucket will only allow objects encrypted with the encryption type corresponding to this configuration. + When set to ``NotRestricted``, the bucket will allow objects encrypted with any encryption type. + + :type effective_time: :class:`datetime.datetime` + :param effective_time: + (Output only) The time when the encryption enforcement configuration became effective. + """ + + def __init__(self, restriction_mode=None): + data = {} + if restriction_mode is not None: + # Validate input against allowed constants + allowed = ( + ENFORCEMENT_MODE_FULLY_RESTRICTED, + ENFORCEMENT_MODE_NOT_RESTRICTED, + ) + if restriction_mode not in allowed: + raise ValueError( + f"Invalid restriction_mode: {restriction_mode}. " + f"Must be one of {allowed}" + ) + data["restrictionMode"] = restriction_mode + + super().__init__(data) + + @classmethod + def from_api_repr(cls, resource): + """Factory: construct instance from resource. + + :type resource: dict + :param resource: mapping as returned from API call. + + :rtype: :class:`EncryptionEnforcementConfig` + :returns: Instance created from resource. + """ + instance = cls() + instance.update(resource) + return instance + + @property + def restriction_mode(self): + """Get the restriction mode. + + :rtype: str or ``NoneType`` + :returns: The restriction mode or ``None`` if the property is not set. + """ + return self.get("restrictionMode") + + @restriction_mode.setter + def restriction_mode(self, value): + """Set the restriction mode. + + :type value: str + :param value: The restriction mode. + """ + self["restrictionMode"] = value + + @property + def effective_time(self): + """Get the effective time. + + :rtype: datetime.datetime or ``NoneType`` + :returns: point-in time at which the configuration is effective, + or ``None`` if the property is not set. + """ + timestamp = self.get("effectiveTime") + if timestamp is not None: + return _rfc3339_nanos_to_datetime(timestamp) + + +class BucketEncryption(dict): + """Map a bucket's encryption configuration. + + :type bucket: :class:`Bucket` + :param bucket: Bucket for which this instance is the policy. + + :type default_kms_key_name: str + :param default_kms_key_name: + (Optional) Resource name of KMS key used to encrypt bucket's content. + + :type google_managed_encryption_enforcement_config: :class:`EncryptionEnforcementConfig` + :param google_managed_encryption_enforcement_config: + (Optional) Encryption enforcement configuration for Google managed encryption. + + :type customer_managed_encryption_enforcement_config: :class:`EncryptionEnforcementConfig` + :param customer_managed_encryption_enforcement_config: + (Optional) Encryption enforcement configuration for Customer managed encryption. + + :type customer_supplied_encryption_enforcement_config: :class:`EncryptionEnforcementConfig` + :param customer_supplied_encryption_enforcement_config: + (Optional) Encryption enforcement configuration for Customer supplied encryption. + """ + + def __init__( + self, + bucket, + default_kms_key_name=None, + google_managed_encryption_enforcement_config=None, + customer_managed_encryption_enforcement_config=None, + customer_supplied_encryption_enforcement_config=None, + ): + data = {} + if default_kms_key_name is not None: + data["defaultKmsKeyName"] = default_kms_key_name + + if google_managed_encryption_enforcement_config is not None: + data["googleManagedEncryptionEnforcementConfig"] = ( + google_managed_encryption_enforcement_config + ) + + if customer_managed_encryption_enforcement_config is not None: + data["customerManagedEncryptionEnforcementConfig"] = ( + customer_managed_encryption_enforcement_config + ) + + if customer_supplied_encryption_enforcement_config is not None: + data["customerSuppliedEncryptionEnforcementConfig"] = ( + customer_supplied_encryption_enforcement_config + ) + + super().__init__(data) + self._bucket = bucket + + @classmethod + def from_api_repr(cls, resource, bucket): + """Factory: construct instance from resource. + + :type resource: dict + :param resource: mapping as returned from API call. + + :type bucket: :class:`Bucket` + :params bucket: Bucket for which this instance is the policy. + + :rtype: :class:`BucketEncryption` + :returns: Instance created from resource. + """ + instance = cls(bucket) + instance.update(resource) + return instance + + @property + def bucket(self): + """Bucket for which this instance is the policy. + + :rtype: :class:`Bucket` + :returns: the instance's bucket. + """ + return self._bucket + + @property + def default_kms_key_name(self): + """Retrieve default KMS encryption key for objects in the bucket. + + :rtype: str or ``NoneType`` + :returns: Default KMS encryption key, or ``None`` if not set. + """ + return self.get("defaultKmsKeyName") + + @default_kms_key_name.setter + def default_kms_key_name(self, value): + """Set default KMS encryption key for objects in the bucket. + + :type value: str or None + :param value: new KMS key name (None to clear any existing key). + """ + self["defaultKmsKeyName"] = value + self.bucket._patch_property("encryption", self) + + @property + def google_managed_encryption_enforcement_config(self): + """Retrieve the encryption enforcement configuration for Google managed encryption. + + :rtype: :class:`EncryptionEnforcementConfig` + :returns: The configuration instance. + """ + data = self.get("googleManagedEncryptionEnforcementConfig") + if data: + return EncryptionEnforcementConfig.from_api_repr(data) + return None + + @google_managed_encryption_enforcement_config.setter + def google_managed_encryption_enforcement_config(self, value): + """Set the encryption enforcement configuration for Google managed encryption. + + :type value: :class:`EncryptionEnforcementConfig` or dict + :param value: The configuration instance or dictionary. + """ + self["googleManagedEncryptionEnforcementConfig"] = value + self.bucket._patch_property("encryption", self) + + @property + def customer_managed_encryption_enforcement_config(self): + """Retrieve the encryption enforcement configuration for Customer managed encryption. + + :rtype: :class:`EncryptionEnforcementConfig` + :returns: The configuration instance. + """ + data = self.get("customerManagedEncryptionEnforcementConfig") + if data: + return EncryptionEnforcementConfig.from_api_repr(data) + return None + + @customer_managed_encryption_enforcement_config.setter + def customer_managed_encryption_enforcement_config(self, value): + """Set the encryption enforcement configuration for Customer managed encryption. + + :type value: :class:`EncryptionEnforcementConfig` or dict + :param value: The configuration instance or dictionary. + """ + self["customerManagedEncryptionEnforcementConfig"] = value + self.bucket._patch_property("encryption", self) + + @property + def customer_supplied_encryption_enforcement_config(self): + """Retrieve the encryption enforcement configuration for Customer supplied encryption. + + :rtype: :class:`EncryptionEnforcementConfig` + :returns: The configuration instance. + """ + data = self.get("customerSuppliedEncryptionEnforcementConfig") + if data: + return EncryptionEnforcementConfig.from_api_repr(data) + return None + + @customer_supplied_encryption_enforcement_config.setter + def customer_supplied_encryption_enforcement_config(self, value): + """Set the encryption enforcement configuration for Customer supplied encryption. + + :type value: :class:`EncryptionEnforcementConfig` or dict + :param value: The configuration instance or dictionary. + """ + self["customerSuppliedEncryptionEnforcementConfig"] = value + self.bucket._patch_property("encryption", self) + + class SoftDeletePolicy(dict): """Map a bucket's soft delete policy. diff --git a/google/cloud/storage/constants.py b/google/cloud/storage/constants.py index eba0a19df..c6c1b63c4 100644 --- a/google/cloud/storage/constants.py +++ b/google/cloud/storage/constants.py @@ -137,3 +137,9 @@ See: https://cloud.google.com/storage/docs/managing-turbo-replication """ + +ENFORCEMENT_MODE_FULLY_RESTRICTED = "FullyRestricted" +"""Bucket encryption restriction mode where encryption is fully restricted.""" + +ENFORCEMENT_MODE_NOT_RESTRICTED = "NotRestricted" +"""Bucket encryption restriction mode where encryption is not restricted.""" diff --git a/google/cloud/storage/exceptions.py b/google/cloud/storage/exceptions.py index 4eb05cef7..12f69071b 100644 --- a/google/cloud/storage/exceptions.py +++ b/google/cloud/storage/exceptions.py @@ -33,6 +33,12 @@ DataCorruptionDynamicParent = Exception +class InvalidPathError(Exception): + """Raised when the provided path string is malformed.""" + + pass + + class InvalidResponse(InvalidResponseDynamicParent): """Error class for responses which are not in the correct state. diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index a34419020..7f4173690 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -25,6 +25,7 @@ import struct import base64 import functools +from pathlib import Path from google.api_core import exceptions from google.cloud.storage import Client @@ -38,7 +39,7 @@ from google.cloud.storage._media.requests.upload import XMLMPUContainer from google.cloud.storage._media.requests.upload import XMLMPUPart -from google.cloud.storage.exceptions import DataCorruption +from google.cloud.storage.exceptions import DataCorruption, InvalidPathError TM_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024 DEFAULT_MAX_WORKERS = 8 @@ -231,9 +232,11 @@ def upload_many( executor.submit( _call_method_on_maybe_pickled_blob, _pickle_client(blob) if needs_pickling else blob, - "_handle_filename_and_upload" - if isinstance(path_or_file, str) - else "_prep_and_do_upload", + ( + "_handle_filename_and_upload" + if isinstance(path_or_file, str) + else "_prep_and_do_upload" + ), path_or_file, **upload_kwargs, ) @@ -259,6 +262,18 @@ def upload_many( return results +def _resolve_path(target_dir, blob_path): + if os.name == "nt" and ":" in blob_path: + raise InvalidPathError(f"{blob_path} cannot be downloaded into {target_dir}") + target_dir = Path(target_dir) + blob_path = Path(blob_path) + # blob_path.anchor will be '/' if `blob_path` is full path else it'll empty. + # This is useful to concatnate target_dir = /local/target , and blob_path = + # /usr/local/mybin into /local/target/usr/local/mybin + concatenated_path = target_dir / blob_path.relative_to(blob_path.anchor) + return concatenated_path.resolve() + + @_deprecate_threads_param def download_many( blob_file_pairs, @@ -384,9 +399,11 @@ def download_many( executor.submit( _call_method_on_maybe_pickled_blob, _pickle_client(blob) if needs_pickling else blob, - "_handle_filename_and_download" - if isinstance(path_or_file, str) - else "_prep_and_do_download", + ( + "_handle_filename_and_download" + if isinstance(path_or_file, str) + else "_prep_and_do_download" + ), path_or_file, **download_kwargs, ) @@ -618,7 +635,8 @@ def download_many_to_path( """Download many files concurrently by their blob names. The destination files are automatically created, with paths based on the - source blob_names and the destination_directory. + source `blob_names` and the `destination_directory`. + The destination files are not automatically deleted if their downloads fail, so please check the return value of this function for any exceptions, or @@ -629,6 +647,50 @@ def download_many_to_path( "images/icon.jpg" will be downloaded to a file named "/home/myuser/icon.jpg". + + Note1: if the path after combining `blob_name` and `destination_directory` + resolves outside `destination_directory` a warning will be issued and the + that particular blob will NOT be downloaded. This may happen in scenarios + where `blob_name` contains "../" + + For example, + consider `destination_directory` is "downloads/gcs_blobs" and + `blob_name` is '../hello.blob'. This blob will not be downloaded + because the final resolved path would be "downloads/hello.blob" + + + To give further examples, the following blobs will not be downloaded because + it "escapes" the "destination_directory" + + "../../local/target", # skips download + "../escape.txt", # skips download + "go/four/levels/deep/../../../../../somefile1", # skips download + "go/four/levels/deep/../some_dir/../../../../../invalid/path1" # skips download + + however the following blobs will be downloaded because the final resolved + destination_directory is still child of given destination_directory + + "data/../sibling.txt", + "dir/./file.txt", + "go/four/levels/deep/../somefile2", + "go/four/levels/deep/../some_dir/valid/path1", + "go/four/levels/deep/../some_dir/../../../../valid/path2", + + It is adviced to use other APIs such as `transfer_manager.download_many` or + `Blob.download_to_filename` or `Blob.download_to_file` to download such blobs. + + + Note2: + The resolved download_directory will always be relative to user provided + `destination_directory`. For example, + + a `blob_name` "/etc/passwd" will be downloaded into + "destination_directory/etc/passwd" instead of "/etc/passwd" + Similarly, + "/tmp/my_fav_blob" downloads to "destination_directory/tmp/my_fav_blob" + + + :type bucket: :class:`google.cloud.storage.bucket.Bucket` :param bucket: The bucket which contains the blobs to be downloaded @@ -646,9 +708,8 @@ def download_many_to_path( :type destination_directory: str :param destination_directory: - A string that will be prepended (with os.path.join()) to each blob_name - in the input list, in order to determine the destination path for that - blob. + A string that will be prepended to each blob_name in the input list, in + order to determine the destination path for that blob. For instance, if the destination_directory string is "/tmp/img" and a blob_name is "0001.jpg", with an empty blob_name_prefix, then the source @@ -656,8 +717,9 @@ def download_many_to_path( This parameter can be an empty string. - Note that this parameter allows directory traversal (e.g. "/", "../") - and is not intended for unsanitized end user input. + Note directory traversal may be possible as long as the final + (e.g. "/", "../") resolved path is inside "destination_directory". + See examples above. :type blob_name_prefix: str :param blob_name_prefix: @@ -745,32 +807,65 @@ def download_many_to_path( :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. - :rtype: list + :rtype: List[None|Exception|UserWarning] :returns: A list of results corresponding to, in order, each item in the - input list. If an exception was received, it will be the result - for that operation. Otherwise, the return value from the successful - download method is used (which will be None). + input list. If an exception was received or a download was skipped + (e.g., due to existing file or path traversal), it will be the result + for that operation (as an Exception or UserWarning, respectively). + Otherwise, the result will be None for a successful download. """ + results = [None] * len(blob_names) blob_file_pairs = [] + indices_to_process = [] - for blob_name in blob_names: + for i, blob_name in enumerate(blob_names): full_blob_name = blob_name_prefix + blob_name - path = os.path.join(destination_directory, blob_name) + try: + resolved_path = _resolve_path(destination_directory, blob_name) + except InvalidPathError as e: + msg = f"The blob {blob_name} will **NOT** be downloaded. {e}" + warnings.warn(msg) + results[i] = UserWarning(msg) + continue + if not resolved_path.parent.is_relative_to( + Path(destination_directory).resolve() + ): + msg = ( + f"The blob {blob_name} will **NOT** be downloaded. " + f"The resolved destination_directory - {resolved_path.parent} - is either invalid or " + f"escapes user provided {Path(destination_directory).resolve()} . Please download this file separately using `download_to_filename`" + ) + warnings.warn(msg) + results[i] = UserWarning(msg) + continue + + resolved_path = str(resolved_path) + if skip_if_exists and os.path.isfile(resolved_path): + msg = f"The blob {blob_name} is skipped because destination file already exists" + results[i] = UserWarning(msg) + continue + if create_directories: - directory, _ = os.path.split(path) + directory, _ = os.path.split(resolved_path) os.makedirs(directory, exist_ok=True) - blob_file_pairs.append((bucket.blob(full_blob_name), path)) + blob_file_pairs.append((bucket.blob(full_blob_name), resolved_path)) + indices_to_process.append(i) - return download_many( + many_results = download_many( blob_file_pairs, download_kwargs=download_kwargs, deadline=deadline, raise_exception=raise_exception, worker_type=worker_type, max_workers=max_workers, - skip_if_exists=skip_if_exists, + skip_if_exists=False, # skip_if_exists is handled in the loop above ) + for meta_index, result in zip(indices_to_process, many_results): + results[meta_index] = result + + return results + def download_chunks_concurrently( blob, diff --git a/google/cloud/storage/version.py b/google/cloud/storage/version.py index 0bc275357..b674396b2 100644 --- a/google/cloud/storage/version.py +++ b/google/cloud/storage/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "3.9.0" +__version__ = "3.10.0" diff --git a/noxfile.py b/noxfile.py index 1cef2a75f..6bce85327 100644 --- a/noxfile.py +++ b/noxfile.py @@ -17,7 +17,6 @@ from __future__ import absolute_import import os import pathlib -import re import shutil import nox @@ -27,9 +26,8 @@ BLACK_PATHS = ["docs", "google", "tests", "noxfile.py", "setup.py"] DEFAULT_PYTHON_VERSION = "3.14" -SYSTEM_TEST_PYTHON_VERSIONS = ["3.9", "3.14"] +SYSTEM_TEST_PYTHON_VERSIONS = ["3.10", "3.14"] UNIT_TEST_PYTHON_VERSIONS = [ - "3.9", "3.10", "3.11", "3.12", @@ -51,9 +49,6 @@ "lint", "lint_setup_py", "system", - # TODO(https://github.com/googleapis/python-storage/issues/1499): - # Remove or restore testing for Python 3.7/3.8 - "unit-3.9", "unit-3.10", "unit-3.11", "unit-3.12", @@ -241,6 +236,7 @@ def conftest_retry(session): session.install( "pytest", "pytest-xdist", + "pytest-asyncio", "grpcio", "grpcio-status", "grpc-google-iam-v1", @@ -252,13 +248,15 @@ def conftest_retry(session): # Run #CPU processes in parallel if no test session arguments are passed in. if session.posargs: test_cmd = [ - "py.test", - "--quiet", + "pytest", + "-vv", + "-s", + # "--quiet", conformance_test_folder_path, *session.posargs, ] else: - test_cmd = ["py.test", "-n", "auto", "--quiet", conformance_test_folder_path] + test_cmd = ["pytest", "-vv", "-s", "-n", "auto", conformance_test_folder_path] # Run py.test against the conformance tests. session.run(*test_cmd, env={"DOCKER_API_VERSION": "1.39"}) diff --git a/samples/generated_samples/snippet_metadata_google.storage.v2.json b/samples/generated_samples/snippet_metadata_google.storage.v2.json index 1889f0c5d..45582dd31 100644 --- a/samples/generated_samples/snippet_metadata_google.storage.v2.json +++ b/samples/generated_samples/snippet_metadata_google.storage.v2.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-storage", - "version": "3.9.0" + "version": "3.10.0" }, "snippets": [ { diff --git a/samples/snippets/hmac_samples_test.py b/samples/snippets/hmac_samples_test.py index 988b40305..fbc2e292d 100644 --- a/samples/snippets/hmac_samples_test.py +++ b/samples/snippets/hmac_samples_test.py @@ -17,7 +17,6 @@ set in order to run. """ - import os import google.api_core.exceptions @@ -51,9 +50,18 @@ def new_hmac_key(): NOTE: Due to the module scope, test order in this file is significant """ - hmac_key, secret = STORAGE_CLIENT.create_hmac_key( - service_account_email=SERVICE_ACCOUNT_EMAIL, project_id=PROJECT_ID - ) + try: + hmac_key, secret = STORAGE_CLIENT.create_hmac_key( + service_account_email=SERVICE_ACCOUNT_EMAIL, project_id=PROJECT_ID + ) + except google.api_core.exceptions.PreconditionFailed as e: + # Check if the failure is due to the Organization Policy constraint + if "constraints/iam.disableServiceAccountKeyCreation" in str(e): + pytest.skip( + "Temporary skip: HMAC key creation is disabled by organization policy " + "on project python-docs-samples-tests. See b/493225655." + ) + raise yield hmac_key # Re-fetch the key metadata in case state has changed during the test. hmac_key = STORAGE_CLIENT.get_hmac_key_metadata( @@ -77,9 +85,16 @@ def test_list_keys(capsys, new_hmac_key): def test_create_key(capsys): - hmac_key = storage_create_hmac_key.create_key( - PROJECT_ID, SERVICE_ACCOUNT_EMAIL - ) + try: + hmac_key = storage_create_hmac_key.create_key(PROJECT_ID, SERVICE_ACCOUNT_EMAIL) + except google.api_core.exceptions.PreconditionFailed as e: + if "constraints/iam.disableServiceAccountKeyCreation" in str(e): + pytest.skip( + "Temporary skip: HMAC key creation is disabled by organization policy " + "on project python-docs-samples-tests. See b/493225655." + ) + raise + hmac_key.state = "INACTIVE" hmac_key.update() hmac_key.delete() diff --git a/samples/snippets/notification_polling.py b/samples/snippets/notification_polling.py index 2ee6789c3..1359c9cfa 100644 --- a/samples/snippets/notification_polling.py +++ b/samples/snippets/notification_polling.py @@ -32,10 +32,10 @@ https://console.cloud.google.com/flows/enableapi?apiid=pubsub 3. Create a Google Cloud Storage bucket: - $ gsutil mb gs://testbucket + $ gcloud storage buckets create gs://testbucket 4. Create a Cloud Pub/Sub topic and publish bucket notifications there: - $ gsutil notification create -f json -t testtopic gs://testbucket + $ gcloud storage buckets notifications create gs://testbucket --topic=testtopic --payload-format=json 5. Create a subscription for your new topic: $ gcloud pubsub subscriptions create testsubscription --topic=testtopic diff --git a/samples/snippets/storage_transfer_manager_download_many.py b/samples/snippets/storage_transfer_manager_download_many.py index 02cb9b887..447d0869c 100644 --- a/samples/snippets/storage_transfer_manager_download_many.py +++ b/samples/snippets/storage_transfer_manager_download_many.py @@ -12,9 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Example usage: +# python samples/snippets/storage_transfer_manager_download_many.py \ +# --bucket_name \ +# --blobs \ +# --destination_directory \ +# --blob_name_prefix + + # [START storage_transfer_manager_download_many] def download_many_blobs_with_transfer_manager( - bucket_name, blob_names, destination_directory="", workers=8 + bucket_name, blob_names, destination_directory="", blob_name_prefix="", workers=8 ): """Download blobs in a list by name, concurrently in a process pool. @@ -36,11 +44,11 @@ def download_many_blobs_with_transfer_manager( # blob_names = ["myblob", "myblob2"] # The directory on your computer to which to download all of the files. This - # string is prepended (with os.path.join()) to the name of each blob to form - # the full path. Relative paths and absolute paths are both accepted. An - # empty string means "the current working directory". Note that this - # parameter allows accepts directory traversal ("../" etc.) and is not - # intended for unsanitized end user input. + # string is prepended to the name of each blob to form the full path using + # pathlib. Relative paths and absolute paths are both accepted. An empty + # string means "the current working directory". Note that this parameter + # will NOT allow files to escape the destination_directory and will skip + # downloads that attempt directory traversal outside of it. # destination_directory = "" # The maximum number of processes to use for the operation. The performance @@ -56,15 +64,63 @@ def download_many_blobs_with_transfer_manager( bucket = storage_client.bucket(bucket_name) results = transfer_manager.download_many_to_path( - bucket, blob_names, destination_directory=destination_directory, max_workers=workers + bucket, + blob_names, + destination_directory=destination_directory, + blob_name_prefix=blob_name_prefix, + max_workers=workers, ) for name, result in zip(blob_names, results): - # The results list is either `None` or an exception for each blob in + # The results list is either `None`, an exception, or a warning for each blob in # the input list, in order. - - if isinstance(result, Exception): + if isinstance(result, UserWarning): + print("Skipped download for {} due to warning: {}".format(name, result)) + elif isinstance(result, Exception): print("Failed to download {} due to exception: {}".format(name, result)) else: - print("Downloaded {} to {}.".format(name, destination_directory + name)) + print( + "Downloaded {} inside {} directory.".format(name, destination_directory) + ) + + # [END storage_transfer_manager_download_many] + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser( + description="Download blobs in a list by name, concurrently in a process pool." + ) + parser.add_argument( + "--bucket_name", required=True, help="The name of your GCS bucket" + ) + parser.add_argument( + "--blobs", + nargs="+", + required=True, + help="The list of blob names to download", + ) + parser.add_argument( + "--destination_directory", + default="", + help="The directory on your computer to which to download all of the files", + ) + parser.add_argument( + "--blob_name_prefix", + default="", + help="A string that will be prepended to each blob_name to determine the source blob name", + ) + parser.add_argument( + "--workers", type=int, default=8, help="The maximum number of processes to use" + ) + + args = parser.parse_args() + + download_many_blobs_with_transfer_manager( + bucket_name=args.bucket_name, + blob_names=args.blobs, + destination_directory=args.destination_directory, + blob_name_prefix=args.blob_name_prefix, + workers=args.workers, + ) diff --git a/samples/snippets/zonal_buckets/storage_read_appendable_object_tail.py b/samples/snippets/zonal_buckets/storage_read_appendable_object_tail.py index 9e4dcd738..624898066 100644 --- a/samples/snippets/zonal_buckets/storage_read_appendable_object_tail.py +++ b/samples/snippets/zonal_buckets/storage_read_appendable_object_tail.py @@ -28,7 +28,7 @@ AsyncMultiRangeDownloader, ) -BYTES_TO_APPEND = b"fav_bytes." +BYTES_TO_APPEND = b"fav_bytes." * 100 * 1024 * 1024 NUM_BYTES_TO_APPEND_EVERY_SECOND = len(BYTES_TO_APPEND) @@ -37,14 +37,16 @@ async def appender(writer: AsyncAppendableObjectWriter, duration: int): """Appends 10 bytes to the object every second for a given duration.""" print("Appender started.") bytes_appended = 0 - for i in range(duration): + start_time = time.monotonic() + # Run the appender for the specified duration. + while time.monotonic() - start_time < duration: await writer.append(BYTES_TO_APPEND) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] bytes_appended += NUM_BYTES_TO_APPEND_EVERY_SECOND print( f"[{now}] Appended {NUM_BYTES_TO_APPEND_EVERY_SECOND} new bytes. Total appended: {bytes_appended} bytes." ) - await asyncio.sleep(1) + await asyncio.sleep(0.1) print("Appender finished.") @@ -67,9 +69,7 @@ async def tailer( bytes_downloaded = output_buffer.getbuffer().nbytes if bytes_downloaded > 0: now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - print( - f"[{now}] Tailer read {bytes_downloaded} new bytes: {output_buffer.getvalue()}" - ) + print(f"[{now}] Tailer read {bytes_downloaded} new bytes: ") start_byte += bytes_downloaded await asyncio.sleep(0.1) # Poll for new data every 0.1 seconds. diff --git a/setup.py b/setup.py index 02cd11140..69fc2899e 100644 --- a/setup.py +++ b/setup.py @@ -123,9 +123,6 @@ "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", @@ -138,7 +135,7 @@ packages=packages, install_requires=dependencies, extras_require=extras, - python_requires=">=3.7", + python_requires=">=3.10", include_package_data=True, zip_safe=False, ) diff --git a/tests/conformance/_utils.py b/tests/conformance/_utils.py new file mode 100644 index 000000000..496b1c9e0 --- /dev/null +++ b/tests/conformance/_utils.py @@ -0,0 +1,32 @@ +import time +import requests +import traceback + +def start_grpc_server(grpc_endpoint, http_endpoint): + """Starts the testbench gRPC server if it's not already running. + + this essentially makes - + + `curl -s --retry 5 --retry-max-time 40 "http://localhost:9000/start_grpc?port=8888"` + """ + start_time = time.time() + max_time = 40 + retries = 5 + port = grpc_endpoint.split(":")[-1] + url = f"{http_endpoint}/start_grpc?port={port}" + + for i in range(retries): + try: + response = requests.get(url, timeout=10) + if response.status_code == 200: + return + except requests.exceptions.RequestException: + print("Failed to create grpc server", traceback.format_exc()) + raise + + elapsed_time = time.time() - start_time + if elapsed_time >= max_time: + raise RuntimeError("Failed to start gRPC server within the time limit.") + + # backoff + time.sleep(1) diff --git a/tests/conformance/test_bidi_reads.py b/tests/conformance/test_bidi_reads.py index 4157182cb..efb9671a3 100644 --- a/tests/conformance/test_bidi_reads.py +++ b/tests/conformance/test_bidi_reads.py @@ -1,23 +1,68 @@ -import asyncio import io +import subprocess +import time +import traceback +import urllib import uuid + import grpc +import pytest import requests - -from google.api_core import exceptions +from google.api_core import client_options, exceptions from google.auth import credentials as auth_credentials + from google.cloud import _storage_v2 as storage_v2 +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage.asyncio.async_multi_range_downloader import \ + AsyncMultiRangeDownloader +from tests.conformance._utils import start_grpc_server -from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( - AsyncMultiRangeDownloader, +# --- Configuration --- + + +TEST_BENCH_ENDPOINT = ( + "http://localhost:9001" # 9000 in VM is taken by test_conformance.py ) +_PORT = urllib.parse.urlsplit(TEST_BENCH_ENDPOINT).port +_GRPC_PORT = 8888 -# --- Configuration --- PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. -GRPC_ENDPOINT = "localhost:8888" -HTTP_ENDPOINT = "http://localhost:9000" +GRPC_ENDPOINT = f"localhost:{_GRPC_PORT}" CONTENT_LENGTH = 1024 * 10 # 10 KB +_DEFAULT_IMAGE_NAME = "gcr.io/cloud-devrel-public-resources/storage-testbench" +_DEFAULT_IMAGE_TAG = "latest" +_DOCKER_IMAGE = f"{_DEFAULT_IMAGE_NAME}:{_DEFAULT_IMAGE_TAG}" +_PULL_CMD = ["docker", "pull", _DOCKER_IMAGE] +_RUN_CMD = [ + "docker", + "run", + "--name", + "bidi_reads_container", + "--rm", + "-d", + "-p", + f"{_PORT}:9000", + "-p", + f"{_GRPC_PORT}:{_GRPC_PORT}", + _DOCKER_IMAGE, +] +_DOCKER_STOP_CMD = [ + "docker", + "stop", + "bidi_reads_container", +] + + +@pytest.fixture(scope="module") +def testbench(): + subprocess.run(_PULL_CMD) + proc = subprocess.Popen(_RUN_CMD) + time.sleep(10) + yield GRPC_ENDPOINT, TEST_BENCH_ENDPOINT + subprocess.run(_DOCKER_STOP_CMD) + proc.kill() + def _is_retriable(exc): """Predicate for identifying retriable errors.""" @@ -32,9 +77,7 @@ def _is_retriable(exc): ) -async def run_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario -): +async def run_test_scenario(http_client, bucket_name, object_name, scenario): """Runs a single fault-injection test scenario.""" print(f"\n--- RUNNING SCENARIO: {scenario['name']} ---") @@ -45,13 +88,18 @@ async def run_test_scenario( "instructions": {scenario["method"]: [scenario["instruction"]]}, "transport": "GRPC", } - resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config) + resp = http_client.post( + f"{TEST_BENCH_ENDPOINT}/retry_test", json=retry_test_config + ) resp.raise_for_status() retry_test_id = resp.json()["id"] # 2. Set up downloader and metadata for fault injection. + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) downloader = await AsyncMultiRangeDownloader.create_mrd( - gapic_client, bucket_name, object_name + grpc_client, bucket_name, object_name ) fault_injection_metadata = (("x-retry-test-id", retry_test_id),) @@ -79,11 +127,17 @@ async def run_test_scenario( finally: # 4. Clean up the Retry Test resource. if retry_test_id: - http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") + http_client.delete(f"{TEST_BENCH_ENDPOINT}/retry_test/{retry_test_id}") -async def main(): +@pytest.mark.asyncio +async def test_bidi_reads(testbench): """Main function to set up resources and run all test scenarios.""" + grpc_endpoint, test_bench_endpoint = testbench + print("starting grpc server", grpc_endpoint, test_bench_endpoint) + start_grpc_server( + grpc_endpoint, test_bench_endpoint + ) # Ensure the testbench gRPC server is running before this test executes. channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) creds = auth_credentials.AnonymousCredentials() transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( @@ -97,42 +151,12 @@ async def main(): # Define all test scenarios test_scenarios = [ - { - "name": "Retry on Service Unavailable (503)", - "method": "storage.objects.get", - "instruction": "return-503", - "expected_error": None, - }, - { - "name": "Retry on 500", - "method": "storage.objects.get", - "instruction": "return-500", - "expected_error": None, - }, - { - "name": "Retry on 504", - "method": "storage.objects.get", - "instruction": "return-504", - "expected_error": None, - }, - { - "name": "Retry on 429", - "method": "storage.objects.get", - "instruction": "return-429", - "expected_error": None, - }, { "name": "Smarter Resumption: Retry 503 after partial data", "method": "storage.objects.get", "instruction": "return-broken-stream-after-2K", "expected_error": None, }, - { - "name": "Retry on BidiReadObjectRedirectedError", - "method": "storage.objects.get", - "instruction": "redirect-send-handle-and-token-tokenval", # Testbench instruction for redirect - "expected_error": None, - }, ] try: @@ -161,9 +185,7 @@ async def write_req_gen(): # Run all defined test scenarios. for scenario in test_scenarios: - await run_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario - ) + await run_test_scenario(http_client, bucket_name, object_name, scenario) # Define and run test scenarios specifically for the open() method open_test_scenarios = [ @@ -185,16 +207,33 @@ async def write_req_gen(): "instruction": "return-401", "expected_error": exceptions.Unauthorized, }, + { + "name": "Retry on 500", + "method": "storage.objects.get", + "instruction": "return-500", + "expected_error": None, + }, + { + "name": "Retry on 504", + "method": "storage.objects.get", + "instruction": "return-504", + "expected_error": None, + }, + { + "name": "Retry on 429", + "method": "storage.objects.get", + "instruction": "return-429", + "expected_error": None, + }, ] for scenario in open_test_scenarios: await run_open_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario + http_client, bucket_name, object_name, scenario ) - except Exception: - import traceback - - traceback.print_exc() + except Exception as e: + print(f"Test failed with error: {e}. Traceback: {traceback.format_exc()}") + raise e finally: # Clean up the test bucket. try: @@ -211,11 +250,9 @@ async def write_req_gen(): print(f"Warning: Cleanup failed: {e}") -async def run_open_test_scenario( - gapic_client, http_client, bucket_name, object_name, scenario -): +async def run_open_test_scenario(http_client, bucket_name, object_name, scenario): """Runs a fault-injection test scenario specifically for the open() method.""" - print(f"\n--- RUNNING SCENARIO: {scenario['name']} ---") + print(f"\n--- RUNNING OPEN SCENARIO: {scenario['name']} ---") retry_test_id = None try: @@ -224,18 +261,22 @@ async def run_open_test_scenario( "instructions": {scenario["method"]: [scenario["instruction"]]}, "transport": "GRPC", } - resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config) + resp = http_client.post( + f"{TEST_BENCH_ENDPOINT}/retry_test", json=retry_test_config + ) resp.raise_for_status() retry_test_id = resp.json()["id"] - print(f"Retry Test created with ID: {retry_test_id}") # 2. Set up metadata for fault injection. fault_injection_metadata = (("x-retry-test-id", retry_test_id),) # 3. Execute the open (via create_mrd) and assert the outcome. try: + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) downloader = await AsyncMultiRangeDownloader.create_mrd( - gapic_client, + grpc_client, bucket_name, object_name, metadata=fault_injection_metadata, @@ -259,8 +300,4 @@ async def run_open_test_scenario( finally: # 4. Clean up the Retry Test resource. if retry_test_id: - http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") - - -if __name__ == "__main__": - asyncio.run(main()) + http_client.delete(f"{TEST_BENCH_ENDPOINT}/retry_test/{retry_test_id}") diff --git a/tests/conformance/test_bidi_writes.py b/tests/conformance/test_bidi_writes.py index 90dfaf5f8..852f43bfc 100644 --- a/tests/conformance/test_bidi_writes.py +++ b/tests/conformance/test_bidi_writes.py @@ -1,22 +1,67 @@ -import asyncio +import subprocess +import time +import urllib import uuid + import grpc +import pytest import requests - -from google.api_core import exceptions +from google.api_core import client_options, exceptions +from google.api_core.retry_async import AsyncRetry from google.auth import credentials as auth_credentials + from google.cloud import _storage_v2 as storage_v2 +from google.cloud.storage.asyncio.async_appendable_object_writer import \ + AsyncAppendableObjectWriter +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +from tests.conformance._utils import start_grpc_server -from google.api_core.retry_async import AsyncRetry -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( - AsyncAppendableObjectWriter, +# --- Configuration --- +TEST_BENCH_ENDPOINT = ( + "http://localhost:9002" # 9000 in VM is taken by test_conformance.py, 9001 by reads ) +_PORT = urllib.parse.urlsplit(TEST_BENCH_ENDPOINT).port +_GRPC_PORT = 8888 -# --- Configuration --- PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. -GRPC_ENDPOINT = "localhost:8888" -HTTP_ENDPOINT = "http://localhost:9000" -CONTENT = b"A" * 1024 * 10 # 10 KB +GRPC_ENDPOINT = f"localhost:{_GRPC_PORT}" +HTTP_ENDPOINT = TEST_BENCH_ENDPOINT + +_DEFAULT_IMAGE_NAME = "gcr.io/cloud-devrel-public-resources/storage-testbench" +_DEFAULT_IMAGE_TAG = "latest" +_DOCKER_IMAGE = f"{_DEFAULT_IMAGE_NAME}:{_DEFAULT_IMAGE_TAG}" +_PULL_CMD = ["docker", "pull", _DOCKER_IMAGE] +_RUN_CMD = [ + "docker", + "run", + "--name", + "bidi_writes_container", + "--rm", + "-d", + "-p", + f"{_PORT}:9000", + "-p", + f"{_GRPC_PORT}:8888", + _DOCKER_IMAGE, +] +_DOCKER_STOP_CMD = [ + "docker", + "stop", + "bidi_writes_container", +] + + +@pytest.fixture(scope="module") +def testbench(): + subprocess.run(_PULL_CMD) + proc = subprocess.Popen(_RUN_CMD) + time.sleep(10) + yield GRPC_ENDPOINT, HTTP_ENDPOINT + subprocess.run(_DOCKER_STOP_CMD) + proc.kill() + + +CONTENT = b"A" * 1024 * 1024 * 10 # 10 MiB def _is_retryable(exc): @@ -70,10 +115,14 @@ def on_retry_error(exc): retry_test_id = resp.json()["id"] # 2. Set up writer and metadata for fault injection. + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) writer = AsyncAppendableObjectWriter( - gapic_client, + grpc_client, bucket_name, object_name, + writer_options={"FLUSH_INTERVAL_BYTES": 2 * 1024 * 1024}, ) fault_injection_metadata = (("x-retry-test-id", retry_test_id),) @@ -121,21 +170,20 @@ def on_retry_error(exc): ): raise - if not use_default: - assert ( - retry_count == 0 - ), f"Retry was incorrectly triggered for non-retriable error in {scenario['name']}!" - print(f"Success: caught expected exception for {scenario['name']}: {e}") - finally: # 5. Clean up the Retry Test resource. if retry_test_id: http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") -async def main(): +@pytest.mark.asyncio +async def test_bidi_writes(testbench): """Main function to set up resources and run all test scenarios.""" - channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) + grpc_endpoint, http_endpoint = testbench + start_grpc_server( + grpc_endpoint, http_endpoint + ) # Ensure the testbench gRPC server is running before this test executes. + channel = grpc.aio.insecure_channel(grpc_endpoint) creds = auth_credentials.AnonymousCredentials() transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( channel=channel, @@ -173,10 +221,11 @@ async def main(): "instruction": "return-429", "expected_error": None, }, + # TODO: b/490280918 { "name": "Smarter Resumption: Retry 503 after partial data", "method": "storage.objects.insert", - "instruction": "return-503-after-2K", + "instruction": "return-503-after-3072K", # 3072 KiB == 3 MiB "expected_error": None, }, { @@ -185,40 +234,6 @@ async def main(): "instruction": "redirect-send-handle-and-token-tokenval", "expected_error": None, }, - { - "name": "Fail on 401", - "method": "storage.objects.insert", - "instruction": "return-401", - "expected_error": exceptions.Unauthorized, - }, - { - "name": "Default Policy: Retry on 503", - "method": "storage.objects.insert", - "instruction": "return-503", - "expected_error": None, - "use_default_policy": True, - }, - { - "name": "Default Policy: Retry on 503", - "method": "storage.objects.insert", - "instruction": "return-500", - "expected_error": None, - "use_default_policy": True, - }, - { - "name": "Default Policy: Retry on BidiWriteObjectRedirectedError", - "method": "storage.objects.insert", - "instruction": "redirect-send-handle-and-token-tokenval", - "expected_error": None, - "use_default_policy": True, - }, - { - "name": "Default Policy: Smarter Ressumption", - "method": "storage.objects.insert", - "instruction": "return-503-after-2K", - "expected_error": None, - "use_default_policy": True, - }, ] try: @@ -261,7 +276,3 @@ async def main(): await gapic_client.delete_bucket(request=delete_bucket_req) except Exception as e: print(f"Warning: Cleanup failed: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/tests/perf/microbenchmarks/_utils.py b/tests/perf/microbenchmarks/_utils.py index ff29b8783..9e5609500 100644 --- a/tests/perf/microbenchmarks/_utils.py +++ b/tests/perf/microbenchmarks/_utils.py @@ -11,10 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, List +from typing import Any, List, Optional import statistics import io import os +import socket +import psutil + +_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show def publish_benchmark_extra_info( @@ -22,6 +26,8 @@ def publish_benchmark_extra_info( params: Any, benchmark_group: str = "read", true_times: List[float] = [], + download_bytes_list: Optional[List[int]] = None, + duration: Optional[int] = None, ) -> None: """ Helper function to publish benchmark parameters to the extra_info property. @@ -41,13 +47,24 @@ def publish_benchmark_extra_info( benchmark.extra_info["processes"] = params.num_processes benchmark.group = benchmark_group - object_size = params.file_size_bytes - num_files = params.num_files - total_uploaded_mib = object_size / (1024 * 1024) * num_files - min_throughput = total_uploaded_mib / benchmark.stats["max"] - max_throughput = total_uploaded_mib / benchmark.stats["min"] - mean_throughput = total_uploaded_mib / benchmark.stats["mean"] - median_throughput = total_uploaded_mib / benchmark.stats["median"] + if download_bytes_list is not None: + assert ( + duration is not None + ), "Duration must be provided if total_bytes_transferred is provided." + throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list] + min_throughput = min(throughputs_list) + max_throughput = max(throughputs_list) + mean_throughput = statistics.mean(throughputs_list) + median_throughput = statistics.median(throughputs_list) + + else: + object_size = params.file_size_bytes + num_files = params.num_files + total_uploaded_mib = object_size / (1024 * 1024) * num_files + min_throughput = total_uploaded_mib / benchmark.stats["max"] + max_throughput = total_uploaded_mib / benchmark.stats["min"] + mean_throughput = total_uploaded_mib / benchmark.stats["mean"] + median_throughput = total_uploaded_mib / benchmark.stats["median"] benchmark.extra_info["throughput_MiB_s_min"] = min_throughput benchmark.extra_info["throughput_MiB_s_max"] = max_throughput @@ -165,3 +182,74 @@ def seek(self, offset, whence=io.SEEK_SET): # Clamp position to valid range [0, size] self._pos = max(0, min(new_pos, self._size)) return self._pos + + +def get_nic_pci(nic): + """Gets the PCI address of a network interface.""" + return os.path.basename(os.readlink(f"/sys/class/net/{nic}/device")) + + +def get_irqs_for_pci(pci): + """Gets the IRQs associated with a PCI address.""" + irqs = [] + with open("/proc/interrupts") as f: + for line in f: + if pci in line: + irq = line.split(":")[0].strip() + irqs.append(irq) + return irqs + + +def get_affinity(irq): + """Gets the CPU affinity of an IRQ.""" + path = f"/proc/irq/{irq}/smp_affinity_list" + try: + with open(path) as f: + return f.read().strip() + except FileNotFoundError: + return "N/A" + + +def get_primary_interface_name(): + primary_ip = None + + # 1. Determine the Local IP used for internet access + # We use UDP (SOCK_DGRAM) so we don't actually send a handshake/packet + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # connect() to a public IP (Google DNS) to force route resolution + s.connect(("8.8.8.8", 80)) + primary_ip = s.getsockname()[0] + except Exception: + # Fallback if no internet + return None + finally: + s.close() + + # 2. Match that IP to an interface name using psutil + if primary_ip: + interfaces = psutil.net_if_addrs() + for name, addresses in interfaces.items(): + for addr in addresses: + # check if this interface has the IP we found + if addr.address == primary_ip: + return name + return None + + +def get_irq_affinity(): + """Gets the set of CPUs for a given network interface.""" + nic = get_primary_interface_name() + if not nic: + nic = _C4_STANDARD_192_NIC + + pci = get_nic_pci(nic) + irqs = get_irqs_for_pci(pci) + cpus = set() + for irq in irqs: + affinity_str = get_affinity(irq) + if affinity_str != "N/A": + for part in affinity_str.split(","): + if "-" not in part: + cpus.add(int(part)) + return cpus diff --git a/tests/perf/microbenchmarks/time_based/conftest.py b/tests/perf/microbenchmarks/time_based/conftest.py new file mode 100644 index 000000000..5c0c787f0 --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/conftest.py @@ -0,0 +1,21 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + + +@pytest.fixture +def workload_params(request): + params = request.param + files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)] + return params, files_names diff --git a/tests/perf/microbenchmarks/time_based/reads/config.py b/tests/perf/microbenchmarks/time_based/reads/config.py new file mode 100644 index 000000000..737bb3b84 --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/config.py @@ -0,0 +1,106 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import itertools +import os +from typing import Dict, List + +import yaml + +try: + from tests.perf.microbenchmarks.time_based.reads.parameters import ( + TimeBasedReadParameters, + ) +except ModuleNotFoundError: + from reads.parameters import TimeBasedReadParameters + + +def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: + """Generates a dictionary of benchmark parameters for time based read operations.""" + params: Dict[str, List[TimeBasedReadParameters]] = {} + config_path = os.path.join(os.path.dirname(__file__), "config.yaml") + with open(config_path, "r") as f: + config = yaml.safe_load(f) + + common_params = config["common"] + bucket_types = common_params["bucket_types"] + file_sizes_mib = common_params["file_sizes_mib"] + chunk_sizes_kib = common_params["chunk_sizes_kib"] + num_ranges = common_params["num_ranges"] + rounds = common_params["rounds"] + duration = common_params["duration"] + warmup_duration = common_params["warmup_duration"] + + bucket_map = { + "zonal": os.environ.get( + "DEFAULT_RAPID_ZONAL_BUCKET", + config["defaults"]["DEFAULT_RAPID_ZONAL_BUCKET"], + ), + "regional": os.environ.get( + "DEFAULT_STANDARD_BUCKET", config["defaults"]["DEFAULT_STANDARD_BUCKET"] + ), + } + + for workload in config["workload"]: + workload_name = workload["name"] + params[workload_name] = [] + pattern = workload["pattern"] + processes = workload["processes"] + coros = workload["coros"] + + # Create a product of all parameter combinations + product = itertools.product( + bucket_types, + file_sizes_mib, + chunk_sizes_kib, + num_ranges, + processes, + coros, + ) + + for ( + bucket_type, + file_size_mib, + chunk_size_kib, + num_ranges_val, + num_processes, + num_coros, + ) in product: + file_size_bytes = file_size_mib * 1024 * 1024 + chunk_size_bytes = chunk_size_kib * 1024 + bucket_name = bucket_map[bucket_type] + + num_files = num_processes * num_coros + + # Create a descriptive name for the parameter set + name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges" + + params[workload_name].append( + TimeBasedReadParameters( + name=name, + workload_name=workload_name, + pattern=pattern, + bucket_name=bucket_name, + bucket_type=bucket_type, + num_coros=num_coros, + num_processes=num_processes, + num_files=num_files, + rounds=rounds, + chunk_size_bytes=chunk_size_bytes, + file_size_bytes=file_size_bytes, + duration=duration, + warmup_duration=warmup_duration, + num_ranges=num_ranges_val, + ) + ) + return params diff --git a/tests/perf/microbenchmarks/time_based/reads/config.yaml b/tests/perf/microbenchmarks/time_based/reads/config.yaml new file mode 100644 index 000000000..e739bfd2f --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/config.yaml @@ -0,0 +1,28 @@ +common: + bucket_types: + - "regional" + - "zonal" + file_sizes_mib: + - 10240 # 10GiB + chunk_sizes_kib: [64] # 16KiB + num_ranges: [1] + rounds: 1 + duration: 30 # seconds + warmup_duration: 5 # seconds + +workload: + ############# multi process multi coroutine ######### + - name: "read_seq_multi_process" + pattern: "seq" + coros: [1] + processes: [96] + + + - name: "read_rand_multi_process" + pattern: "rand" + coros: [1] + processes: [1] + +defaults: + DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb" + DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb" \ No newline at end of file diff --git a/tests/perf/microbenchmarks/time_based/reads/parameters.py b/tests/perf/microbenchmarks/time_based/reads/parameters.py new file mode 100644 index 000000000..6ed2da210 --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/parameters.py @@ -0,0 +1,23 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from dataclasses import dataclass +from tests.perf.microbenchmarks.parameters import IOBenchmarkParameters + + +@dataclass +class TimeBasedReadParameters(IOBenchmarkParameters): + pattern: str + duration: int + warmup_duration: int + num_ranges: int diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py new file mode 100644 index 000000000..17e6d48fd --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -0,0 +1,227 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Microbenchmarks for time-based Google Cloud Storage read operations.""" + +import time +import asyncio +import random +import logging +import os +import multiprocessing + +import pytest + +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) +from tests.perf.microbenchmarks._utils import ( + get_irq_affinity, + publish_benchmark_extra_info, +) +from tests.perf.microbenchmarks.conftest import ( + publish_resource_metrics, +) +from io import BytesIO +import tests.perf.microbenchmarks.time_based.reads.config as config + +all_params = config._get_params() + + +async def create_client(): + """Initializes async client and gets the current event loop.""" + return AsyncGrpcClient() + + +# --- Global Variables for Worker Process --- +worker_loop = None +worker_client = None +worker_json_client = None + + +# TODO: b/479135274 close clients properly. +def _worker_init(bucket_type): + """Initializes a persistent event loop and client for each worker process.""" + cpu_affinity = get_irq_affinity() + if cpu_affinity: + os.sched_setaffinity( + 0, {i for i in range(0, os.cpu_count()) if i not in cpu_affinity} + ) + + global worker_loop, worker_client, worker_json_client + if bucket_type == "zonal": + worker_loop = asyncio.new_event_loop() + asyncio.set_event_loop(worker_loop) + worker_client = worker_loop.run_until_complete(create_client()) + else: # regional + from google.cloud import storage + + worker_json_client = storage.Client() + + +def _download_time_based_json(client, filename, params): + """Performs time-based downloads using the JSON API.""" + total_bytes_downloaded = 0 + bucket = client.bucket(params.bucket_name) + blob = bucket.blob(filename) + + offset = 0 + is_warming_up = True + start_time = time.monotonic() + warmup_end_time = start_time + params.warmup_duration + test_end_time = warmup_end_time + params.duration + + while time.monotonic() < test_end_time: + current_time = time.monotonic() + if is_warming_up and current_time >= warmup_end_time: + is_warming_up = False + total_bytes_downloaded = 0 # Reset counter after warmup + + bytes_in_iteration = 0 + # For JSON, we can't batch ranges like gRPC, so we download one by one + for _ in range(params.num_ranges): + if params.pattern == "rand": + offset = random.randint( + 0, params.file_size_bytes - params.chunk_size_bytes + ) + + data = blob.download_as_bytes( + start=offset, end=offset + params.chunk_size_bytes - 1 + ) + bytes_in_iteration += len(data) + + if params.pattern == "seq": + offset += params.chunk_size_bytes + if offset + params.chunk_size_bytes > params.file_size_bytes: + offset = 0 + + assert bytes_in_iteration == params.chunk_size_bytes * params.num_ranges + + if not is_warming_up: + total_bytes_downloaded += bytes_in_iteration + + return total_bytes_downloaded + + +async def _download_time_based_async(client, filename, params): + total_bytes_downloaded = 0 + + mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename) + await mrd.open() + + offset = 0 + is_warming_up = True + start_time = time.monotonic() + warmup_end_time = start_time + params.warmup_duration + test_end_time = warmup_end_time + params.duration + + while time.monotonic() < test_end_time: + current_time = time.monotonic() + if is_warming_up and current_time >= warmup_end_time: + is_warming_up = False + total_bytes_downloaded = 0 # Reset counter after warmup + + ranges = [] + if params.pattern == "rand": + for _ in range(params.num_ranges): + offset = random.randint( + 0, params.file_size_bytes - params.chunk_size_bytes + ) + ranges.append((offset, params.chunk_size_bytes, BytesIO())) + else: # seq + for _ in range(params.num_ranges): + ranges.append((offset, params.chunk_size_bytes, BytesIO())) + offset += params.chunk_size_bytes + if offset + params.chunk_size_bytes > params.file_size_bytes: + offset = 0 # Reset offset if end of file is reached + + await mrd.download_ranges(ranges) + + bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges) + assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges + + if not is_warming_up: + total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges + + await mrd.close() + return total_bytes_downloaded + + +def _download_files_worker(process_idx, filename, params, bucket_type): + if bucket_type == "zonal": + return worker_loop.run_until_complete( + _download_time_based_async(worker_client, filename, params) + ) + else: # regional + return _download_time_based_json(worker_json_client, filename, params) + + +def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type): + args = [(i, files_names[i], params, bucket_type) for i in range(len(files_names))] + + results = pool.starmap(_download_files_worker, args) + return sum(results) + + +@pytest.mark.parametrize( + "workload_params", + all_params["read_seq_multi_process"] + all_params["read_rand_multi_process"], + indirect=True, + ids=lambda p: p.name, +) +def test_downloads_multi_proc_multi_coro( + benchmark, storage_client, monitor, workload_params +): + params, files_names = workload_params + logging.info(f"num files: {len(files_names)}") + + ctx = multiprocessing.get_context("spawn") + pool = ctx.Pool( + processes=params.num_processes, + initializer=_worker_init, + initargs=(params.bucket_type,), + ) + + download_bytes_list = [] + + def target_wrapper(*args, **kwargs): + download_bytes_list.append(download_files_mp_mc_wrapper(pool, *args, **kwargs)) + return + + try: + with monitor() as m: + benchmark.pedantic( + target=target_wrapper, + iterations=1, + rounds=params.rounds, + args=(files_names, params, params.bucket_type), + ) + finally: + pool.close() + pool.join() + total_bytes_downloaded = sum(download_bytes_list) + throughput_mib_s = ( + total_bytes_downloaded / params.duration / params.rounds + ) / (1024 * 1024) + benchmark.extra_info["avg_throughput_mib_s"] = f"{throughput_mib_s:.2f}" + print( + f"Avg Throughput of {params.rounds} round(s): {throughput_mib_s:.2f} MiB/s" + ) + publish_benchmark_extra_info( + benchmark, + params, + download_bytes_list=download_bytes_list, + duration=params.duration, + ) + publish_resource_metrics(benchmark, m) diff --git a/tests/perf/microbenchmarks/writes/config.py b/tests/perf/microbenchmarks/writes/config.py index d823260f9..3f34cc789 100644 --- a/tests/perf/microbenchmarks/writes/config.py +++ b/tests/perf/microbenchmarks/writes/config.py @@ -86,7 +86,7 @@ def get_write_params() -> Dict[str, List[WriteParameters]]: num_files = num_processes * num_coros # Create a descriptive name for the parameter set - name = f"{workload_name}_{bucket_type}_{num_processes}p_{num_coros}c" + name = f"{workload_name}_{bucket_type}_{num_processes}p_{num_coros}c_{chunk_size_mib}csize" params[workload_name].append( WriteParameters( diff --git a/tests/perf/microbenchmarks/writes/test_writes.py b/tests/perf/microbenchmarks/writes/test_writes.py index 02a0f5e4f..f130a3f0b 100644 --- a/tests/perf/microbenchmarks/writes/test_writes.py +++ b/tests/perf/microbenchmarks/writes/test_writes.py @@ -38,10 +38,10 @@ from tests.perf.microbenchmarks._utils import ( publish_benchmark_extra_info, RandomBytesIO, + get_irq_affinity, ) from tests.perf.microbenchmarks.conftest import publish_resource_metrics import tests.perf.microbenchmarks.writes.config as config -from google.cloud import storage # Get write parameters all_params = config.get_write_params() @@ -82,7 +82,9 @@ async def upload_chunks_using_grpc_async(client, filename, other_params): uploaded_bytes += bytes_to_upload await writer.close() - assert uploaded_bytes == upload_size + # print('writer flush count', writer._flush_count) + + assert writer.offset == upload_size end_time = time.monotonic_ns() elapsed_time = end_time - start_time @@ -316,10 +318,34 @@ def target_wrapper(*args, **kwargs): ) +# --- Global Variables for Worker Process --- +worker_loop = None +worker_client = None +worker_json_client = None + + +def _worker_init(bucket_type): + """Initializes a persistent event loop and client for each worker process.""" + cpu_affinity = get_irq_affinity() + if cpu_affinity: + os.sched_setaffinity( + 0, {i for i in range(1, os.cpu_count()) if i not in cpu_affinity} + ) + global worker_loop, worker_client, worker_json_client + if bucket_type == "zonal": + worker_loop = asyncio.new_event_loop() + asyncio.set_event_loop(worker_loop) + worker_client = worker_loop.run_until_complete(create_client()) + else: # regional + from google.cloud import storage + + worker_json_client = storage.Client() + + def _upload_files_worker(files_to_upload, other_params, bucket_type): """A worker function for multi-processing uploads. - Initializes a client and calls the appropriate multi-coroutine upload function. + Calls the appropriate multi-coroutine upload function using the global client. This function is intended to be called in a separate process. Args: @@ -331,41 +357,28 @@ def _upload_files_worker(files_to_upload, other_params, bucket_type): float: The maximum latency from the uploads performed by this worker. """ if bucket_type == "zonal": - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - client = loop.run_until_complete(create_client()) - try: - result = upload_files_using_grpc_multi_coro( - loop, client, files_to_upload, other_params - ) - finally: - # cleanup loop - tasks = asyncio.all_tasks(loop=loop) - for task in tasks: - task.cancel() - loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) - loop.close() - return result + return upload_files_using_grpc_multi_coro( + worker_loop, worker_client, files_to_upload, other_params + ) else: # regional - json_client = storage.Client() return upload_files_using_json_multi_threaded( - None, json_client, files_to_upload, other_params + None, worker_json_client, files_to_upload, other_params ) -def upload_files_mp_mc_wrapper(files_names, params): +def upload_files_mp_mc_wrapper(pool, files_names, params): """Wrapper for multi-process, multi-coroutine uploads. Distributes files among a pool of processes and calls the worker function. Args: + pool: The multiprocessing pool. files_names (list): The full list of filenames to upload. - params: An object containing benchmark parameters (num_processes, num_coros). + params: An object containing benchmark parameters (num_coros). Returns: float: The maximum latency observed across all processes. """ - num_processes = params.num_processes num_coros = params.num_coros filenames_per_process = [ @@ -381,9 +394,7 @@ def upload_files_mp_mc_wrapper(files_names, params): for filenames in filenames_per_process ] - ctx = multiprocessing.get_context("spawn") - with ctx.Pool(processes=num_processes) as pool: - results = pool.starmap(_upload_files_worker, args) + results = pool.starmap(_upload_files_worker, args) return max(results) @@ -412,6 +423,12 @@ def target_wrapper(*args, **kwargs): output_times.append(result) return output_times + ctx = multiprocessing.get_context("spawn") + pool = ctx.Pool( + processes=params.num_processes, + initializer=_worker_init, + initargs=(params.bucket_type,), + ) try: with monitor() as m: output_times = benchmark.pedantic( @@ -419,11 +436,14 @@ def target_wrapper(*args, **kwargs): iterations=1, rounds=params.rounds, args=( + pool, files_names, params, ), ) finally: + pool.close() + pool.join() publish_benchmark_extra_info( benchmark, params, benchmark_group="write", true_times=output_times ) diff --git a/tests/system/test_bucket.py b/tests/system/test_bucket.py index 32806bd4c..cbd9d1880 100644 --- a/tests/system/test_bucket.py +++ b/tests/system/test_bucket.py @@ -22,6 +22,11 @@ PublicNetworkSource, VpcNetworkSource, ) +from google.cloud.storage.bucket import EncryptionEnforcementConfig +from google.cloud.storage.constants import ( + ENFORCEMENT_MODE_FULLY_RESTRICTED, + ENFORCEMENT_MODE_NOT_RESTRICTED, +) def test_bucket_create_w_alt_storage_class(storage_client, buckets_to_delete): @@ -1398,3 +1403,99 @@ def test_list_buckets_with_ip_filter(storage_client, buckets_to_delete): # Check that the summarized filter does not include full details. assert summarized_filter.public_network_source is None assert summarized_filter.vpc_network_sources == [] + + +def test_create_bucket_with_encryption_enforcement(storage_client, buckets_to_delete): + bucket_name = _helpers.unique_name("enforce-on-create") + + # Initialize the bucket object locally + bucket = storage_client.bucket(bucket_name) + + # Define and set the enforcement config + enforcement = EncryptionEnforcementConfig( + restriction_mode=ENFORCEMENT_MODE_FULLY_RESTRICTED + ) + # Set the config on the local object + bucket.encryption.google_managed_encryption_enforcement_config = enforcement + + # storage_client.create_bucket(bucket) sends the config in the initial POST + created_bucket = storage_client.create_bucket(bucket) + buckets_to_delete.append(created_bucket) + + # Verify backend persistence and server-generated effective_time + config = created_bucket.encryption.google_managed_encryption_enforcement_config + assert config.restriction_mode == ENFORCEMENT_MODE_FULLY_RESTRICTED + assert isinstance(config.effective_time, datetime.datetime) + + +def test_bucket_encryption_enforcement_config(storage_client, buckets_to_delete): + bucket_name = _helpers.unique_name("encryption-enforcement") + bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) + buckets_to_delete.append(bucket) + + # 1. Set initial enforcement configuration + # Testing both Google-managed and Customer-managed configurations + google_config = EncryptionEnforcementConfig( + restriction_mode=ENFORCEMENT_MODE_FULLY_RESTRICTED + ) + customer_config = EncryptionEnforcementConfig( + restriction_mode=ENFORCEMENT_MODE_NOT_RESTRICTED + ) + + bucket.encryption.google_managed_encryption_enforcement_config = google_config + bucket.encryption.customer_managed_encryption_enforcement_config = customer_config + + # Patch sends the 'encryption' dict to the server + bucket.patch() + + # 2. Reload and Verify backend persistence + bucket.reload() + + # Verify Google Managed Config and the presence of effective_time + reloaded_google = bucket.encryption.google_managed_encryption_enforcement_config + assert reloaded_google.restriction_mode == ENFORCEMENT_MODE_FULLY_RESTRICTED + assert isinstance(reloaded_google.effective_time, datetime.datetime) + + # Verify Customer Managed Config + reloaded_customer = bucket.encryption.customer_managed_encryption_enforcement_config + assert reloaded_customer.restriction_mode == ENFORCEMENT_MODE_NOT_RESTRICTED + assert reloaded_customer.effective_time is None + + # 3. Test updating an existing config + update_google_config = EncryptionEnforcementConfig( + restriction_mode=ENFORCEMENT_MODE_NOT_RESTRICTED + ) + bucket.encryption.google_managed_encryption_enforcement_config = ( + update_google_config + ) + bucket.patch() + assert ( + bucket.encryption.google_managed_encryption_enforcement_config.restriction_mode + == ENFORCEMENT_MODE_NOT_RESTRICTED + ) + + +def test_delete_bucket_encryption_enforcement_config(storage_client, buckets_to_delete): + bucket_name = _helpers.unique_name("delete-encryption-config") + + # Create a bucket with an initial restricted config + enforcement = EncryptionEnforcementConfig( + restriction_mode=ENFORCEMENT_MODE_FULLY_RESTRICTED + ) + bucket = storage_client.bucket(bucket_name) + bucket.encryption.google_managed_encryption_enforcement_config = enforcement + bucket = storage_client.create_bucket(bucket) + buckets_to_delete.append(bucket) + + # Verify it exists first + assert bucket.encryption.google_managed_encryption_enforcement_config is not None + + # DELETE: Set the specific enforcement config to None + bucket.encryption.google_managed_encryption_enforcement_config = None + + # patch() sends the null value to the server to clear the field + bucket.patch() + + # Reload and verify the field is gone + bucket.reload() + assert bucket.encryption.google_managed_encryption_enforcement_config is None diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index 7a257e960..6bb0e03fd 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -121,6 +121,233 @@ def test_upload_many_from_filenames_with_attributes( assert blob.cache_control == "no-cache" +@pytest.mark.parametrize( + "blobname", + [ + "../../local/target", # skips download + "../escape.txt", # skips download + "go/four/levels/deep/../../../../../somefile1", # skips download + "go/four/levels/deep/../some_dir/../../../../../invalid/path1", # skips download + ], +) +def test_download_many_to_path_skips_download( + shared_bucket, file_data, blobs_to_delete, blobname +): + """ + Test downloading blobs with traversal skipped + """ + # Setup + BLOBNAMES = [blobname] + + FILE_BLOB_PAIRS = [ + ( + file_data["simple"]["path"], + shared_bucket.blob("folder_traversal/" + blob_name), + ) + for blob_name in BLOBNAMES + ] + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + deadline=DEADLINE, + ) + for result in results: + assert result is None + + blobs = list(shared_bucket.list_blobs(prefix="folder_traversal/")) + blobs_to_delete.extend(blobs) + + # We expect 1 blob uploaded for this test parametrization + assert len(list(b for b in blobs if b.name == "folder_traversal/" + blobname)) == 1 + + # Actual Test + with tempfile.TemporaryDirectory() as tempdir: + import warnings + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + results = transfer_manager.download_many_to_path( + shared_bucket, + BLOBNAMES, + destination_directory=tempdir, + blob_name_prefix="folder_traversal/", + deadline=DEADLINE, + create_directories=True, + ) + + path_traversal_warnings = [ + warning + for warning in w + if str(warning.message).startswith("The blob ") + and "will **NOT** be downloaded. The resolved destination_directory" + in str(warning.message) + ] + assert len(path_traversal_warnings) == 1, "---".join( + [str(warning.message) for warning in w] + ) + + # 1 total - 1 skipped = 1 result (containing Warning) + assert len(results) == 1 + assert isinstance(results[0], UserWarning) + + +@pytest.mark.parametrize( + "blobname", + [ + "simple_blob", + "data/file.txt", + "data/../sibling.txt", + "/etc/passwd", + "/local/usr/a.txt", + "dir/./file.txt", + "go/four/levels/deep/../somefile2", + "go/four/levels/deep/../some_dir/valid/path1", + "go/four/levels/deep/../some_dir/../../../../valid/path2", + ], +) +def test_download_many_to_path_downloads_within_dest_dir( + shared_bucket, file_data, blobs_to_delete, blobname +): + """ + Test downloading blobs with valid traversal + """ + # Setup + BLOBNAMES = [blobname] + + FILE_BLOB_PAIRS = [ + ( + file_data["simple"]["path"], + shared_bucket.blob("folder_traversal/" + blob_name), + ) + for blob_name in BLOBNAMES + ] + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + deadline=DEADLINE, + ) + for result in results: + assert result is None + + blobs = list(shared_bucket.list_blobs(prefix="folder_traversal/")) + blobs_to_delete.extend(blobs) + + assert len(list(b for b in blobs if b.name == "folder_traversal/" + blobname)) == 1 + + # Actual Test + with tempfile.TemporaryDirectory() as tempdir: + results = transfer_manager.download_many_to_path( + shared_bucket, + BLOBNAMES, + destination_directory=tempdir, + blob_name_prefix="folder_traversal/", + deadline=DEADLINE, + create_directories=True, + ) + + assert len(results) == 1 + for result in results: + assert result is None + + # Verify the file exists and contents match + from google.cloud.storage.transfer_manager import _resolve_path + from pathlib import Path + + expected_file_path = Path(_resolve_path(tempdir, blobname)) + assert expected_file_path.is_file() + + with open(file_data["simple"]["path"], "rb") as source_file: + source_contents = source_file.read() + + with expected_file_path.open("rb") as downloaded_file: + downloaded_contents = downloaded_file.read() + + assert downloaded_contents == source_contents + + + +def test_download_many_to_path_mixed_results( + shared_bucket, file_data, blobs_to_delete +): + """ + Test download_many_to_path with successful downloads, skip_if_exists skips, and path traversal skips. + """ + PREFIX = "mixed_results/" + BLOBNAMES = [ + "success1.txt", + "success2.txt", + "exists.txt", + "../escape.txt" + ] + + FILE_BLOB_PAIRS = [ + ( + file_data["simple"]["path"], + shared_bucket.blob(PREFIX + name), + ) + for name in BLOBNAMES + ] + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + deadline=DEADLINE, + ) + for result in results: + assert result is None + + blobs = list(shared_bucket.list_blobs(prefix=PREFIX)) + blobs_to_delete.extend(blobs) + assert len(blobs) == 4 + + # Actual Test + with tempfile.TemporaryDirectory() as tempdir: + existing_file_path = os.path.join(tempdir, "exists.txt") + with open(existing_file_path, "w") as f: + f.write("already here") + + import warnings + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + results = transfer_manager.download_many_to_path( + shared_bucket, + BLOBNAMES, + destination_directory=tempdir, + blob_name_prefix=PREFIX, + deadline=DEADLINE, + create_directories=True, + skip_if_exists=True, + ) + + assert len(results) == 4 + + path_traversal_warnings = [ + warning + for warning in w + if str(warning.message).startswith("The blob ") + and "will **NOT** be downloaded. The resolved destination_directory" + in str(warning.message) + ] + assert len(path_traversal_warnings) == 1, "---".join( + [str(warning.message) for warning in w] + ) + + assert results[0] is None + assert results[1] is None + assert isinstance(results[2], UserWarning) + assert "skipped because destination file already exists" in str(results[2]) + assert isinstance(results[3], UserWarning) + assert "will **NOT** be downloaded" in str(results[3]) + + assert os.path.exists(os.path.join(tempdir, "success1.txt")) + assert os.path.exists(os.path.join(tempdir, "success2.txt")) + + with open(existing_file_path, "r") as f: + assert f.read() == "already here" + + def test_download_many(listable_bucket): blobs = list(listable_bucket.list_blobs()) with tempfile.TemporaryDirectory() as tempdir: diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index fd5d841c3..01d9a7c01 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -31,6 +31,7 @@ # TODO: replace this with a fixture once zonal bucket creation / deletion # is supported in grpc client or json client client. _ZONAL_BUCKET = os.getenv("ZONAL_BUCKET") +_CROSS_REGION_BUCKET = os.getenv("CROSS_REGION_BUCKET") _BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object" @@ -82,6 +83,51 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]: return a + step, a + 2 * step +@pytest.mark.parametrize( + "object_size", + [ + 256, # less than _chunk size + 10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES + 20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE + ], +) +def test_basic_wrd_x_region( + storage_client, + blobs_to_delete, + object_size, + event_loop, + grpc_client, +): + object_name = f"test_basic_wrd-{str(uuid.uuid4())}" + + async def _run(): + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) + + writer = AsyncAppendableObjectWriter(grpc_client, _CROSS_REGION_BUCKET, object_name) + await writer.open() + await writer.append(object_data) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum + + buffer = BytesIO() + mrd = AsyncMultiRangeDownloader(grpc_client, _CROSS_REGION_BUCKET, object_name) + async with mrd: + assert mrd._open_retries == 1 + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + assert mrd.persisted_size == object_size + + assert buffer.getvalue() == object_data + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_CROSS_REGION_BUCKET).blob(object_name)) + del writer + gc.collect() + + event_loop.run_until_complete(_run()) + @pytest.mark.parametrize( "object_size", [ @@ -437,63 +483,6 @@ async def _read_and_verify(expected_content, generation=None): event_loop.run_until_complete(_run()) -def test_append_flushes_and_state_lookup( - storage_client, blobs_to_delete, event_loop, grpc_client -): - """ - System test for AsyncAppendableObjectWriter, verifying flushing behavior - for both small and large appends. - """ - object_name = f"test-append-flush-varied-size-{uuid.uuid4()}" - - async def _run(): - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - - # Schedule for cleanup - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - - # --- Part 1: Test with small data --- - small_data = b"small data" - - await writer.open() - assert writer._is_stream_open - - await writer.append(small_data) - persisted_size = await writer.state_lookup() - assert persisted_size == len(small_data) - - # --- Part 2: Test with large data --- - large_data = os.urandom(38 * 1024 * 1024) - - # Append data larger than the default flush interval (16 MiB). - # This should trigger the interval-based flushing logic. - await writer.append(large_data) - - # Verify the total data has been persisted. - total_size = len(small_data) + len(large_data) - persisted_size = await writer.state_lookup() - assert persisted_size == total_size - - # --- Part 3: Finalize and verify --- - final_object = await writer.close(finalize_on_close=True) - - assert not writer._is_stream_open - assert final_object.size == total_size - - # Verify the full content of the object. - full_data = small_data + large_data - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - content = buffer.getvalue() - assert content == full_data - - event_loop.run_until_complete(_run()) - - def test_open_with_generation_zero( storage_client, blobs_to_delete, event_loop, grpc_client ): @@ -601,56 +590,3 @@ async def _run(): gc.collect() event_loop.run_until_complete(_run()) - -def test_get_object_after_appendable_write( - grpc_clients, - grpc_client_direct, - event_loop, - storage_client, - blobs_to_delete, -): - """Test getting object metadata after writing with AsyncAppendableObjectWriter. - - This test: - 1. Creates a test object using AsyncAppendableObjectWriter - 2. Appends content to the object (without finalizing) - 3. Closes the write stream - 4. Fetches the object metadata using AsyncGrpcClient.get_object() - 5. Verifies the object size matches the written data - """ - - async def _run(): - grpc_client = grpc_client_direct - object_name = f"test-get-object-{uuid.uuid4().hex}" - test_data = b"Some test data bytes." - expected_size = len(test_data) - - writer = AsyncAppendableObjectWriter( - grpc_client, - _ZONAL_BUCKET, - object_name, - ) - - await writer.open() - await writer.append(test_data) - await writer.close(finalize_on_close=False) - - obj = await grpc_client.get_object( - bucket_name=_ZONAL_BUCKET, - object_name=object_name, - ) - - # Assert - assert obj is not None - assert obj.name == object_name - assert obj.bucket == f"projects/_/buckets/{_ZONAL_BUCKET}" - assert obj.size == expected_size, ( - f"Expected object size {expected_size}, got {obj.size}" - ) - - # Cleanup - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - gc.collect() - - event_loop.run_until_complete(_run()) diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 02fb3238d..c19d6f4ad 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -35,11 +35,15 @@ WRITE_HANDLE = b"test-write-handle" PERSISTED_SIZE = 456 EIGHT_MIB = 8 * 1024 * 1024 +DATA_LESS_THAN_FLUSH_INTERVAL = ( + b"test-data" # 9 bytes, less than default flush interval +) class TestIsWriteRetryable: """Exhaustive tests for retry predicate logic.""" + # TODO: remove `mock_appendable_writer` param. def test_standard_transient_errors(self, mock_appendable_writer): for exc in [ exceptions.InternalServerError("500"), @@ -110,7 +114,6 @@ def mock_appendable_writer(): yield { "mock_client": mock_client, - "mock_stream_cls": mock_stream_cls, "mock_stream": mock_stream, } @@ -215,46 +218,113 @@ def test_on_open_error_redirection(self, mock_appendable_writer): assert writer.write_handle.handle == b"h1" assert writer.generation == 777 + @pytest.mark.asyncio + async def test_open_closes_existing_stream(self, mock_appendable_writer): + """Verify proper cleanup of existing stream on re-open.""" + writer = self._make_one(mock_appendable_writer["mock_client"]) + # We simulate a state where write_obj_stream exists but we are opening (e.g. retry or stale) + writer._is_stream_open = False + # Set an existing stream + old_stream = mock.AsyncMock() + old_stream.is_stream_open = True + writer.write_obj_stream = old_stream + + # Mock the creation of NEW stream to avoid overwriting our old_stream reference too early if we needed it, + # but here we just want to verify old_stream.close() is called. + + # Act + await writer.open() + + # Assert + old_stream.close.assert_awaited_once() + assert writer.write_obj_stream != old_stream + assert writer._is_stream_open + + @pytest.mark.asyncio + async def test_open_logs_warning_on_close_error(self, mock_appendable_writer): + """Verify logging when closing existing stream fails.""" + writer = self._make_one(mock_appendable_writer["mock_client"]) + old_stream = mock.AsyncMock() + old_stream.is_stream_open = True + old_stream.close.side_effect = ValueError("close failed") + writer.write_obj_stream = old_stream + writer._is_stream_open = False + + with mock.patch( + "google.cloud.storage.asyncio.async_appendable_object_writer.logger" + ) as mock_logger: + await writer.open() + + mock_logger.warning.assert_called_once() + args, _ = mock_logger.warning.call_args + assert "Error closing previous write stream" in args[0] + assert "close failed" in args[0] + # ------------------------------------------------------------------------- # Append Tests # ------------------------------------------------------------------------- @pytest.mark.asyncio - async def test_append_basic_success(self, mock_appendable_writer): + async def test_append_data_less_than_flush_interval(self, mock_appendable_writer): """Verify append orchestrates manager and drives the internal generator.""" writer = self._make_one(mock_appendable_writer["mock_client"]) writer._is_stream_open = True + writer.persisted_size = 0 writer.write_obj_stream = mock_appendable_writer["mock_stream"] + writer.write_obj_stream.send = AsyncMock() + + data_len = len(DATA_LESS_THAN_FLUSH_INTERVAL) + await writer.append(DATA_LESS_THAN_FLUSH_INTERVAL) + + assert writer.offset == data_len + assert writer.bytes_appended_since_last_flush == data_len + + @pytest.mark.parametrize( + "data_len", + [ + _DEFAULT_FLUSH_INTERVAL_BYTES - 1, + _DEFAULT_FLUSH_INTERVAL_BYTES, + _DEFAULT_FLUSH_INTERVAL_BYTES + 1, + ], + ) + @pytest.mark.asyncio + async def test_append(self, data_len, mock_appendable_writer): + """Verify append orchestrates manager and drives the internal generator.""" + # Arrange + writer = self._make_one(mock_appendable_writer["mock_client"]) + writer._is_stream_open = True writer.persisted_size = 0 + writer.write_obj_stream = mock_appendable_writer["mock_stream"] + writer.write_obj_stream.send = AsyncMock() - data = b"test-data" + # if data > flush interval then we expect 1 flush & 1 state_lookup going through + # which means we will do 1 recv() call. + writer.write_obj_stream.recv = AsyncMock( + return_value=storage_type.BidiWriteObjectResponse( + persisted_size=_DEFAULT_FLUSH_INTERVAL_BYTES + ) + ) + data = b"a" * data_len - with mock.patch( - "google.cloud.storage.asyncio.async_appendable_object_writer._BidiStreamRetryManager" - ) as MockManager: - - async def mock_execute(state, policy): - factory = MockManager.call_args[0][1] - dummy_reqs = [storage_type.BidiWriteObjectRequest()] - gen = factory(dummy_reqs, state) - - mock_appendable_writer["mock_stream"].recv.side_effect = [ - storage_type.BidiWriteObjectResponse( - persisted_size=len(data), - write_handle=storage_type.BidiWriteHandle(handle=b"h2"), - ), - None, - ] - async for _ in gen: - pass - - MockManager.return_value.execute.side_effect = mock_execute - await writer.append(data) - - assert writer.persisted_size == len(data) - sent_req = mock_appendable_writer["mock_stream"].send.call_args[0][0] - assert sent_req.state_lookup - assert sent_req.flush + # Act + await writer.append(data) + + # Assert + expected_recv_count = data_len // _DEFAULT_FLUSH_INTERVAL_BYTES + assert writer.offset == data_len + assert ( + writer.bytes_appended_since_last_flush + == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES + ) + assert ( + writer.persisted_size == expected_recv_count * _DEFAULT_FLUSH_INTERVAL_BYTES + ) + assert writer.write_obj_stream.send.await_count == -( + -data_len // _MAX_CHUNK_SIZE_BYTES + ) # Ceiling division for number of chunks + assert ( + writer.write_obj_stream.recv.await_count == expected_recv_count + ) # Expect 1 recv per flush interval @pytest.mark.asyncio async def test_append_recovery_reopens_stream(self, mock_appendable_writer): @@ -297,7 +367,7 @@ async def mock_execute(state, policy): MockManager.return_value.execute.side_effect = mock_execute await writer.append(b"0123456789") - mock_appendable_writer["mock_stream"].close.assert_awaited() + # mock_appendable_writer["mock_stream"].close.assert_awaited() # Removed because open() is mocked mock_writer_open.assert_awaited() assert writer.persisted_size == 5 diff --git a/tests/unit/asyncio/test_async_grpc_client.py b/tests/unit/asyncio/test_async_grpc_client.py index f193acb60..06cb232d5 100644 --- a/tests/unit/asyncio/test_async_grpc_client.py +++ b/tests/unit/asyncio/test_async_grpc_client.py @@ -19,6 +19,7 @@ from google.api_core import client_info as client_info_lib from google.cloud.storage.asyncio import async_grpc_client from google.cloud.storage import __version__ +from google.api_core import client_options def _make_credentials(spec=None): @@ -157,36 +158,31 @@ def test_grpc_client_property(self, mock_grpc_gapic_client): assert retrieved_client is mock_grpc_gapic_client.return_value @mock.patch("google.cloud._storage_v2.StorageAsyncClient") - def test_grpc_client_with_anon_creds(self, mock_grpc_gapic_client): + @mock.patch( + "google.cloud.storage.asyncio.async_grpc_client.grpc.aio.insecure_channel" + ) + def test_grpc_client_with_anon_creds( + self, mock_insecure_channel, mock_async_storage_client + ): # Arrange - mock_transport_cls = mock.MagicMock() - mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls - channel_sentinel = mock.sentinel.channel - - mock_transport_cls.create_channel.return_value = channel_sentinel - mock_transport_cls.return_value = mock.sentinel.transport + mock_channel = mock.MagicMock() + mock_insecure_channel.return_value = mock_channel # Act - anonymous_creds = AnonymousCredentials() - client = async_grpc_client.AsyncGrpcClient(credentials=anonymous_creds) - retrieved_client = client.grpc_client + client = async_grpc_client.AsyncGrpcClient( + client_options=client_options.ClientOptions( + api_endpoint="my-grpc-endpoint" + ), + credentials=AnonymousCredentials(), + ) # Assert - assert retrieved_client is mock_grpc_gapic_client.return_value - - kwargs = mock_grpc_gapic_client.call_args.kwargs - client_info = kwargs["client_info"] - agent_version = f"gcloud-python/{__version__}" - assert agent_version in client_info.user_agent - primary_user_agent = client_info.to_user_agent() - expected_options = (("grpc.primary_user_agent", primary_user_agent),) + assert client.grpc_client is mock_async_storage_client.return_value + mock_insecure_channel.assert_called_once_with("my-grpc-endpoint") - mock_transport_cls.create_channel.assert_called_once_with( - attempt_direct_path=True, - credentials=anonymous_creds, - options=expected_options, - ) - mock_transport_cls.assert_called_once_with(channel=channel_sentinel) + kwargs = mock_async_storage_client.call_args.kwargs + transport = kwargs["transport"] + assert isinstance(transport._credentials, AnonymousCredentials) @mock.patch("google.cloud._storage_v2.StorageAsyncClient") def test_user_agent_with_custom_client_info(self, mock_async_storage_client): @@ -221,9 +217,7 @@ async def test_delete_object(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" @@ -264,9 +258,7 @@ async def test_get_object(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" @@ -293,9 +285,7 @@ async def test_get_object_with_all_parameters(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" diff --git a/tests/unit/asyncio/test_async_multi_range_downloader.py b/tests/unit/asyncio/test_async_multi_range_downloader.py index c912dc995..f813c8a39 100644 --- a/tests/unit/asyncio/test_async_multi_range_downloader.py +++ b/tests/unit/asyncio/test_async_multi_range_downloader.py @@ -27,7 +27,6 @@ from io import BytesIO from google.cloud.storage.exceptions import DataCorruption - _TEST_BUCKET_NAME = "test-bucket" _TEST_OBJECT_NAME = "test-object" _TEST_OBJECT_SIZE = 1024 * 1024 # 1 MiB @@ -93,6 +92,7 @@ async def test_create_mrd(self, mock_cls_async_read_object_stream): assert mrd.read_handle == _TEST_READ_HANDLE assert mrd.persisted_size == _TEST_OBJECT_SIZE assert mrd.is_stream_open + assert mrd._open_retries == 0 @mock.patch( "google.cloud.storage.asyncio.async_multi_range_downloader.generate_random_56_bit_integer" @@ -407,7 +407,7 @@ async def close_side_effect(): ) @pytest.mark.asyncio async def test_create_mrd_with_generation_number( - self, mock_cls_async_read_object_stream + self, mock_cls_async_read_object_stream, caplog ): # Arrange mock_client = mock.MagicMock() @@ -430,6 +430,7 @@ async def test_create_mrd_with_generation_number( # Assert assert mrd.generation == _TEST_GENERATION_NUMBER + assert "'generation_number' is deprecated" in caplog.text @pytest.mark.asyncio async def test_create_mrd_with_both_generation_and_generation_number(self): @@ -445,3 +446,133 @@ async def test_create_mrd_with_both_generation_and_generation_number(self): generation=_TEST_GENERATION_NUMBER, generation_number=_TEST_GENERATION_NUMBER, ) + + @mock.patch("google.cloud.storage.asyncio.async_multi_range_downloader.AsyncRetry") + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_open_retries_increment( + self, mock_cls_async_read_object_stream, mock_async_retry + ): + # Arrange + # Configure AsyncRetry mock to return a pass-through decorator so we can await the result + mock_policy = mock.MagicMock() + mock_policy.side_effect = lambda f: f + mock_async_retry.return_value = mock_policy + + mrd, _ = await self._make_mock_mrd(mock_cls_async_read_object_stream) + # _make_mock_mrd calls create_mrd -> open. + # We need to test logic where retry happens. + + # Create fresh MRD + mock_client = mock.MagicMock() + mock_client.grpc_client = mock.AsyncMock() + mrd = AsyncMultiRangeDownloader( + mock_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME + ) + # Mock stream + mock_stream = mock_cls_async_read_object_stream.return_value + mock_stream.open = AsyncMock() + + # Action: We want to capture the on_error passed to AsyncRetry + await mrd.open() + + # Assert + # Check that AsyncRetry was initialized with a wrapper + call_args = mock_async_retry.call_args + assert call_args is not None + _, kwargs = call_args + on_error = kwargs.get("on_error") + assert on_error is not None + + # Simulate error to trigger increment + assert mrd._open_retries == 0 + on_error(ValueError("test")) + assert mrd._open_retries == 1 + + @mock.patch("google.cloud.storage.asyncio.async_multi_range_downloader.logger") + @pytest.mark.asyncio + async def test_on_open_error_logs_warning(self, mock_logger): + # Arrange + mock_client = mock.MagicMock() + mrd = AsyncMultiRangeDownloader( + mock_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME + ) + exc = ValueError("test error") + + # Act + mrd._on_open_error(exc) + + # Assert + mock_logger.warning.assert_called_once_with( + f"Error occurred while opening MRD: {exc}" + ) + + @mock.patch("google.cloud.storage.asyncio.async_multi_range_downloader.logger") + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader.generate_random_56_bit_integer" + ) + @mock.patch( + "google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream" + ) + @pytest.mark.asyncio + async def test_download_ranges_resumption_logging( + self, mock_cls_async_read_object_stream, mock_random_int, mock_logger + ): + # Arrange + mock_mrd, _ = await self._make_mock_mrd(mock_cls_async_read_object_stream) + + mock_mrd.read_obj_str.send = AsyncMock() + mock_mrd.read_obj_str.recv = AsyncMock() + + from google.api_core import exceptions as core_exceptions + + retryable_exc = core_exceptions.ServiceUnavailable("Retry me") + + # mock send to raise exception ONCE then succeed + mock_mrd.read_obj_str.send.side_effect = [ + retryable_exc, + None, # Success on second try + ] + + # mock recv for second try + mock_mrd.read_obj_str.recv.side_effect = [ + _storage_v2.BidiReadObjectResponse( + object_data_ranges=[ + _storage_v2.ObjectRangeData( + checksummed_data=_storage_v2.ChecksummedData( + content=b"data", crc32c=123 + ), + range_end=True, + read_range=_storage_v2.ReadRange( + read_offset=0, read_length=4, read_id=123 + ), + ) + ] + ), + None, + ] + + mock_random_int.return_value = 123 + + # Act + buffer = BytesIO() + # Patch Checksum where it is likely used (reads_resumption_strategy or similar), + # but actually if we use google_crc32c directly, we should patch that or provide valid CRC. + # Since we can't reliably predict where Checksum is imported/used without more digging, + # let's provide a valid CRC for b"data". + # Checksum(b"data").digest() -> needs to match crc32c=123. + # But we can't force b"data" to have crc=123. + # So we MUST patch Checksum. + # It is used in google.cloud.storage.asyncio.retry.reads_resumption_strategy + + with mock.patch( + "google.cloud.storage.asyncio.retry.reads_resumption_strategy.Checksum" + ) as mock_chk: + mock_chk.return_value.digest.return_value = (123).to_bytes(4, "big") + + await mock_mrd.download_ranges([(0, 4, buffer)]) + + # Assert + mock_logger.info.assert_any_call("Resuming download (attempt 2) for 1 ranges.") diff --git a/tests/unit/test_bucket.py b/tests/unit/test_bucket.py index 850e89d04..98fc50ac8 100644 --- a/tests/unit/test_bucket.py +++ b/tests/unit/test_bucket.py @@ -2733,6 +2733,41 @@ def test_cors_setter(self): self.assertEqual(bucket.cors, [CORS_ENTRY]) self.assertTrue("cors" in bucket._changes) + def test_encryption_getter(self): + from google.cloud.storage.bucket import BucketEncryption + + NAME = "name" + KMS_RESOURCE = ( + "projects/test-project-123/" + "locations/us/" + "keyRings/test-ring/" + "cryptoKeys/test-key" + ) + ENCRYPTION_CONFIG = {"defaultKmsKeyName": KMS_RESOURCE} + bucket = self._make_one(name=NAME) + self.assertIsNone(bucket.encryption.default_kms_key_name) + bucket._properties["encryption"] = ENCRYPTION_CONFIG + encryption = bucket.encryption + self.assertIsInstance(encryption, BucketEncryption) + self.assertEqual(encryption.default_kms_key_name, KMS_RESOURCE) + + def test_encryption_setter(self): + from google.cloud.storage.bucket import BucketEncryption + + NAME = "name" + KMS_RESOURCE = ( + "projects/test-project-123/" + "locations/us/" + "keyRings/test-ring/" + "cryptoKeys/test-key" + ) + ENCRYPTION_CONFIG = {"defaultKmsKeyName": KMS_RESOURCE} + bucket = self._make_one(name=NAME) + encryption = BucketEncryption(bucket, default_kms_key_name=KMS_RESOURCE) + bucket.encryption = encryption + self.assertEqual(bucket._properties["encryption"], ENCRYPTION_CONFIG) + self.assertTrue("encryption" in bucket._changes) + def test_default_kms_key_name_getter(self): NAME = "name" KMS_RESOURCE = ( @@ -4722,3 +4757,127 @@ def test_it(self): self.assertEqual(notification._topic_name, topic) self.assertEqual(notification._topic_project, project) self.assertEqual(notification._properties, item) + + +class Test_EncryptionEnforcementConfig(unittest.TestCase): + @staticmethod + def _get_target_class(): + from google.cloud.storage.bucket import EncryptionEnforcementConfig + + return EncryptionEnforcementConfig + + def _make_one(self, **kw): + return self._get_target_class()(**kw) + + def test_ctor(self): + + from google.cloud.storage.constants import ENFORCEMENT_MODE_FULLY_RESTRICTED + + config = self._make_one(restriction_mode=ENFORCEMENT_MODE_FULLY_RESTRICTED) + + self.assertEqual(config.restriction_mode, ENFORCEMENT_MODE_FULLY_RESTRICTED) + self.assertIsNone(config.effective_time) + + def test_from_api_repr(self): + from google.cloud._helpers import _datetime_to_rfc3339 + from google.cloud.storage.constants import ENFORCEMENT_MODE_NOT_RESTRICTED + + now = _NOW(_UTC) + resource = { + "restrictionMode": ENFORCEMENT_MODE_NOT_RESTRICTED, + "effectiveTime": _datetime_to_rfc3339(now), + } + klass = self._get_target_class() + config = klass.from_api_repr(resource) + self.assertEqual(config.restriction_mode, ENFORCEMENT_MODE_NOT_RESTRICTED) + self.assertEqual(config.effective_time, now) + + def test_restriction_mode_setter(self): + config = self._make_one() + self.assertIsNone(config.restriction_mode) + config.restriction_mode = "FULLY_RESTRICTED" + self.assertEqual(config.restriction_mode, "FULLY_RESTRICTED") + self.assertEqual(config["restrictionMode"], "FULLY_RESTRICTED") + + +class Test_BucketEncryption(unittest.TestCase): + @staticmethod + def _get_target_class(): + from google.cloud.storage.bucket import BucketEncryption + + return BucketEncryption + + def _make_one(self, bucket, **kw): + return self._get_target_class()(bucket, **kw) + + @staticmethod + def _make_bucket(): + from google.cloud.storage.bucket import Bucket + + return mock.create_autospec(Bucket, instance=True) + + def test_ctor_defaults(self): + bucket = self._make_bucket() + encryption = self._make_one(bucket) + self.assertIs(encryption.bucket, bucket) + self.assertIsNone(encryption.default_kms_key_name) + # Check that the config itself is None, not its sub-property + self.assertIsNone(encryption.google_managed_encryption_enforcement_config) + self.assertIsNone(encryption.customer_managed_encryption_enforcement_config) + self.assertIsNone(encryption.customer_supplied_encryption_enforcement_config) + + def test_ctor_explicit(self): + from google.cloud.storage.bucket import EncryptionEnforcementConfig + + bucket = self._make_bucket() + kms_key = "key" + google_config = EncryptionEnforcementConfig("FullyRestricted") + encryption = self._make_one( + bucket, + default_kms_key_name=kms_key, + google_managed_encryption_enforcement_config=google_config, + ) + self.assertEqual(encryption.default_kms_key_name, kms_key) + self.assertEqual( + encryption.google_managed_encryption_enforcement_config.restriction_mode, + "FullyRestricted", + ) + + def test_from_api_repr(self): + klass = self._get_target_class() + bucket = self._make_bucket() + resource = { + "defaultKmsKeyName": "key", + "googleManagedEncryptionEnforcementConfig": { + "restrictionMode": "FullyRestricted" + }, + } + encryption = klass.from_api_repr(resource, bucket) + self.assertEqual(encryption.default_kms_key_name, "key") + self.assertEqual( + encryption.google_managed_encryption_enforcement_config.restriction_mode, + "FullyRestricted", + ) + + def test_setters_trigger_patch(self): + from google.cloud.storage.bucket import EncryptionEnforcementConfig + + bucket = self._make_bucket() + encryption = self._make_one(bucket) + + encryption.default_kms_key_name = "new-key" + config = EncryptionEnforcementConfig("NotRestricted") + encryption.google_managed_encryption_enforcement_config = config + encryption.customer_managed_encryption_enforcement_config = config + encryption.customer_supplied_encryption_enforcement_config = config + + self.assertEqual(bucket._patch_property.call_count, 4) + bucket._patch_property.assert_called_with("encryption", encryption) + + def test_bucket_encryption_getters_handle_none(self): + bucket = self._make_bucket() + encryption = self._get_target_class()(bucket) + encryption["googleManagedEncryptionEnforcementConfig"] = None + + config = encryption.google_managed_encryption_enforcement_config + self.assertIsNone(config) diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 151cd2877..90c5c478a 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -513,6 +513,57 @@ def test_upload_many_from_filenames_additional_properties(): assert getattr(blob, attrib) == value + +def test__resolve_path_raises_invalid_path_error_on_windows(): + from google.cloud.storage.transfer_manager import _resolve_path, InvalidPathError + + with mock.patch("os.name", "nt"): + with pytest.raises(InvalidPathError) as exc_info: + _resolve_path("C:\\target", "C:\\target\\file.txt") + assert "cannot be downloaded into" in str(exc_info.value) + + # Test that it DOES NOT raise on non-windows + with mock.patch("os.name", "posix"): + # Should not raise + _resolve_path("/target", "C:\\target\\file.txt") + + +def test_download_many_to_path_raises_invalid_path_error(): + bucket = mock.Mock() + + BLOBNAMES = ["C:\\target\\file.txt"] + PATH_ROOT = "mypath/" + BLOB_NAME_PREFIX = "myprefix/" + DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} + MAX_WORKERS = 7 + DEADLINE = 10 + WORKER_TYPE = transfer_manager.THREAD + + with mock.patch("os.name", "nt"): + import warnings + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + results = transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + destination_directory=PATH_ROOT, + blob_name_prefix=BLOB_NAME_PREFIX, + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + create_directories=False, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + skip_if_exists=True, + ) + + assert len(w) == 1 + assert "will **NOT** be downloaded" in str(w[0].message) + assert len(results) == 1 + assert isinstance(results[0], UserWarning) + + def test_download_many_to_path(): bucket = mock.Mock() @@ -525,13 +576,15 @@ def test_download_many_to_path(): WORKER_TYPE = transfer_manager.THREAD EXPECTED_BLOB_FILE_PAIRS = [ - (mock.ANY, os.path.join(PATH_ROOT, blobname)) for blobname in BLOBNAMES + (mock.ANY, os.path.join(os.getcwd(), PATH_ROOT, blobname)) + for blobname in BLOBNAMES ] with mock.patch( - "google.cloud.storage.transfer_manager.download_many" + "google.cloud.storage.transfer_manager.download_many", + return_value=[FAKE_RESULT] * len(BLOBNAMES), ) as mock_download_many: - transfer_manager.download_many_to_path( + results = transfer_manager.download_many_to_path( bucket, BLOBNAMES, destination_directory=PATH_ROOT, @@ -552,11 +605,194 @@ def test_download_many_to_path(): raise_exception=True, max_workers=MAX_WORKERS, worker_type=WORKER_TYPE, - skip_if_exists=True, + skip_if_exists=False, ) + assert results == [FAKE_RESULT] * len(BLOBNAMES) for blobname in BLOBNAMES: bucket.blob.assert_any_call(BLOB_NAME_PREFIX + blobname) +def test_download_many_to_path_with_skip_if_exists(): + bucket = mock.Mock() + + BLOBNAMES = ["file_a.txt", "file_b.txt", "dir_a/file_c.txt"] + PATH_ROOT = "mypath/" + BLOB_NAME_PREFIX = "myprefix/" + DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} + MAX_WORKERS = 7 + DEADLINE = 10 + WORKER_TYPE = transfer_manager.THREAD + + from google.cloud.storage.transfer_manager import _resolve_path + + existing_file = str(_resolve_path(PATH_ROOT, "file_a.txt")) + + def isfile_side_effect(path): + return path == existing_file + + EXPECTED_BLOB_FILE_PAIRS = [ + (mock.ANY, str(_resolve_path(PATH_ROOT, "file_b.txt"))), + (mock.ANY, str(_resolve_path(PATH_ROOT, "dir_a/file_c.txt"))), + ] + + with mock.patch("os.path.isfile", side_effect=isfile_side_effect): + with mock.patch( + "google.cloud.storage.transfer_manager.download_many", + return_value=[FAKE_RESULT, FAKE_RESULT], + ) as mock_download_many: + results = transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + destination_directory=PATH_ROOT, + blob_name_prefix=BLOB_NAME_PREFIX, + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + create_directories=False, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + skip_if_exists=True, + ) + + mock_download_many.assert_called_once_with( + EXPECTED_BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + skip_if_exists=False, + ) + + assert len(results) == 3 + assert isinstance(results[0], UserWarning) + assert str(results[0]) == "The blob file_a.txt is skipped because destination file already exists" + assert results[1] == FAKE_RESULT + assert results[2] == FAKE_RESULT + + + +@pytest.mark.parametrize( + "blobname", + [ + "../../local/target", + "../mypath", + "../escape.txt", + "go/four/levels/deep/../../../../../somefile1", + "go/four/levels/deep/../some_dir/../../../../../invalid/path1", + ], +) +def test_download_many_to_path_skips_download(blobname): + bucket = mock.Mock() + BLOBNAMES = [blobname] + + PATH_ROOT = "mypath/" + BLOB_NAME_PREFIX = "myprefix/" + DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} + MAX_WORKERS = 7 + DEADLINE = 10 + WORKER_TYPE = transfer_manager.THREAD + + import warnings + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + with mock.patch( + "google.cloud.storage.transfer_manager.download_many", + return_value=[], + ) as mock_download_many: + results = transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + destination_directory=PATH_ROOT, + blob_name_prefix=BLOB_NAME_PREFIX, + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + create_directories=False, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + skip_if_exists=True, + ) + + path_traversal_warnings = [ + warning for warning in w + if str(warning.message).startswith("The blob ") + and "will **NOT** be downloaded. The resolved destination_directory" in str(warning.message) + ] + assert len(path_traversal_warnings) == 1, "---".join([str(warning.message) for warning in w]) + + mock_download_many.assert_called_once_with( + [], + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + skip_if_exists=False, + ) + assert len(results) == 1 + assert isinstance(results[0], UserWarning) + + +@pytest.mark.parametrize( + "blobname", + [ + "simple_blob", + "data/file.txt", + "data/../sibling.txt", + "/etc/passwd", + "/local/usr/a.txt", + "dir/./file.txt", + "go/four/levels/deep/../somefile2", + "go/four/levels/deep/../some_dir/valid/path1", + "go/four/levels/deep/../some_dir/../../../../valid/path2", + ], +) +def test_download_many_to_path_downloads_within_dest_dir(blobname): + bucket = mock.Mock() + BLOBNAMES = [blobname] + + PATH_ROOT = "mypath/" + BLOB_NAME_PREFIX = "myprefix/" + DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} + MAX_WORKERS = 7 + DEADLINE = 10 + WORKER_TYPE = transfer_manager.THREAD + + from google.cloud.storage.transfer_manager import _resolve_path + EXPECTED_BLOB_FILE_PAIRS = [ + (mock.ANY, str(_resolve_path(PATH_ROOT, blobname))) + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.download_many", + return_value=[FAKE_RESULT], + ) as mock_download_many: + results = transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + destination_directory=PATH_ROOT, + blob_name_prefix=BLOB_NAME_PREFIX, + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + create_directories=False, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + skip_if_exists=True, + ) + + mock_download_many.assert_called_once_with( + EXPECTED_BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + skip_if_exists=False, + ) + assert results == [FAKE_RESULT] + bucket.blob.assert_any_call(BLOB_NAME_PREFIX + blobname) + def test_download_many_to_path_creates_directories(): bucket = mock.Mock()