diff --git a/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py b/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py new file mode 100644 index 000000000000..02634f39fc98 --- /dev/null +++ b/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py @@ -0,0 +1,149 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""In-memory LRU cache for bucket metadata supporting App-centric Observability (ACO).""" + +import logging +import threading + +from google.api_core import exceptions as api_exceptions +from google.cloud.exceptions import NotFound +from google.cloud.storage._lru_cache import LRUCache + +logger = logging.getLogger(__name__) + + +class BucketMetadataCache: + """Thread-safe LRU cache for storing GCS bucket metadata (project number and location). + + Supports Singleflight asynchronous background fetching to prevent stampedes on cache misses. + """ + + def __init__(self, client, max_size=10000): + self._client = client + self._cache = LRUCache(max_size) + self._lock = threading.Lock() + self._inflight_fetches = set() + self._inflight_checks = set() + + def get(self, bucket_name): + """Thread-safely retrieve cached metadata without queueing fetch.""" + with self._lock: + return self._cache.get(bucket_name) + + def get_or_queue_fetch(self, bucket_name): + """Retrieve bucket metadata or queue a background fetch on cache miss. + + Returns None immediately on cache miss so caller does not block. + """ + with self._lock: + if bucket_name in self._cache: + return self._cache.get(bucket_name) + elif bucket_name in self._inflight_fetches: + # this would be the case of thundering herd, where 'n' threads + # all of them faced "cache miss" and 1 is in progress to fetch metadata. + # hence we don't want rest `n - 1` threads to make the same req + return None + else: + # fire a background thread and get bucket metadata. + self._inflight_fetches.add(bucket_name) + threading.Thread( + target=self._fetch_background, args=(bucket_name,), daemon=True + ).start() + return None + + def check_and_evict(self, bucket_name): + """Asynchronously verify if a bucket exists on 404 and evict if deleted.""" + with self._lock: + if bucket_name not in self._cache: + return + if bucket_name in self._inflight_checks: + return + self._inflight_checks.add(bucket_name) + threading.Thread( + target=self._verify_existence_background, + args=(bucket_name,), + daemon=True, + ).start() + + def _verify_existence_background(self, bucket_name): + try: + bucket = self._client.bucket(bucket_name) + if not bucket.exists(): + self.evict(bucket_name) + except Exception as e: + logger.debug( + f"Background verification for bucket existence failed for {bucket_name}: {e}" + ) + finally: + with self._lock: + self._inflight_checks.discard(bucket_name) + + def _fetch_background(self, bucket_name): + """Asynchronously fetch bucket metadata and update the cache.""" + try: + bucket = self._client.get_bucket(bucket_name, timeout=10.0) + self.update_from_bucket(bucket) + except (NotFound, api_exceptions.NotFound): + self.evict(bucket_name) + except api_exceptions.Forbidden: + # On 403 (Forbidden), cache fallback values permanently to avoid retry storms + self.update_cache( + bucket_name, f"projects/_/buckets/{bucket_name}", "global" + ) + except Exception as e: + logger.debug( + f"Background fetch for bucket metadata failed for {bucket_name}: {e}" + ) + finally: + with self._lock: + self._inflight_fetches.discard(bucket_name) + + def update_from_bucket(self, bucket): + """Update cache from a Bucket instance.""" + if not bucket or not bucket.name: + return + + project_number = getattr(bucket, "project_number", None) + location = getattr(bucket, "location", None) or "global" + location = location.lower() + location_type = getattr(bucket, "location_type", None) or "region" + location_type = location_type.lower() + + if location_type in ("multi-region", "dual-region"): + location = "global" + + if project_number: + destination_id = f"projects/{project_number}/buckets/{bucket.name}" + else: + destination_id = f"projects/_/buckets/{bucket.name}" + + self.update_cache(bucket.name, destination_id, location) + + def update_cache(self, bucket_name, destination_id, location): + """Thread-safely update or insert a cache entry with bounded size.""" + with self._lock: + self._cache.put(bucket_name, (destination_id, location)) + + def evict(self, bucket_name): + """Remove a bucket from the cache (e.g., on 404).""" + with self._lock: + self._cache.delete(bucket_name) + + def clear(self): + """Clear all cached metadata.""" + with self._lock: + self._cache.clear() + self._inflight_fetches.clear() + self._inflight_checks.clear() diff --git a/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py b/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py new file mode 100644 index 000000000000..23a37cf846d3 --- /dev/null +++ b/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py @@ -0,0 +1,89 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A Least Recently Used (LRU) cache implementation.""" + +from collections import OrderedDict +from typing import Generic, Optional, TypeVar + +K = TypeVar("K") +V = TypeVar("V") + + +class LRUCache(Generic[K, V]): + """A Least Recently Used (LRU) cache implementation using OrderedDict. + + :type capacity: int + :param capacity: The maximum number of items the cache can hold. + """ + + def __init__(self, capacity: int) -> None: + if capacity <= 0: + raise ValueError("Capacity must be greater than 0") + self._capacity = capacity + self._cache: OrderedDict[K, V] = OrderedDict() + + @property + def capacity(self) -> int: + """Return the capacity of the cache.""" + return self._capacity + + def get(self, key: K, default: Optional[V] = None) -> Optional[V]: + """Retrieve an item from the cache. + + If the key exists, it is moved to the end (marked as most recently used). + + :type key: Any + :param key: The key to look up in the cache. + + :type default: Any + :param default: Default value to return if key is not found. + """ + if key not in self._cache: + return default + self._cache.move_to_end(key) + return self._cache[key] + + def put(self, key: K, value: V) -> None: + """Add or update an item in the cache. + + If the key already exists, it is updated and moved to the end. + If adding the item exceeds capacity, the least recently used item (at the beginning) + is evicted. + + :type key: Any + :param key: The key to store. + + :type value: Any + :param value: The value to store. + """ + if key in self._cache: + self._cache.move_to_end(key) + self._cache[key] = value + if len(self._cache) > self._capacity: + self._cache.popitem(last=False) + + def __len__(self) -> int: + return len(self._cache) + + def __contains__(self, key: K) -> bool: + return key in self._cache + + def clear(self) -> None: + """Clear all items from the cache.""" + self._cache.clear() + + def delete(self, key: K) -> None: + """Remove an item from the cache if it exists.""" + self._cache.pop(key, None) diff --git a/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py b/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py new file mode 100644 index 000000000000..39f6975ded4c --- /dev/null +++ b/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py @@ -0,0 +1,203 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest import mock + +from google.api_core import exceptions as api_exceptions +from google.cloud.exceptions import NotFound +from google.cloud.storage._bucket_metadata_cache import BucketMetadataCache + + +class TestBucketMetadataCache(unittest.TestCase): + @mock.patch("threading.Thread") + def test_lru_eviction(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client, max_size=3) + + cache.update_cache("b1", "dest1", "loc1") + cache.update_cache("b2", "dest2", "loc2") + cache.update_cache("b3", "dest3", "loc3") + cache.update_cache("b4", "dest4", "loc4") # Evicts b1 (oldest) + + self.assertIsNone(cache.get_or_queue_fetch("b1")) + self.assertEqual(cache.get_or_queue_fetch("b2"), ("dest2", "loc2")) + self.assertEqual(cache.get_or_queue_fetch("b3"), ("dest3", "loc3")) + self.assertEqual(cache.get_or_queue_fetch("b4"), ("dest4", "loc4")) + + def test_update_from_bucket(self): + client = mock.Mock() + cache = BucketMetadataCache(client) + + # Multi-region -> global + b1 = mock.Mock() + b1.name = "b1" + b1.location = "US" + b1.location_type = "multi-region" + b1.project_number = 123 + cache.update_from_bucket(b1) + self.assertEqual( + cache.get_or_queue_fetch("b1"), ("projects/123/buckets/b1", "global") + ) + + # Dual-region -> global + b2 = mock.Mock() + b2.name = "b2" + b2.location = "NAM4" + b2.location_type = "dual-region" + b2.project_number = 456 + cache.update_from_bucket(b2) + self.assertEqual( + cache.get_or_queue_fetch("b2"), ("projects/456/buckets/b2", "global") + ) + + # Region -> us-east1 + b3 = mock.Mock() + b3.name = "b3" + b3.location = "US-EAST1" + b3.location_type = "region" + b3.project_number = 789 + cache.update_from_bucket(b3) + self.assertEqual( + cache.get_or_queue_fetch("b3"), ("projects/789/buckets/b3", "us-east1") + ) + + # Missing project number -> _ + b4 = mock.Mock() + b4.name = "b4" + b4.location = "eu-west1" + b4.location_type = "region" + b4.project_number = None + cache.update_from_bucket(b4) + self.assertEqual( + cache.get_or_queue_fetch("b4"), ("projects/_/buckets/b4", "eu-west1") + ) + + @mock.patch("threading.Thread") + def test_get_or_queue_fetch(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client) + + # Cache miss -> returns None immediately and spawns thread + result = cache.get_or_queue_fetch("my-bucket") + self.assertIsNone(result) + mock_thread.assert_called_once() + + # Second immediate lookup -> returns None, does not spawn another thread (singleflight) + mock_thread.reset_mock() + result2 = cache.get_or_queue_fetch("my-bucket") + self.assertIsNone(result2) + mock_thread.assert_not_called() + + def test_fetch_background_success(self): + client = mock.Mock() + b1 = mock.Mock() + b1.name = "b1" + b1.location = "US-WEST1" + b1.location_type = "region" + b1.project_number = 999 + client.get_bucket.return_value = b1 + + cache = BucketMetadataCache(client) + cache._inflight_fetches.add("b1") + + cache._fetch_background("b1") + + self.assertEqual( + cache.get_or_queue_fetch("b1"), ("projects/999/buckets/b1", "us-west1") + ) + self.assertNotIn("b1", cache._inflight_fetches) + + def test_fetch_background_not_found(self): + client = mock.Mock() + client.get_bucket.side_effect = NotFound("Bucket not found") + cache = BucketMetadataCache(client) + cache.update_cache("b1", "projects/_/buckets/b1", "global") + cache._inflight_fetches.add("b1") + + cache._fetch_background("b1") + + self.assertNotIn("b1", cache._cache) + self.assertNotIn("b1", cache._inflight_fetches) + + def test_fetch_background_forbidden(self): + client = mock.Mock() + client.get_bucket.side_effect = api_exceptions.Forbidden("403") + cache = BucketMetadataCache(client) + cache._inflight_fetches.add("b1") + + cache._fetch_background("b1") + + self.assertEqual( + cache.get_or_queue_fetch("b1"), ("projects/_/buckets/b1", "global") + ) + self.assertNotIn("b1", cache._inflight_fetches) + + @mock.patch("threading.Thread") + def test_clear_and_evict(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client) + + cache.update_cache("b1", "dest1", "loc1") + cache.evict("b1") + self.assertNotIn("b1", cache._cache) + + cache.update_cache("b2", "dest2", "loc2") + cache.clear() + self.assertNotIn("b2", cache._cache) + + @mock.patch("threading.Thread") + def test_check_and_evict_queue(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client) + cache.update_cache("b1", "dest1", "loc1") + + cache.check_and_evict("b1") + mock_thread.assert_called_once() + self.assertIn("b1", cache._inflight_checks) + + # Second immediate check -> singleflight + mock_thread.reset_mock() + cache.check_and_evict("b1") + mock_thread.assert_not_called() + + def test_verify_existence_background_exists(self): + client = mock.Mock() + b1 = mock.Mock() + b1.exists.return_value = True + client.bucket.return_value = b1 + + cache = BucketMetadataCache(client) + cache.update_cache("b1", "dest1", "loc1") + cache._inflight_checks.add("b1") + + cache._verify_existence_background("b1") + + self.assertIn("b1", cache._cache) + self.assertNotIn("b1", cache._inflight_checks) + + def test_verify_existence_background_deleted(self): + client = mock.Mock() + b1 = mock.Mock() + b1.exists.return_value = False + client.bucket.return_value = b1 + + cache = BucketMetadataCache(client) + cache.update_cache("b1", "dest1", "loc1") + cache._inflight_checks.add("b1") + + cache._verify_existence_background("b1") + + self.assertNotIn("b1", cache._cache) + self.assertNotIn("b1", cache._inflight_checks) diff --git a/packages/google-cloud-storage/tests/unit/test__lru_cache.py b/packages/google-cloud-storage/tests/unit/test__lru_cache.py new file mode 100644 index 000000000000..62b612be5513 --- /dev/null +++ b/packages/google-cloud-storage/tests/unit/test__lru_cache.py @@ -0,0 +1,101 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from google.cloud.storage._lru_cache import LRUCache + + +def test_lru_cache_capacity(): + cache = LRUCache(capacity=3) + assert cache.capacity == 3 + + with pytest.raises(ValueError): + LRUCache(capacity=0) + + with pytest.raises(ValueError): + LRUCache(capacity=-1) + + +def test_lru_cache_put_and_get(): + cache = LRUCache(capacity=2) + assert cache.get("a") is None + assert cache.get("a", default="default") == "default" + + cache.put("a", 1) + assert cache.get("a") == 1 + assert len(cache) == 1 + assert "a" in cache + + cache.put("b", 2) + assert cache.get("b") == 2 + assert len(cache) == 2 + assert "b" in cache + + +def test_lru_cache_eviction(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + # Access "a" so "b" becomes least recently used + assert cache.get("a") == 1 + + # Put "c" should evict "b" + cache.put("c", 3) + + assert "b" not in cache + assert cache.get("b") is None + assert cache.get("a") == 1 + assert cache.get("c") == 3 + assert len(cache) == 2 + + +def test_lru_cache_update(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + # Update "a", so it becomes most recently used + cache.put("a", 10) + + # Put "c" should evict "b" + cache.put("c", 3) + + assert "b" not in cache + assert cache.get("a") == 10 + assert cache.get("c") == 3 + + +def test_lru_cache_clear(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + cache.clear() + assert len(cache) == 0 + assert "a" not in cache + assert "b" not in cache + + +def test_lru_cache_delete(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + cache.delete("a") + assert len(cache) == 1 + assert "a" not in cache + assert cache.get("a") is None + assert cache.get("b") == 2