Skip to content

Commit 640fd68

Browse files
authored
Asynchronously refresh registry in transformation service (#2060)
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
1 parent 8d9b831 commit 640fd68

File tree

2 files changed

+19
-4
lines changed

2 files changed

+19
-4
lines changed

sdk/python/feast/feature_store.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,10 @@ def refresh_registry(self):
142142
downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
143143
"""
144144
registry_config = self.config.get_registry_config()
145-
self._registry = Registry(registry_config, repo_path=self.repo_path)
146-
self._registry.refresh()
145+
registry = Registry(registry_config, repo_path=self.repo_path)
146+
registry.refresh()
147+
148+
self._registry = registry
147149

148150
@log_exceptions_and_usage
149151
def list_entities(self, allow_cache: bool = False) -> List[Entity]:

sdk/python/feast/infra/transformation_servers/app.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import base64
22
import os
33
import tempfile
4+
import threading
45
from pathlib import Path
56

67
import yaml
@@ -28,9 +29,10 @@
2829
# Write registry contents for local registries
2930
config_string = config_bytes.decode("utf-8")
3031
raw_config = yaml.safe_load(config_string)
31-
registry_path = raw_config["registry"]
32+
registry = raw_config["registry"]
33+
registry_path = registry["path"] if isinstance(registry, dict) else registry
3234
registry_store_class = get_registry_store_class_from_scheme(registry_path)
33-
if registry_store_class == LocalRegistryStore:
35+
if registry_store_class == LocalRegistryStore and not os.path.exists(registry_path):
3436
registry_base64 = os.environ[REGISTRY_ENV_NAME]
3537
registry_bytes = base64.b64decode(registry_base64)
3638
registry_dir = os.path.dirname(registry_path)
@@ -42,6 +44,17 @@
4244
# Initialize the feature store
4345
store = FeatureStore(repo_path=str(repo_path.resolve()))
4446

47+
if isinstance(registry, dict) and registry.get("cache_ttl_seconds", 0) > 0:
48+
# disable synchronous refresh
49+
store.config.registry.cache_ttl_seconds = 0
50+
51+
# enable asynchronous refresh
52+
def async_refresh():
53+
store.refresh_registry()
54+
threading.Timer(registry["cache_ttl_seconds"], async_refresh).start()
55+
56+
async_refresh()
57+
4558
# Start the feature transformation server
4659
port = (
4760
os.environ.get(FEATURE_TRANSFORMATION_SERVER_PORT_ENV_NAME)

0 commit comments

Comments
 (0)