-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Enable Vector database and retrieve_online_documents API #4061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
713768e
58d5d94
2cd73d1
d2e0a59
7079e7f
8c9ee97
513dd39
29d98cd
11eb97f
865baf2
47cd117
3f9f59f
7935071
ba39f93
cf53c71
92046af
d0acd2d
cc45f73
006b5c6
6e0ba03
a2302be
2e6fc55
3cbbf21
ec32764
e2d8008
523d20f
5cd085d
795699e
67b007f
33b46bd
82fe5f1
0618378
7de2016
92fed1d
d4f2639
396d7de
6c38b92
f763dc9
818c055
a51b555
2624b22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |||||
| cast, | ||||||
| ) | ||||||
|
|
||||||
| import numpy as np | ||||||
| import pandas as pd | ||||||
| import pyarrow as pa | ||||||
| from colorama import Fore, Style | ||||||
|
|
@@ -1684,6 +1685,63 @@ def _get_online_features( | |||||
| ) | ||||||
| return OnlineResponse(online_features_response) | ||||||
|
|
||||||
| @log_exceptions_and_usage | ||||||
| def get_top_k_document_features( | ||||||
| self, | ||||||
| feature: str, | ||||||
| document: Union[str, np.ndarray], | ||||||
| top_k: int, | ||||||
| ) -> OnlineResponse: | ||||||
| """ | ||||||
| Retrieves the top k cloeses document features. | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| Args: | ||||||
| feature: The list of document features that should be retrieved from the online document store. These features can be | ||||||
| specified either as a list of string document feature references or as a feature service. String feature | ||||||
| references must have format "feature_view:feature", e.g, "document_fv:document_embedding_feature". | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| document: The document to retrieve the closest document features for. | ||||||
| top_k: The number of closest document features to retrieve. | ||||||
| """ | ||||||
| return self._get_top_k_document_features( | ||||||
| feature=feature, | ||||||
| document=document, | ||||||
| top_k=top_k, | ||||||
| ) | ||||||
|
|
||||||
| def _get_top_k_document_features( | ||||||
| self, | ||||||
| feature: str, | ||||||
| document: Union[str, np.ndarray], | ||||||
| top_k: int, | ||||||
| ): | ||||||
| ( | ||||||
| requested_feature_views, | ||||||
| requested_on_demand_feature_views, | ||||||
| ) = self._get_feature_views_to_use( | ||||||
| features=[feature], allow_cache=True, hide_dummy_entity=False | ||||||
| ) | ||||||
| requested_feature = ( | ||||||
| feature.split(":")[1] if isinstance(feature, str) else feature | ||||||
| ) | ||||||
| provider = self._get_provider() | ||||||
| document_features = self._search_from_document_store( | ||||||
| provider, | ||||||
| requested_feature_views[0], | ||||||
| requested_feature, | ||||||
| document, | ||||||
| top_k, | ||||||
| ) | ||||||
| online_features_response = GetOnlineFeaturesResponse(results=[]) | ||||||
| self._populate_response_from_feature_data( | ||||||
| document_features, | ||||||
| [], | ||||||
| online_features_response, | ||||||
| False, | ||||||
| requested_feature, | ||||||
| requested_feature_views[0], | ||||||
| ) | ||||||
| return OnlineResponse(online_features_response) | ||||||
|
|
||||||
| @staticmethod | ||||||
| def _get_columnar_entity_values( | ||||||
| rowise: Optional[List[Dict[str, Any]]], columnar: Optional[Dict[str, List[Any]]] | ||||||
|
|
@@ -1900,6 +1958,48 @@ def _read_from_online_store( | |||||
| read_row_protos.append((event_timestamps, statuses, values)) | ||||||
| return read_row_protos | ||||||
|
|
||||||
| def _search_from_document_store( | ||||||
| self, | ||||||
| provider: Provider, | ||||||
| table: FeatureView, | ||||||
| requested_feature: str, | ||||||
| document: Union[str, np.ndarray], | ||||||
| top_k: int, | ||||||
| ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: | ||||||
| """ | ||||||
| Search and return document features from the online document store. | ||||||
| """ | ||||||
| documents = provider.online_search( | ||||||
| config=self.config, | ||||||
| table=table, | ||||||
| requested_feature=requested_feature, | ||||||
| document=document, | ||||||
| top_k=top_k, | ||||||
| ) | ||||||
| # Each row is a set of features for a given entity key. We only need to convert | ||||||
| # the data to Protobuf once. | ||||||
| null_value = Value() | ||||||
| read_row_protos = [] | ||||||
|
|
||||||
| for doc in documents: | ||||||
| row_ts_proto = Timestamp() | ||||||
| row_ts, feature_data = doc | ||||||
| # TODO (Ly): reuse whatever timestamp if row_ts is None? | ||||||
| if row_ts is not None: | ||||||
| row_ts_proto.FromDatetime(row_ts) | ||||||
| event_timestamps = [row_ts_proto] | ||||||
| if feature_data is None: | ||||||
| statuses = [FieldStatus.NOT_FOUND] | ||||||
| values = [null_value] | ||||||
| else: | ||||||
| statuses = [] | ||||||
| values = [] | ||||||
| for feature_name, feature_value in feature_data.items(): | ||||||
| statuses.append(FieldStatus.PRESENT) | ||||||
| values.append(feature_value) | ||||||
| read_row_protos.append((event_timestamps, statuses, values)) | ||||||
| return read_row_protos | ||||||
|
|
||||||
| @staticmethod | ||||||
| def _populate_response_from_feature_data( | ||||||
| feature_data: Iterable[ | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| from abc import abstractmethod | ||
| from datetime import datetime | ||
| from typing import Dict, List, Optional, Tuple | ||
|
|
||
| import numpy as np | ||
|
|
||
| from feast.feature_view import FeatureView | ||
| from feast.infra.online_stores.online_store import OnlineStore | ||
| from feast.protos.feast.types.Value_pb2 import Value as ValueProto | ||
| from feast.repo_config import FeastConfigBaseModel, RepoConfig | ||
|
|
||
|
|
||
| class DocumentStoreIndexConfig(FeastConfigBaseModel): | ||
| index_type: Optional[str] | ||
| embedding_type: Optional[str] | ||
|
|
||
|
|
||
| class DocumentStore(OnlineStore): | ||
| @abstractmethod | ||
| def online_search( | ||
| self, | ||
| config: RepoConfig, | ||
| table: FeatureView, | ||
| requested_feature: str, | ||
| embeddings: np.ndarray, | ||
| top_k: int, | ||
| ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: | ||
| raise NotImplementedError("You have to implement this!") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| from datetime import datetime, timedelta | ||
| from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union | ||
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
| import pyarrow as pa | ||
| from tqdm import tqdm | ||
|
|
@@ -18,7 +19,10 @@ | |
| ) | ||
| from feast.infra.offline_stores.offline_store import RetrievalJob | ||
| from feast.infra.offline_stores.offline_utils import get_offline_store_from_config | ||
| from feast.infra.online_stores.helpers import get_online_store_from_config | ||
| from feast.infra.online_stores.helpers import ( | ||
| get_document_store_from_config, | ||
| get_online_store_from_config, | ||
| ) | ||
| from feast.infra.provider import Provider | ||
| from feast.infra.registry.base_registry import BaseRegistry | ||
| from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto | ||
|
|
@@ -47,6 +51,7 @@ def __init__(self, config: RepoConfig): | |
| self.repo_config = config | ||
| self._offline_store = None | ||
| self._online_store = None | ||
| self._document_store = None | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is no longer necessary There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for addressing the issue. One question, though: what if we want to continue using Redis or any other online store for usual features, and use PG vector solely for embedding and search? Do we have the option to use the online store and the document store in the feature_store.yaml, both?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think currently Feast doesn't support multiple online store. but that would be a good feature to add.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That could get complicated but agreed it'd be good to add. I could imagine a Redis + another DB layer would be an obvious one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you again, guys, for these amazing features. Yes, having multiple online stores will make it easier to use the right database layer for the appropriate use case! 🙌 |
||
| self._batch_engine: Optional[BatchMaterializationEngine] = None | ||
|
|
||
| @property | ||
|
|
@@ -57,6 +62,14 @@ def online_store(self): | |
| ) | ||
| return self._online_store | ||
|
|
||
| @property | ||
| def document_store(self): | ||
| if not self._document_store: | ||
| self._document_store = get_document_store_from_config( | ||
| self.repo_config.online_store | ||
| ) | ||
| return self._document_store | ||
|
|
||
| @property | ||
| def offline_store(self): | ||
| if not self._offline_store: | ||
|
|
@@ -190,6 +203,23 @@ def online_read( | |
| ) | ||
| return result | ||
|
|
||
| @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001)) | ||
| def online_search( | ||
| self, | ||
| config: RepoConfig, | ||
| table: FeatureView, | ||
| requested_feature: str, | ||
| embeddings: np.ndarray, | ||
| top_k: int, | ||
| ) -> List: | ||
| set_usage_attribute("provider", self.__class__.__name__) | ||
| result = [] | ||
| if self.document_store: | ||
| result = self.document_store.online_search( | ||
| config, table, requested_feature, embeddings, top_k | ||
| ) | ||
| return result | ||
|
|
||
| def ingest_df( | ||
| self, | ||
| feature_view: FeatureView, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is document the user input? often referred to as the query?