Skip to content

Commit f958f01

Browse files
authored
Add registry refreshing and caching (feast-dev#1431)
* Fail on missing registry and add cache test Signed-off-by: Willem Pienaar <git@willem.co> * Add basic support for caching to Registry Signed-off-by: Willem Pienaar <git@willem.co> * Add additional tests to registry caching Signed-off-by: Willem Pienaar <git@willem.co> * Fix linting errors Signed-off-by: Willem Pienaar <git@willem.co> * Add incremental configuration to metadata store config Signed-off-by: Willem Pienaar <git@willem.co> * Add comment to refresh_registry() Signed-off-by: Willem Pienaar <git@willem.co> * Add comments Signed-off-by: Willem Pienaar <git@willem.co> * Don't allow simultanous cache loads Signed-off-by: Willem Pienaar <git@willem.co>
1 parent 1247bd9 commit f958f01

File tree

6 files changed

+273
-64
lines changed

6 files changed

+273
-64
lines changed

sdk/python/feast/feature_store.py

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
from collections import defaultdict
15-
from datetime import datetime
15+
from datetime import datetime, timedelta
1616
from pathlib import Path
1717
from typing import Any, Dict, List, Optional, Tuple, Union
1818

@@ -51,6 +51,7 @@ class FeatureStore:
5151

5252
config: RepoConfig
5353
repo_path: Optional[str]
54+
_registry: Registry
5455

5556
def __init__(
5657
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
@@ -72,15 +73,40 @@ def __init__(
7273
),
7374
)
7475

76+
metadata_store_config = self.config.get_metadata_store_config()
77+
self._registry = Registry(
78+
registry_path=metadata_store_config.path,
79+
cache_ttl=timedelta(seconds=metadata_store_config.cache_ttl_seconds),
80+
)
81+
7582
@property
7683
def project(self) -> str:
7784
return self.config.project
7885

7986
def _get_provider(self) -> Provider:
8087
return get_provider(self.config)
8188

82-
def _get_registry(self) -> Registry:
83-
return Registry(self.config.metadata_store)
89+
def refresh_registry(self):
90+
"""Fetches and caches a copy of the feature registry in memory.
91+
92+
Explicitly calling this method allows for direct control of the state of the registry cache. Every time this
93+
method is called the complete registry state will be retrieved from the remote registry store backend
94+
(e.g., GCS, S3), and the cache timer will be reset. If refresh_registry() is run before get_online_features()
95+
is called, then get_online_feature() will use the cached registry instead of retrieving (and caching) the
96+
registry itself.
97+
98+
Additionally, the TTL for the registry cache can be set to infinity (by setting it to 0), which means that
99+
refresh_registry() will become the only way to update the cached registry. If the TTL is set to a value
100+
greater than 0, then once the cache becomes stale (more time than the TTL has passed), a new cache will be
101+
downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
102+
"""
103+
104+
metadata_store_config = self.config.get_metadata_store_config()
105+
self._registry = Registry(
106+
registry_path=metadata_store_config.path,
107+
cache_ttl=timedelta(seconds=metadata_store_config.cache_ttl_seconds),
108+
)
109+
self._registry.refresh()
84110

85111
def list_entities(self) -> List[Entity]:
86112
"""
@@ -89,7 +115,7 @@ def list_entities(self) -> List[Entity]:
89115
Returns:
90116
List of entities
91117
"""
92-
return self._get_registry().list_entities(self.project)
118+
return self._registry.list_entities(self.project)
93119

94120
def list_feature_views(self) -> List[FeatureView]:
95121
"""
@@ -98,7 +124,7 @@ def list_feature_views(self) -> List[FeatureView]:
98124
Returns:
99125
List of feature views
100126
"""
101-
return self._get_registry().list_feature_views(self.project)
127+
return self._registry.list_feature_views(self.project)
102128

103129
def get_entity(self, name: str) -> Entity:
104130
"""
@@ -111,7 +137,7 @@ def get_entity(self, name: str) -> Entity:
111137
Returns either the specified entity, or raises an exception if
112138
none is found
113139
"""
114-
return self._get_registry().get_entity(name, self.project)
140+
return self._registry.get_entity(name, self.project)
115141

116142
def get_feature_view(self, name: str) -> FeatureView:
117143
"""
@@ -124,7 +150,7 @@ def get_feature_view(self, name: str) -> FeatureView:
124150
Returns either the specified feature view, or raises an exception if
125151
none is found
126152
"""
127-
return self._get_registry().get_feature_view(name, self.project)
153+
return self._registry.get_feature_view(name, self.project)
128154

129155
def delete_feature_view(self, name: str):
130156
"""
@@ -133,7 +159,7 @@ def delete_feature_view(self, name: str):
133159
Args:
134160
name: Name of feature view
135161
"""
136-
return self._get_registry().delete_feature_view(name, self.project)
162+
return self._registry.delete_feature_view(name, self.project)
137163

138164
def apply(self, objects: List[Union[FeatureView, Entity]]):
139165
"""Register objects to metadata store and update related infrastructure.
@@ -166,15 +192,14 @@ def apply(self, objects: List[Union[FeatureView, Entity]]):
166192

167193
# TODO: Add locking
168194
# TODO: Optimize by only making a single call (read/write)
169-
registry = self._get_registry()
170195

171196
views_to_update = []
172197
for ob in objects:
173198
if isinstance(ob, FeatureView):
174-
registry.apply_feature_view(ob, project=self.config.project)
199+
self._registry.apply_feature_view(ob, project=self.config.project)
175200
views_to_update.append(ob)
176201
elif isinstance(ob, Entity):
177-
registry.apply_entity(ob, project=self.config.project)
202+
self._registry.apply_entity(ob, project=self.config.project)
178203
else:
179204
raise ValueError(
180205
f"Unknown object type ({type(ob)}) provided as part of apply() call"
@@ -226,8 +251,9 @@ def get_historical_features(
226251
>>> model.fit(feature_data) # insert your modeling framework here.
227252
"""
228253

229-
registry = self._get_registry()
230-
all_feature_views = registry.list_feature_views(project=self.config.project)
254+
all_feature_views = self._registry.list_feature_views(
255+
project=self.config.project
256+
)
231257
feature_views = _get_requested_feature_views(feature_refs, all_feature_views)
232258
offline_store = get_offline_store_for_retrieval(feature_views)
233259
job = offline_store.get_historical_features(
@@ -263,14 +289,15 @@ def materialize_incremental(
263289
>>> )
264290
"""
265291
feature_views_to_materialize = []
266-
registry = self._get_registry()
267292
if feature_views is None:
268-
feature_views_to_materialize = registry.list_feature_views(
293+
feature_views_to_materialize = self._registry.list_feature_views(
269294
self.config.project
270295
)
271296
else:
272297
for name in feature_views:
273-
feature_view = registry.get_feature_view(name, self.config.project)
298+
feature_view = self._registry.get_feature_view(
299+
name, self.config.project
300+
)
274301
feature_views_to_materialize.append(feature_view)
275302

276303
# TODO paging large loads
@@ -316,14 +343,15 @@ def materialize(
316343
>>> )
317344
"""
318345
feature_views_to_materialize = []
319-
registry = self._get_registry()
320346
if feature_views is None:
321-
feature_views_to_materialize = registry.list_feature_views(
347+
feature_views_to_materialize = self._registry.list_feature_views(
322348
self.config.project
323349
)
324350
else:
325351
for name in feature_views:
326-
feature_view = registry.get_feature_view(name, self.config.project)
352+
feature_view = self._registry.get_feature_view(
353+
name, self.config.project
354+
)
327355
feature_views_to_materialize.append(feature_view)
328356

329357
# TODO paging large loads
@@ -369,6 +397,15 @@ def get_online_features(
369397
) -> OnlineResponse:
370398
"""
371399
Retrieves the latest online feature data.
400+
401+
Note: This method will download the full feature registry the first time it is run. If you are using a
402+
remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL
403+
duration (which can be set to infinitey). If the cached registry is stale (more time than the TTL has
404+
passed), then a new registry will be downloaded synchronously by this method. This download may
405+
introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call
406+
refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to
407+
infinity (cache forever).
408+
372409
Args:
373410
feature_refs: List of feature references that will be returned for each entity.
374411
Each feature reference should have the following format:
@@ -416,8 +453,9 @@ def _get_online_features(
416453
entity_keys.append(_entity_row_to_key(row))
417454
result_rows.append(_entity_row_to_field_values(row))
418455

419-
registry = self._get_registry()
420-
all_feature_views = registry.list_feature_views(project=self.config.project)
456+
all_feature_views = self._registry.list_feature_views(
457+
project=self.config.project, allow_cache=True
458+
)
421459

422460
grouped_refs = _group_refs(feature_refs, all_feature_views)
423461
for table, requested_features in grouped_refs:

0 commit comments

Comments
 (0)