11import json
2+ import threading
23import traceback
34import warnings
45from typing import List , Optional
@@ -44,14 +45,37 @@ class MaterializeIncrementalRequest(BaseModel):
4445 feature_views : Optional [List [str ]] = None
4546
4647
47- def get_app (store : "feast.FeatureStore" ):
48+ def get_app (store : "feast.FeatureStore" , registry_ttl_sec : int = 5 ):
4849 proto_json .patch ()
4950
5051 app = FastAPI ()
52+ # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down
53+ registry_proto = None
54+ shutting_down = False
55+ active_timer : Optional [threading .Timer ] = None
5156
5257 async def get_body (request : Request ):
5358 return await request .body ()
5459
60+ def async_refresh ():
61+ store .refresh_registry ()
62+ nonlocal registry_proto
63+ registry_proto = store .registry .proto ()
64+ if shutting_down :
65+ return
66+ nonlocal active_timer
67+ active_timer = threading .Timer (registry_ttl_sec , async_refresh )
68+ active_timer .start ()
69+
70+ @app .on_event ("shutdown" )
71+ def shutdown_event ():
72+ nonlocal shutting_down
73+ shutting_down = True
74+ if active_timer :
75+ active_timer .cancel ()
76+
77+ async_refresh ()
78+
5579 @app .post ("/get-online-features" )
5680 def get_online_features (body = Depends (get_body )):
5781 try :
@@ -180,7 +204,10 @@ def materialize_incremental(body=Depends(get_body)):
180204
181205class FeastServeApplication (gunicorn .app .base .BaseApplication ):
182206 def __init__ (self , store : "feast.FeatureStore" , ** options ):
183- self ._app = get_app (store = store )
207+ self ._app = get_app (
208+ store = store ,
209+ registry_ttl_sec = options .get ("registry_ttl_sec" , 5 ),
210+ )
184211 self ._options = options
185212 super ().__init__ ()
186213
@@ -202,11 +229,13 @@ def start_server(
202229 no_access_log : bool ,
203230 workers : int ,
204231 keep_alive_timeout : int ,
232+ registry_ttl_sec : int = 5 ,
205233):
206234 FeastServeApplication (
207235 store = store ,
208236 bind = f"{ host } :{ port } " ,
209237 accesslog = None if no_access_log else "-" ,
210238 workers = workers ,
211239 keepalive = keep_alive_timeout ,
240+ registry_ttl_sec = registry_ttl_sec ,
212241 ).run ()
0 commit comments