1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414from collections import defaultdict
15- from datetime import datetime
15+ from datetime import datetime , timedelta
1616from pathlib import Path
1717from typing import Any , Dict , List , Optional , Tuple , Union
1818
@@ -51,6 +51,7 @@ class FeatureStore:
5151
5252 config : RepoConfig
5353 repo_path : Optional [str ]
54+ _registry : Registry
5455
5556 def __init__ (
5657 self , repo_path : Optional [str ] = None , config : Optional [RepoConfig ] = None ,
@@ -72,15 +73,40 @@ def __init__(
7273 ),
7374 )
7475
76+ metadata_store_config = self .config .get_metadata_store_config ()
77+ self ._registry = Registry (
78+ registry_path = metadata_store_config .path ,
79+ cache_ttl = timedelta (seconds = metadata_store_config .cache_ttl_seconds ),
80+ )
81+
7582 @property
7683 def project (self ) -> str :
7784 return self .config .project
7885
7986 def _get_provider (self ) -> Provider :
8087 return get_provider (self .config )
8188
82- def _get_registry (self ) -> Registry :
83- return Registry (self .config .metadata_store )
89+ def refresh_registry (self ):
90+ """Fetches and caches a copy of the feature registry in memory.
91+
92+ Explicitly calling this method allows for direct control of the state of the registry cache. Every time this
93+ method is called the complete registry state will be retrieved from the remote registry store backend
94+ (e.g., GCS, S3), and the cache timer will be reset. If refresh_registry() is run before get_online_features()
95+ is called, then get_online_feature() will use the cached registry instead of retrieving (and caching) the
96+ registry itself.
97+
98+ Additionally, the TTL for the registry cache can be set to infinity (by setting it to 0), which means that
99+ refresh_registry() will become the only way to update the cached registry. If the TTL is set to a value
100+ greater than 0, then once the cache becomes stale (more time than the TTL has passed), a new cache will be
101+ downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
102+ """
103+
104+ metadata_store_config = self .config .get_metadata_store_config ()
105+ self ._registry = Registry (
106+ registry_path = metadata_store_config .path ,
107+ cache_ttl = timedelta (seconds = metadata_store_config .cache_ttl_seconds ),
108+ )
109+ self ._registry .refresh ()
84110
85111 def list_entities (self ) -> List [Entity ]:
86112 """
@@ -89,7 +115,7 @@ def list_entities(self) -> List[Entity]:
89115 Returns:
90116 List of entities
91117 """
92- return self ._get_registry () .list_entities (self .project )
118+ return self ._registry .list_entities (self .project )
93119
94120 def list_feature_views (self ) -> List [FeatureView ]:
95121 """
@@ -98,7 +124,7 @@ def list_feature_views(self) -> List[FeatureView]:
98124 Returns:
99125 List of feature views
100126 """
101- return self ._get_registry () .list_feature_views (self .project )
127+ return self ._registry .list_feature_views (self .project )
102128
103129 def get_entity (self , name : str ) -> Entity :
104130 """
@@ -111,7 +137,7 @@ def get_entity(self, name: str) -> Entity:
111137 Returns either the specified entity, or raises an exception if
112138 none is found
113139 """
114- return self ._get_registry () .get_entity (name , self .project )
140+ return self ._registry .get_entity (name , self .project )
115141
116142 def get_feature_view (self , name : str ) -> FeatureView :
117143 """
@@ -124,7 +150,7 @@ def get_feature_view(self, name: str) -> FeatureView:
124150 Returns either the specified feature view, or raises an exception if
125151 none is found
126152 """
127- return self ._get_registry () .get_feature_view (name , self .project )
153+ return self ._registry .get_feature_view (name , self .project )
128154
129155 def delete_feature_view (self , name : str ):
130156 """
@@ -133,7 +159,7 @@ def delete_feature_view(self, name: str):
133159 Args:
134160 name: Name of feature view
135161 """
136- return self ._get_registry () .delete_feature_view (name , self .project )
162+ return self ._registry .delete_feature_view (name , self .project )
137163
138164 def apply (self , objects : List [Union [FeatureView , Entity ]]):
139165 """Register objects to metadata store and update related infrastructure.
@@ -166,15 +192,14 @@ def apply(self, objects: List[Union[FeatureView, Entity]]):
166192
167193 # TODO: Add locking
168194 # TODO: Optimize by only making a single call (read/write)
169- registry = self ._get_registry ()
170195
171196 views_to_update = []
172197 for ob in objects :
173198 if isinstance (ob , FeatureView ):
174- registry .apply_feature_view (ob , project = self .config .project )
199+ self . _registry .apply_feature_view (ob , project = self .config .project )
175200 views_to_update .append (ob )
176201 elif isinstance (ob , Entity ):
177- registry .apply_entity (ob , project = self .config .project )
202+ self . _registry .apply_entity (ob , project = self .config .project )
178203 else :
179204 raise ValueError (
180205 f"Unknown object type ({ type (ob )} ) provided as part of apply() call"
@@ -226,8 +251,9 @@ def get_historical_features(
226251 >>> model.fit(feature_data) # insert your modeling framework here.
227252 """
228253
229- registry = self ._get_registry ()
230- all_feature_views = registry .list_feature_views (project = self .config .project )
254+ all_feature_views = self ._registry .list_feature_views (
255+ project = self .config .project
256+ )
231257 feature_views = _get_requested_feature_views (feature_refs , all_feature_views )
232258 offline_store = get_offline_store_for_retrieval (feature_views )
233259 job = offline_store .get_historical_features (
@@ -263,14 +289,15 @@ def materialize_incremental(
263289 >>> )
264290 """
265291 feature_views_to_materialize = []
266- registry = self ._get_registry ()
267292 if feature_views is None :
268- feature_views_to_materialize = registry .list_feature_views (
293+ feature_views_to_materialize = self . _registry .list_feature_views (
269294 self .config .project
270295 )
271296 else :
272297 for name in feature_views :
273- feature_view = registry .get_feature_view (name , self .config .project )
298+ feature_view = self ._registry .get_feature_view (
299+ name , self .config .project
300+ )
274301 feature_views_to_materialize .append (feature_view )
275302
276303 # TODO paging large loads
@@ -316,14 +343,15 @@ def materialize(
316343 >>> )
317344 """
318345 feature_views_to_materialize = []
319- registry = self ._get_registry ()
320346 if feature_views is None :
321- feature_views_to_materialize = registry .list_feature_views (
347+ feature_views_to_materialize = self . _registry .list_feature_views (
322348 self .config .project
323349 )
324350 else :
325351 for name in feature_views :
326- feature_view = registry .get_feature_view (name , self .config .project )
352+ feature_view = self ._registry .get_feature_view (
353+ name , self .config .project
354+ )
327355 feature_views_to_materialize .append (feature_view )
328356
329357 # TODO paging large loads
@@ -369,6 +397,15 @@ def get_online_features(
369397 ) -> OnlineResponse :
370398 """
371399 Retrieves the latest online feature data.
400+
401+ Note: This method will download the full feature registry the first time it is run. If you are using a
402+ remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL
403+ duration (which can be set to infinitey). If the cached registry is stale (more time than the TTL has
404+ passed), then a new registry will be downloaded synchronously by this method. This download may
405+ introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call
406+ refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to
407+ infinity (cache forever).
408+
372409 Args:
373410 feature_refs: List of feature references that will be returned for each entity.
374411 Each feature reference should have the following format:
@@ -416,8 +453,9 @@ def _get_online_features(
416453 entity_keys .append (_entity_row_to_key (row ))
417454 result_rows .append (_entity_row_to_field_values (row ))
418455
419- registry = self ._get_registry ()
420- all_feature_views = registry .list_feature_views (project = self .config .project )
456+ all_feature_views = self ._registry .list_feature_views (
457+ project = self .config .project , allow_cache = True
458+ )
421459
422460 grouped_refs = _group_refs (feature_refs , all_feature_views )
423461 for table , requested_features in grouped_refs :
0 commit comments