Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
feat: Added HybridOnlineStore for multi-backend online store routing
- Implements HybridOnlineStore, enabling routing of online feature operations to different backends based on a configurable tag (e.g., tribe, team, or project) on the FeatureView.
- Adds support for specifying the routing tag name via the 'routing_tag' field in the online_store config, allowing flexible backend selection.
- Supports multi-tenancy and flexible data management by allowing multiple online store backends in a single Feast deployment.
- added documentation
- fixed linter raised issues

Signed-off-by: r0b0fyi <renukaprasannakumar.badugu@walmart.com>
  • Loading branch information
r0b0fyi committed Jun 22, 2025
commit ba8543b2b6fc8a532858af1c69171882601e4aee
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ class HybridOnlineStoreConfig(FeastConfigBaseModel):
type: The type identifier for the HybridOnlineStore.
online_stores: A list of OnlineStoresWithConfig, each specifying the type and config for an online store backend.
"""
type: Literal[
"HybridOnlineStore", "hybrid_online_store.HybridOnlineStore"] = "hybrid_online_store.HybridOnlineStore"

type: Literal["HybridOnlineStore", "hybrid_online_store.HybridOnlineStore"] = (
"hybrid_online_store.HybridOnlineStore"
)

class OnlineStoresWithConfig(FeastConfigBaseModel):
"""
Expand All @@ -74,11 +76,14 @@ class OnlineStoresWithConfig(FeastConfigBaseModel):
type: Python import path to the online store class.
conf: Dictionary of configuration parameters for the online store.
"""

type: StrictStr # Python import path to the online store class
conf: Dict

online_stores: Optional[List[OnlineStoresWithConfig]]
routing_tag: StrictStr = "tribe" # Configurable tag name for routing, default is 'tribe'
routing_tag: StrictStr = (
"tribe" # Configurable tag name for routing, default is 'tribe'
)


class HybridOnlineStore(OnlineStore):
Expand All @@ -89,6 +94,7 @@ class HybridOnlineStore(OnlineStore):

The backend is selected dynamically at runtime according to the tag value.
"""

def __init__(self):
"""
Initialize the HybridOnlineStore. Online stores are instantiated lazily on first use.
Expand All @@ -106,12 +112,16 @@ def _initialize_online_stores(self, config: RepoConfig):
if self._initialized:
return
self.online_stores = {}
online_stores_cfg = getattr(config.online_store, 'online_stores', [])
online_stores_cfg = getattr(config.online_store, "online_stores", [])
for store_cfg in online_stores_cfg:
config_cls = get_online_config_from_type(store_cfg.type.split('.')[-1].lower())
config_cls = get_online_config_from_type(
store_cfg.type.split(".")[-1].lower()
)
config_instance = config_cls(**store_cfg.conf)
online_store_instance = get_online_store_from_config(config_instance)
self.online_stores[store_cfg.type.split('.')[-1].lower()] = online_store_instance
self.online_stores[store_cfg.type.split(".")[-1].lower()] = (
online_store_instance
)
self._initialized = True

def _get_online_store(self, tribe_tag, config: RepoConfig):
Expand Down Expand Up @@ -139,27 +149,27 @@ def _prepare_repo_conf(self, config: RepoConfig, online_store_type: str):
"""
rconfig = config
for online_store in config.online_store.online_stores:
if online_store.type.split('.')[-1].lower() == online_store_type.lower():
if online_store.type.split(".")[-1].lower() == online_store_type.lower():
rconfig.online_config = online_store.conf
rconfig.online_config["type"] = online_store.type
data = rconfig.__dict__
data['registry'] = data['registry_config']
data['offline_store'] = data['offline_config']
data['online_store'] = data['online_config']
data["registry"] = data["registry_config"]
data["offline_store"] = data["offline_config"]
data["online_store"] = data["online_config"]
return data

def _get_routing_tag_value(self, table: FeatureView, config: RepoConfig):
tag_name = getattr(config.online_store, 'routing_tag', 'tribe')
tag_name = getattr(config.online_store, "routing_tag", "tribe")
return table.tags.get(tag_name)

def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
odata: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
self,
config: RepoConfig,
table: FeatureView,
odata: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
"""
Write a batch of feature rows to the appropriate online store based on the FeatureView's tag.
Expand All @@ -175,29 +185,34 @@ def online_write_batch(
"""
tribe = self._get_routing_tag_value(table, config)
if not tribe:
tag_name = getattr(config.online_store, 'routing_tag', 'tribe')
raise ValueError(f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore.")
tag_name = getattr(config.online_store, "routing_tag", "tribe")
raise ValueError(
f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore."
)
online_store = self._get_online_store(tribe, config)
if online_store:
config = RepoConfig(**self._prepare_repo_conf(config, tribe))
online_store.online_write_batch(config, table, odata, progress)
else:
raise NotImplementedError(
f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration.")
f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration."
)

@staticmethod
def write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val):
def write_to_table(
created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val
):
"""
(Not implemented) Write a single feature value to the online store table.
"""
pass

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Read feature rows from the appropriate online store based on the FeatureView's tag.
Expand All @@ -215,24 +230,29 @@ def online_read(
"""
tribe = self._get_routing_tag_value(table, config)
if not tribe:
tag_name = getattr(config.online_store, 'routing_tag', 'tribe')
raise ValueError(f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore.")
tag_name = getattr(config.online_store, "routing_tag", "tribe")
raise ValueError(
f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore."
)
online_store = self._get_online_store(tribe, config)
if online_store:
config = RepoConfig(**self._prepare_repo_conf(config, tribe))
return online_store.online_read(config, table, entity_keys, requested_features)
return online_store.online_read(
config, table, entity_keys, requested_features
)
else:
raise NotImplementedError(
f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration.")
f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration."
)

def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
"""
Update the state of the online stores for the given FeatureViews and Entities.
Expand All @@ -251,22 +271,31 @@ def update(
for table in tables_to_keep:
tribe = self._get_routing_tag_value(table, config)
if not tribe:
tag_name = getattr(config.online_store, 'routing_tag', 'tribe')
raise ValueError(f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore.")
tag_name = getattr(config.online_store, "routing_tag", "tribe")
raise ValueError(
f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore."
)
online_store = self._get_online_store(tribe, config)
if online_store:
config = RepoConfig(**self._prepare_repo_conf(config, tribe))
online_store.update(config, tables_to_delete, tables_to_keep, entities_to_delete,
entities_to_keep, partial)
online_store.update(
config,
tables_to_delete,
tables_to_keep,
entities_to_delete,
entities_to_keep,
partial,
)
else:
raise NotImplementedError(
f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration.")
f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration."
)

def teardown(
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
):
"""
Teardown all managed online stores for the given FeatureViews and Entities.
Expand All @@ -278,8 +307,8 @@ def teardown(
"""
# Use a set of (tribe, store_type, conf_id) to avoid duplicate teardowns for the same instance
tribes_seen = set()
online_stores_cfg = getattr(config.online_store, 'online_stores', [])
tag_name = getattr(config.online_store, 'routing_tag', 'tribe')
online_stores_cfg = getattr(config.online_store, "online_stores", [])
tag_name = getattr(config.online_store, "routing_tag", "tribe")
for table in tables:
tribe = table.tags.get(tag_name)
if not tribe:
Expand All @@ -293,7 +322,7 @@ def teardown(
continue
tribes_seen.add(key)
# Only select the online store if tribe matches the type (or you can add a mapping in config for more flexibility)
if tribe.lower() == store_type.split('.')[-1].lower():
if tribe.lower() == store_type.split(".")[-1].lower():
online_store = self._get_online_store(tribe, config)
if online_store:
config = RepoConfig(**self._prepare_repo_conf(config, tribe))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,3 @@
FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(online_store_creator=HybridOnlineStoreCreator),
]

Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,11 @@ def create_online_store(self):
"type": "redis",
"conf": {
"redis_type": "redis",
"connection_string": "localhost:6379"
}
"connection_string": "localhost:6379",
},
},
{
"type": "sqlite",
"conf": {
"path": "/tmp/feast_hybrid_test.db"
}
}
]
{"type": "sqlite", "conf": {"path": "/tmp/feast_hybrid_test.db"}},
],
}

def teardown(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
def sample_entity():
return Entity(name="id", join_keys=["id"], value_type=ValueType.INT64)


@pytest.fixture
def sample_feature_view(sample_entity):
file_source = FileSource(
Expand All @@ -32,6 +33,7 @@ def sample_feature_view(sample_entity):
source=file_source,
)


@pytest.fixture
def sample_repo_config():
# Minimal config for HybridOnlineStore with two backends (mocked for test)
Expand All @@ -43,10 +45,7 @@ def sample_repo_config():
online_stores=[
HybridOnlineStoreConfig.OnlineStoresWithConfig(
type="redis",
conf={
"redis_type": "redis",
"connection_string": "localhost:6379"
},
conf={"redis_type": "redis", "connection_string": "localhost:6379"},
),
HybridOnlineStoreConfig.OnlineStoresWithConfig(
type="sqlite",
Expand All @@ -57,26 +56,32 @@ def sample_repo_config():
offline_store=None,
)


@pytest.mark.usefixtures("sample_entity", "sample_feature_view", "sample_repo_config")
def test_hybrid_online_store_write_and_read(sample_repo_config, sample_feature_view):
with patch("feast.infra.online_stores.redis.RedisOnlineStore.online_write_batch") as mock_write, \
patch("feast.infra.online_stores.redis.RedisOnlineStore.online_read") as mock_read:
with (
patch(
"feast.infra.online_stores.redis.RedisOnlineStore.online_write_batch"
) as mock_write,
patch(
"feast.infra.online_stores.redis.RedisOnlineStore.online_read"
) as mock_read,
):
mock_write.return_value = None
mock_read.return_value = [
(None, {"feature1": Value(int64_val=100)})
]
mock_read.return_value = [(None, {"feature1": Value(int64_val=100)})]
store = HybridOnlineStore()
entity_key = EntityKey(
join_keys=["id"],
entity_values=[Value(int64_val=1)],
)
now = datetime.utcnow()
odata = [
(entity_key, {"feature1": Value(int64_val=100)}, now, None)
]
odata = [(entity_key, {"feature1": Value(int64_val=100)}, now, None)]
# Write to the online store (mocked)
store.online_write_batch(sample_repo_config, sample_feature_view, odata, progress=None)
store.online_write_batch(
sample_repo_config, sample_feature_view, odata, progress=None
)
# Read back (mocked)
result = store.online_read(sample_repo_config, sample_feature_view, [entity_key])
result = store.online_read(
sample_repo_config, sample_feature_view, [entity_key]
)
assert result[0][1]["feature1"].int64_val == 100

Loading