Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
713768e
feat: add document store
HaoXuAI Mar 31, 2024
58d5d94
feat: add document store
HaoXuAI Mar 31, 2024
2cd73d1
feat: add document store
HaoXuAI Mar 31, 2024
d2e0a59
feat: add document store
HaoXuAI Mar 31, 2024
7079e7f
remove DocumentStore
HaoXuAI Apr 9, 2024
8c9ee97
format
HaoXuAI Apr 9, 2024
513dd39
Merge branch 'master' into feat-documentstore
HaoXuAI Apr 9, 2024
29d98cd
format
HaoXuAI Apr 9, 2024
11eb97f
format
HaoXuAI Apr 9, 2024
865baf2
format
HaoXuAI Apr 9, 2024
47cd117
format
HaoXuAI Apr 9, 2024
3f9f59f
format
HaoXuAI Apr 9, 2024
7935071
remove unused vars
HaoXuAI Apr 9, 2024
ba39f93
add test
HaoXuAI Apr 11, 2024
cf53c71
add test
HaoXuAI Apr 11, 2024
92046af
format
HaoXuAI Apr 11, 2024
d0acd2d
format
HaoXuAI Apr 11, 2024
cc45f73
format
HaoXuAI Apr 11, 2024
006b5c6
format
HaoXuAI Apr 11, 2024
6e0ba03
format
HaoXuAI Apr 11, 2024
a2302be
fix not implemented issue
HaoXuAI Apr 11, 2024
2e6fc55
fix not implemented issue
HaoXuAI Apr 11, 2024
3cbbf21
fix test
HaoXuAI Apr 11, 2024
ec32764
format
HaoXuAI Apr 11, 2024
e2d8008
format
HaoXuAI Apr 12, 2024
523d20f
format
HaoXuAI Apr 12, 2024
5cd085d
format
HaoXuAI Apr 12, 2024
795699e
format
HaoXuAI Apr 12, 2024
67b007f
format
HaoXuAI Apr 12, 2024
33b46bd
update testcontainer
HaoXuAI Apr 12, 2024
82fe5f1
format
HaoXuAI Apr 12, 2024
0618378
fix postgres integration test
HaoXuAI Apr 12, 2024
7de2016
format
HaoXuAI Apr 12, 2024
92fed1d
fix postgres test
HaoXuAI Apr 14, 2024
d4f2639
fix postgres test
HaoXuAI Apr 14, 2024
396d7de
fix postgres test
HaoXuAI Apr 14, 2024
6c38b92
fix postgres test
HaoXuAI Apr 14, 2024
f763dc9
fix postgres test
HaoXuAI Apr 14, 2024
818c055
format
HaoXuAI Apr 14, 2024
a51b555
format
HaoXuAI Apr 15, 2024
2624b22
format
HaoXuAI Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add document store
  • Loading branch information
HaoXuAI committed Mar 31, 2024
commit 713768e036443ecf404437685a18ee354bd087fd
78 changes: 78 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import pandas as pd
import pyarrow as pa
import numpy as np
from colorama import Fore, Style
from google.protobuf.timestamp_pb2 import Timestamp
from tqdm import tqdm
Expand Down Expand Up @@ -1684,6 +1685,62 @@ def _get_online_features(
)
return OnlineResponse(online_features_response)

@log_exceptions_and_usage
def get_top_k_document_features(self,
feature: Union[str, FeatureService],
document: Union[str, np.ndarray],
top_k: int,
) -> OnlineResponse:
"""
Retrieves the top k cloeses document features.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Retrieves the top k cloeses document features.
Retrieves the top k closest document features.


Args:
feature: The list of document features that should be retrieved from the online document store. These features can be
specified either as a list of string document feature references or as a feature service. String feature
references must have format "feature_view:feature", e.g, "document_fv:document_embedding_feature".
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
references must have format "feature_view:feature", e.g, "document_fv:document_embedding_feature".
references must have format "feature_view:feature", e.g, "document_fv:document_embeddings".

document: The document to retrieve the closest document features for.
top_k: The number of closest document features to retrieve.
"""
return self._get_top_k_document_features(
feature=feature,
document=document,
top_k=top_k,
)

def _get_top_k_document_features(
self,
feature: Union[str, FeatureService],
document: Union[str, np.ndarray],
top_k: int,
):
(
requested_feature_views,
requested_on_demand_feature_views
) = self._get_feature_views_to_use(
features=[feature],
allow_cache=True,
hide_dummy_entity=False
)
requested_feature = feature.split(":")[1] if isinstance(feature, str) else feature
provider = self._get_provider()
document_features = self._search_from_document_store(
provider,
requested_feature_views[0],
requested_feature,
document,
top_k,
)
online_features_response = GetOnlineFeaturesResponse(results=[])
self._populate_response_from_feature_data(
document_features,
[],
online_features_response,
False,
requested_feature,
requested_feature_views[0]
)
return OnlineResponse(online_features_response)

@staticmethod
def _get_columnar_entity_values(
rowise: Optional[List[Dict[str, Any]]], columnar: Optional[Dict[str, List[Any]]]
Expand Down Expand Up @@ -1900,6 +1957,27 @@ def _read_from_online_store(
read_row_protos.append((event_timestamps, statuses, values))
return read_row_protos

def _search_from_document_store(
self,
provider: Provider,
table: FeatureView,
requested_feature: str,
document: Union[str, np.ndarray],
top_k: int,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
"""
Search and return document features from the online document store.
"""
documents = provider.online_search(
config=self.config,
table=table,
requested_feature=requested_feature,
document=document,
top_k=top_k,
)
return documents


@staticmethod
def _populate_response_from_feature_data(
feature_data: Iterable[
Expand Down
123 changes: 99 additions & 24 deletions sdk/python/feast/infra/online_stores/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple

import numpy as np
import psycopg2
import pytz
from psycopg2 import sql
Expand All @@ -12,8 +13,10 @@

from feast import Entity
from feast.feature_view import FeatureView
from feast.feature import Feature
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.online_stores.document_store import DocumentStore, DocumentStoreIndexConfig
from feast.infra.utils.postgres.connection_utils import _get_conn, _get_connection_pool
from feast.infra.utils.postgres.postgres_config import ConnectionType, PostgreSQLConfig
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand Down Expand Up @@ -46,13 +49,13 @@ def _get_conn(self, config: RepoConfig):

@log_exceptions_and_usage(online_store="postgres")
def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
project = config.project

Expand Down Expand Up @@ -80,7 +83,7 @@ def online_write_batch(
# Control the batch so that we can update the progress
batch_size = 5000
for i in range(0, len(insert_values), batch_size):
cur_batch = insert_values[i : i + batch_size]
cur_batch = insert_values[i: i + batch_size]
execute_values(
cur,
sql.SQL(
Expand All @@ -104,11 +107,11 @@ def online_write_batch(

@log_exceptions_and_usage(online_store="postgres")
def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

Expand Down Expand Up @@ -175,13 +178,13 @@ def online_read(

@log_exceptions_and_usage(online_store="postgres")
def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
project = config.project
schema_name = config.online_store.db_schema or config.online_store.user
Expand Down Expand Up @@ -236,10 +239,10 @@ def update(
conn.commit()

def teardown(
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
):
project = config.project
try:
Expand Down Expand Up @@ -273,3 +276,75 @@ def _to_naive_utc(ts: datetime):
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)


# Search query template to find the top k items that are closest to the given embedding
# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
SEARCH_QUERY_TEMPLATE = """
SELECT entity_key, feature_name, value, event_ts FROM {table_name}
WHERE feature_name = '{feature_name}'
ORDER BY value <-> %s
LIMIT %s;
"""

# Create index query template to create a index based on the index type
CREATE_INDEX_QUERY_TEMPLATE = """
CREATE INDEX ON {table_name} USING {index_type} (embedding {embeding_type});
"""


class PostgresDocumentStoreConfig(DocumentStoreIndexConfig):
type: Literal["postgres"] = "postgres"


class PostgresDocumentStore(PostgreSQLOnlineStore, DocumentStore):

def online_search(self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
embedding: np.ndarray,
top_k: int,
):
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

with self._get_conn(config) as conn, conn.cursor() as cur:
cur.execute(SEARCH_QUERY_TEMPLATE.format(
table_name=table,
feature_name=requested_feature
), (embedding, top_k))
rows = cur.fetchall()

for row in rows:
# The first column is the entity key
entity_key = EntityKeyProto()
entity_key.ParseFromString(row[0])

# The second column is the feature name
feature_name = row[1]

# The third column is the embedding value
val = ValueProto()
val.ParseFromString(row[2])

# The fourth column is the event timestamp
event_ts = row[3]

res = {}
res[feature_name] = val
result.append((event_ts, res))


return result

def create_index(self,
config: RepoConfig,
index: str,
index_config: DocumentStoreIndexConfig
):
with self._get_conn(config) as conn, conn.cursor() as cur:
cur.execute(CREATE_INDEX_QUERY_TEMPLATE.format(
table_name=config.project,
index_type=index,
embeding_type=index_config.embedding_type
))
38 changes: 38 additions & 0 deletions sdk/python/feast/infra/online_stores/document_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from abc import abstractmethod
from datetime import datetime
from feast.feature_view import FeatureView
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig, FeastConfigBaseModel
from infra.online_stores.online_store import OnlineStore
from typing import Optional, List, Tuple, Dict
import numpy as np


class DocumentStoreIndexConfig(FeastConfigBaseModel):
embedding_type: Optional[str]


class DocumentStore(OnlineStore):
index: Optional[str]

@abstractmethod
def online_search(self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
embeddings: np.ndarray,
top_k: int,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
raise NotImplementedError(
"You have to implement this!"
)

@abstractmethod
def create_index(self,
config: RepoConfig,
index: str,
index_config: DocumentStoreIndexConfig
):
raise NotImplementedError(
"You have to implement this!"
)
10 changes: 10 additions & 0 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
serialize_entity_key_prefix,
)
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.online_stores.document_store import DocumentStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto


Expand All @@ -21,6 +22,15 @@ def get_online_store_from_config(online_store_config: Any) -> OnlineStore:
return online_store_class()


def get_document_store_from_config(document_store_config: Any) -> DocumentStore:
"""Creates a document store corresponding to the given online document store config."""
module_name = document_store_config.__module__
qualified_name = type(document_store_config).__name__
class_name = qualified_name.replace("Config", "")
document_store_class = import_class(module_name, class_name, "DocumentStore")
return document_store_class()


def _redis_key(
project: str, entity_key: EntityKeyProto, entity_key_serialization_version=1
) -> bytes:
Expand Down
Loading