Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -1510,7 +1510,7 @@
"filename": "sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py",
"hashed_secret": "95433727ea51026e1e0dc8deadaabd4a3baaaaf4",
"is_verified": false,
"line_number": 19
"line_number": 21
}
],
"sdk/python/tests/universal/feature_repos/universal/online_store/singlestore.py": [
Expand Down
6 changes: 3 additions & 3 deletions infra/scripts/compile-templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def find_repo(path):
# Template README.md
############################
roadmap_path = repo_root / "docs" / "roadmap.md"
with open(roadmap_path, "r") as f:
with open(roadmap_path, "r", encoding="utf-8") as f:
# skip first lines since it has the title
roadmap_contents_lines = f.readlines()[2:]

# Join back again
roadmap_contents = "".join(roadmap_contents_lines)

template_path = repo_root / "infra" / "templates" / "README.md.jinja2"
with open(template_path) as f:
with open(template_path, encoding="utf-8") as f:
template = Template(f.read())

# Compile template
Expand All @@ -49,5 +49,5 @@ def find_repo(path):
)

readme_path = repo_root / "README.md"
with open(readme_path, "w") as f:
with open(readme_path, "w", encoding="utf-8") as f:
f.write(readme_md)
2 changes: 1 addition & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class VersionedOnlineReadNotSupported(FeastError):
def __init__(self, store_name: str, version: int):
super().__init__(
f"Versioned feature reads (@v{version}) are not yet supported by {store_name}. "
f"Currently only SQLite, PostgreSQL, and MySQL support version-qualified feature references. "
f"Currently only SQLite, PostgreSQL, MySQL, and SingleStore support version-qualified feature references. "
)


Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,9 @@ def teardown(self):

entities = self.list_entities()

self._get_provider().teardown_infra(self.project, tables, entities)
self._get_provider().teardown_infra(
self.project, tables, entities, registry=self.registry
)
self.registry.teardown()

def get_historical_features(
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/online_stores/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from feast.feature_view import DUMMY_ENTITY_NAME
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand Down Expand Up @@ -306,6 +307,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry: Optional[BaseRegistry] = None,
):
# Because of historical reasons, Feast calls them tables. We use this alias for
# readability.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Delete tables from the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Delete tables from the database.
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
online_config = config.online_store
assert isinstance(online_config, DatastoreOnlineStoreConfig)
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Delete tables from the DynamoDB Online Store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
project = config.project
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
self._index = None
self._in_memory_store.teardown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
online_store_config = config.online_store
if not isinstance(online_store_config, HazelcastOnlineStoreConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Delete tables from the Hbase Online Store.
Expand Down
27 changes: 19 additions & 8 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import struct
from datetime import datetime, timezone
from typing import Any, List
from typing import Any, List, Optional

import mmh3

from feast.feature_view import FeatureView
from feast.importer import import_class
from feast.infra.key_encoding_utils import (
serialize_entity_key,
Expand Down Expand Up @@ -72,13 +73,23 @@ def _to_naive_utc(ts: datetime) -> datetime:
return ts.astimezone(tz=timezone.utc).replace(tzinfo=None)


def compute_table_id(project: str, table: Any, enable_versioning: bool = False) -> str:
"""Build the online-store table name, appending a version suffix when versioning is enabled."""
def online_store_table_id(
project: str,
table: FeatureView,
enable_versioning: bool = False,
version: Optional[int] = None,
) -> str:
name = table.name
if enable_versioning:
version = getattr(table.projection, "version_tag", None)
if version is None:
version = getattr(table, "current_version_number", None)
if version is not None and version > 0:
name = f"{table.name}_v{version}"
resolved_version = version
if resolved_version is None:
resolved_version = getattr(table.projection, "version_tag", None)
if resolved_version is None:
resolved_version = getattr(table, "current_version_number", None)
if resolved_version is not None and resolved_version > 0:
name = f"{table.name}_v{resolved_version}"
return f"{project}_{name}"


def compute_table_id(project: str, table: Any, enable_versioning: bool = False) -> str:
return online_store_table_id(project, table, enable_versioning)
Original file line number Diff line number Diff line change
Expand Up @@ -294,34 +294,32 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Teardown all managed online stores for the given FeatureViews and Entities.
"""Teardown all managed online stores for the given FeatureViews and Entities."""

Args:
config: Feast RepoConfig.
tables: Sequence of FeatureViews to teardown.
entities: Sequence of Entities to teardown.
"""
# Use a set of (tribe, store_type, conf_id) to avoid duplicate teardowns for the same instance
tribes_seen = set()
online_stores_cfg = getattr(config.online_store, "online_stores", [])
tag_name = getattr(config.online_store, "routing_tag", "tribe")
self._initialize_online_stores(config)
tables_by_tribe: Dict[str, List[FeatureView]] = {}
for table in tables:
tribe = table.tags.get(tag_name)
tribe = self._get_routing_tag_value(table, config)
if not tribe:
continue
# Find all store configs matching this tribe (supporting multiple instances of the same type)
for store_cfg in online_stores_cfg:
store_type = store_cfg.type
# Use id(store_cfg.conf) to distinguish different configs of the same type
key = (tribe, store_type, id(store_cfg.conf))
if key in tribes_seen:
continue
tribes_seen.add(key)
# Only select the online store if tribe matches the type (or you can add a mapping in config for more flexibility)
if tribe.lower() == store_type.split(".")[-1].lower():
online_store = self._get_online_store(tribe, config)
if online_store:
config = RepoConfig(**self._prepare_repo_conf(config, tribe))
online_store.teardown(config, tables, entities)
tag_name = getattr(config.online_store, "routing_tag", "tribe")
raise ValueError(
f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore."
)
tables_by_tribe.setdefault(tribe, []).append(table)

for tribe, tribe_tables in tables_by_tribe.items():
online_store = self._get_online_store(tribe, config)
if not online_store:
raise NotImplementedError(
f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration."
)

tribe_config = RepoConfig(**self._prepare_repo_conf(config, tribe))
online_store.teardown(
tribe_config,
tribe_tables,
entities,
registry=registry,
)
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
self.client = self._connect(config)
for table in tables:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry=None,
):
"""
Drop the backing collection and close the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.helpers import compute_table_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
Expand Down Expand Up @@ -43,6 +44,10 @@ class MySQLOnlineStore(OnlineStore):

_conn: Optional[Connection] = None

@property
def supports_versioned_online_reads(self) -> bool:
return True

def _get_conn(self, config: RepoConfig) -> Connection:
online_store_config = config.online_store
assert isinstance(online_store_config, MySQLOnlineStoreConfig)
Expand Down Expand Up @@ -78,7 +83,7 @@ def online_write_batch(
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=3,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
Expand All @@ -100,14 +105,14 @@ def online_write_batch(
if progress:
progress(1)
else:
batch_size = config.online_store.bacth_size
batch_size = config.online_store.batch_size
if not batch_size or batch_size < 2:
raise ValueError("Batch size must be at least 2")
insert_values = []
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=2,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
Expand Down Expand Up @@ -223,7 +228,7 @@ def online_read(
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=3,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()

cur.execute(
Expand Down Expand Up @@ -296,6 +301,7 @@ def teardown(
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
registry: Optional[BaseRegistry] = None,
) -> None:
conn = self._get_conn(config)
cur = conn.cursor()
Expand Down
Loading
Loading