88
99from feast import Entity
1010from feast .feature_view import FeatureView
11- from feast .infra .key_encoding_utils import serialize_entity_key
11+ from feast .infra .online_stores . helpers import compute_entity_id
1212from feast .infra .online_stores .online_store import OnlineStore
1313from feast .infra .utils .hbase_utils import HbaseConstants , HbaseUtils
1414from feast .protos .feast .types .EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -104,14 +104,15 @@ def online_write_batch(
104104
105105 hbase = HbaseUtils (self ._get_conn (config ))
106106 project = config .project
107- table_name = _table_id (project , table )
107+ table_name = self . _table_id (project , table )
108108
109109 b = hbase .batch (table_name )
110110 for entity_key , values , timestamp , created_ts in data :
111- row_key = serialize_entity_key (
111+ row_key = self . _hbase_row_key (
112112 entity_key ,
113- entity_key_serialization_version = config .entity_key_serialization_version ,
114- ).hex ()
113+ feature_view_name = table .name ,
114+ config = config ,
115+ )
115116 values_dict = {}
116117 for feature_name , val in values .items ():
117118 values_dict [
@@ -133,6 +134,9 @@ def online_write_batch(
133134 b .put (row_key , values_dict )
134135 b .send ()
135136
137+ if progress :
138+ progress (len (data ))
139+
136140 @log_exceptions_and_usage (online_store = "hbase" )
137141 def online_read (
138142 self ,
@@ -152,15 +156,16 @@ def online_read(
152156 """
153157 hbase = HbaseUtils (self ._get_conn (config ))
154158 project = config .project
155- table_name = _table_id (project , table )
159+ table_name = self . _table_id (project , table )
156160
157161 result : List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]] = []
158162
159163 row_keys = [
160- serialize_entity_key (
164+ self . _hbase_row_key (
161165 entity_key ,
162- entity_key_serialization_version = config .entity_key_serialization_version ,
163- ).hex ()
166+ feature_view_name = table .name ,
167+ config = config ,
168+ )
164169 for entity_key in entity_keys
165170 ]
166171 rows = hbase .rows (table_name , row_keys = row_keys )
@@ -206,12 +211,12 @@ def update(
206211
207212 # We don't create any special state for the entites in this implementation.
208213 for table in tables_to_keep :
209- table_name = _table_id (project , table )
214+ table_name = self . _table_id (project , table )
210215 if not hbase .check_if_table_exist (table_name ):
211216 hbase .create_table_with_default_cf (table_name )
212217
213218 for table in tables_to_delete :
214- table_name = _table_id (project , table )
219+ table_name = self . _table_id (project , table )
215220 hbase .delete_table (table_name )
216221
217222 def teardown (
@@ -231,16 +236,43 @@ def teardown(
231236 project = config .project
232237
233238 for table in tables :
234- table_name = _table_id (project , table )
239+ table_name = self . _table_id (project , table )
235240 hbase .delete_table (table_name )
236241
242+ def _hbase_row_key (
243+ self ,
244+ entity_key : EntityKeyProto ,
245+ feature_view_name : str ,
246+ config : RepoConfig ,
247+ ) -> bytes :
248+ """
249+ Computes the HBase row key for a given entity key and feature view name.
237250
238- def _table_id (project : str , table : FeatureView ) -> str :
239- """
240- Returns table name given the project_name and the feature_view.
251+ Args:
252+ entity_key (EntityKeyProto): The entity key to compute the row key for.
253+ feature_view_name (str): The name of the feature view to compute the row key for.
254+ config (RepoConfig): The configuration for the Feast repository.
241255
242- Args:
243- project: Name of the feast project.
244- table: Feast FeatureView.
245- """
246- return f"{ project } _{ table .name } "
256+ Returns:
257+ bytes: The HBase row key for the given entity key and feature view name.
258+ """
259+ entity_id = compute_entity_id (
260+ entity_key ,
261+ entity_key_serialization_version = config .entity_key_serialization_version ,
262+ )
263+ # Even though `entity_id` uniquely identifies an entity, we use the same table
264+ # for multiple feature_views with the same set of entities.
265+ # To uniquely identify the row for a feature_view, we suffix the name of the feature_view itself.
266+ # This also ensures that features for entities from various feature_views are
267+ # colocated.
268+ return f"{ entity_id } #{ feature_view_name } " .encode ()
269+
270+ def _table_id (self , project : str , table : FeatureView ) -> str :
271+ """
272+ Returns table name given the project_name and the feature_view.
273+
274+ Args:
275+ project: Name of the feast project.
276+ table: Feast FeatureView.
277+ """
278+ return f"{ project } :{ table .name } "
0 commit comments