Skip to content

Commit 713768e

Browse files
committed
feat: add document store
1 parent afd52b8 commit 713768e

7 files changed

Lines changed: 341 additions & 83 deletions

File tree

sdk/python/feast/feature_store.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import pandas as pd
3737
import pyarrow as pa
38+
import numpy as np
3839
from colorama import Fore, Style
3940
from google.protobuf.timestamp_pb2 import Timestamp
4041
from tqdm import tqdm
@@ -1684,6 +1685,62 @@ def _get_online_features(
16841685
)
16851686
return OnlineResponse(online_features_response)
16861687

1688+
@log_exceptions_and_usage
1689+
def get_top_k_document_features(self,
1690+
feature: Union[str, FeatureService],
1691+
document: Union[str, np.ndarray],
1692+
top_k: int,
1693+
) -> OnlineResponse:
1694+
"""
1695+
Retrieves the top k cloeses document features.
1696+
1697+
Args:
1698+
feature: The list of document features that should be retrieved from the online document store. These features can be
1699+
specified either as a list of string document feature references or as a feature service. String feature
1700+
references must have format "feature_view:feature", e.g, "document_fv:document_embedding_feature".
1701+
document: The document to retrieve the closest document features for.
1702+
top_k: The number of closest document features to retrieve.
1703+
"""
1704+
return self._get_top_k_document_features(
1705+
feature=feature,
1706+
document=document,
1707+
top_k=top_k,
1708+
)
1709+
1710+
def _get_top_k_document_features(
1711+
self,
1712+
feature: Union[str, FeatureService],
1713+
document: Union[str, np.ndarray],
1714+
top_k: int,
1715+
):
1716+
(
1717+
requested_feature_views,
1718+
requested_on_demand_feature_views
1719+
) = self._get_feature_views_to_use(
1720+
features=[feature],
1721+
allow_cache=True,
1722+
hide_dummy_entity=False
1723+
)
1724+
requested_feature = feature.split(":")[1] if isinstance(feature, str) else feature
1725+
provider = self._get_provider()
1726+
document_features = self._search_from_document_store(
1727+
provider,
1728+
requested_feature_views[0],
1729+
requested_feature,
1730+
document,
1731+
top_k,
1732+
)
1733+
online_features_response = GetOnlineFeaturesResponse(results=[])
1734+
self._populate_response_from_feature_data(
1735+
document_features,
1736+
[],
1737+
online_features_response,
1738+
False,
1739+
requested_feature,
1740+
requested_feature_views[0]
1741+
)
1742+
return OnlineResponse(online_features_response)
1743+
16871744
@staticmethod
16881745
def _get_columnar_entity_values(
16891746
rowise: Optional[List[Dict[str, Any]]], columnar: Optional[Dict[str, List[Any]]]
@@ -1900,6 +1957,27 @@ def _read_from_online_store(
19001957
read_row_protos.append((event_timestamps, statuses, values))
19011958
return read_row_protos
19021959

1960+
def _search_from_document_store(
1961+
self,
1962+
provider: Provider,
1963+
table: FeatureView,
1964+
requested_feature: str,
1965+
document: Union[str, np.ndarray],
1966+
top_k: int,
1967+
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
1968+
"""
1969+
Search and return document features from the online document store.
1970+
"""
1971+
documents = provider.online_search(
1972+
config=self.config,
1973+
table=table,
1974+
requested_feature=requested_feature,
1975+
document=document,
1976+
top_k=top_k,
1977+
)
1978+
return documents
1979+
1980+
19031981
@staticmethod
19041982
def _populate_response_from_feature_data(
19051983
feature_data: Iterable[

sdk/python/feast/infra/online_stores/contrib/postgres.py

Lines changed: 99 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from datetime import datetime
55
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple
66

7+
import numpy as np
78
import psycopg2
89
import pytz
910
from psycopg2 import sql
@@ -12,8 +13,10 @@
1213

1314
from feast import Entity
1415
from feast.feature_view import FeatureView
16+
from feast.feature import Feature
1517
from feast.infra.key_encoding_utils import serialize_entity_key
1618
from feast.infra.online_stores.online_store import OnlineStore
19+
from feast.infra.online_stores.document_store import DocumentStore, DocumentStoreIndexConfig
1720
from feast.infra.utils.postgres.connection_utils import _get_conn, _get_connection_pool
1821
from feast.infra.utils.postgres.postgres_config import ConnectionType, PostgreSQLConfig
1922
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -46,13 +49,13 @@ def _get_conn(self, config: RepoConfig):
4649

4750
@log_exceptions_and_usage(online_store="postgres")
4851
def online_write_batch(
49-
self,
50-
config: RepoConfig,
51-
table: FeatureView,
52-
data: List[
53-
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
54-
],
55-
progress: Optional[Callable[[int], Any]],
52+
self,
53+
config: RepoConfig,
54+
table: FeatureView,
55+
data: List[
56+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
57+
],
58+
progress: Optional[Callable[[int], Any]],
5659
) -> None:
5760
project = config.project
5861

@@ -80,7 +83,7 @@ def online_write_batch(
8083
# Control the batch so that we can update the progress
8184
batch_size = 5000
8285
for i in range(0, len(insert_values), batch_size):
83-
cur_batch = insert_values[i : i + batch_size]
86+
cur_batch = insert_values[i: i + batch_size]
8487
execute_values(
8588
cur,
8689
sql.SQL(
@@ -104,11 +107,11 @@ def online_write_batch(
104107

105108
@log_exceptions_and_usage(online_store="postgres")
106109
def online_read(
107-
self,
108-
config: RepoConfig,
109-
table: FeatureView,
110-
entity_keys: List[EntityKeyProto],
111-
requested_features: Optional[List[str]] = None,
110+
self,
111+
config: RepoConfig,
112+
table: FeatureView,
113+
entity_keys: List[EntityKeyProto],
114+
requested_features: Optional[List[str]] = None,
112115
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
113116
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
114117

@@ -175,13 +178,13 @@ def online_read(
175178

176179
@log_exceptions_and_usage(online_store="postgres")
177180
def update(
178-
self,
179-
config: RepoConfig,
180-
tables_to_delete: Sequence[FeatureView],
181-
tables_to_keep: Sequence[FeatureView],
182-
entities_to_delete: Sequence[Entity],
183-
entities_to_keep: Sequence[Entity],
184-
partial: bool,
181+
self,
182+
config: RepoConfig,
183+
tables_to_delete: Sequence[FeatureView],
184+
tables_to_keep: Sequence[FeatureView],
185+
entities_to_delete: Sequence[Entity],
186+
entities_to_keep: Sequence[Entity],
187+
partial: bool,
185188
):
186189
project = config.project
187190
schema_name = config.online_store.db_schema or config.online_store.user
@@ -236,10 +239,10 @@ def update(
236239
conn.commit()
237240

238241
def teardown(
239-
self,
240-
config: RepoConfig,
241-
tables: Sequence[FeatureView],
242-
entities: Sequence[Entity],
242+
self,
243+
config: RepoConfig,
244+
tables: Sequence[FeatureView],
245+
entities: Sequence[Entity],
243246
):
244247
project = config.project
245248
try:
@@ -273,3 +276,75 @@ def _to_naive_utc(ts: datetime):
273276
return ts
274277
else:
275278
return ts.astimezone(pytz.utc).replace(tzinfo=None)
279+
280+
281+
# Search query template to find the top k items that are closest to the given embedding
282+
# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
283+
SEARCH_QUERY_TEMPLATE = """
284+
SELECT entity_key, feature_name, value, event_ts FROM {table_name}
285+
WHERE feature_name = '{feature_name}'
286+
ORDER BY value <-> %s
287+
LIMIT %s;
288+
"""
289+
290+
# Create index query template to create a index based on the index type
291+
CREATE_INDEX_QUERY_TEMPLATE = """
292+
CREATE INDEX ON {table_name} USING {index_type} (embedding {embeding_type});
293+
"""
294+
295+
296+
class PostgresDocumentStoreConfig(DocumentStoreIndexConfig):
297+
type: Literal["postgres"] = "postgres"
298+
299+
300+
class PostgresDocumentStore(PostgreSQLOnlineStore, DocumentStore):
301+
302+
def online_search(self,
303+
config: RepoConfig,
304+
table: FeatureView,
305+
requested_feature: str,
306+
embedding: np.ndarray,
307+
top_k: int,
308+
):
309+
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
310+
311+
with self._get_conn(config) as conn, conn.cursor() as cur:
312+
cur.execute(SEARCH_QUERY_TEMPLATE.format(
313+
table_name=table,
314+
feature_name=requested_feature
315+
), (embedding, top_k))
316+
rows = cur.fetchall()
317+
318+
for row in rows:
319+
# The first column is the entity key
320+
entity_key = EntityKeyProto()
321+
entity_key.ParseFromString(row[0])
322+
323+
# The second column is the feature name
324+
feature_name = row[1]
325+
326+
# The third column is the embedding value
327+
val = ValueProto()
328+
val.ParseFromString(row[2])
329+
330+
# The fourth column is the event timestamp
331+
event_ts = row[3]
332+
333+
res = {}
334+
res[feature_name] = val
335+
result.append((event_ts, res))
336+
337+
338+
return result
339+
340+
def create_index(self,
341+
config: RepoConfig,
342+
index: str,
343+
index_config: DocumentStoreIndexConfig
344+
):
345+
with self._get_conn(config) as conn, conn.cursor() as cur:
346+
cur.execute(CREATE_INDEX_QUERY_TEMPLATE.format(
347+
table_name=config.project,
348+
index_type=index,
349+
embeding_type=index_config.embedding_type
350+
))
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from abc import abstractmethod
2+
from datetime import datetime
3+
from feast.feature_view import FeatureView
4+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
5+
from feast.repo_config import RepoConfig, FeastConfigBaseModel
6+
from infra.online_stores.online_store import OnlineStore
7+
from typing import Optional, List, Tuple, Dict
8+
import numpy as np
9+
10+
11+
class DocumentStoreIndexConfig(FeastConfigBaseModel):
12+
embedding_type: Optional[str]
13+
14+
15+
class DocumentStore(OnlineStore):
16+
index: Optional[str]
17+
18+
@abstractmethod
19+
def online_search(self,
20+
config: RepoConfig,
21+
table: FeatureView,
22+
requested_feature: str,
23+
embeddings: np.ndarray,
24+
top_k: int,
25+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
26+
raise NotImplementedError(
27+
"You have to implement this!"
28+
)
29+
30+
@abstractmethod
31+
def create_index(self,
32+
config: RepoConfig,
33+
index: str,
34+
index_config: DocumentStoreIndexConfig
35+
):
36+
raise NotImplementedError(
37+
"You have to implement this!"
38+
)

sdk/python/feast/infra/online_stores/helpers.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
serialize_entity_key_prefix,
1010
)
1111
from feast.infra.online_stores.online_store import OnlineStore
12+
from feast.infra.online_stores.document_store import DocumentStore
1213
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
1314

1415

@@ -21,6 +22,15 @@ def get_online_store_from_config(online_store_config: Any) -> OnlineStore:
2122
return online_store_class()
2223

2324

25+
def get_document_store_from_config(document_store_config: Any) -> DocumentStore:
26+
"""Creates a document store corresponding to the given online document store config."""
27+
module_name = document_store_config.__module__
28+
qualified_name = type(document_store_config).__name__
29+
class_name = qualified_name.replace("Config", "")
30+
document_store_class = import_class(module_name, class_name, "DocumentStore")
31+
return document_store_class()
32+
33+
2434
def _redis_key(
2535
project: str, entity_key: EntityKeyProto, entity_key_serialization_version=1
2636
) -> bytes:

0 commit comments

Comments
 (0)