Skip to content

Commit 7079e7f

Browse files
committed
remove DocumentStore
1 parent d2e0a59 commit 7079e7f

7 files changed

Lines changed: 105 additions & 148 deletions

File tree

sdk/python/feast/feature_store.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,49 +1686,53 @@ def _get_online_features(
16861686
return OnlineResponse(online_features_response)
16871687

16881688
@log_exceptions_and_usage
1689-
def get_top_k_document_features(
1689+
def retrieve_online_documents(
16901690
self,
16911691
feature: str,
1692-
document: Union[str, np.ndarray],
1692+
query: Union[str, List[float]],
16931693
top_k: int,
16941694
) -> OnlineResponse:
16951695
"""
1696-
Retrieves the top k cloeses document features.
1696+
Retrieves the top k closest document features.
16971697
16981698
Args:
16991699
feature: The list of document features that should be retrieved from the online document store. These features can be
17001700
specified either as a list of string document feature references or as a feature service. String feature
17011701
references must have format "feature_view:feature", e.g, "document_fv:document_embedding_feature".
1702-
document: The document to retrieve the closest document features for.
1702+
query: The query to retrieve the closest document features for.
17031703
top_k: The number of closest document features to retrieve.
17041704
"""
1705-
return self._get_top_k_document_features(
1705+
return self._retrieve_online_documents(
17061706
feature=feature,
1707-
document=document,
1707+
query=query,
17081708
top_k=top_k,
17091709
)
17101710

1711-
def _get_top_k_document_features(
1711+
def _retrieve_online_documents(
17121712
self,
17131713
feature: str,
1714-
document: Union[str, np.ndarray],
1714+
query: Union[str, List[float]],
17151715
top_k: int,
17161716
):
1717+
if isinstance(query, str):
1718+
raise ValueError(
1719+
"Using embedding functionality is not supported for document retrieval. Please embed the query before calling retrieve_online_documents."
1720+
)
17171721
(
17181722
requested_feature_views,
1719-
requested_on_demand_feature_views,
1723+
_,
17201724
) = self._get_feature_views_to_use(
17211725
features=[feature], allow_cache=True, hide_dummy_entity=False
17221726
)
17231727
requested_feature = (
17241728
feature.split(":")[1] if isinstance(feature, str) else feature
17251729
)
17261730
provider = self._get_provider()
1727-
document_features = self._search_from_document_store(
1731+
document_features = self._retrieve_from_online_store(
17281732
provider,
17291733
requested_feature_views[0],
17301734
requested_feature,
1731-
document,
1735+
query,
17321736
top_k,
17331737
)
17341738
online_features_response = GetOnlineFeaturesResponse(results=[])
@@ -1958,22 +1962,22 @@ def _read_from_online_store(
19581962
read_row_protos.append((event_timestamps, statuses, values))
19591963
return read_row_protos
19601964

1961-
def _search_from_document_store(
1965+
def _retrieve_from_online_store(
19621966
self,
19631967
provider: Provider,
19641968
table: FeatureView,
19651969
requested_feature: str,
1966-
document: Union[str, np.ndarray],
1970+
query: Union[str, List[float]],
19671971
top_k: int,
19681972
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
19691973
"""
19701974
Search and return document features from the online document store.
19711975
"""
1972-
documents = provider.online_search(
1976+
documents = provider.retrieve_online_documents(
19731977
config=self.config,
19741978
table=table,
19751979
requested_feature=requested_feature,
1976-
document=document,
1980+
query=query,
19771981
top_k=top_k,
19781982
)
19791983
# Each row is a set of features for a given entity key. We only need to convert

sdk/python/feast/infra/online_stores/contrib/postgres.py

Lines changed: 58 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@
1414
from feast import Entity
1515
from feast.feature_view import FeatureView
1616
from feast.infra.key_encoding_utils import serialize_entity_key
17-
from feast.infra.online_stores.document_store import (
18-
DocumentStore,
19-
DocumentStoreIndexConfig,
20-
)
2117
from feast.infra.online_stores.online_store import OnlineStore
2218
from feast.infra.utils.postgres.connection_utils import _get_conn, _get_connection_pool
2319
from feast.infra.utils.postgres.postgres_config import ConnectionType, PostgreSQLConfig
@@ -27,6 +23,22 @@
2723
from feast.usage import log_exceptions_and_usage
2824

2925

26+
# Search query template to find the top k items that are closest to the given embedding
27+
# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
28+
SEARCH_QUERY_TEMPLATE = """
29+
SELECT feature_name, value, event_ts FROM {table_name}
30+
WHERE feature_name = '{feature_name}'
31+
ORDER BY value <-> %s
32+
LIMIT %s;
33+
"""
34+
35+
36+
# Create index query template to create a index based on the index type
37+
CREATE_INDEX_QUERY_TEMPLATE = """
38+
CREATE INDEX ON {table_name} USING {index_type} (embedding {embeding_type});
39+
"""
40+
41+
3042
class PostgreSQLOnlineStoreConfig(PostgreSQLConfig):
3143
type: Literal["postgres"] = "postgres"
3244

@@ -256,6 +268,48 @@ def teardown(
256268
logging.exception("Teardown failed")
257269
raise
258270

271+
def retrieve_online_documents(
272+
self,
273+
config: RepoConfig,
274+
table: FeatureView,
275+
requested_feature: str,
276+
embedding: List[float],
277+
top_k: int,
278+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
279+
"""
280+
281+
Args:
282+
config: Feast configuration object
283+
table: FeatureView object as the table to search
284+
requested_feature: The requested feature as the column to search
285+
embedding: The query embedding to search for
286+
top_k: The number of items to return
287+
Returns:
288+
List of tuples containing the event timestamp and the document feature
289+
290+
"""
291+
292+
# Convert the embedding to a string to be used in postgres vector search
293+
query_embedding_str = f"'[{','.join(str(el) for el in embedding)}]'"
294+
295+
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
296+
with self._get_conn(config) as conn, conn.cursor() as cur:
297+
cur.execute(
298+
SEARCH_QUERY_TEMPLATE.format(
299+
table_name=table, feature_name=requested_feature
300+
),
301+
(query_embedding_str, top_k),
302+
)
303+
rows = cur.fetchall()
304+
305+
for feature_name, value, event_ts in rows:
306+
val = ValueProto()
307+
val.ParseFromString(value)
308+
309+
res = {feature_name: val}
310+
result.append((event_ts, res))
311+
312+
return result
259313

260314
def _table_id(project: str, table: FeatureView) -> str:
261315
return f"{project}_{table.name}"
@@ -278,75 +332,3 @@ def _to_naive_utc(ts: datetime):
278332
return ts
279333
else:
280334
return ts.astimezone(pytz.utc).replace(tzinfo=None)
281-
282-
283-
# Search query template to find the top k items that are closest to the given embedding
284-
# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
285-
SEARCH_QUERY_TEMPLATE = """
286-
SELECT entity_key, feature_name, value, event_ts FROM {table_name}
287-
WHERE feature_name = '{feature_name}'
288-
ORDER BY value <-> %s
289-
LIMIT %s;
290-
"""
291-
292-
# Create index query template to create a index based on the index type
293-
CREATE_INDEX_QUERY_TEMPLATE = """
294-
CREATE INDEX ON {table_name} USING {index_type} (embedding {embeding_type});
295-
"""
296-
297-
298-
class PostgresDocumentStoreConfig(DocumentStoreIndexConfig):
299-
type: Literal["postgres"] = "postgres"
300-
301-
302-
class PostgresDocumentStore(PostgreSQLOnlineStore, DocumentStore):
303-
def online_search(
304-
self,
305-
config: RepoConfig,
306-
table: FeatureView,
307-
requested_feature: str,
308-
embedding: np.ndarray,
309-
top_k: int,
310-
):
311-
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
312-
313-
with self._get_conn(config) as conn, conn.cursor() as cur:
314-
cur.execute(
315-
SEARCH_QUERY_TEMPLATE.format(
316-
table_name=table, feature_name=requested_feature
317-
),
318-
(embedding, top_k),
319-
)
320-
rows = cur.fetchall()
321-
322-
for row in rows:
323-
# The first column is the entity key
324-
entity_key = EntityKeyProto()
325-
entity_key.ParseFromString(row[0])
326-
327-
# The second column is the feature name
328-
feature_name = row[1]
329-
330-
# The third column is the embedding value
331-
val = ValueProto()
332-
val.ParseFromString(row[2])
333-
334-
# The fourth column is the event timestamp
335-
event_ts = row[3]
336-
337-
res = {}
338-
res[feature_name] = val
339-
result.append((event_ts, res))
340-
341-
return result
342-
343-
def create_index(self, config: RepoConfig, table: str):
344-
document_store_config = config.document_store_config
345-
with self._get_conn(config) as conn, conn.cursor() as cur:
346-
cur.execute(
347-
CREATE_INDEX_QUERY_TEMPLATE.format(
348-
table=table,
349-
index_type=document_store_config.index_type,
350-
embeding_type=document_store_config.embedding_type,
351-
)
352-
)

sdk/python/feast/infra/online_stores/document_store.py

Lines changed: 0 additions & 28 deletions
This file was deleted.

sdk/python/feast/infra/online_stores/helpers.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
serialize_entity_key,
99
serialize_entity_key_prefix,
1010
)
11-
from feast.infra.online_stores.document_store import DocumentStore
1211
from feast.infra.online_stores.online_store import OnlineStore
1312
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
1413

@@ -22,15 +21,6 @@ def get_online_store_from_config(online_store_config: Any) -> OnlineStore:
2221
return online_store_class()
2322

2423

25-
def get_document_store_from_config(document_store_config: Any) -> DocumentStore:
26-
"""Creates a document store corresponding to the given online document store config."""
27-
module_name = document_store_config.__module__
28-
qualified_name = type(document_store_config).__name__
29-
class_name = qualified_name.replace("Config", "")
30-
document_store_class = import_class(module_name, class_name, "DocumentStore")
31-
return document_store_class()
32-
33-
3424
def _redis_key(
3525
project: str, entity_key: EntityKeyProto, entity_key_serialization_version=1
3626
) -> bytes:

sdk/python/feast/infra/online_stores/online_store.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,23 @@ def teardown(
134134
entities: Entities whose corresponding infrastructure should be deleted.
135135
"""
136136
pass
137+
138+
def retrieve_online_documents(
139+
self,
140+
config: RepoConfig,
141+
table: FeatureView,
142+
requested_feature: str,
143+
embedding: List[float],
144+
top_k: int,
145+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
146+
"""
147+
Retrieves online feature values for the specified embeddings.
148+
149+
Args:
150+
config: The config for the current feature store.
151+
table: The feature view whose feature values should be read.
152+
requested_feature: The name of the feature whose embeddings should be used for retrieval.
153+
embedding: The embeddings to use for retrieval.
154+
top_k: The number of nearest neighbors to retrieve.
155+
"""
156+
pass

sdk/python/feast/infra/passthrough_provider.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from datetime import datetime, timedelta
22
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
33

4-
import numpy as np
54
import pandas as pd
65
import pyarrow as pa
76
from tqdm import tqdm
@@ -20,7 +19,6 @@
2019
from feast.infra.offline_stores.offline_store import RetrievalJob
2120
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
2221
from feast.infra.online_stores.helpers import (
23-
get_document_store_from_config,
2422
get_online_store_from_config,
2523
)
2624
from feast.infra.provider import Provider
@@ -62,14 +60,6 @@ def online_store(self):
6260
)
6361
return self._online_store
6462

65-
@property
66-
def document_store(self):
67-
if not self._document_store:
68-
self._document_store = get_document_store_from_config(
69-
self.repo_config.online_store
70-
)
71-
return self._document_store
72-
7363
@property
7464
def offline_store(self):
7565
if not self._offline_store:
@@ -204,19 +194,19 @@ def online_read(
204194
return result
205195

206196
@log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
207-
def online_search(
197+
def retrieve_online_documents(
208198
self,
209199
config: RepoConfig,
210200
table: FeatureView,
211201
requested_feature: str,
212-
embeddings: np.ndarray,
202+
embedding: List[float],
213203
top_k: int,
214204
) -> List:
215205
set_usage_attribute("provider", self.__class__.__name__)
216206
result = []
217-
if self.document_store:
218-
result = self.document_store.online_search(
219-
config, table, requested_feature, embeddings, top_k
207+
if self.online_store:
208+
result = self.online_store.retrieve_online_documents(
209+
config, table, requested_feature, embedding, top_k
220210
)
221211
return result
222212

0 commit comments

Comments
 (0)