Skip to content

Commit 7d188d8

Browse files
author
r0b0fyi
committed
feast: Added HybridOnlineStore for multi-backend online store routing
Signed-off-by: r0b0fyi <renukaprasannakumar.badugu@walmart.com>
1 parent ff2df51 commit 7d188d8

File tree

4 files changed

+439
-0
lines changed

4 files changed

+439
-0
lines changed
Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
"""
2+
hybrid_online_store.py
3+
----------------------
4+
5+
This module provides the HybridOnlineStore, a Feast OnlineStore implementation that enables routing online feature operations
6+
to different online stores based on a tag (e.g., tribe, team, or project) on the FeatureView. This allows a single Feast deployment
7+
to support multiple online store backends, each configured independently and selected dynamically at runtime.
8+
9+
Features:
10+
- Supports multiple online store backends in a single Feast deployment.
11+
- Routes online reads and writes to the correct backend based on a tag on the FeatureView.
12+
- Enables multi-tenancy and flexible data management strategies.
13+
- Designed for extensibility and compatibility with Feast's OnlineStore interface.
14+
15+
Usage:
16+
1. Add a 'tribe' (or similar) tag to your FeatureView.
17+
2. Configure multiple online stores in your Feast repo config under 'online_stores'.
18+
3. The HybridOnlineStore will route reads and writes to the correct backend based on the tag.
19+
20+
Example configuration (feature_store.yaml):
21+
22+
online_store:
23+
type: hybrid_online_store.HybridOnlineStore
24+
online_stores:
25+
- type: feast.infra.online_stores.bigtable.BigtableOnlineStore
26+
conf:
27+
... # bigtable config
28+
- type: feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore
29+
conf:
30+
... # cassandra config
31+
32+
Example FeatureView:
33+
34+
tags:
35+
tribe: bigtable
36+
37+
The HybridOnlineStore will route requests to the correct online store based on the tribe tag.
38+
"""
39+
40+
from datetime import datetime
41+
from typing import Sequence, List, Optional, Tuple, Dict, Callable, Any, Literal
42+
from importlib import import_module
43+
44+
from feast import RepoConfig, FeatureView, Entity
45+
from feast.infra.online_stores.helpers import get_online_store_from_config
46+
from feast.infra.online_stores.online_store import OnlineStore
47+
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
48+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
49+
from feast.repo_config import FeastConfigBaseModel, get_online_config_from_type
50+
from pydantic import StrictStr
51+
52+
53+
class HybridOnlineStoreConfig(FeastConfigBaseModel):
54+
"""
55+
Configuration for HybridOnlineStore.
56+
57+
This config allows multiple online stores to be used in a single Feast deployment. Each online store is specified by its type (Python import path)
58+
and a configuration dictionary. The HybridOnlineStore uses this configuration to instantiate and manage the set of online stores.
59+
60+
Attributes:
61+
type: The type identifier for the HybridOnlineStore.
62+
online_stores: A list of OnlineStoresWithConfig, each specifying the type and config for an online store backend.
63+
"""
64+
type: Literal[
65+
"HybridOnlineStore", "hybrid_online_store.HybridOnlineStore"] = "hybrid_online_store.HybridOnlineStore"
66+
67+
class OnlineStoresWithConfig(FeastConfigBaseModel):
68+
"""
69+
Configuration for a single online store backend.
70+
71+
Attributes:
72+
type: Python import path to the online store class.
73+
conf: Dictionary of configuration parameters for the online store.
74+
"""
75+
type: StrictStr # Python import path to the online store class
76+
conf: Dict
77+
78+
online_stores: Optional[List[OnlineStoresWithConfig]]
79+
80+
81+
class HybridOnlineStore(OnlineStore):
82+
"""
83+
HybridOnlineStore routes online feature operations to different online store backends
84+
based on a tag (e.g., 'tribe') on the FeatureView. This enables multi-tenancy and flexible
85+
backend selection in a single Feast deployment.
86+
87+
The backend is selected dynamically at runtime according to the tag value.
88+
"""
89+
def __init__(self):
90+
"""
91+
Initialize the HybridOnlineStore. Online stores are instantiated lazily on first use.
92+
"""
93+
self.online_stores = {}
94+
self._initialized = False
95+
96+
def _initialize_online_stores(self, config: RepoConfig):
97+
"""
98+
Lazily instantiate all configured online store backends from the repo config.
99+
100+
Args:
101+
config: Feast RepoConfig containing the online_stores configuration.
102+
"""
103+
if self._initialized:
104+
return
105+
self.online_stores = {}
106+
online_stores_cfg = getattr(config.online_store, 'online_stores', [])
107+
for store_cfg in online_stores_cfg:
108+
config_cls = get_online_config_from_type(store_cfg.type.split('.')[-1].lower())
109+
config_instance = config_cls(**store_cfg.conf)
110+
online_store_instance = get_online_store_from_config(config_instance)
111+
self.online_stores[store_cfg.type.split('.')[-1].lower()] = online_store_instance
112+
self._initialized = True
113+
114+
def _get_online_store(self, tribe_tag, config: RepoConfig):
115+
"""
116+
Retrieve the online store backend corresponding to the given tag value.
117+
118+
Args:
119+
tribe_tag: The tag value (e.g., 'tribe') used to select the backend.
120+
config: Feast RepoConfig.
121+
Returns:
122+
The OnlineStore instance for the given tag, or None if not found.
123+
"""
124+
self._initialize_online_stores(config)
125+
return self.online_stores.get(tribe_tag.lower())
126+
127+
def _prepare_repo_conf(self, config: RepoConfig, online_store_type: str):
128+
"""
129+
Prepare a RepoConfig for the selected online store backend.
130+
131+
Args:
132+
config: The original Feast RepoConfig.
133+
online_store_type: The type of the online store backend to use.
134+
Returns:
135+
A dictionary representing the updated RepoConfig for the selected backend.
136+
"""
137+
rconfig = config
138+
for online_store in config.online_store.online_stores:
139+
if online_store.type.split('.')[-1].lower() == online_store_type.lower():
140+
rconfig.online_config = online_store.conf
141+
rconfig.online_config["type"] = online_store.type
142+
data = rconfig.__dict__
143+
data['registry'] = data['registry_config']
144+
data['offline_store'] = data['offline_config']
145+
data['online_store'] = data['online_config']
146+
return data
147+
148+
def online_write_batch(
149+
self,
150+
config: RepoConfig,
151+
table: FeatureView,
152+
odata: List[
153+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
154+
],
155+
progress: Optional[Callable[[int], Any]],
156+
) -> None:
157+
"""
158+
Write a batch of feature rows to the appropriate online store based on the FeatureView's tag.
159+
160+
Args:
161+
config: Feast RepoConfig.
162+
table: FeatureView to write to. Must have a tag (e.g., 'tribe') to select the backend.
163+
odata: List of tuples containing entity key, feature values, event timestamp, and created timestamp.
164+
progress: Optional callback for progress reporting.
165+
Raises:
166+
ValueError: If the FeatureView does not have the required tag.
167+
NotImplementedError: If no online store is found for the tag value.
168+
"""
169+
tribe = table.tags.get('tribe')
170+
if not tribe:
171+
raise ValueError("FeatureView must have a 'tribe' tag to use HybridOnlineStore.")
172+
online_store = self._get_online_store(tribe, config)
173+
if online_store:
174+
config = RepoConfig(**self._prepare_repo_conf(config, tribe))
175+
online_store.online_write_batch(config, table, odata, progress)
176+
else:
177+
raise NotImplementedError(
178+
f"No online store found for tribe tag '{tribe}'. Please check your configuration.")
179+
180+
@staticmethod
181+
def write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val):
182+
"""
183+
(Not implemented) Write a single feature value to the online store table.
184+
"""
185+
pass
186+
187+
def online_read(
188+
self,
189+
config: RepoConfig,
190+
table: FeatureView,
191+
entity_keys: List[EntityKeyProto],
192+
requested_features: Optional[List[str]] = None,
193+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
194+
"""
195+
Read feature rows from the appropriate online store based on the FeatureView's tag.
196+
197+
Args:
198+
config: Feast RepoConfig.
199+
table: FeatureView to read from. Must have a tag (e.g., 'tribe') to select the backend.
200+
entity_keys: List of entity keys to read.
201+
requested_features: Optional list of feature names to read.
202+
Returns:
203+
List of tuples containing event timestamp and feature values.
204+
Raises:
205+
ValueError: If the FeatureView does not have the required tag.
206+
NotImplementedError: If no online store is found for the tag value.
207+
"""
208+
tribe = table.tags.get('tribe')
209+
if not tribe:
210+
raise ValueError("FeatureView must have a 'tribe' tag to use HybridOnlineStore.")
211+
online_store = self._get_online_store(tribe, config)
212+
if online_store:
213+
config = RepoConfig(**self._prepare_repo_conf(config, tribe))
214+
return online_store.online_read(config, table, entity_keys, requested_features)
215+
else:
216+
raise NotImplementedError(
217+
f"No online store found for tribe tag '{tribe}'. Please check your configuration.")
218+
219+
def update(
220+
self,
221+
config: RepoConfig,
222+
tables_to_delete: Sequence[FeatureView],
223+
tables_to_keep: Sequence[FeatureView],
224+
entities_to_delete: Sequence[Entity],
225+
entities_to_keep: Sequence[Entity],
226+
partial: bool,
227+
):
228+
"""
229+
Update the state of the online stores for the given FeatureViews and Entities.
230+
231+
Args:
232+
config: Feast RepoConfig.
233+
tables_to_delete: Sequence of FeatureViews to delete.
234+
tables_to_keep: Sequence of FeatureViews to keep.
235+
entities_to_delete: Sequence of Entities to delete.
236+
entities_to_keep: Sequence of Entities to keep.
237+
partial: Whether to perform a partial update.
238+
Raises:
239+
ValueError: If a FeatureView does not have the required tag.
240+
NotImplementedError: If no online store is found for a tag value.
241+
"""
242+
for table in tables_to_keep:
243+
tribe = table.tags.get('tribe')
244+
if not tribe:
245+
raise ValueError("FeatureView must have a 'tribe' tag to use HybridOnlineStore.")
246+
online_store = self._get_online_store(tribe, config)
247+
if online_store:
248+
config = RepoConfig(**self._prepare_repo_conf(config, tribe))
249+
online_store.update(config, tables_to_delete, tables_to_keep, entities_to_delete,
250+
entities_to_keep, partial)
251+
else:
252+
raise NotImplementedError(
253+
f"No online store found for tribe tag '{tribe}'. Please check your configuration.")
254+
255+
def teardown(
256+
self,
257+
config: RepoConfig,
258+
tables: Sequence[FeatureView],
259+
entities: Sequence[Entity],
260+
):
261+
"""
262+
Teardown all managed online stores for the given FeatureViews and Entities.
263+
264+
Args:
265+
config: Feast RepoConfig.
266+
tables: Sequence of FeatureViews to teardown.
267+
entities: Sequence of Entities to teardown.
268+
"""
269+
# Use a set of (tribe, store_type, conf_id) to avoid duplicate teardowns for the same instance
270+
tribes_seen = set()
271+
online_stores_cfg = getattr(config.online_store, 'online_stores', [])
272+
for table in tables:
273+
tribe = table.tags.get('tribe')
274+
if not tribe:
275+
continue
276+
# Find all store configs matching this tribe (supporting multiple instances of the same type)
277+
for store_cfg in online_stores_cfg:
278+
store_type = store_cfg.type
279+
# Use id(store_cfg.conf) to distinguish different configs of the same type
280+
key = (tribe, store_type, id(store_cfg.conf))
281+
if key in tribes_seen:
282+
continue
283+
tribes_seen.add(key)
284+
# Only select the online store if tribe matches the type (or you can add a mapping in config for more flexibility)
285+
if tribe.lower() == store_type.split('.')[-1].lower():
286+
online_store = self._get_online_store(tribe, config)
287+
if online_store:
288+
config = RepoConfig(**self._prepare_repo_conf(config, tribe))
289+
online_store.teardown(config, tables, entities)
290+
291+
# Example usage in your repo config (feature_store.yaml):
292+
#
293+
# online_store:
294+
# type: hybrid_online_store.HybridOnlineStore
295+
# online_stores:
296+
# - type: feast.infra.online_stores.bigtable.BigtableOnlineStore
297+
# conf:
298+
# ... # bigtable config
299+
# - type: feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore
300+
# conf:
301+
# ... # cassandra config
302+
#
303+
# Then, in your FeatureView:
304+
# tags:
305+
# tribe: bigtable
306+
#
307+
# The HybridOnlineStore will route requests to the correct online store based on the tribe tag.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright 2025 The Feast Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This file provides integration test repo configuration for HybridOnlineStore.
16+
# It enables running integration tests with multiple online store backends.
17+
# Update this file if you add more backends or change test setup.
18+
19+
from tests.integration.feature_repos.integration_test_repo_config import (
20+
IntegrationTestRepoConfig,
21+
)
22+
from tests.integration.feature_repos.universal.online_store.hybrid_online_store import HybridOnlineStoreCreator
23+
24+
FULL_REPO_CONFIGS = [
25+
IntegrationTestRepoConfig(online_store_creator=HybridOnlineStoreCreator),
26+
]
27+
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from sdk.python.tests.integration.feature_repos.universal.online_store_creator import OnlineStoreCreator
2+
3+
4+
class HybridOnlineStoreCreator(OnlineStoreCreator):
5+
def create_online_store(self):
6+
# Use Redis and SQLite as two backends for demonstration/testing, but mock Redis config for unit tests
7+
return {
8+
"type": "hybrid_online_store.HybridOnlineStore",
9+
"online_stores": [
10+
{
11+
"type": "redis",
12+
"conf": {
13+
"redis_type": "redis",
14+
"connection_string": "localhost:6379"
15+
}
16+
},
17+
{
18+
"type": "sqlite",
19+
"conf": {
20+
"path": "/tmp/feast_hybrid_test.db"
21+
}
22+
}
23+
]
24+
}
25+
26+
def teardown(self):
27+
# Implement any resource cleanup if needed (e.g., remove test DB files)
28+
pass

0 commit comments

Comments
 (0)