1- import json
21import sys
32import threading
43import time
54import traceback
65from contextlib import asynccontextmanager
7- from typing import List , Optional
6+ from typing import Any , Dict , List , Optional
87
98import pandas as pd
109import psutil
@@ -69,6 +68,13 @@ class MaterializeIncrementalRequest(BaseModel):
6968 feature_views : Optional [List [str ]] = None
7069
7170
71+ class GetOnlineFeaturesRequest (BaseModel ):
72+ entities : Dict [str , List [Any ]]
73+ feature_service : Optional [str ] = None
74+ features : Optional [List [str ]] = None
75+ full_feature_names : bool = False
76+
77+
7278def get_app (
7379 store : "feast.FeatureStore" ,
7480 registry_ttl_sec : int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL ,
@@ -108,33 +114,26 @@ async def lifespan(app: FastAPI):
108114
109115 app = FastAPI (lifespan = lifespan )
110116
111- async def get_body (request : Request ):
112- return await request .body ()
113-
114117 @app .post (
115118 "/get-online-features" ,
116119 dependencies = [Depends (inject_user_details )],
117120 )
118- async def get_online_features (body = Depends (get_body )):
119- body = json .loads (body )
120- full_feature_names = body .get ("full_feature_names" , False )
121- entity_rows = body ["entities" ]
121+ async def get_online_features (request : GetOnlineFeaturesRequest ) -> Dict [str , Any ]:
122122 # Initialize parameters for FeatureStore.get_online_features(...) call
123- if " feature_service" in body :
123+ if request . feature_service :
124124 feature_service = store .get_feature_service (
125- body [ " feature_service" ] , allow_cache = True
125+ request . feature_service , allow_cache = True
126126 )
127127 assert_permissions (
128128 resource = feature_service , actions = [AuthzedAction .READ_ONLINE ]
129129 )
130- features = feature_service
130+ features = feature_service # type: ignore
131131 else :
132- features = body ["features" ]
133132 all_feature_views , all_on_demand_feature_views = (
134133 utils ._get_feature_views_to_use (
135134 store .registry ,
136135 store .project ,
137- features ,
136+ request . features ,
138137 allow_cache = True ,
139138 hide_dummy_entity = False ,
140139 )
@@ -147,18 +146,19 @@ async def get_online_features(body=Depends(get_body)):
147146 assert_permissions (
148147 resource = od_feature_view , actions = [AuthzedAction .READ_ONLINE ]
149148 )
149+ features = request .features # type: ignore
150150
151151 read_params = dict (
152152 features = features ,
153- entity_rows = entity_rows ,
154- full_feature_names = full_feature_names ,
153+ entity_rows = request . entities ,
154+ full_feature_names = request . full_feature_names ,
155155 )
156156
157157 if store ._get_provider ().async_supported .online .read :
158- response = await store .get_online_features_async (** read_params )
158+ response = await store .get_online_features_async (** read_params ) # type: ignore
159159 else :
160160 response = await run_in_threadpool (
161- lambda : store .get_online_features (** read_params )
161+ lambda : store .get_online_features (** read_params ) # type: ignore
162162 )
163163
164164 # Convert the Protobuf object to JSON and return it
@@ -167,8 +167,7 @@ async def get_online_features(body=Depends(get_body)):
167167 )
168168
169169 @app .post ("/push" , dependencies = [Depends (inject_user_details )])
170- async def push (body = Depends (get_body )):
171- request = PushFeaturesRequest (** json .loads (body ))
170+ async def push (request : PushFeaturesRequest ) -> None :
172171 df = pd .DataFrame (request .df )
173172 actions = []
174173 if request .to == "offline" :
@@ -220,17 +219,16 @@ async def push(body=Depends(get_body)):
220219 store .push (** push_params )
221220
222221 @app .post ("/write-to-online-store" , dependencies = [Depends (inject_user_details )])
223- def write_to_online_store (body = Depends (get_body )):
224- request = WriteToFeatureStoreRequest (** json .loads (body ))
222+ def write_to_online_store (request : WriteToFeatureStoreRequest ) -> None :
225223 df = pd .DataFrame (request .df )
226224 feature_view_name = request .feature_view_name
227225 allow_registry_cache = request .allow_registry_cache
228226 try :
229- feature_view = store .get_stream_feature_view (
227+ feature_view = store .get_stream_feature_view ( # type: ignore
230228 feature_view_name , allow_registry_cache = allow_registry_cache
231229 )
232230 except FeatureViewNotFoundException :
233- feature_view = store .get_feature_view (
231+ feature_view = store .get_feature_view ( # type: ignore
234232 feature_view_name , allow_registry_cache = allow_registry_cache
235233 )
236234
@@ -250,11 +248,12 @@ async def health():
250248 )
251249
252250 @app .post ("/materialize" , dependencies = [Depends (inject_user_details )])
253- def materialize (body = Depends ( get_body )) :
254- request = MaterializeRequest ( ** json . loads ( body ))
255- for feature_view in request . feature_views :
251+ def materialize (request : MaterializeRequest ) -> None :
252+ for feature_view in request . feature_views or []:
253+ # TODO: receives a str for resource but isn't in the Union. is str actually allowed?
256254 assert_permissions (
257- resource = feature_view , actions = [AuthzedAction .WRITE_ONLINE ]
255+ resource = feature_view , # type: ignore
256+ actions = [AuthzedAction .WRITE_ONLINE ],
258257 )
259258 store .materialize (
260259 utils .make_tzaware (parser .parse (request .start_ts )),
@@ -263,11 +262,12 @@ def materialize(body=Depends(get_body)):
263262 )
264263
265264 @app .post ("/materialize-incremental" , dependencies = [Depends (inject_user_details )])
266- def materialize_incremental (body = Depends ( get_body )) :
267- request = MaterializeIncrementalRequest ( ** json . loads ( body ))
268- for feature_view in request . feature_views :
265+ def materialize_incremental (request : MaterializeIncrementalRequest ) -> None :
266+ for feature_view in request . feature_views or []:
267+ # TODO: receives a str for resource but isn't in the Union. is str actually allowed?
269268 assert_permissions (
270- resource = feature_view , actions = [AuthzedAction .WRITE_ONLINE ]
269+ resource = feature_view , # type: ignore
270+ actions = [AuthzedAction .WRITE_ONLINE ],
271271 )
272272 store .materialize_incremental (
273273 utils .make_tzaware (parser .parse (request .end_ts )), request .feature_views
0 commit comments