Skip to content

Commit 37337db

Browse files
committed
fix: Skip refresh if already in progress or if lock is already held
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
1 parent 4231274 commit 37337db

File tree

1 file changed

+25
-3
lines changed

1 file changed

+25
-3
lines changed

sdk/python/feast/infra/registry/caching_registry.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str):
3737
)
3838
self.cached_registry_proto = self.proto()
3939
self.cached_registry_proto_created = _utc_now()
40+
self._refreshing = False # Prevents reentrant calls to refresh
4041
if cache_mode == "thread":
4142
self._start_thread_async_refresh(cache_ttl_seconds)
4243
atexit.register(self._exit_handler)
@@ -425,12 +426,26 @@ def list_projects(
425426
return self._list_projects(tags)
426427

427428
def refresh(self, project: Optional[str] = None):
428-
self.cached_registry_proto = self.proto()
429-
self.cached_registry_proto_created = _utc_now()
429+
if self._refreshing: # Skipping refresh if already in progress
430+
return
431+
self._refreshing = True
432+
try:
433+
self.cached_registry_proto = self.proto()
434+
self.cached_registry_proto_created = _utc_now()
435+
logger.info("Registry cache refreshed successfully")
436+
except Exception as e:
437+
logger.error(f"Error while refreshing registry: {e}", exc_info=True)
438+
finally:
439+
self._refreshing = False # Always reset flag
430440

431441
def _refresh_cached_registry_if_necessary(self):
432442
if self.cache_mode == "sync":
433-
with self._refresh_lock:
443+
# Try acquiring the lock without blocking
444+
if not self._refresh_lock.acquire(
445+
blocking=False
446+
): # Skipping refresh if lock is already held by another thread
447+
return
448+
try:
434449
if self.cached_registry_proto == RegistryProto():
435450
# Avoids the need to refresh the registry when cache is not populated yet
436451
# Specially during the __init__ phase
@@ -454,6 +469,13 @@ def _refresh_cached_registry_if_necessary(self):
454469
if expired:
455470
logger.info("Registry cache expired, so refreshing")
456471
self.refresh()
472+
except Exception as e:
473+
logger.error(
474+
f"Error in _refresh_cached_registry_if_necessary: {e}",
475+
exc_info=True,
476+
)
477+
finally:
478+
self._refresh_lock.release() # Always release the lock safely
457479

458480
def _start_thread_async_refresh(self, cache_ttl_seconds):
459481
self.refresh()

0 commit comments

Comments
 (0)