@@ -1150,27 +1150,24 @@ def _get_online_features(
11501150 [DUMMY_ENTITY_VAL ] * num_rows , DUMMY_ENTITY .value_type
11511151 )
11521152
1153- # Initialize the set of EntityKeyProtos once and reuse them for each FeatureView
1154- # to avoid initialization overhead.
1155- entity_keys = [EntityKeyProto () for _ in range (num_rows )]
11561153 provider = self ._get_provider ()
11571154 for table , requested_features in grouped_refs :
11581155 # Get the correct set of entity values with the correct join keys.
1159- table_entity_values = self ._get_table_entity_values (
1160- table , entity_name_to_join_key_map , join_key_values ,
1156+ table_entity_values , idxs = self ._get_unique_entities (
1157+ table , join_key_values , entity_name_to_join_key_map ,
11611158 )
11621159
1163- # Set the EntityKeyProtos inplace .
1164- self ._set_table_entity_keys (
1165- table_entity_values , entity_keys ,
1160+ # Fetch feature data for the minimum set of Entities .
1161+ feature_data = self ._read_from_online_store (
1162+ table_entity_values , provider , requested_features , table ,
11661163 )
11671164
11681165 # Populate the result_rows with the Features from the OnlineStore inplace.
1169- self ._populate_result_rows_from_feature_view (
1166+ self ._populate_response_from_feature_data (
1167+ feature_data ,
1168+ idxs ,
11701169 online_features_response ,
1171- entity_keys ,
11721170 full_feature_names ,
1173- provider ,
11741171 requested_features ,
11751172 table ,
11761173 )
@@ -1255,22 +1252,6 @@ def _get_table_entity_values(
12551252 }
12561253 return entity_values
12571254
1258- @staticmethod
1259- def _set_table_entity_keys (
1260- entity_values : Dict [str , List [Value ]], entity_keys : List [EntityKeyProto ],
1261- ):
1262- """
1263- This method sets the a list of EntityKeyProtos inplace.
1264- """
1265- keys = entity_values .keys ()
1266- # Columar to rowise (dict keys and values are guaranteed to have the same order).
1267- rowise_values = zip (* entity_values .values ())
1268- for entity_key in entity_keys :
1269- # Make sure entity_keys are empty before setting.
1270- entity_key .Clear ()
1271- entity_key .join_keys .extend (keys )
1272- entity_key .entity_values .extend (next (rowise_values ))
1273-
12741255 @staticmethod
12751256 def _populate_result_rows_from_columnar (
12761257 online_features_response : GetOnlineFeaturesResponse ,
@@ -1323,21 +1304,134 @@ def ensure_request_data_values_exist(
13231304 feature_names = missing_features
13241305 )
13251306
1326- def _populate_result_rows_from_feature_view (
1307+ def _get_unique_entities (
13271308 self ,
1328- online_features_response : GetOnlineFeaturesResponse ,
1329- entity_keys : List [EntityKeyProto ],
1330- full_feature_names : bool ,
1309+ table : FeatureView ,
1310+ join_key_values : Dict [str , List [Value ]],
1311+ entity_name_to_join_key_map : Dict [str , str ],
1312+ ) -> Tuple [Tuple [Dict [str , Value ], ...], Tuple [List [int ], ...]]:
1313+ """ Return the set of unique composite Entities for a Feature View and the indexes at which they appear.
1314+
1315+ This method allows us to query the OnlineStore for data we need only once
1316+ rather than requesting and processing data for the same combination of
1317+ Entities multiple times.
1318+ """
1319+ # Get the correct set of entity values with the correct join keys.
1320+ table_entity_values = self ._get_table_entity_values (
1321+ table , entity_name_to_join_key_map , join_key_values ,
1322+ )
1323+
1324+ # Convert back to rowise.
1325+ keys = table_entity_values .keys ()
1326+ # Sort the rowise data to allow for grouping but keep original index. This lambda is
1327+ # sufficient as Entity types cannot be complex (ie. lists).
1328+ rowise = list (enumerate (zip (* table_entity_values .values ())))
1329+ rowise .sort (
1330+ key = lambda row : tuple (getattr (x , x .WhichOneof ("val" )) for x in row [1 ])
1331+ )
1332+
1333+ # Identify unique entities and the indexes at which they occur.
1334+ unique_entities : Tuple [Dict [str , Value ], ...]
1335+ indexes : Tuple [List [int ], ...]
1336+ unique_entities , indexes = tuple (
1337+ zip (
1338+ * [
1339+ (dict (zip (keys , k )), [_ [0 ] for _ in g ])
1340+ for k , g in itertools .groupby (rowise , key = lambda x : x [1 ])
1341+ ]
1342+ )
1343+ )
1344+ return unique_entities , indexes
1345+
1346+ def _read_from_online_store (
1347+ self ,
1348+ entity_rows : Iterable [Mapping [str , Value ]],
13311349 provider : Provider ,
13321350 requested_features : List [str ],
13331351 table : FeatureView ,
1334- ):
1352+ ) -> List [Tuple [List [Timestamp ], List ["FieldStatus.ValueType" ], List [Value ]]]:
1353+ """ Read and process data from the OnlineStore for a given FeatureView.
1354+
1355+ This method guarentees that the order of the data in each element of the
1356+ List returned is the same as the order of `requested_features`.
1357+
1358+ This method assumes that `provider.online_read` returns data for each
1359+ combination of Entities in `entity_rows` in the same order as they
1360+ are provided.
1361+ """
1362+ # Instantiate one EntityKeyProto per Entity.
1363+ entity_key_protos = [
1364+ EntityKeyProto (join_keys = row .keys (), entity_values = row .values ())
1365+ for row in entity_rows
1366+ ]
1367+
1368+ # Fetch data for Entities.
13351369 read_rows = provider .online_read (
13361370 config = self .config ,
13371371 table = table ,
1338- entity_keys = entity_keys ,
1372+ entity_keys = entity_key_protos ,
13391373 requested_features = requested_features ,
13401374 )
1375+
1376+ # Each row is a set of features for a given entity key. We only need to convert
1377+ # the data to Protobuf once.
1378+ row_ts_proto = Timestamp ()
1379+ null_value = Value ()
1380+ read_row_protos = []
1381+ for read_row in read_rows :
1382+ row_ts , feature_data = read_row
1383+ if row_ts is not None :
1384+ row_ts_proto .FromDatetime (row_ts )
1385+ event_timestamps = [row_ts_proto ] * len (requested_features )
1386+ if feature_data is None :
1387+ statuses = [FieldStatus .NOT_FOUND ] * len (requested_features )
1388+ values = [null_value ] * len (requested_features )
1389+ else :
1390+ statuses = []
1391+ values = []
1392+ for feature_name in requested_features :
1393+ # Make sure order of data is the same as requested_features.
1394+ if feature_name not in feature_data :
1395+ statuses .append (FieldStatus .NOT_FOUND )
1396+ values .append (null_value )
1397+ else :
1398+ statuses .append (FieldStatus .PRESENT )
1399+ values .append (feature_data [feature_name ])
1400+ read_row_protos .append ((event_timestamps , statuses , values ))
1401+ return read_row_protos
1402+
1403+ @staticmethod
1404+ def _populate_response_from_feature_data (
1405+ feature_data : Iterable [
1406+ Tuple [
1407+ Iterable [Timestamp ], Iterable ["FieldStatus.ValueType" ], Iterable [Value ]
1408+ ]
1409+ ],
1410+ indexes : Iterable [Iterable [int ]],
1411+ online_features_response : GetOnlineFeaturesResponse ,
1412+ full_feature_names : bool ,
1413+ requested_features : Iterable [str ],
1414+ table : FeatureView ,
1415+ ):
1416+ """ Populate the GetOnlineFeaturesReponse with feature data.
1417+
1418+ This method assumes that `_read_from_online_store` returns data for each
1419+ combination of Entities in `entity_rows` in the same order as they
1420+ are provided.
1421+
1422+ Args:
1423+ feature_data: A list of data in Protobuf form which was retrieved from the OnlineStore.
1424+ indexes: A list of indexes which should be the same length as `feature_data`. Each list
1425+ of indexes corresponds to a set of result rows in `online_features_response`.
1426+ online_features_response: The object to populate.
1427+ full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names,
1428+ changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to
1429+ "customer_fv__daily_transactions").
1430+ requested_features: The names of the features in `feature_data`. This should be ordered in the same way as the
1431+ data in `feature_data`.
1432+ table: The FeatureView that `feature_data` was retrieved from.
1433+ """
1434+ # Add the feature names to the response.
13411435 requested_feature_refs = [
13421436 f"{ table .projection .name_to_use ()} __{ feature_name } "
13431437 if full_feature_names
@@ -1347,28 +1441,16 @@ def _populate_result_rows_from_feature_view(
13471441 online_features_response .metadata .feature_names .val .extend (
13481442 requested_feature_refs
13491443 )
1350- # Each row is a set of features for a given entity key
1351- for row_idx , read_row in enumerate (read_rows ):
1352- row_ts , feature_data = read_row
1353- result_row = online_features_response .results [row_idx ]
1354- row_ts_proto = Timestamp ()
1355- if row_ts is not None :
1356- row_ts_proto .FromDatetime (row_ts )
1357- result_row .event_timestamps .extend ([row_ts_proto ] * len (requested_features ))
13581444
1359- if feature_data is None :
1360- result_row .statuses .extend (
1361- [FieldStatus .NOT_FOUND ] * len (requested_features )
1362- )
1363- result_row .values .extend ([Value ()] * len (requested_features ))
1364- else :
1365- for feature_name in requested_features :
1366- if feature_name not in feature_data :
1367- result_row .statuses .append (FieldStatus .NOT_FOUND )
1368- result_row .values .append (Value ())
1369- else :
1370- result_row .statuses .append (FieldStatus .PRESENT )
1371- result_row .values .append (feature_data [feature_name ])
1445+ # Populate the result with data fetched from the OnlineStore
1446+ # which is guarenteed to be aligned with `requested_features`.
1447+ for feature_row , dest_idxs in zip (feature_data , indexes ):
1448+ event_timestamps , statuses , values = feature_row
1449+ for dest_idx in dest_idxs :
1450+ result_row = online_features_response .results [dest_idx ]
1451+ result_row .event_timestamps .extend (event_timestamps )
1452+ result_row .statuses .extend (statuses )
1453+ result_row .values .extend (values )
13721454
13731455 @staticmethod
13741456 def _augment_response_with_on_demand_transforms (
0 commit comments