diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/benchmark_crc32c.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/benchmark_crc32c.py new file mode 100644 index 000000000000..ff995063bf31 --- /dev/null +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/benchmark_crc32c.py @@ -0,0 +1,107 @@ +import argparse +import os +import statistics +import sys +import time + +try: + import google_crc32c +except ImportError: + print("Error: google_crc32c package is not installed in the python environment.", file=sys.stderr) + sys.exit(1) + + +def parse_size(size_str: str) -> int: + size_str = size_str.strip().upper() + if size_str.endswith("KIB"): + return int(float(size_str[:-3]) * 1024) + elif size_str.endswith("MIB"): + return int(float(size_str[:-3]) * 1024 * 1024) + elif size_str.endswith("GIB"): + return int(float(size_str[:-3]) * 1024 * 1024 * 1024) + elif size_str.endswith("KB"): + return int(float(size_str[:-2]) * 1000) + elif size_str.endswith("MB"): + return int(float(size_str[:-2]) * 1000 * 1000) + elif size_str.endswith("GB"): + return int(float(size_str[:-2]) * 1000 * 1000 * 1000) + elif size_str.endswith("B"): + return int(size_str[:-1]) + else: + try: + return int(size_str) + except ValueError: + raise ValueError(f"Unknown size format: {size_str}") + + +def format_time(seconds: float) -> str: + if seconds < 1e-6: + return f"{seconds * 1e9:.2f} ns" + elif seconds < 1e-3: + return f"{seconds * 1e6:.2f} \u03bcs" + elif seconds < 1.0: + return f"{seconds * 1e3:.2f} ms" + else: + return f"{seconds:.2f} s" + + +def main(): + parser = argparse.ArgumentParser(description="Benchmark google_crc32c.value execution time.") + parser.add_argument( + "--sizes", + type=str, + default="1KiB,100KiB,2MiB", + help="Comma-separated list of sizes (e.g. '1KiB,100KiB,2MiB')" + ) + parser.add_argument( + "--iterations", + type=int, + default=100, + help="Number of iterations for benchmark (default: 100)" + ) + args = parser.parse_args() + + # Ensure google_crc32c uses accelerated C code + impl = getattr(google_crc32c, "implementation", None) + print(f"google_crc32c implementation: {impl}") + if impl != "c": + print(f"Error: google_crc32c is not using the accelerated C code (got '{impl}').", file=sys.stderr) + sys.exit(1) + + sizes_to_test = [] + for s in args.sizes.split(","): + try: + sizes_to_test.append((s.strip(), parse_size(s))) + except ValueError as e: + print(f"Error parsing size '{s}': {e}", file=sys.stderr) + sys.exit(1) + + print(f"Benchmarking google_crc32c.value(data) with {args.iterations} iterations:") + print("-" * 80) + print(f"{'Size (String)':<15} | {'Size (Bytes)':<12} | {'Min':<10} | {'Max':<10} | {'Mean':<10} | {'Median':<10}") + print("-" * 80) + + for size_str, size_bytes in sizes_to_test: + data = os.urandom(size_bytes) + + durations = [] + for _ in range(args.iterations): + start = time.perf_counter() + _ = google_crc32c.value(data) + end = time.perf_counter() + durations.append(end - start) + + min_time = min(durations) + max_time = max(durations) + mean_time = statistics.mean(durations) + median_time = statistics.median(durations) + + print( + f"{size_str:<15} | {size_bytes:<12} | " + f"{format_time(min_time):<10} | {format_time(max_time):<10} | " + f"{format_time(mean_time):<10} | {format_time(median_time):<10}" + ) + + +if __name__ == "__main__": + main() diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/benchmark_mrd_reads.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/benchmark_mrd_reads.py new file mode 100644 index 000000000000..b887d2e0f73b --- /dev/null +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/benchmark_mrd_reads.py @@ -0,0 +1,333 @@ +import argparse +import asyncio +import os +import random +import statistics +import sys +import time +import uuid +import logging + +try: + import google_crc32c +except ImportError: + print("Error: google_crc32c package is not installed in the python environment.", file=sys.stderr) + sys.exit(1) + +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage.asyncio.async_multi_range_downloader import AsyncMultiRangeDownloader +from google.cloud.storage.asyncio.async_appendable_object_writer import AsyncAppendableObjectWriter + + +class VoidBuffer: + """A writeable file-like object that discards written data to save memory.""" + def __init__(self): + self.size = 0 + + def write(self, data: bytes) -> int: + n = len(data) + self.size += n + return n + + def tell(self) -> int: + return self.size + + +def parse_size(size_str: str) -> int: + size_str = size_str.strip().upper() + if size_str.endswith("KIB"): + return int(float(size_str[:-3]) * 1024) + elif size_str.endswith("MIB"): + return int(float(size_str[:-3]) * 1024 * 1024) + elif size_str.endswith("GIB"): + return int(float(size_str[:-3]) * 1024 * 1024 * 1024) + elif size_str.endswith("KB"): + return int(float(size_str[:-2]) * 1000) + elif size_str.endswith("MB"): + return int(float(size_str[:-2]) * 1000 * 1000) + elif size_str.endswith("GB"): + return int(float(size_str[:-2]) * 1000 * 1000 * 1000) + elif size_str.endswith("B"): + return int(size_str[:-1]) + else: + try: + return int(size_str) + except ValueError: + raise ValueError(f"Unknown size format: {size_str}") + + +def format_time(seconds: float) -> str: + if seconds < 1e-6: + return f"{seconds * 1e9:.2f} ns" + elif seconds < 1e-3: + return f"{seconds * 1e6:.2f} \u03bcs" + elif seconds < 1.0: + return f"{seconds * 1e3:.2f} ms" + else: + return f"{seconds:.2f} s" + + +async def download_range( + grpc_client: AsyncGrpcClient, + bucket_name: str, + object_name: str, + start_byte: int, + size: int, + enable_checksum: bool, +) -> float: + mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name) + try: + await mrd.open() + output_buffer = VoidBuffer() + start = time.perf_counter() + await mrd.download_ranges( + [(start_byte, size, output_buffer)], + enable_checksum=enable_checksum, + ) + end = time.perf_counter() + return end - start + finally: + if mrd.is_stream_open: + await mrd.close() + + +async def upload_random_object( + grpc_client: AsyncGrpcClient, + bucket_name: str, + object_name: str, + total_size_bytes: int, + chunk_size_bytes: int, +): + logging.debug(f"Uploading a new random object of size {total_size_bytes} bytes to gs://{bucket_name}/{object_name}...") + logging.debug(f"Upload chunk size: {chunk_size_bytes} bytes") + + writer = AsyncAppendableObjectWriter( + client=grpc_client, + bucket_name=bucket_name, + object_name=object_name, + generation=0, + ) + + await writer.open() + + uploaded_bytes = 0 + # Generate 10MiB of random buffer to slice from to avoid CPU urandom overhead + buffer_size = min(10 * 1024 * 1024, total_size_bytes) + random_buffer = os.urandom(buffer_size) + + while uploaded_bytes < total_size_bytes: + bytes_to_write = min(chunk_size_bytes, total_size_bytes - uploaded_bytes) + slice_start = (uploaded_bytes) % (buffer_size - bytes_to_write + 1) + data = random_buffer[slice_start : slice_start + bytes_to_write] + + await writer.append(data) + uploaded_bytes += bytes_to_write + + object_resource = await writer.finalize() + logging.debug(f"Appendable object {object_name} created and finalized. Uploaded {uploaded_bytes} bytes.") + + +async def run_benchmark(): + parser = argparse.ArgumentParser(description="Benchmark GCS Object Range Downloads using MRD.") + parser.add_argument("--bucket", type=str, default="chandrasiri-benchmarks-zb", help="Bucket name") + parser.add_argument("--sizes", type=str, default="1KiB,100KiB,1MiB,16MiB,100MiB,1GiB", help="Sizes to benchmark") + parser.add_argument("--iterations", type=int, default=5, help="Number of iterations per size") + parser.add_argument("--upload-chunk-size", type=str, default="2MiB", help="Chunk size for the upload (default: 2MiB, max: 100MiB)") + parser.add_argument("--debug", action="store_true", help="Print debug/progress logs") + args = parser.parse_args() + + log_level = logging.DEBUG if args.debug else logging.WARNING + logging.basicConfig(level=log_level, format="%(asctime)s [%(levelname)s] %(message)s") + + impl = getattr(google_crc32c, "implementation", None) + logging.info(f"google_crc32c implementation: {impl}") + if impl != "c": + logging.error(f"Error: google_crc32c implementation is '{impl}', expected 'c'") + sys.exit(1) + + sizes_to_test = [] + for s in args.sizes.split(","): + try: + sizes_to_test.append((s.strip(), parse_size(s))) + except ValueError as e: + logging.error(f"Error parsing size '{s}': {e}") + sys.exit(1) + + try: + upload_chunk_size_bytes = parse_size(args.upload_chunk_size) + except ValueError as e: + logging.error(f"Error parsing upload-chunk-size '{args.upload_chunk_size}': {e}") + sys.exit(1) + + if upload_chunk_size_bytes > 100 * 1024 * 1024: + logging.error("Error: max upload-chunk-size is 100MiB") + sys.exit(1) + + grpc_client = AsyncGrpcClient() + + print(f"Benchmarking MRD Reads on gs://{args.bucket}/checksum_benchmarking__ with {args.iterations} iterations:") + print("-" * 150) + print(f"{'Size (String)':<15} | {'Checksum':<10} | {'Size (Bytes)':<12} | {'Min':<12} | {'Max':<12} | {'Mean':<12} | {'Median':<12} | {'Avg Throughput':<15} | {'% Chk-Disabled Change':<22}") + print("-" * 150) + + for size_str, size_bytes in sizes_to_test: + enabled_throughput_full = None + enabled_throughput_minus_1 = None + + for enable_chk in [True, False]: + object_name = f"checksum_benchmarking_{size_str}_{uuid.uuid4().hex[:12]}" + + try: + chunk_size = min(upload_chunk_size_bytes, size_bytes) + await upload_random_object( + grpc_client, + args.bucket, + object_name, + size_bytes, + chunk_size, + ) + except Exception as e: + logging.error(f"Upload failed for size {size_str}: {e}") + sys.exit(1) + + try: + chk_label = "Enabled" if enable_chk else "Disabled" + + # Warmup phase using the uploaded object + logging.info(f"Warming up for 10 seconds using object {object_name} (Checksum {chk_label})...") + warmup_start = time.perf_counter() + warmup_downloads = 0 + warmup_chunk_size = min(10 * 1024 * 1024, size_bytes) + while time.perf_counter() - warmup_start < 10.0: + if size_bytes > warmup_chunk_size: + start_byte = random.randint(0, size_bytes - warmup_chunk_size) + else: + start_byte = 0 + try: + await download_range( + grpc_client, + args.bucket, + object_name, + start_byte, + warmup_chunk_size, + enable_checksum=enable_chk, + ) + warmup_downloads += 1 + except Exception: + pass + logging.info(f"Warmup complete. Completed {warmup_downloads} warmup downloads.") + + durations_full = [] + durations_minus_1 = [] + + for i in range(args.iterations): + # Download entire object + try: + duration = await download_range( + grpc_client, + args.bucket, + object_name, + 0, + size_bytes, + enable_checksum=enable_chk, + ) + durations_full.append(duration) + logging.debug(f" [{size_str} (Full) - Checksum {chk_label}] Iteration {i+1}/{args.iterations}: Done in {format_time(duration)}") + except Exception as e: + logging.error(f" [{size_str} (Full) - Checksum {chk_label}] Iteration {i+1}/{args.iterations}: Failed: {e}") + + # Download entire object - 1 byte + if size_bytes > 1 and enable_chk: + try: + duration = await download_range( + grpc_client, + args.bucket, + object_name, + 0, + size_bytes - 1, + enable_checksum=enable_chk, + ) + durations_minus_1.append(duration) + logging.debug(f" [{size_str} (Full-1) - Checksum {chk_label}] Iteration {i+1}/{args.iterations}: Done in {format_time(duration)}") + except Exception as e: + logging.error(f" [{size_str} (Full-1) - Checksum {chk_label}] Iteration {i+1}/{args.iterations}: Failed: {e}") + + # Reporting for Full + if not durations_full: + print(f"{f'{size_str} (Full)':<15} | {chk_label:<10} | {size_bytes:<12} | {'FAILED':<12} | {'FAILED':<12} | {'FAILED':<12} | {'FAILED':<12} | {'N/A':<15} | {'N/A':<22}") + else: + min_time = min(durations_full) + max_time = max(durations_full) + mean_time = statistics.mean(durations_full) + median_time = statistics.median(durations_full) + avg_throughput = (size_bytes / (1024 * 1024)) / mean_time + + percent_diff_str = "" + if enable_chk: + enabled_throughput_full = avg_throughput + percent_diff_str = "N/A" + else: + if enabled_throughput_full is not None and enabled_throughput_full > 0: + percent_increase = ((avg_throughput - enabled_throughput_full) / enabled_throughput_full) * 100 + percent_diff_str = f"{percent_increase:+.2f}%" + else: + percent_diff_str = "N/A" + + print( + f"{f'{size_str} (Full)':<15} | {chk_label:<10} | {size_bytes:<12} | " + f"{format_time(min_time):<12} | {format_time(max_time):<12} | " + f"{format_time(mean_time):<12} | {format_time(median_time):<12} | " + f"{avg_throughput:.2f} MiB/s | " + f"{percent_diff_str:<22}" + ) + + # Reporting for Full-1 + if size_bytes > 1 and enable_chk: + if not durations_minus_1: + print(f"{f'{size_str} (Full-1)':<15} | {chk_label:<10} | {size_bytes - 1:<12} | {'FAILED':<12} | {'FAILED':<12} | {'FAILED':<12} | {'FAILED':<12} | {'N/A':<15} | {'N/A':<22}") + else: + min_time = min(durations_minus_1) + max_time = max(durations_minus_1) + mean_time = statistics.mean(durations_minus_1) + median_time = statistics.median(durations_minus_1) + avg_throughput = ((size_bytes - 1) / (1024 * 1024)) / mean_time + + percent_diff_str = "" + if enable_chk: + enabled_throughput_minus_1 = avg_throughput + if enabled_throughput_full is not None and enabled_throughput_full > 0: + percent_increase = ((enabled_throughput_minus_1 - enabled_throughput_full) / enabled_throughput_full) * 100 + percent_diff_str = f"{percent_increase:+.2f}%" + else: + percent_diff_str = "N/A" + else: + if enabled_throughput_minus_1 is not None and enabled_throughput_minus_1 > 0: + percent_increase = ((avg_throughput - enabled_throughput_minus_1) / enabled_throughput_minus_1) * 100 + percent_diff_str = f"{percent_increase:+.2f}%" + else: + percent_diff_str = "N/A" + + print( + f"{f'{size_str} (Full-1)':<15} | {chk_label:<10} | {size_bytes - 1:<12} | " + f"{format_time(min_time):<12} | {format_time(max_time):<12} | " + f"{format_time(mean_time):<12} | {format_time(median_time):<12} | " + f"{avg_throughput:.2f} MiB/s | " + f"{percent_diff_str:<22}" + ) + + print("-" * 150) + finally: + try: + logging.info(f"Cleaning up object gs://{args.bucket}/{object_name}...") + await grpc_client.delete_object(args.bucket, object_name) + logging.info(f"Cleanup complete.") + except Exception as e: + logging.warning(f"Warning: failed to delete test object {object_name}: {e}") + + +def main(): + asyncio.run(run_benchmark()) + + +if __name__ == "__main__": + main() diff --git a/packages/google-cloud-storage/tests/perf/microbenchmarks/test_checksum_overhead.py b/packages/google-cloud-storage/tests/perf/microbenchmarks/test_checksum_overhead.py new file mode 100644 index 000000000000..6af26ea04082 --- /dev/null +++ b/packages/google-cloud-storage/tests/perf/microbenchmarks/test_checksum_overhead.py @@ -0,0 +1,206 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import os +import random +import time +import uuid +import pytest + +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage.asyncio.async_multi_range_downloader import AsyncMultiRangeDownloader +from google.cloud.storage.asyncio.async_appendable_object_writer import AsyncAppendableObjectWriter + +DEFAULT_BUCKET = os.environ.get("DEFAULT_RAPID_ZONAL_BUCKET", "chandrasiri-gcsfs-zb") + + +class VoidBuffer: + """A writeable file-like object that discards written data to save memory.""" + def __init__(self): + self.size = 0 + + def write(self, data: bytes) -> int: + n = len(data) + self.size += n + return n + + def tell(self) -> int: + return self.size + + +async def download_range( + grpc_client: AsyncGrpcClient, + bucket_name: str, + object_name: str, + start_byte: int, + size: int, + enable_checksum: bool, +): + mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name) + try: + await mrd.open() + output_buffer = VoidBuffer() + await mrd.download_ranges( + [(start_byte, size, output_buffer)], + enable_checksum=enable_checksum, + ) + finally: + if mrd.is_stream_open: + await mrd.close() + + +async def upload_random_object( + grpc_client: AsyncGrpcClient, + bucket_name: str, + object_name: str, + total_size_bytes: int, + chunk_size_bytes: int = 2 * 1024 * 1024, +): + writer = AsyncAppendableObjectWriter( + client=grpc_client, + bucket_name=bucket_name, + object_name=object_name, + generation=0, + ) + await writer.open() + uploaded_bytes = 0 + buffer_size = min(10 * 1024 * 1024, total_size_bytes) + random_buffer = os.urandom(buffer_size) + while uploaded_bytes < total_size_bytes: + bytes_to_write = min(chunk_size_bytes, total_size_bytes - uploaded_bytes) + slice_start = (uploaded_bytes) % (buffer_size - bytes_to_write + 1) + data = random_buffer[slice_start : slice_start + bytes_to_write] + await writer.append(data) + uploaded_bytes += bytes_to_write + await writer.finalize() + + +@pytest.mark.parametrize( + "object_size,download_size", + [ + (1024, 1024), # 1KiB Full + (1024, 1024 - 1), # 1KiB Full-1 + (100 * 1024, 100 * 1024), # 100KiB Full + (100 * 1024, 100 * 1024 - 1), # 100KiB Full-1 + (1024 * 1024, 1024 * 1024), # 1MiB Full + (1024 * 1024, 1024 * 1024 - 1),# 1MiB Full-1 + (16 * 1024 * 1024, 16 * 1024 * 1024), # 16MiB Full + (16 * 1024 * 1024, 16 * 1024 * 1024 - 1), # 16MiB Full-1 + (100 * 1024 * 1024, 100 * 1024 * 1024), # 100MiB Full + (100 * 1024 * 1024, 100 * 1024 * 1024 - 1),# 100MiB Full-1 + (1024 * 1024 * 1024, 1024 * 1024 * 1024), # 1GiB Full + (1024 * 1024 * 1024, 1024 * 1024 * 1024 - 1),# 1GiB Full-1 + ], + ids=[ + "1KiB-Full", "1KiB-Full-1", + "100KiB-Full", "100KiB-Full-1", + "1MiB-Full", "1MiB-Full-1", + "16MiB-Full", "16MiB-Full-1", + "100MiB-Full", "100MiB-Full-1", + "1GiB-Full", "1GiB-Full-1" + ], +) +@pytest.mark.parametrize("enable_checksum", [True, False], ids=["checksum_enabled", "checksum_disabled"]) +def test_checksum_overhead(benchmark, object_size, download_size, enable_checksum): + if not enable_checksum and download_size == object_size - 1: + pytest.skip("Skip Full-1 range download when checksum is disabled") + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + grpc_client = AsyncGrpcClient() + + download_bytes_list = [] + download_elapsed_times = [] + + object_name = f"checksum_benchmarking_{object_size}_{uuid.uuid4().hex[:12]}" + + # 1. upload an object + chunk_size = min(2 * 1024 * 1024, object_size) + loop.run_until_complete( + upload_random_object( + grpc_client, + DEFAULT_BUCKET, + object_name, + object_size, + chunk_size, + ) + ) + + try: + # 2. warmup + warmup_start = time.perf_counter() + warmup_chunk_size = min(10 * 1024 * 1024, object_size) + while time.perf_counter() - warmup_start < 10.0: + if object_size > warmup_chunk_size: + start_byte = random.randint(0, object_size - warmup_chunk_size) + else: + start_byte = 0 + try: + loop.run_until_complete( + download_range( + grpc_client, + DEFAULT_BUCKET, + object_name, + start_byte, + warmup_chunk_size, + enable_checksum, + ) + ) + except Exception: + pass + + # 3. download range (0, download_size) for 5 rounds + def run_download(): + start_time = time.perf_counter() + loop.run_until_complete( + download_range( + grpc_client, + DEFAULT_BUCKET, + object_name, + 0, + download_size, + enable_checksum, + ) + ) + elapsed = time.perf_counter() - start_time + download_bytes_list.append(download_size) + download_elapsed_times.append(elapsed) + + benchmark.pedantic( + target=run_download, + iterations=1, + rounds=5, + ) + + finally: + if download_elapsed_times: + total_bytes = sum(download_bytes_list) + total_time = sum(download_elapsed_times) + throughput_mib_s = (total_bytes / total_time) / (1024 * 1024) + benchmark.extra_info["avg_throughput_mib_s"] = f"{throughput_mib_s:.2f}" + print(f"\nAvg Throughput: {throughput_mib_s:.2f} MiB/s") + + # 4. delete the object + try: + loop.run_until_complete(grpc_client.delete_object(DEFAULT_BUCKET, object_name)) + except Exception: + pass + + tasks = asyncio.all_tasks(loop=loop) + for task in tasks: + task.cancel() + if tasks: + loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) + loop.close()