File tree Expand file tree Collapse file tree 2 files changed +19
-4
lines changed
infra/transformation_servers Expand file tree Collapse file tree 2 files changed +19
-4
lines changed Original file line number Diff line number Diff line change @@ -142,8 +142,10 @@ def refresh_registry(self):
142142 downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
143143 """
144144 registry_config = self .config .get_registry_config ()
145- self ._registry = Registry (registry_config , repo_path = self .repo_path )
146- self ._registry .refresh ()
145+ registry = Registry (registry_config , repo_path = self .repo_path )
146+ registry .refresh ()
147+
148+ self ._registry = registry
147149
148150 @log_exceptions_and_usage
149151 def list_entities (self , allow_cache : bool = False ) -> List [Entity ]:
Original file line number Diff line number Diff line change 11import base64
22import os
33import tempfile
4+ import threading
45from pathlib import Path
56
67import yaml
2829# Write registry contents for local registries
2930config_string = config_bytes .decode ("utf-8" )
3031raw_config = yaml .safe_load (config_string )
31- registry_path = raw_config ["registry" ]
32+ registry = raw_config ["registry" ]
33+ registry_path = registry ["path" ] if isinstance (registry , dict ) else registry
3234registry_store_class = get_registry_store_class_from_scheme (registry_path )
33- if registry_store_class == LocalRegistryStore :
35+ if registry_store_class == LocalRegistryStore and not os . path . exists ( registry_path ) :
3436 registry_base64 = os .environ [REGISTRY_ENV_NAME ]
3537 registry_bytes = base64 .b64decode (registry_base64 )
3638 registry_dir = os .path .dirname (registry_path )
4244# Initialize the feature store
4345store = FeatureStore (repo_path = str (repo_path .resolve ()))
4446
47+ if isinstance (registry , dict ) and registry .get ("cache_ttl_seconds" , 0 ) > 0 :
48+ # disable synchronous refresh
49+ store .config .registry .cache_ttl_seconds = 0
50+
51+ # enable asynchronous refresh
52+ def async_refresh ():
53+ store .refresh_registry ()
54+ threading .Timer (registry ["cache_ttl_seconds" ], async_refresh ).start ()
55+
56+ async_refresh ()
57+
4558# Start the feature transformation server
4659port = (
4760 os .environ .get (FEATURE_TRANSFORMATION_SERVER_PORT_ENV_NAME )
You can’t perform that action at this time.
0 commit comments