|
| 1 | +""" |
| 2 | +hybrid_online_store.py |
| 3 | +---------------------- |
| 4 | +
|
| 5 | +This module provides the HybridOnlineStore, a Feast OnlineStore implementation that enables routing online feature operations |
| 6 | +to different online stores based on a tag (e.g., tribe, team, or project) on the FeatureView. This allows a single Feast deployment |
| 7 | +to support multiple online store backends, each configured independently and selected dynamically at runtime. |
| 8 | +
|
| 9 | +Features: |
| 10 | + - Supports multiple online store backends in a single Feast deployment. |
| 11 | + - Routes online reads and writes to the correct backend based on a tag on the FeatureView. |
| 12 | + - Enables multi-tenancy and flexible data management strategies. |
| 13 | + - Designed for extensibility and compatibility with Feast's OnlineStore interface. |
| 14 | +
|
| 15 | +Usage: |
| 16 | + 1. Add a 'tribe' (or similar) tag to your FeatureView. |
| 17 | + 2. Configure multiple online stores in your Feast repo config under 'online_stores'. |
| 18 | + 3. The HybridOnlineStore will route reads and writes to the correct backend based on the tag. |
| 19 | +
|
| 20 | +Example configuration (feature_store.yaml): |
| 21 | +
|
| 22 | + online_store: |
| 23 | + type: hybrid_online_store.HybridOnlineStore |
| 24 | + online_stores: |
| 25 | + - type: feast.infra.online_stores.bigtable.BigtableOnlineStore |
| 26 | + conf: |
| 27 | + ... # bigtable config |
| 28 | + - type: feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore |
| 29 | + conf: |
| 30 | + ... # cassandra config |
| 31 | +
|
| 32 | +Example FeatureView: |
| 33 | +
|
| 34 | + tags: |
| 35 | + tribe: bigtable |
| 36 | +
|
| 37 | +The HybridOnlineStore will route requests to the correct online store based on the tribe tag. |
| 38 | +""" |
| 39 | + |
| 40 | +from datetime import datetime |
| 41 | +from typing import Sequence, List, Optional, Tuple, Dict, Callable, Any, Literal |
| 42 | +from importlib import import_module |
| 43 | + |
| 44 | +from feast import RepoConfig, FeatureView, Entity |
| 45 | +from feast.infra.online_stores.helpers import get_online_store_from_config |
| 46 | +from feast.infra.online_stores.online_store import OnlineStore |
| 47 | +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto |
| 48 | +from feast.protos.feast.types.Value_pb2 import Value as ValueProto |
| 49 | +from feast.repo_config import FeastConfigBaseModel, get_online_config_from_type |
| 50 | +from pydantic import StrictStr |
| 51 | + |
| 52 | + |
| 53 | +class HybridOnlineStoreConfig(FeastConfigBaseModel): |
| 54 | + """ |
| 55 | + Configuration for HybridOnlineStore. |
| 56 | +
|
| 57 | + This config allows multiple online stores to be used in a single Feast deployment. Each online store is specified by its type (Python import path) |
| 58 | + and a configuration dictionary. The HybridOnlineStore uses this configuration to instantiate and manage the set of online stores. |
| 59 | +
|
| 60 | + Attributes: |
| 61 | + type: The type identifier for the HybridOnlineStore. |
| 62 | + online_stores: A list of OnlineStoresWithConfig, each specifying the type and config for an online store backend. |
| 63 | + """ |
| 64 | + type: Literal[ |
| 65 | + "HybridOnlineStore", "hybrid_online_store.HybridOnlineStore"] = "hybrid_online_store.HybridOnlineStore" |
| 66 | + |
| 67 | + class OnlineStoresWithConfig(FeastConfigBaseModel): |
| 68 | + """ |
| 69 | + Configuration for a single online store backend. |
| 70 | +
|
| 71 | + Attributes: |
| 72 | + type: Python import path to the online store class. |
| 73 | + conf: Dictionary of configuration parameters for the online store. |
| 74 | + """ |
| 75 | + type: StrictStr # Python import path to the online store class |
| 76 | + conf: Dict |
| 77 | + |
| 78 | + online_stores: Optional[List[OnlineStoresWithConfig]] |
| 79 | + |
| 80 | + |
| 81 | +class HybridOnlineStore(OnlineStore): |
| 82 | + """ |
| 83 | + HybridOnlineStore routes online feature operations to different online store backends |
| 84 | + based on a tag (e.g., 'tribe') on the FeatureView. This enables multi-tenancy and flexible |
| 85 | + backend selection in a single Feast deployment. |
| 86 | +
|
| 87 | + The backend is selected dynamically at runtime according to the tag value. |
| 88 | + """ |
| 89 | + def __init__(self): |
| 90 | + """ |
| 91 | + Initialize the HybridOnlineStore. Online stores are instantiated lazily on first use. |
| 92 | + """ |
| 93 | + self.online_stores = {} |
| 94 | + self._initialized = False |
| 95 | + |
| 96 | + def _initialize_online_stores(self, config: RepoConfig): |
| 97 | + """ |
| 98 | + Lazily instantiate all configured online store backends from the repo config. |
| 99 | +
|
| 100 | + Args: |
| 101 | + config: Feast RepoConfig containing the online_stores configuration. |
| 102 | + """ |
| 103 | + if self._initialized: |
| 104 | + return |
| 105 | + self.online_stores = {} |
| 106 | + online_stores_cfg = getattr(config.online_store, 'online_stores', []) |
| 107 | + for store_cfg in online_stores_cfg: |
| 108 | + config_cls = get_online_config_from_type(store_cfg.type.split('.')[-1].lower()) |
| 109 | + config_instance = config_cls(**store_cfg.conf) |
| 110 | + online_store_instance = get_online_store_from_config(config_instance) |
| 111 | + self.online_stores[store_cfg.type.split('.')[-1].lower()] = online_store_instance |
| 112 | + self._initialized = True |
| 113 | + |
| 114 | + def _get_online_store(self, tribe_tag, config: RepoConfig): |
| 115 | + """ |
| 116 | + Retrieve the online store backend corresponding to the given tag value. |
| 117 | +
|
| 118 | + Args: |
| 119 | + tribe_tag: The tag value (e.g., 'tribe') used to select the backend. |
| 120 | + config: Feast RepoConfig. |
| 121 | + Returns: |
| 122 | + The OnlineStore instance for the given tag, or None if not found. |
| 123 | + """ |
| 124 | + self._initialize_online_stores(config) |
| 125 | + return self.online_stores.get(tribe_tag.lower()) |
| 126 | + |
| 127 | + def _prepare_repo_conf(self, config: RepoConfig, online_store_type: str): |
| 128 | + """ |
| 129 | + Prepare a RepoConfig for the selected online store backend. |
| 130 | +
|
| 131 | + Args: |
| 132 | + config: The original Feast RepoConfig. |
| 133 | + online_store_type: The type of the online store backend to use. |
| 134 | + Returns: |
| 135 | + A dictionary representing the updated RepoConfig for the selected backend. |
| 136 | + """ |
| 137 | + rconfig = config |
| 138 | + for online_store in config.online_store.online_stores: |
| 139 | + if online_store.type.split('.')[-1].lower() == online_store_type.lower(): |
| 140 | + rconfig.online_config = online_store.conf |
| 141 | + rconfig.online_config["type"] = online_store.type |
| 142 | + data = rconfig.__dict__ |
| 143 | + data['registry'] = data['registry_config'] |
| 144 | + data['offline_store'] = data['offline_config'] |
| 145 | + data['online_store'] = data['online_config'] |
| 146 | + return data |
| 147 | + |
| 148 | + def online_write_batch( |
| 149 | + self, |
| 150 | + config: RepoConfig, |
| 151 | + table: FeatureView, |
| 152 | + odata: List[ |
| 153 | + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] |
| 154 | + ], |
| 155 | + progress: Optional[Callable[[int], Any]], |
| 156 | + ) -> None: |
| 157 | + """ |
| 158 | + Write a batch of feature rows to the appropriate online store based on the FeatureView's tag. |
| 159 | +
|
| 160 | + Args: |
| 161 | + config: Feast RepoConfig. |
| 162 | + table: FeatureView to write to. Must have a tag (e.g., 'tribe') to select the backend. |
| 163 | + odata: List of tuples containing entity key, feature values, event timestamp, and created timestamp. |
| 164 | + progress: Optional callback for progress reporting. |
| 165 | + Raises: |
| 166 | + ValueError: If the FeatureView does not have the required tag. |
| 167 | + NotImplementedError: If no online store is found for the tag value. |
| 168 | + """ |
| 169 | + tribe = table.tags.get('tribe') |
| 170 | + if not tribe: |
| 171 | + raise ValueError("FeatureView must have a 'tribe' tag to use HybridOnlineStore.") |
| 172 | + online_store = self._get_online_store(tribe, config) |
| 173 | + if online_store: |
| 174 | + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) |
| 175 | + online_store.online_write_batch(config, table, odata, progress) |
| 176 | + else: |
| 177 | + raise NotImplementedError( |
| 178 | + f"No online store found for tribe tag '{tribe}'. Please check your configuration.") |
| 179 | + |
| 180 | + @staticmethod |
| 181 | + def write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val): |
| 182 | + """ |
| 183 | + (Not implemented) Write a single feature value to the online store table. |
| 184 | + """ |
| 185 | + pass |
| 186 | + |
| 187 | + def online_read( |
| 188 | + self, |
| 189 | + config: RepoConfig, |
| 190 | + table: FeatureView, |
| 191 | + entity_keys: List[EntityKeyProto], |
| 192 | + requested_features: Optional[List[str]] = None, |
| 193 | + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: |
| 194 | + """ |
| 195 | + Read feature rows from the appropriate online store based on the FeatureView's tag. |
| 196 | +
|
| 197 | + Args: |
| 198 | + config: Feast RepoConfig. |
| 199 | + table: FeatureView to read from. Must have a tag (e.g., 'tribe') to select the backend. |
| 200 | + entity_keys: List of entity keys to read. |
| 201 | + requested_features: Optional list of feature names to read. |
| 202 | + Returns: |
| 203 | + List of tuples containing event timestamp and feature values. |
| 204 | + Raises: |
| 205 | + ValueError: If the FeatureView does not have the required tag. |
| 206 | + NotImplementedError: If no online store is found for the tag value. |
| 207 | + """ |
| 208 | + tribe = table.tags.get('tribe') |
| 209 | + if not tribe: |
| 210 | + raise ValueError("FeatureView must have a 'tribe' tag to use HybridOnlineStore.") |
| 211 | + online_store = self._get_online_store(tribe, config) |
| 212 | + if online_store: |
| 213 | + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) |
| 214 | + return online_store.online_read(config, table, entity_keys, requested_features) |
| 215 | + else: |
| 216 | + raise NotImplementedError( |
| 217 | + f"No online store found for tribe tag '{tribe}'. Please check your configuration.") |
| 218 | + |
| 219 | + def update( |
| 220 | + self, |
| 221 | + config: RepoConfig, |
| 222 | + tables_to_delete: Sequence[FeatureView], |
| 223 | + tables_to_keep: Sequence[FeatureView], |
| 224 | + entities_to_delete: Sequence[Entity], |
| 225 | + entities_to_keep: Sequence[Entity], |
| 226 | + partial: bool, |
| 227 | + ): |
| 228 | + """ |
| 229 | + Update the state of the online stores for the given FeatureViews and Entities. |
| 230 | +
|
| 231 | + Args: |
| 232 | + config: Feast RepoConfig. |
| 233 | + tables_to_delete: Sequence of FeatureViews to delete. |
| 234 | + tables_to_keep: Sequence of FeatureViews to keep. |
| 235 | + entities_to_delete: Sequence of Entities to delete. |
| 236 | + entities_to_keep: Sequence of Entities to keep. |
| 237 | + partial: Whether to perform a partial update. |
| 238 | + Raises: |
| 239 | + ValueError: If a FeatureView does not have the required tag. |
| 240 | + NotImplementedError: If no online store is found for a tag value. |
| 241 | + """ |
| 242 | + for table in tables_to_keep: |
| 243 | + tribe = table.tags.get('tribe') |
| 244 | + if not tribe: |
| 245 | + raise ValueError("FeatureView must have a 'tribe' tag to use HybridOnlineStore.") |
| 246 | + online_store = self._get_online_store(tribe, config) |
| 247 | + if online_store: |
| 248 | + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) |
| 249 | + online_store.update(config, tables_to_delete, tables_to_keep, entities_to_delete, |
| 250 | + entities_to_keep, partial) |
| 251 | + else: |
| 252 | + raise NotImplementedError( |
| 253 | + f"No online store found for tribe tag '{tribe}'. Please check your configuration.") |
| 254 | + |
| 255 | + def teardown( |
| 256 | + self, |
| 257 | + config: RepoConfig, |
| 258 | + tables: Sequence[FeatureView], |
| 259 | + entities: Sequence[Entity], |
| 260 | + ): |
| 261 | + """ |
| 262 | + Teardown all managed online stores for the given FeatureViews and Entities. |
| 263 | +
|
| 264 | + Args: |
| 265 | + config: Feast RepoConfig. |
| 266 | + tables: Sequence of FeatureViews to teardown. |
| 267 | + entities: Sequence of Entities to teardown. |
| 268 | + """ |
| 269 | + # Use a set of (tribe, store_type, conf_id) to avoid duplicate teardowns for the same instance |
| 270 | + tribes_seen = set() |
| 271 | + online_stores_cfg = getattr(config.online_store, 'online_stores', []) |
| 272 | + for table in tables: |
| 273 | + tribe = table.tags.get('tribe') |
| 274 | + if not tribe: |
| 275 | + continue |
| 276 | + # Find all store configs matching this tribe (supporting multiple instances of the same type) |
| 277 | + for store_cfg in online_stores_cfg: |
| 278 | + store_type = store_cfg.type |
| 279 | + # Use id(store_cfg.conf) to distinguish different configs of the same type |
| 280 | + key = (tribe, store_type, id(store_cfg.conf)) |
| 281 | + if key in tribes_seen: |
| 282 | + continue |
| 283 | + tribes_seen.add(key) |
| 284 | + # Only select the online store if tribe matches the type (or you can add a mapping in config for more flexibility) |
| 285 | + if tribe.lower() == store_type.split('.')[-1].lower(): |
| 286 | + online_store = self._get_online_store(tribe, config) |
| 287 | + if online_store: |
| 288 | + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) |
| 289 | + online_store.teardown(config, tables, entities) |
| 290 | + |
| 291 | +# Example usage in your repo config (feature_store.yaml): |
| 292 | +# |
| 293 | +# online_store: |
| 294 | +# type: hybrid_online_store.HybridOnlineStore |
| 295 | +# online_stores: |
| 296 | +# - type: feast.infra.online_stores.bigtable.BigtableOnlineStore |
| 297 | +# conf: |
| 298 | +# ... # bigtable config |
| 299 | +# - type: feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore |
| 300 | +# conf: |
| 301 | +# ... # cassandra config |
| 302 | +# |
| 303 | +# Then, in your FeatureView: |
| 304 | +# tags: |
| 305 | +# tribe: bigtable |
| 306 | +# |
| 307 | +# The HybridOnlineStore will route requests to the correct online store based on the tribe tag. |
0 commit comments