1414
1515
1616import grpc
17-
1817from feast .core .CoreService_pb2_grpc import CoreServiceStub
1918from feast .core .CoreService_pb2 import (
19+ GetFeastCoreVersionRequest ,
2020 GetFeatureSetsResponse ,
2121 ApplyFeatureSetRequest ,
2222 GetFeatureSetsRequest ,
2323 ApplyFeatureSetResponse ,
2424)
25- from feast .core .FeatureSet_pb2 import FeatureSetSpec , FeatureSpec , EntitySpec
26- from feast .core .Source_pb2 import Source
25+ from feast .serving .ServingService_pb2 import (
26+ GetFeaturesRequest ,
27+ GetFeastServingVersionRequest ,
28+ GetOnlineFeaturesResponse ,
29+ )
2730from feast .feature_set import FeatureSet , Entity
2831from feast .serving .ServingService_pb2_grpc import ServingServiceStub
29- from google . protobuf import empty_pb2 as empty
32+
3033from typing import List
3134from collections import OrderedDict
3235from typing import Dict
33- from feast .type_map import dtype_to_value_type
3436import os
37+ import pandas as pd
38+ from feast .type_map import (
39+ pandas_value_to_proto_value ,
40+ dtype_to_feast_value_attr ,
41+ FEAST_VALUETYPE_TO_DTYPE ,
42+ )
3543
3644GRPC_CONNECTION_TIMEOUT = 5 # type: int
3745FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str
@@ -42,16 +50,14 @@ class Client:
4250 def __init__ (
4351 self , core_url : str = None , serving_url : str = None , verbose : bool = False
4452 ):
45- self ._feature_sets = [] # type: List [FeatureSet]
53+ self ._feature_sets = OrderedDict () # type: Dict [FeatureSet]
4654 self ._core_url = core_url
4755 self ._serving_url = serving_url
4856 self ._verbose = verbose
4957 self .__core_channel : grpc .Channel = None
5058 self .__serving_channel : grpc .Channel = None
5159 self ._core_service_stub : CoreServiceStub = None
5260 self ._serving_service_stub : ServingServiceStub = None
53- self ._is_serving_connected = False
54- self ._is_core_connected = False
5561
5662 @property
5763 def core_url (self ) -> str :
@@ -83,14 +89,14 @@ def version(self):
8389
8490 try :
8591 core_version = self ._core_service_stub .GetFeastCoreVersion (
86- empty , timeout = GRPC_CONNECTION_TIMEOUT
92+ GetFeastCoreVersionRequest () , timeout = GRPC_CONNECTION_TIMEOUT
8793 ).version
8894 except grpc .FutureCancelledError :
8995 core_version = "not connected"
9096
9197 try :
9298 serving_version = self ._serving_service_stub .GetFeastServingVersion (
93- empty , timeout = GRPC_CONNECTION_TIMEOUT
99+ GetFeastServingVersionRequest () , timeout = GRPC_CONNECTION_TIMEOUT
94100 ).version
95101 except grpc .FutureCancelledError :
96102 serving_version = "not connected"
@@ -100,10 +106,13 @@ def version(self):
100106 "serving" : {"url" : self .serving_url , "version" : serving_version },
101107 }
102108
103- def _connect_core (self ):
109+ def _connect_core (self , skip_if_connected = True ):
104110 """
105111 Connect to Core API
106112 """
113+ if skip_if_connected and self ._core_service_stub :
114+ return
115+
107116 if not self .core_url :
108117 raise ValueError ("Please set Feast Core URL." )
109118
@@ -121,13 +130,15 @@ def _connect_core(self):
121130 )
122131 else :
123132 self ._core_service_stub = CoreServiceStub (self .__core_channel )
124- self ._is_core_connected = True
125133
126- def _connect_serving (self ):
134+ def _connect_serving (self , skip_if_connected = True ):
127135 """
128136 Connect to Serving API
129137 """
130138
139+ if skip_if_connected and self ._serving_service_stub :
140+ return
141+
131142 if not self .serving_url :
132143 raise ValueError ("Please set Feast Serving URL." )
133144
@@ -145,49 +156,194 @@ def _connect_serving(self):
145156 )
146157 else :
147158 self ._serving_service_stub = ServingServiceStub (self .__serving_channel )
148- self ._is_serving_connected = True
149159
150160 def apply (self , resource ):
151161 if isinstance (resource , FeatureSet ):
152162 self ._apply_feature_set (resource )
163+ return
164+ raise Exception ("Could not determine resource type to apply" )
153165
154166 def refresh (self ):
155167 """
156168 Refresh list of Feature Sets from Feast Core
157169 """
158- self ._connect_core ()
170+ self ._connect_core (skip_if_connected = True )
159171
160172 # Get latest Feature Sets from Feast Core
161173 feature_set_protos = self ._core_service_stub .GetFeatureSets (
162174 GetFeatureSetsRequest (filter = GetFeatureSetsRequest .Filter ())
163175 ) # type: GetFeatureSetsResponse
164176
165177 # Store list of Feature Sets
166- self ._feature_sets = [
167- FeatureSet .from_proto (feature_set )
168- for feature_set in feature_set_protos .featureSets
169- ]
178+ for feature_set_proto in feature_set_protos .feature_sets :
179+ feature_set = FeatureSet .from_proto (feature_set_proto )
180+ feature_set ._is_dirty = False
181+ feature_set ._client = self
182+ self ._feature_sets [feature_set .name .strip ()] = feature_set
170183
171184 @property
172185 def feature_sets (self ) -> List [FeatureSet ]:
173186 self .refresh ()
174- return self ._feature_sets
187+ return list ( self ._feature_sets . values ())
175188
176189 @property
177190 def entities (self ) -> Dict [str , Entity ]:
178191 entities_dict = OrderedDict ()
179- for fs in self ._feature_sets :
180- for entityName , entity in fs .entities :
181- entities_dict [entityName ] = entity
192+ for fs in list ( self ._feature_sets . values ()) :
193+ for entity in fs .entities :
194+ entities_dict [entity . name ] = entity
182195 return entities_dict
183196
184197 def _apply_feature_set (self , feature_set : FeatureSet ):
185- self ._connect_core ()
186- apply_feature_set_response = self ._core_service_stub .ApplyFeatureSet (
187- ApplyFeatureSetRequest (featureSet = feature_set .to_proto ()),
198+ self ._connect_core (skip_if_connected = True )
199+ feature_set ._client = self
200+
201+ apply_fs_response = self ._core_service_stub .ApplyFeatureSet (
202+ ApplyFeatureSetRequest (feature_set = feature_set .to_proto ()),
188203 timeout = GRPC_CONNECTION_TIMEOUT ,
189204 ) # type: ApplyFeatureSetResponse
190205
191- feature_set ._version = apply_feature_set_response .featureSet .version
192- feature_set ._is_dirty = False
193- feature_set ._client = self
206+ if apply_fs_response .status == ApplyFeatureSetResponse .Status .ERROR :
207+ raise Exception (
208+ "Error while trying to apply feature set " + feature_set .name
209+ )
210+
211+ # Refresh state from Feast Core to local client
212+ self .refresh ()
213+
214+ # Replace applied feature set with refreshed feature set from Feast Core
215+ deep_update_feature_set (
216+ source = self ._feature_sets [feature_set .name ], target = feature_set
217+ )
218+
219+ def get (
220+ self ,
221+ entity_data : pd .DataFrame ,
222+ feature_ids : List [str ],
223+ join_on : Dict [str , str ] = None ,
224+ ) -> pd .DataFrame :
225+ self ._connect_serving (skip_if_connected = True )
226+
227+ if "datetime" != entity_data .columns [0 ]:
228+ raise ValueError ("The first column in entity_data should be 'datetime'" )
229+
230+ entity_data_field_names = ["datetime" ]
231+ for column in entity_data .columns [1 :]:
232+ if column not in self .entities .keys ():
233+ raise Exception ("Entity " + column + " could not be found" )
234+ entity_data_field_names .append (column )
235+
236+ entity_dataset_rows = []
237+ for row in entity_data .iterrows ():
238+ entity_dataset_row = GetFeaturesRequest .EntityDataSetRow ()
239+ for i in range (len (entity_data .columns [1 :])):
240+ proto_value = pandas_value_to_proto_value (
241+ entity_data [entity_data .columns [i ]].dtype , row [i ]
242+ )
243+ entity_dataset_row .value .append (proto_value )
244+ entity_dataset_rows .append (entity_dataset_row )
245+
246+ feature_set_request = create_feature_set_request_from_feature_strings (
247+ feature_ids
248+ )
249+
250+ get_online_features_response_proto = self ._serving_service_stub .GetOnlineFeatures (
251+ GetFeaturesRequest (
252+ entityDataSet = GetFeaturesRequest .EntityDataSet (
253+ entity_data_set_rows = entity_dataset_rows ,
254+ fieldNames = entity_data_field_names ,
255+ ),
256+ featureSets = feature_set_request ,
257+ )
258+ ) # type: GetOnlineFeaturesResponse
259+
260+ feature_dataframe = feature_data_sets_to_pandas_dataframe (
261+ entity_data_set = entity_data .copy (),
262+ feature_data_sets = list (
263+ get_online_features_response_proto .feature_data_sets
264+ ),
265+ )
266+ return feature_dataframe
267+
268+
269+ def feature_data_sets_to_pandas_dataframe (
270+ entity_data_set : pd .DataFrame ,
271+ feature_data_sets : List [GetOnlineFeaturesResponse .FeatureDataSet ],
272+ ):
273+ feature_data_set_dataframes = []
274+ for feature_data_set in feature_data_sets :
275+ # Validate feature data set length
276+ if len (feature_data_set .feature_rows ) != len (entity_data_set .index ):
277+ raise Exception (
278+ "Feature data set response is of different size "
279+ + str (len (feature_data_set .feature_rows ))
280+ + " than the entity data set request "
281+ + str (len (entity_data_set .index ))
282+ )
283+
284+ # Convert to Pandas DataFrame
285+ feature_data_set_dataframes .append (
286+ feature_data_set_to_pandas_dataframe (feature_data_set )
287+ )
288+
289+ # Join dataframes into a single feature dataframe
290+ dataframe = join_feature_set_dataframes (feature_data_set_dataframes )
291+ return dataframe
292+
293+
294+ def join_feature_set_dataframes (
295+ feature_data_set_dataframes : List [pd .DataFrame ]
296+ ) -> pd .DataFrame :
297+ return (
298+ feature_data_set_dataframes [0 ]
299+ if len (feature_data_set_dataframes ) > 0
300+ else pd .DataFrame
301+ )
302+
303+
304+ def feature_data_set_to_pandas_dataframe (
305+ feature_data_set : GetOnlineFeaturesResponse .FeatureDataSet
306+ ) -> pd .DataFrame :
307+ feature_set_name = feature_data_set .name
308+ dtypes = {}
309+ columns = []
310+ for field in list (feature_data_set .feature_rows [0 ].fields ):
311+ feature_id = feature_set_name + "." + field .name
312+ columns .append (feature_id )
313+ dtypes [feature_id ] = FEAST_VALUETYPE_TO_DTYPE [field .value .WhichOneof ("val" )]
314+
315+ dataframe = pd .DataFrame (columns = columns ).reset_index (drop = True ).astype (dtypes )
316+
317+ for featureRow in list (feature_data_set .feature_rows ):
318+ pandas_row = {}
319+ for field in list (featureRow .fields ):
320+ pandas_row [feature_set_name + "." + field .name ] = getattr (
321+ field .value , field .value .WhichOneof ("val" )
322+ )
323+ dataframe = dataframe .append (pandas_row , ignore_index = True )
324+
325+ return dataframe
326+
327+
328+ def create_feature_set_request_from_feature_strings (
329+ feature_ids : List [str ]
330+ ) -> List [GetFeaturesRequest .FeatureSet ]:
331+ feature_set_request = dict () # type: Dict[str, GetFeaturesRequest.FeatureSet]
332+ for feature_id in feature_ids :
333+ feature_set , feature = feature_id .split ("." )
334+ if feature_set not in feature_set_request :
335+ feature_set_request [feature_set ] = GetFeaturesRequest .FeatureSet (
336+ name = feature_set
337+ )
338+ feature_set_request [feature_set ].feature_names .append (feature )
339+ return list (feature_set_request .values ())
340+
341+
342+ def deep_update_feature_set (source : FeatureSet , target : FeatureSet ):
343+ target ._name = source .name
344+ target ._version = source .version
345+ target ._source = source .source
346+ target ._max_age = source .max_age
347+ target ._features = source .features
348+ target ._entities = source .entities
349+ target ._is_dirty = source ._is_dirty
0 commit comments