Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
713768e
feat: add document store
HaoXuAI Mar 31, 2024
58d5d94
feat: add document store
HaoXuAI Mar 31, 2024
2cd73d1
feat: add document store
HaoXuAI Mar 31, 2024
d2e0a59
feat: add document store
HaoXuAI Mar 31, 2024
7079e7f
remove DocumentStore
HaoXuAI Apr 9, 2024
8c9ee97
format
HaoXuAI Apr 9, 2024
513dd39
Merge branch 'master' into feat-documentstore
HaoXuAI Apr 9, 2024
29d98cd
format
HaoXuAI Apr 9, 2024
11eb97f
format
HaoXuAI Apr 9, 2024
865baf2
format
HaoXuAI Apr 9, 2024
47cd117
format
HaoXuAI Apr 9, 2024
3f9f59f
format
HaoXuAI Apr 9, 2024
7935071
remove unused vars
HaoXuAI Apr 9, 2024
ba39f93
add test
HaoXuAI Apr 11, 2024
cf53c71
add test
HaoXuAI Apr 11, 2024
92046af
format
HaoXuAI Apr 11, 2024
d0acd2d
format
HaoXuAI Apr 11, 2024
cc45f73
format
HaoXuAI Apr 11, 2024
006b5c6
format
HaoXuAI Apr 11, 2024
6e0ba03
format
HaoXuAI Apr 11, 2024
a2302be
fix not implemented issue
HaoXuAI Apr 11, 2024
2e6fc55
fix not implemented issue
HaoXuAI Apr 11, 2024
3cbbf21
fix test
HaoXuAI Apr 11, 2024
ec32764
format
HaoXuAI Apr 11, 2024
e2d8008
format
HaoXuAI Apr 12, 2024
523d20f
format
HaoXuAI Apr 12, 2024
5cd085d
format
HaoXuAI Apr 12, 2024
795699e
format
HaoXuAI Apr 12, 2024
67b007f
format
HaoXuAI Apr 12, 2024
33b46bd
update testcontainer
HaoXuAI Apr 12, 2024
82fe5f1
format
HaoXuAI Apr 12, 2024
0618378
fix postgres integration test
HaoXuAI Apr 12, 2024
7de2016
format
HaoXuAI Apr 12, 2024
92fed1d
fix postgres test
HaoXuAI Apr 14, 2024
d4f2639
fix postgres test
HaoXuAI Apr 14, 2024
396d7de
fix postgres test
HaoXuAI Apr 14, 2024
6c38b92
fix postgres test
HaoXuAI Apr 14, 2024
f763dc9
fix postgres test
HaoXuAI Apr 14, 2024
818c055
format
HaoXuAI Apr 14, 2024
a51b555
format
HaoXuAI Apr 15, 2024
2624b22
format
HaoXuAI Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ test-python-universal-postgres-offline:
test-python-universal-postgres-online:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.postgres_repo_configuration \
PYTEST_PLUGINS=sdk.python.feast.infra.offline_stores.contrib.postgres_offline_store.tests \
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this now, was this the right choice?

PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.postgres \
python -m pytest -n 8 --integration \
-k "not test_universal_cli and \
not test_go_feature_server and \
Expand Down
100 changes: 100 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,67 @@ def _get_online_features(
)
return OnlineResponse(online_features_response)

@log_exceptions_and_usage
def retrieve_online_documents(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's probably something to be said about having a configurable distance metric to let the user choose which way to get the top_k

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, there are a bunch of different algorithms/configs for Postgresql to retrieve the documents. We can support it in the future after this PR

self,
feature: str,
query: Union[str, List[float]],
top_k: int,
) -> OnlineResponse:
"""
Retrieves the top k closest document features. Note, embeddings are a subset of features.

Args:
feature: The list of document features that should be retrieved from the online document store. These features can be
specified either as a list of string document feature references or as a feature service. String feature
references must have format "feature_view:feature", e.g, "document_fv:document_embeddings".
query: The query to retrieve the closest document features for.
top_k: The number of closest document features to retrieve.
"""
return self._retrieve_online_documents(
feature=feature,
query=query,
top_k=top_k,
)

def _retrieve_online_documents(
self,
feature: str,
query: Union[str, List[float]],
top_k: int,
):
if isinstance(query, str):
raise ValueError(
"Using embedding functionality is not supported for document retrieval. Please embed the query before calling retrieve_online_documents."
)
(
requested_feature_views,
_,
) = self._get_feature_views_to_use(
features=[feature], allow_cache=True, hide_dummy_entity=False
)
requested_feature = (
feature.split(":")[1] if isinstance(feature, str) else feature
)
provider = self._get_provider()
document_features = self._retrieve_from_online_store(
provider,
requested_feature_views[0],
requested_feature,
query,
top_k,
)
online_features_response = GetOnlineFeaturesResponse(results=[])
self._populate_response_from_feature_data(
document_features,
[],
online_features_response,
False,
requested_feature,
requested_feature_views[0],
)
return OnlineResponse(online_features_response)

@staticmethod
def _get_columnar_entity_values(
rowise: Optional[List[Dict[str, Any]]], columnar: Optional[Dict[str, List[Any]]]
Expand Down Expand Up @@ -1906,6 +1967,45 @@ def _read_from_online_store(
read_row_protos.append((event_timestamps, statuses, values))
return read_row_protos

def _retrieve_from_online_store(
self,
provider: Provider,
table: FeatureView,
requested_feature: str,
query: List[float],
top_k: int,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
"""
Search and return document features from the online document store.
"""
documents = provider.retrieve_online_documents(
config=self.config,
table=table,
requested_feature=requested_feature,
query=query,
top_k=top_k,
)
# Each row is a set of features for a given entity key. We only need to convert
# the data to Protobuf once.
null_value = Value()
read_row_protos = []

for doc in documents:
row_ts_proto = Timestamp()
row_ts, feature_data = doc
# TODO (Ly): reuse whatever timestamp if row_ts is None?
if row_ts is not None:
row_ts_proto.FromDatetime(row_ts)
event_timestamps = [row_ts_proto]
if feature_data is None:
statuses = [FieldStatus.NOT_FOUND]
values = [null_value]
else:
statuses = [FieldStatus.PRESENT]
values = [feature_data]
read_row_protos.append((event_timestamps, statuses, values))
return read_row_protos

@staticmethod
def _populate_response_from_feature_data(
feature_data: Iterable[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from testcontainers.core.waiting_utils import wait_for_logs

from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import (
PostgreSQLOfflineStoreConfig,
PostgreSQLSource,
Expand Down Expand Up @@ -57,6 +58,9 @@ def postgres_container():


class PostgreSQLDataSourceCreator(DataSourceCreator, OnlineStoreCreator):
def create_logged_features_destination(self) -> LoggingDestination:
return None # type: ignore

def __init__(
self, project_name: str, fixture_request: pytest.FixtureRequest, **kwargs
):
Expand Down
51 changes: 51 additions & 0 deletions sdk/python/feast/infra/online_stores/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@
from feast.repo_config import RepoConfig
from feast.usage import log_exceptions_and_usage

# Search query template to find the top k items that are closest to the given embedding
# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
SEARCH_QUERY_TEMPLATE = """
SELECT feature_name, value, event_ts FROM {table_name}
WHERE feature_name = '{feature_name}'
ORDER BY value <-> %s
LIMIT %s;
"""


class PostgreSQLOnlineStoreConfig(PostgreSQLConfig):
type: Literal["postgres"] = "postgres"
Expand Down Expand Up @@ -251,6 +260,48 @@ def teardown(
logging.exception("Teardown failed")
raise

def retrieve_online_documents(
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
embedding: List[float],
top_k: int,
) -> List[Tuple[Optional[datetime], Optional[ValueProto]]]:
"""

Args:
config: Feast configuration object
table: FeatureView object as the table to search
requested_feature: The requested feature as the column to search
embedding: The query embedding to search for
top_k: The number of items to return
Returns:
List of tuples containing the event timestamp and the document feature

"""

# Convert the embedding to a string to be used in postgres vector search
query_embedding_str = f"'[{','.join(str(el) for el in embedding)}]'"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the best serialization we can do? This feels pretty brittle but I get it.


result: List[Tuple[Optional[datetime], Optional[ValueProto]]] = []
with self._get_conn(config) as conn, conn.cursor() as cur:
cur.execute(
SEARCH_QUERY_TEMPLATE.format(
table_name=table, feature_name=requested_feature
),
(query_embedding_str, top_k),
)
rows = cur.fetchall()

for feature_name, value, event_ts in rows:
val = ValueProto()
val.ParseFromString(value)

result.append((event_ts, val))

return result


def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
)

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(online_store_creator=PostgreSQLDataSourceCreator),
IntegrationTestRepoConfig(
online_store="postgres", online_store_creator=PostgreSQLDataSourceCreator
),
]
27 changes: 27 additions & 0 deletions sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,30 @@ def teardown(
entities: Entities whose corresponding infrastructure should be deleted.
"""
pass

def retrieve_online_documents(
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
embedding: List[float],
top_k: int,
) -> List[Tuple[Optional[datetime], Optional[ValueProto]]]:
"""
Retrieves online feature values for the specified embeddings.

Args:
config: The config for the current feature store.
table: The feature view whose feature values should be read.
requested_feature: The name of the feature whose embeddings should be used for retrieval.
embedding: The embeddings to use for retrieval.
top_k: The number of nearest neighbors to retrieve.

Returns:
object: A list of top k closest documents to the specified embedding. Each item in the list is a tuple
where the first item is the event timestamp for the row, and the second item is a dict of feature
name to embeddings.
"""
raise NotImplementedError(
f"Online store {self.__class__.__name__} does not support online retrieval"
)
17 changes: 17 additions & 0 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,23 @@ def online_read(
)
return result

@log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
def retrieve_online_documents(
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
embedding: List[float],
top_k: int,
) -> List:
set_usage_attribute("provider", self.__class__.__name__)
result = []
if self.online_store:
result = self.online_store.retrieve_online_documents(
config, table, requested_feature, embedding, top_k
)
return result

def ingest_df(
self,
feature_view: FeatureView,
Expand Down
24 changes: 24 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,30 @@ def get_feature_server_endpoint(self) -> Optional[str]:
"""Returns endpoint for the feature server, if it exists."""
return None

@abstractmethod
def retrieve_online_documents(
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
query: List[float],
top_k: int,
) -> List[Tuple[Optional[datetime], Optional[ValueProto]]]:
"""
Searches for the top-k nearest neighbors of the given document in the online document store.

Args:
config: The config for the current feature store.
table: The feature view whose embeddings should be searched.
requested_feature: the requested document feature name.
query: The query embedding to search for.
top_k: The number of nearest neighbors to return.

Returns:
A list of dictionaries, where each dictionary contains the document feature.
"""
pass


def get_provider(config: RepoConfig) -> Provider:
if "." not in config.provider:
Expand Down
12 changes: 12 additions & 0 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,15 @@ def fake_ingest_data():
"created": [pd.Timestamp(datetime.utcnow()).round("ms")],
}
return pd.DataFrame(data)


@pytest.fixture
def fake_ingest_document_data():
"""Fake document data to ingest into the feature store"""
data = {
"driver_id": [1],
"doc": [4, 5],
"event_timestamp": [pd.Timestamp(datetime.utcnow()).round("ms")],
"created": [pd.Timestamp(datetime.utcnow()).round("ms")],
}
return pd.DataFrame(data)
10 changes: 10 additions & 0 deletions sdk/python/tests/foo_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,13 @@ def retrieve_feature_service_logs(
registry: BaseRegistry,
) -> RetrievalJob:
return RetrievalJob()

def retrieve_online_documents(
self,
config: RepoConfig,
table: FeatureView,
requested_feature: str,
query: List[float],
top_k: int,
) -> List[Tuple[Optional[datetime], Optional[ValueProto]]]:
return []
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Dict

from testcontainers.postgres import PostgresContainer

from tests.integration.feature_repos.universal.online_store_creator import (
OnlineStoreCreator,
)


class PostgresOnlieStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.container = (
PostgresContainer("postgres:latest", platform="linux/amd64")
.with_exposed_ports(5432)
.with_env("POSTGRES_USER", "root")
.with_env("POSTGRES_PASSWORD", "test")
.with_env("POSTGRES_DB", "test")
)

def create_online_store(self) -> Dict[str, str]:
self.container.start()
exposed_port = self.container.get_exposed_port(5432)
return {
"type": "postgres",
"user": "root",
"password": "test",
"database": "test",
"port": exposed_port,
}

def teardown(self):
self.container.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -785,3 +785,26 @@ def assert_feature_service_entity_mapping_correctness(
entity_rows=entity_rows,
full_feature_names=full_feature_names,
)


@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["postgres"])
def test_retrieve_online_documents(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will you be outputting the cosine similarity as well? That would be useful possibly for debugging. Would be good to be able to test that the engine computes it...maybe not doable though.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be possible. Somehow just the integration test doesn't startup the Postgres container. And I'm debugging it.

environment, universal_data_sources, fake_ingest_document_data
):
fs = environment.feature_store
entities, datasets, data_sources = universal_data_sources
driver_hourly_stats = create_driver_hourly_stats_feature_view(data_sources.driver)
driver_entity = driver()

# Register Feature View and Entity
fs.apply([driver_hourly_stats, driver_entity])

# directly ingest data into the Online Store
fs.write_to_online_store("document_fv", fake_ingest_document_data)

# retrieve the online documents
documents = fs.retrieve_online_documents(
feature="document_fv:doc", query="[1, 2]", top_k=5
)
assert len(documents) == 2