Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint
Signed-off-by: cmuhao <sduxuhao@gmail.com>
  • Loading branch information
HaoXuAI committed Sep 10, 2024
commit c24dc7535ecfd36f1342ac01603fde52042871d9
17 changes: 13 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import pyarrow as pa
from colorama import Fore, Style
from google.protobuf.timestamp_pb2 import Timestamp
from protos.feast.types.EntityKey_pb2 import EntityKey
from tqdm import tqdm

from feast import feature_server, flags_helper, ui_server, utils
Expand Down Expand Up @@ -85,7 +86,6 @@
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _utc_now
from feast.version import get_version
from protos.feast.types.EntityKey_pb2 import EntityKey

warnings.simplefilter("once", DeprecationWarning)

Expand Down Expand Up @@ -1678,7 +1678,7 @@ def retrieve_online_documents(
data={
"entity_key": entity_key_vals,
requested_feature: document_feature_vals,
"distance": document_feature_distance_vals
"distance": document_feature_distance_vals,
},
)
return OnlineResponse(online_features_response)
Expand All @@ -1691,7 +1691,9 @@ def _retrieve_from_online_store(
query: List[float],
top_k: int,
distance_metric: Optional[str],
) -> List[Tuple[Timestamp, EntityKey, "FieldStatus.ValueType", Value, Value, Value]]:
) -> List[
Tuple[Timestamp, EntityKey, "FieldStatus.ValueType", Value, Value, Value]
]:
"""
Search and return document features from the online document store.
"""
Expand Down Expand Up @@ -1721,7 +1723,14 @@ def _retrieve_from_online_store(
status = FieldStatus.PRESENT

read_row_protos.append(
(row_ts_proto, entity_key, status, feature_val, vector_value, distance_val)
(
row_ts_proto,
entity_key,
status,
feature_val,
vector_value,
distance_val,
)
)
return read_row_protos

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import contextlib
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

import duckdb
from typing import Optional, Dict, Any, List, Tuple, Union
from feast import Entity
from infra.key_encoding_utils import serialize_entity_key
from utils import _build_retrieve_online_document_results

from feast.feature_view import FeatureView
from feast.infra.online_stores.online_store import OnlineStore
from feast.repo_config import RepoConfig
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from infra.key_encoding_utils import serialize_entity_key
from utils import _build_retrieve_online_document_results
from feast.repo_config import RepoConfig


class DuckDBOnlineStoreConfig:
Expand All @@ -22,30 +22,28 @@ class DuckDBOnlineStoreConfig:


class DuckDBOnlineStore(OnlineStore):
async def online_read_async(self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None) -> List[
Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
async def online_read_async(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
pass

def __init__(self,
config: DuckDBOnlineStoreConfig):
def __init__(self, config: DuckDBOnlineStoreConfig):
self.config = config
self.connection = None

@contextlib.contextmanager
def _get_conn(self,
config: RepoConfig) -> Any:
def _get_conn(self, config: RepoConfig) -> Any:
if self.connection is None:
self.connection = duckdb.connect(database=self.config.path, read_only=False)
yield self.connection

def create_vector_index(self,
config: RepoConfig,
table_name: str,
vector_column: str) -> None:
def create_vector_index(
self, config: RepoConfig, table_name: str, vector_column: str
) -> None:
"""Create an HNSW index for vector similarity search."""
if not config.enable_vector_search:
raise ValueError("Vector search is not enabled in the configuration.")
Expand All @@ -57,47 +55,56 @@ def create_vector_index(self,
)

def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto]]],
self,
config: RepoConfig,
table: FeatureView,
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto]]],
) -> None:
insert_values = []
for entity_key, values in data:
entity_key_bin = serialize_entity_key(entity_key).hex()
for feature_name, val in values.items():
insert_values.append((entity_key_bin, feature_name, val.SerializeToString()))
insert_values.append(
(entity_key_bin, feature_name, val.SerializeToString())
)

with self._get_conn(config) as conn:
conn.execute(f"CREATE TABLE IF NOT EXISTS {table.name} (entity_key BLOB, feature_name TEXT, value BLOB)")
conn.execute(
f"CREATE TABLE IF NOT EXISTS {table.name} (entity_key BLOB, feature_name TEXT, value BLOB)"
)
conn.executemany(
f"INSERT INTO {table.name} (entity_key, feature_name, value) VALUES (?, ?, ?)",
insert_values
insert_values,
)

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[Dict[str, ValueProto]]]]:
keys = [serialize_entity_key(key).hex() for key in entity_keys]
query = f"SELECT feature_name, value FROM {table.name} WHERE entity_key IN ({','.join(['?'] * len(keys))})"

with self._get_conn(config) as conn:
results = conn.execute(query, keys).fetchall()

return [{feature_name: ValueProto().ParseFromString(value) for feature_name, value in results}]
return [
{
feature_name: ValueProto().ParseFromString(value)
for feature_name, value in results
}
]

def retrieve_online_documents(
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
embedding: List[float],
top_k: int,
distance_metric: Optional[str] = "L2",
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
embedding: List[float],
top_k: int,
distance_metric: Optional[str] = "L2",
) -> List[
Tuple[
Optional[datetime],
Expand All @@ -111,7 +118,9 @@ def retrieve_online_documents(
if not self.config.enable_vector_search:
raise ValueError("Vector search is not enabled in the configuration.")
if config.entity_key_serialization_version < 3:
raise ValueError("Entity key serialization version must be at least 3 for vector search.")
raise ValueError(
"Entity key serialization version must be at least 3 for vector search."
)

result: List[
Tuple[
Expand All @@ -138,28 +147,29 @@ def retrieve_online_documents(
rows = conn.execute(query, (embedding, top_k)).fetchall()
result = _build_retrieve_online_document_results(
rows,
entity_key_serialization_version=config.entity_key_serialization_version
entity_key_serialization_version=config.entity_key_serialization_version,
)

return result

def update(
self,
config: RepoConfig,
tables_to_delete: List[FeatureView],
tables_to_keep: List[FeatureView],
self,
config: RepoConfig,
tables_to_delete: List[FeatureView],
tables_to_keep: List[FeatureView],
) -> None:
with self._get_conn(config) as conn:
for table in tables_to_delete:
conn.execute(f"DROP TABLE IF EXISTS {table.name}")
for table in tables_to_keep:
conn.execute(
f"CREATE TABLE IF NOT EXISTS {table.name} (entity_key BLOB, feature_name TEXT, value BLOB)")
f"CREATE TABLE IF NOT EXISTS {table.name} (entity_key BLOB, feature_name TEXT, value BLOB)"
)

def teardown(
self,
config: RepoConfig,
tables: List[FeatureView],
self,
config: RepoConfig,
tables: List[FeatureView],
) -> None:
with self._get_conn(config) as conn:
for table in tables:
Expand Down
Loading