Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix: Fixed registry cache init
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
  • Loading branch information
ntkathole committed Sep 15, 2025
commit 033dfe2e016d6ba71b1cddf351a6f2fd189c1b47
15 changes: 7 additions & 8 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,16 +435,15 @@ def _refresh_cached_registry_if_necessary(self):
if self.cache_mode == "sync":

def is_cache_expired():
if self.cached_registry_proto == RegistryProto():
if self.cached_registry_proto_ttl.total_seconds() == 0:
return False
else:
return True

# Cache is expired if it's None or creation time is None
if (
self.cached_registry_proto is None
or not hasattr(self, "cached_registry_proto_created")
or self.cached_registry_proto == RegistryProto()
):
return True

# Cache is expired if creation time is None
if (
not hasattr(self, "cached_registry_proto_created")
or self.cached_registry_proto_created is None
):
return True
Expand Down
17 changes: 13 additions & 4 deletions sdk/python/feast/infra/registry/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,6 @@ def _apply_object(
if not self.purge_feast_metadata:
self._set_last_updated_metadata(update_datetime, project)

self.refresh()

def apply_permission(
self, permission: Permission, project: str, commit: bool = True
):
Expand Down Expand Up @@ -494,7 +492,6 @@ def _delete_object(
raise not_found_exception(name, project)
self._set_last_updated_metadata(_utc_now(), project)

self.refresh()
return cursor.rowcount

def delete_permission(self, name: str, project: str, commit: bool = True):
Expand Down Expand Up @@ -1128,6 +1125,18 @@ def process_project(project: Project):
project_name = project.name
last_updated_timestamp = project.last_updated_timestamp

try:
cached_project = self.get_project(project_name, True)
except ProjectObjectNotFoundException:
cached_project = None

allow_cache = False

if cached_project is not None:
allow_cache = (
last_updated_timestamp <= cached_project.last_updated_timestamp
)

r.projects.extend([project.to_proto()])
last_updated_timestamps.append(last_updated_timestamp)

Expand All @@ -1142,7 +1151,7 @@ def process_project(project: Project):
(self.list_validation_references, r.validation_references),
(self.list_permissions, r.permissions),
]:
objs: List[Any] = lister(project_name, allow_cache=False) # type: ignore
objs: List[Any] = lister(project_name, allow_cache) # type: ignore
if objs:
obj_protos = [obj.to_proto() for obj in objs]
for obj_proto in obj_protos:
Expand Down
16 changes: 0 additions & 16 deletions sdk/python/tests/unit/infra/registry/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,22 +205,6 @@ def test_empty_cache_refresh_with_ttl(registry):
mock_refresh.assert_called_once()


def test_empty_cache_no_refresh_with_infinite_ttl(registry):
"""Test that empty cache is not refreshed when TTL = 0 (infinite)"""
# Set up empty cache with TTL = 0 (infinite)
registry.cached_registry_proto = RegistryProto()
registry.cached_registry_proto_created = datetime.now(timezone.utc)
registry.cached_registry_proto_ttl = timedelta(seconds=0) # TTL = 0 (infinite)

# Mock refresh to check if it's called
with patch.object(
CachingRegistry, "refresh", wraps=registry.refresh
) as mock_refresh:
registry._refresh_cached_registry_if_necessary()
# Should not refresh because TTL = 0 (infinite)
mock_refresh.assert_not_called()


def test_concurrent_cache_refresh_race_condition(registry):
"""Test that concurrent requests don't skip cache refresh when cache is expired"""
import threading
Expand Down
Loading