7878 FieldStatus ,
7979 GetOnlineFeaturesResponse ,
8080)
81+ from feast .protos .feast .types .EntityKey_pb2 import EntityKey
8182from feast .protos .feast .types .Value_pb2 import RepeatedValue , Value
83+ from feast .protos .feast .types .Value_pb2 import Value as ValueProto
8284from feast .repo_config import RepoConfig , load_repo_config
8385from feast .repo_contents import RepoContents
8486from feast .saved_dataset import SavedDataset , SavedDatasetStorage , ValidationReference
@@ -1666,20 +1668,29 @@ def retrieve_online_documents(
16661668 distance_metric ,
16671669 )
16681670
1669- # TODO Refactor to better way of populating result
1670- # TODO populate entity in the response after returning entity in document_features is supported
16711671 # TODO currently not return the vector value since it is same as feature value, if embedding is supported,
16721672 # the feature value can be raw text before embedded
1673- document_feature_vals = [feature [2 ] for feature in document_features ]
1674- document_feature_distance_vals = [feature [4 ] for feature in document_features ]
1673+ entity_key_vals = [feature [1 ] for feature in document_features ]
1674+ join_key_values : Dict [str , List [ValueProto ]] = {}
1675+ for entity_key_val in entity_key_vals :
1676+ if entity_key_val is not None :
1677+ for join_key , entity_value in zip (
1678+ entity_key_val .join_keys , entity_key_val .entity_values
1679+ ):
1680+ if join_key not in join_key_values :
1681+ join_key_values [join_key ] = []
1682+ join_key_values [join_key ].append (entity_value )
1683+
1684+ document_feature_vals = [feature [4 ] for feature in document_features ]
1685+ document_feature_distance_vals = [feature [5 ] for feature in document_features ]
16751686 online_features_response = GetOnlineFeaturesResponse (results = [])
16761687 utils ._populate_result_rows_from_columnar (
16771688 online_features_response = online_features_response ,
1678- data = {requested_feature : document_feature_vals },
1679- )
1680- utils . _populate_result_rows_from_columnar (
1681- online_features_response = online_features_response ,
1682- data = { "distance" : document_feature_distance_vals },
1689+ data = {
1690+ ** join_key_values ,
1691+ requested_feature : document_feature_vals ,
1692+ "distance" : document_feature_distance_vals ,
1693+ },
16831694 )
16841695 return OnlineResponse (online_features_response )
16851696
@@ -1691,7 +1702,11 @@ def _retrieve_from_online_store(
16911702 query : List [float ],
16921703 top_k : int ,
16931704 distance_metric : Optional [str ],
1694- ) -> List [Tuple [Timestamp , "FieldStatus.ValueType" , Value , Value , Value ]]:
1705+ ) -> List [
1706+ Tuple [
1707+ Timestamp , Optional [EntityKey ], "FieldStatus.ValueType" , Value , Value , Value
1708+ ]
1709+ ]:
16951710 """
16961711 Search and return document features from the online document store.
16971712 """
@@ -1707,7 +1722,7 @@ def _retrieve_from_online_store(
17071722 read_row_protos = []
17081723 row_ts_proto = Timestamp ()
17091724
1710- for row_ts , feature_val , vector_value , distance_val in documents :
1725+ for row_ts , entity_key , feature_val , vector_value , distance_val in documents :
17111726 # Reset timestamp to default or update if row_ts is not None
17121727 if row_ts is not None :
17131728 row_ts_proto .FromDatetime (row_ts )
@@ -1721,7 +1736,14 @@ def _retrieve_from_online_store(
17211736 status = FieldStatus .PRESENT
17221737
17231738 read_row_protos .append (
1724- (row_ts_proto , status , feature_val , vector_value , distance_val )
1739+ (
1740+ row_ts_proto ,
1741+ entity_key ,
1742+ status ,
1743+ feature_val ,
1744+ vector_value ,
1745+ distance_val ,
1746+ )
17251747 )
17261748 return read_row_protos
17271749
0 commit comments