Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
713768e
feat: add document store
HaoXuAI Mar 31, 2024
58d5d94
feat: add document store
HaoXuAI Mar 31, 2024
2cd73d1
feat: add document store
HaoXuAI Mar 31, 2024
d2e0a59
feat: add document store
HaoXuAI Mar 31, 2024
7079e7f
remove DocumentStore
HaoXuAI Apr 9, 2024
8c9ee97
format
HaoXuAI Apr 9, 2024
513dd39
Merge branch 'master' into feat-documentstore
HaoXuAI Apr 9, 2024
29d98cd
format
HaoXuAI Apr 9, 2024
11eb97f
format
HaoXuAI Apr 9, 2024
865baf2
format
HaoXuAI Apr 9, 2024
47cd117
format
HaoXuAI Apr 9, 2024
3f9f59f
format
HaoXuAI Apr 9, 2024
7935071
remove unused vars
HaoXuAI Apr 9, 2024
ba39f93
add test
HaoXuAI Apr 11, 2024
cf53c71
add test
HaoXuAI Apr 11, 2024
92046af
format
HaoXuAI Apr 11, 2024
d0acd2d
format
HaoXuAI Apr 11, 2024
cc45f73
format
HaoXuAI Apr 11, 2024
006b5c6
format
HaoXuAI Apr 11, 2024
6e0ba03
format
HaoXuAI Apr 11, 2024
a2302be
fix not implemented issue
HaoXuAI Apr 11, 2024
2e6fc55
fix not implemented issue
HaoXuAI Apr 11, 2024
3cbbf21
fix test
HaoXuAI Apr 11, 2024
ec32764
format
HaoXuAI Apr 11, 2024
e2d8008
format
HaoXuAI Apr 12, 2024
523d20f
format
HaoXuAI Apr 12, 2024
5cd085d
format
HaoXuAI Apr 12, 2024
795699e
format
HaoXuAI Apr 12, 2024
67b007f
format
HaoXuAI Apr 12, 2024
33b46bd
update testcontainer
HaoXuAI Apr 12, 2024
82fe5f1
format
HaoXuAI Apr 12, 2024
0618378
fix postgres integration test
HaoXuAI Apr 12, 2024
7de2016
format
HaoXuAI Apr 12, 2024
92fed1d
fix postgres test
HaoXuAI Apr 14, 2024
d4f2639
fix postgres test
HaoXuAI Apr 14, 2024
396d7de
fix postgres test
HaoXuAI Apr 14, 2024
6c38b92
fix postgres test
HaoXuAI Apr 14, 2024
f763dc9
fix postgres test
HaoXuAI Apr 14, 2024
818c055
format
HaoXuAI Apr 14, 2024
a51b555
format
HaoXuAI Apr 15, 2024
2624b22
format
HaoXuAI Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
cast,
)

import numpy as np
import pandas as pd
import pyarrow as pa
from colorama import Fore, Style
Expand Down Expand Up @@ -1684,6 +1685,63 @@ def _get_online_features(
)
return OnlineResponse(online_features_response)

@log_exceptions_and_usage
def get_top_k_document_features(
self,
feature: str,
document: Union[str, np.ndarray],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is document the user input? often referred to as the query?

top_k: int,
) -> OnlineResponse:
"""
Retrieves the top k cloeses document features.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Retrieves the top k cloeses document features.
Retrieves the top k closest document features.


Args:
feature: The list of document features that should be retrieved from the online document store. These features can be
specified either as a list of string document feature references or as a feature service. String feature
references must have format "feature_view:feature", e.g, "document_fv:document_embedding_feature".
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
references must have format "feature_view:feature", e.g, "document_fv:document_embedding_feature".
references must have format "feature_view:feature", e.g, "document_fv:document_embeddings".

document: The document to retrieve the closest document features for.
top_k: The number of closest document features to retrieve.
"""
return self._get_top_k_document_features(
feature=feature,
document=document,
top_k=top_k,
)

def _get_top_k_document_features(
self,
feature: str,
document: Union[str, np.ndarray],
top_k: int,
):
(
requested_feature_views,
requested_on_demand_feature_views,
) = self._get_feature_views_to_use(
features=[feature], allow_cache=True, hide_dummy_entity=False
)
requested_feature = (
feature.split(":")[1] if isinstance(feature, str) else feature
)
provider = self._get_provider()
document_features = self._search_from_document_store(
provider,
requested_feature_views[0],
requested_feature,
document,
top_k,
)
online_features_response = GetOnlineFeaturesResponse(results=[])
self._populate_response_from_feature_data(
document_features,
[],
online_features_response,
False,
requested_feature,
requested_feature_views[0],
)
return OnlineResponse(online_features_response)

@staticmethod
def _get_columnar_entity_values(
rowise: Optional[List[Dict[str, Any]]], columnar: Optional[Dict[str, List[Any]]]
Expand Down Expand Up @@ -1900,6 +1958,48 @@ def _read_from_online_store(
read_row_protos.append((event_timestamps, statuses, values))
return read_row_protos

def _search_from_document_store(
self,
provider: Provider,
table: FeatureView,
requested_feature: str,
document: Union[str, np.ndarray],
top_k: int,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
"""
Search and return document features from the online document store.
"""
documents = provider.online_search(
config=self.config,
table=table,
requested_feature=requested_feature,
document=document,
top_k=top_k,
)
# Each row is a set of features for a given entity key. We only need to convert
# the data to Protobuf once.
null_value = Value()
read_row_protos = []

for doc in documents:
row_ts_proto = Timestamp()
row_ts, feature_data = doc
# TODO (Ly): reuse whatever timestamp if row_ts is None?
if row_ts is not None:
row_ts_proto.FromDatetime(row_ts)
event_timestamps = [row_ts_proto]
if feature_data is None:
statuses = [FieldStatus.NOT_FOUND]
values = [null_value]
else:
statuses = []
values = []
for feature_name, feature_value in feature_data.items():
statuses.append(FieldStatus.PRESENT)
values.append(feature_value)
read_row_protos.append((event_timestamps, statuses, values))
return read_row_protos

@staticmethod
def _populate_response_from_feature_data(
feature_data: Iterable[
Expand Down
77 changes: 77 additions & 0 deletions sdk/python/feast/infra/online_stores/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple

import numpy as np
import psycopg2
import pytz
from psycopg2 import sql
Expand All @@ -13,6 +14,10 @@
from feast import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.document_store import (
DocumentStore,
DocumentStoreIndexConfig,
)
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.utils.postgres.connection_utils import _get_conn, _get_connection_pool
from feast.infra.utils.postgres.postgres_config import ConnectionType, PostgreSQLConfig
Expand Down Expand Up @@ -273,3 +278,75 @@ def _to_naive_utc(ts: datetime):
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)


# Search query template to find the top k items that are closest to the given embedding
# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
SEARCH_QUERY_TEMPLATE = """
SELECT entity_key, feature_name, value, event_ts FROM {table_name}
WHERE feature_name = '{feature_name}'
ORDER BY value <-> %s
LIMIT %s;
"""

# Create index query template to create a index based on the index type
CREATE_INDEX_QUERY_TEMPLATE = """
CREATE INDEX ON {table_name} USING {index_type} (embedding {embeding_type});
"""


class PostgresDocumentStoreConfig(DocumentStoreIndexConfig):
type: Literal["postgres"] = "postgres"


class PostgresDocumentStore(PostgreSQLOnlineStore, DocumentStore):
def online_search(
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
embedding: np.ndarray,
top_k: int,
):
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

with self._get_conn(config) as conn, conn.cursor() as cur:
cur.execute(
SEARCH_QUERY_TEMPLATE.format(
table_name=table, feature_name=requested_feature
),
(embedding, top_k),
)
rows = cur.fetchall()

for row in rows:
# The first column is the entity key
entity_key = EntityKeyProto()
entity_key.ParseFromString(row[0])

# The second column is the feature name
feature_name = row[1]

# The third column is the embedding value
val = ValueProto()
val.ParseFromString(row[2])

# The fourth column is the event timestamp
event_ts = row[3]

res = {}
res[feature_name] = val
result.append((event_ts, res))

return result

def create_index(self, config: RepoConfig, table: str):
document_store_config = config.document_store_config
with self._get_conn(config) as conn, conn.cursor() as cur:
cur.execute(
CREATE_INDEX_QUERY_TEMPLATE.format(
table=table,
index_type=document_store_config.index_type,
embeding_type=document_store_config.embedding_type,
)
)
28 changes: 28 additions & 0 deletions sdk/python/feast/infra/online_stores/document_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from abc import abstractmethod
from datetime import datetime
from typing import Dict, List, Optional, Tuple

import numpy as np

from feast.feature_view import FeatureView
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig


class DocumentStoreIndexConfig(FeastConfigBaseModel):
index_type: Optional[str]
embedding_type: Optional[str]


class DocumentStore(OnlineStore):
@abstractmethod
def online_search(
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
embeddings: np.ndarray,
top_k: int,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
raise NotImplementedError("You have to implement this!")
10 changes: 10 additions & 0 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
serialize_entity_key,
serialize_entity_key_prefix,
)
from feast.infra.online_stores.document_store import DocumentStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto

Expand All @@ -21,6 +22,15 @@ def get_online_store_from_config(online_store_config: Any) -> OnlineStore:
return online_store_class()


def get_document_store_from_config(document_store_config: Any) -> DocumentStore:
"""Creates a document store corresponding to the given online document store config."""
module_name = document_store_config.__module__
qualified_name = type(document_store_config).__name__
class_name = qualified_name.replace("Config", "")
document_store_class = import_class(module_name, class_name, "DocumentStore")
return document_store_class()


def _redis_key(
project: str, entity_key: EntityKeyProto, entity_key_serialization_version=1
) -> bytes:
Expand Down
32 changes: 31 additions & 1 deletion sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd
import pyarrow as pa
from tqdm import tqdm
Expand All @@ -18,7 +19,10 @@
)
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.online_stores.helpers import (
get_document_store_from_config,
get_online_store_from_config,
)
from feast.infra.provider import Provider
from feast.infra.registry.base_registry import BaseRegistry
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand Down Expand Up @@ -47,6 +51,7 @@ def __init__(self, config: RepoConfig):
self.repo_config = config
self._offline_store = None
self._online_store = None
self._document_store = None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer necessary

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the issue. One question, though: what if we want to continue using Redis or any other online store for usual features, and use PG vector solely for embedding and search? Do we have the option to use the online store and the document store in the feature_store.yaml, both?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think currently Feast doesn't support multiple online store. but that would be a good feature to add.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could get complicated but agreed it'd be good to add. I could imagine a Redis + another DB layer would be an obvious one.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you again, guys, for these amazing features. Yes, having multiple online stores will make it easier to use the right database layer for the appropriate use case! 🙌

self._batch_engine: Optional[BatchMaterializationEngine] = None

@property
Expand All @@ -57,6 +62,14 @@ def online_store(self):
)
return self._online_store

@property
def document_store(self):
if not self._document_store:
self._document_store = get_document_store_from_config(
self.repo_config.online_store
)
return self._document_store

@property
def offline_store(self):
if not self._offline_store:
Expand Down Expand Up @@ -190,6 +203,23 @@ def online_read(
)
return result

@log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
def online_search(
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
embeddings: np.ndarray,
top_k: int,
) -> List:
set_usage_attribute("provider", self.__class__.__name__)
result = []
if self.document_store:
result = self.document_store.online_search(
config, table, requested_feature, embeddings, top_k
)
return result

def ingest_df(
self,
feature_view: FeatureView,
Expand Down
25 changes: 25 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd
import pyarrow
from tqdm import tqdm
Expand Down Expand Up @@ -295,6 +296,30 @@ def get_feature_server_endpoint(self) -> Optional[str]:
"""Returns endpoint for the feature server, if it exists."""
return None

@abstractmethod
def online_search(
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
document: Union[str, np.ndarray],
top_k: int,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Searches for the top-k nearest neighbors of the given document in the online document store.

Args:
config: The config for the current feature store.
table: The feature view whose embeddings should be searched.
requested_feature: the requested document feature name.
document: The document to search for.
top_k: The number of nearest neighbors to return.

Returns:
A list of dictionaries, where each dictionary contains the document feature.
"""
pass


def get_provider(config: RepoConfig) -> Provider:
if "." not in config.provider:
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
"mysql": "feast.infra.online_stores.contrib.mysql_online_store.mysql.MySQLOnlineStore",
"rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore",
"hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore",
# below are supported Online Document Store
"postgresDocument": "feast.infra.online_stores.contrib.postgres.PostgresDocumentStore",
}

OFFLINE_STORE_CLASS_FOR_TYPE = {
Expand Down Expand Up @@ -181,6 +183,9 @@ class RepoConfig(FeastBaseModel):
coerce_tz_aware: Optional[bool] = True
""" If True, coerces entity_df timestamp columns to be timezone aware (to UTC by default). """

document_store_config: Any = Field(None, alias="document_store")
""" DocumentStoreConfig: Document store configuration (optional depending on provider) """

def __init__(self, **data: Any):
super().__init__(**data)

Expand Down