From e33aa20e0d6203440024210f2a439cae4144ce57 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Thu, 9 May 2024 00:02:24 -0700 Subject: [PATCH 01/17] add elasticsearch as online store Signed-off-by: cmuhao --- README.md | 2 +- .../online_stores/contrib/elastichsearch.py | 193 ++++++++++++++++++ setup.py | 4 + 3 files changed, 198 insertions(+), 1 deletion(-) create mode 100644 sdk/python/feast/infra/online_stores/contrib/elastichsearch.py diff --git a/README.md b/README.md index e9b7ff4743..a1e06774da 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@

-
+
[![unit-tests](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml/badge.svg?branch=master&event=push)](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml) [![integration-tests-and-build](https://github.com/feast-dev/feast/actions/workflows/master_only.yml/badge.svg?branch=master&event=push)](https://github.com/feast-dev/feast/actions/workflows/master_only.yml) diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py new file mode 100644 index 0000000000..28b18413c5 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py @@ -0,0 +1,193 @@ +from __future__ import absolute_import + +from datetime import datetime +from typing import List, Tuple, Optional, Sequence, Dict, Callable, Any + +from elasticsearch import Elasticsearch, helpers +import pytz +from feast import RepoConfig, FeatureView, Entity +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel +from feast.infra.key_encoding_utils import serialize_entity_key, get_list_val_str + + +class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel): + """ + Configuration for the Elasticsearch online store. + NOTE: The class *must* end with the `OnlineStoreConfig` suffix. + """ + + type: str = "elasticsearch" + + host: Optional[str] = None + user: Optional[str] = None + password: Optional[str] = None + port: Optional[int] = None + index: Optional[str] = None + + +class ElasticsearchOnlineStore(OnlineStore): + _client: Optional[Elasticsearch] = None + + _index: Optional[str] = None + + def _get_client(self, config: RepoConfig) -> Elasticsearch: + online_store_config = config.online_store + assert isinstance(online_store_config, ElasticsearchOnlineStoreConfig) + + if not self._client: + self._client = Elasticsearch( + hosts=[{"host": online_store_config.host or "localhost", "port": online_store_config.port or 9200}], + http_auth=(online_store_config.user, online_store_config.password), + ) + + def create_index(self, config: RepoConfig, table: FeatureView): + pass + + def _bulk_batch_actions(self, batch): + for row in batch: + yield { + "_index": self._index, + "_id": f"{row['entity_key']}_{row['feature_name']}_{row['timestamp']}", + "_source": row + } + + def online_write_batch(self, + config: RepoConfig, + table: FeatureView, + data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]], + progress: Optional[Callable[[int], Any]]) -> None: + insert_values = [] + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + timestamp = _to_naive_utc(timestamp) + if created_ts is not None: + created_ts = _to_naive_utc(created_ts) + for feature_name, value in values.items(): + vector_val = get_list_val_str(value) + insert_values.append({ + "entity_key": entity_key_bin, + "feature_name": feature_name, + "feature_value": value, + "timestamp": timestamp, + "created_ts": created_ts, + "vector_value": vector_val, + }) + + batch_size = config.online_config.batch_size + for i in range(0, len(insert_values), batch_size): + batch = insert_values[i:i + batch_size] + actions = self._bulk_batch_actions(batch) + helpers.bulk(self._client, actions) + + def online_read(self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None) -> List[ + Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + if not requested_features: + body = { + "_source": { + "excludes": ["vector_value"] + }, + "query": { + "match": { + "entity_key": entity_keys + } + } + } + else: + body = { + "_source": { + "excludes": ["vector_value"] + }, + "query": { + "bool": { + "must": [ + {"terms": {"entity_key": entity_keys}}, + {"terms": {"feature_name": requested_features}} + ] + } + } + } + self._client.search(index=self._index, body=body) + + def update(self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], partial: bool): + # implement the update method + for table in tables_to_delete: + self._client.delete_by_query(index=table.name) + for table in tables_to_keep: + self.create_index(config, table) + + def teardown(self, config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity]): + pass + + def retrieve_online_documents(self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], top_k: int) -> List[ + Tuple[ + Optional[datetime], + Optional[ValueProto], + Optional[ValueProto], + Optional[ValueProto], + ] + ]: + result: List[ + Tuple[ + Optional[datetime], + Optional[ValueProto], + Optional[ValueProto], + Optional[ValueProto], + ] + ] = [] + reponse = self._client.search( + index=self._index, + knn={ + "field": requested_feature, + "query_vector": embedding, + "k": top_k, + } + ) + rows = reponse["hits"]["hits"][0:top_k] + for row in rows: + ( + entity_key, + feature_value, + timestamp, + created_ts, + vector_value, + ) = row["_source"] + feature_value_proto = ValueProto() + feature_value_proto.ParseFromString(feature_value) + + vector_value_proto = ValueProto(string_val=vector_value) + vector_value_proto.ParseFromString(vector_value) + result.append( + ( + timestamp, + feature_value_proto, + None, + vector_value_proto, + ) + ) + return result + + +def _to_naive_utc(ts: datetime): + if ts.tzinfo is None: + return ts + else: + return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/setup.py b/setup.py index 6cc728ee98..b1c3806451 100644 --- a/setup.py +++ b/setup.py @@ -150,6 +150,8 @@ DELTA_REQUIRED = ["deltalake"] +ELASTICSEARCH_REQUIRED = ["elasticsearch>=8"] + CI_REQUIRED = ( [ "build", @@ -211,6 +213,7 @@ + GRPCIO_REQUIRED + DUCKDB_REQUIRED + DELTA_REQUIRED + + ELASTICSEARCH_REQUIRED ) DOCS_REQUIRED = CI_REQUIRED @@ -377,6 +380,7 @@ def run(self): "duckdb": DUCKDB_REQUIRED, "ikv": IKV_REQUIRED, "delta": DELTA_REQUIRED, + "elasticsearch": ELASTICSEARCH_REQUIRED, }, include_package_data=True, license="Apache", From 091e838e5543445fd5847fd7d9f6386efa0e5e71 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Thu, 9 May 2024 00:03:01 -0700 Subject: [PATCH 02/17] add elasticsearch as online store Signed-off-by: cmuhao --- .../online_stores/contrib/elastichsearch.py | 120 ++++++++++-------- 1 file changed, 68 insertions(+), 52 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py index 28b18413c5..5e84c21c14 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py @@ -1,16 +1,17 @@ from __future__ import absolute_import from datetime import datetime -from typing import List, Tuple, Optional, Sequence, Dict, Callable, Any +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple -from elasticsearch import Elasticsearch, helpers import pytz -from feast import RepoConfig, FeatureView, Entity +from elasticsearch import Elasticsearch, helpers + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel -from feast.infra.key_encoding_utils import serialize_entity_key, get_list_val_str class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel): @@ -39,7 +40,12 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch: if not self._client: self._client = Elasticsearch( - hosts=[{"host": online_store_config.host or "localhost", "port": online_store_config.port or 9200}], + hosts=[ + { + "host": online_store_config.host or "localhost", + "port": online_store_config.port or 9200, + } + ], http_auth=(online_store_config.user, online_store_config.password), ) @@ -51,14 +57,18 @@ def _bulk_batch_actions(self, batch): yield { "_index": self._index, "_id": f"{row['entity_key']}_{row['feature_name']}_{row['timestamp']}", - "_source": row + "_source": row, } - def online_write_batch(self, - config: RepoConfig, - table: FeatureView, - data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]], - progress: Optional[Callable[[int], Any]]) -> None: + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: insert_values = [] for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( @@ -70,74 +80,80 @@ def online_write_batch(self, created_ts = _to_naive_utc(created_ts) for feature_name, value in values.items(): vector_val = get_list_val_str(value) - insert_values.append({ - "entity_key": entity_key_bin, - "feature_name": feature_name, - "feature_value": value, - "timestamp": timestamp, - "created_ts": created_ts, - "vector_value": vector_val, - }) + insert_values.append( + { + "entity_key": entity_key_bin, + "feature_name": feature_name, + "feature_value": value, + "timestamp": timestamp, + "created_ts": created_ts, + "vector_value": vector_val, + } + ) batch_size = config.online_config.batch_size for i in range(0, len(insert_values), batch_size): - batch = insert_values[i:i + batch_size] + batch = insert_values[i : i + batch_size] actions = self._bulk_batch_actions(batch) helpers.bulk(self._client, actions) - def online_read(self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None) -> List[ - Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: if not requested_features: body = { - "_source": { - "excludes": ["vector_value"] - }, - "query": { - "match": { - "entity_key": entity_keys - } - } + "_source": {"excludes": ["vector_value"]}, + "query": {"match": {"entity_key": entity_keys}}, } else: body = { - "_source": { - "excludes": ["vector_value"] - }, + "_source": {"excludes": ["vector_value"]}, "query": { "bool": { "must": [ {"terms": {"entity_key": entity_keys}}, - {"terms": {"feature_name": requested_features}} + {"terms": {"feature_name": requested_features}}, ] } - } + }, } self._client.search(index=self._index, body=body) - def update(self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], partial: bool): + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): # implement the update method for table in tables_to_delete: self._client.delete_by_query(index=table.name) for table in tables_to_keep: self.create_index(config, table) - def teardown(self, config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity]): + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): pass - def retrieve_online_documents(self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], top_k: int) -> List[ + def retrieve_online_documents( + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + ) -> List[ Tuple[ Optional[datetime], Optional[ValueProto], @@ -159,7 +175,7 @@ def retrieve_online_documents(self, "field": requested_feature, "query_vector": embedding, "k": top_k, - } + }, ) rows = reponse["hits"]["hits"][0:top_k] for row in rows: From de668fb9835cffad35d4d59a39eec10b1e0b892f Mon Sep 17 00:00:00 2001 From: cmuhao Date: Thu, 9 May 2024 00:05:53 -0700 Subject: [PATCH 03/17] format Signed-off-by: cmuhao --- .../infra/online_stores/contrib/elastichsearch.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py index 5e84c21c14..1a775e4059 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py @@ -121,7 +121,18 @@ def online_read( } }, } - self._client.search(index=self._index, body=body) + response = self._client.search(index=self._index, body=body) + results: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for hit in response["hits"]["hits"]: + results.append( + ( + hit["_source"]["timestamp"], + { + hit["_source"]["feature_name"]: hit["_source"]["feature_value"] + }, + ) + ) + return results def update( self, From d3619f65abbbe4f237e1091dab1838b4dbe0ec0c Mon Sep 17 00:00:00 2001 From: cmuhao Date: Thu, 9 May 2024 00:09:19 -0700 Subject: [PATCH 04/17] format Signed-off-by: cmuhao --- .../online_stores/contrib/elastichsearch.py | 73 ++++++++++--------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py index 1a775e4059..21121306e5 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py @@ -38,7 +38,9 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch: online_store_config = config.online_store assert isinstance(online_store_config, ElasticsearchOnlineStoreConfig) - if not self._client: + if self._client: + return self._client + else: self._client = Elasticsearch( hosts=[ { @@ -48,6 +50,7 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch: ], http_auth=(online_store_config.user, online_store_config.password), ) + return self._client def create_index(self, config: RepoConfig, table: FeatureView): pass @@ -61,13 +64,13 @@ def _bulk_batch_actions(self, batch): } def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], ) -> None: insert_values = [] for entity_key, values, timestamp, created_ts in data: @@ -93,16 +96,16 @@ def online_write_batch( batch_size = config.online_config.batch_size for i in range(0, len(insert_values), batch_size): - batch = insert_values[i : i + batch_size] + batch = insert_values[i: i + batch_size] actions = self._bulk_batch_actions(batch) helpers.bulk(self._client, actions) def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: if not requested_features: body = { @@ -121,27 +124,25 @@ def online_read( } }, } - response = self._client.search(index=self._index, body=body) + response = self._get_client(config).search(index=self._index, body=body) results: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] for hit in response["hits"]["hits"]: results.append( ( hit["_source"]["timestamp"], - { - hit["_source"]["feature_name"]: hit["_source"]["feature_value"] - }, + {hit["_source"]["feature_name"]: hit["_source"]["feature_value"]}, ) ) return results def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): # implement the update method for table in tables_to_delete: @@ -150,20 +151,20 @@ def update( self.create_index(config, table) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): pass def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, ) -> List[ Tuple[ Optional[datetime], @@ -180,7 +181,7 @@ def retrieve_online_documents( Optional[ValueProto], ] ] = [] - reponse = self._client.search( + reponse = self._get_client(config).search( index=self._index, knn={ "field": requested_feature, From 7a091385686ac691f56b45e0cd96d553dbebe52b Mon Sep 17 00:00:00 2001 From: cmuhao Date: Thu, 9 May 2024 00:12:32 -0700 Subject: [PATCH 05/17] format Signed-off-by: cmuhao --- .../feast/infra/online_stores/contrib/elastichsearch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py index 21121306e5..5dfd901499 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py @@ -98,7 +98,7 @@ def online_write_batch( for i in range(0, len(insert_values), batch_size): batch = insert_values[i: i + batch_size] actions = self._bulk_batch_actions(batch) - helpers.bulk(self._client, actions) + helpers.bulk(self._get_client(config), actions) def online_read( self, @@ -146,7 +146,7 @@ def update( ): # implement the update method for table in tables_to_delete: - self._client.delete_by_query(index=table.name) + self._get_client(config).delete_by_query(index=table.name) for table in tables_to_keep: self.create_index(config, table) From f33e7698464b33298a43318a7eb5e353b497fdd1 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Thu, 9 May 2024 00:14:40 -0700 Subject: [PATCH 06/17] format Signed-off-by: cmuhao --- .../online_stores/contrib/elastichsearch.py | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py index 5dfd901499..d865fdfc67 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py @@ -64,13 +64,13 @@ def _bulk_batch_actions(self, batch): } def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], ) -> None: insert_values = [] for entity_key, values, timestamp, created_ts in data: @@ -96,16 +96,16 @@ def online_write_batch( batch_size = config.online_config.batch_size for i in range(0, len(insert_values), batch_size): - batch = insert_values[i: i + batch_size] + batch = insert_values[i : i + batch_size] actions = self._bulk_batch_actions(batch) helpers.bulk(self._get_client(config), actions) def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: if not requested_features: body = { @@ -136,13 +136,13 @@ def online_read( return results def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): # implement the update method for table in tables_to_delete: @@ -151,20 +151,20 @@ def update( self.create_index(config, table) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): pass def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, ) -> List[ Tuple[ Optional[datetime], From 9153983baa92381852ffb1aed7278a39cca85199 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sat, 11 May 2024 17:33:18 -0700 Subject: [PATCH 07/17] fix search Signed-off-by: cmuhao --- .../{elastichsearch.py => elasticsearch.py} | 167 +++++++++++------- .../feast/infra/online_stores/online_store.py | 2 + sdk/python/feast/infra/provider.py | 3 +- sdk/python/feast/repo_config.py | 1 + setup.py | 2 +- 5 files changed, 109 insertions(+), 66 deletions(-) rename sdk/python/feast/infra/online_stores/contrib/{elastichsearch.py => elasticsearch.py} (52%) diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py similarity index 52% rename from sdk/python/feast/infra/online_stores/contrib/elastichsearch.py rename to sdk/python/feast/infra/online_stores/contrib/elasticsearch.py index d865fdfc67..2b34db53c2 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py @@ -1,10 +1,14 @@ from __future__ import absolute_import +import json +import logging from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple +import base64 import pytz from elasticsearch import Elasticsearch, helpers +from pydantic import PositiveInt from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key @@ -14,9 +18,9 @@ from feast.repo_config import FeastConfigBaseModel -class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel): +class ElasticSearchOnlineStoreConfig(FeastConfigBaseModel): """ - Configuration for the Elasticsearch online store. + Configuration for the ElasticSearch online store. NOTE: The class *must* end with the `OnlineStoreConfig` suffix. """ @@ -27,16 +31,25 @@ class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel): password: Optional[str] = None port: Optional[int] = None index: Optional[str] = None + scheme: Optional[str] = "http" + # The number of rows to write in a single batch + write_batch_size: Optional[PositiveInt] = 40 + + # The length of the vector value + vector_len: Optional[int] = 512 + + # The vector similarity metric to use in KNN search + # more details: https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html + similarity: Optional[str] = "cosine" -class ElasticsearchOnlineStore(OnlineStore): - _client: Optional[Elasticsearch] = None - _index: Optional[str] = None +class ElasticSearchOnlineStore(OnlineStore): + _client: Optional[Elasticsearch] = None def _get_client(self, config: RepoConfig) -> Elasticsearch: online_store_config = config.online_store - assert isinstance(online_store_config, ElasticsearchOnlineStoreConfig) + assert isinstance(online_store_config, ElasticSearchOnlineStoreConfig) if self._client: return self._client @@ -46,31 +59,29 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch: { "host": online_store_config.host or "localhost", "port": online_store_config.port or 9200, + "scheme": online_store_config.scheme or "http" } ], - http_auth=(online_store_config.user, online_store_config.password), + basic_auth=(online_store_config.user, online_store_config.password), ) return self._client - def create_index(self, config: RepoConfig, table: FeatureView): - pass - - def _bulk_batch_actions(self, batch): + def _bulk_batch_actions(self, table: FeatureView, batch: List[Dict[str, Any]]): for row in batch: yield { - "_index": self._index, + "_index": table.name, "_id": f"{row['entity_key']}_{row['feature_name']}_{row['timestamp']}", "_source": row, } def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], ) -> None: insert_values = [] for entity_key, values, timestamp, created_ts in data: @@ -78,34 +89,36 @@ def online_write_batch( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ) + encoded_entity_key = base64.b64encode(entity_key_bin).decode("utf-8") timestamp = _to_naive_utc(timestamp) if created_ts is not None: created_ts = _to_naive_utc(created_ts) for feature_name, value in values.items(): - vector_val = get_list_val_str(value) + encoded_value = base64.b64encode(value.SerializeToString()).decode("utf-8") + vector_val = json.loads(get_list_val_str(value)) insert_values.append( { - "entity_key": entity_key_bin, + "entity_key": encoded_entity_key, "feature_name": feature_name, - "feature_value": value, + "feature_value": encoded_value, "timestamp": timestamp, "created_ts": created_ts, "vector_value": vector_val, } ) - batch_size = config.online_config.batch_size + batch_size = config.online_config.write_batch_size for i in range(0, len(insert_values), batch_size): - batch = insert_values[i : i + batch_size] - actions = self._bulk_batch_actions(batch) + batch = insert_values[i: i + batch_size] + actions = self._bulk_batch_actions(table, batch) helpers.bulk(self._get_client(config), actions) def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: if not requested_features: body = { @@ -124,7 +137,7 @@ def online_read( } }, } - response = self._get_client(config).search(index=self._index, body=body) + response = self._get_client(config).search(index=table.name, body=body) results: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] for hit in response["hits"]["hits"]: results.append( @@ -135,14 +148,32 @@ def online_read( ) return results + def create_index(self, config: RepoConfig, table: FeatureView): + index_mapping = { + "properties": { + "entity_key": {"type": "binary"}, + "feature_name": {"type": "keyword"}, + "feature_value": {"type": "binary"}, + "timestamp": {"type": "date"}, + "created_ts": {"type": "date"}, + "vector_value": { + "type": "dense_vector", + "dims": config.online_config.vector_len, + "index": "true", + "similarity": config.online_config.similarity, + }, + } + } + self._get_client(config).indices.create(index=table.name, mappings=index_mapping) + def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): # implement the update method for table in tables_to_delete: @@ -151,20 +182,28 @@ def update( self.create_index(config, table) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): - pass + project = config.project + try: + for table in tables: + self._get_client(config).indices.delete(index=table.name) + except Exception as e: + logging.exception(f"Error deleting index in project {project}: {e}") + raise def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + *args, + **kwargs, ) -> List[ Tuple[ Optional[datetime], @@ -181,34 +220,34 @@ def retrieve_online_documents( Optional[ValueProto], ] ] = [] - reponse = self._get_client(config).search( - index=self._index, + response = self._get_client(config).search( + index=table.name, knn={ - "field": requested_feature, + "field": "vector_value", "query_vector": embedding, "k": top_k, }, ) - rows = reponse["hits"]["hits"][0:top_k] + rows = response["hits"]["hits"][0:top_k] for row in rows: - ( - entity_key, - feature_value, - timestamp, - created_ts, - vector_value, - ) = row["_source"] + entity_key = row["_source"]["entity_key"] + feature_value = row["_source"]["feature_value"] + vector_value = row["_source"]["vector_value"] + timestamp = row["_source"]["timestamp"] + distance = row["_score"] + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") + feature_value_proto = ValueProto() - feature_value_proto.ParseFromString(feature_value) + feature_value_proto.ParseFromString(base64.b64decode(feature_value)) - vector_value_proto = ValueProto(string_val=vector_value) - vector_value_proto.ParseFromString(vector_value) + vector_value_proto = ValueProto(string_val=str(vector_value)) + distance_value_proto = ValueProto(float_val=distance) result.append( ( timestamp, feature_value_proto, - None, vector_value_proto, + distance_value_proto, ) ) return result diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 2a81e37042..9de148a095 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -142,6 +142,7 @@ def retrieve_online_documents( requested_feature: str, embedding: List[float], top_k: int, + distance_metric: Optional[str] = None, ) -> List[ Tuple[ Optional[datetime], @@ -154,6 +155,7 @@ def retrieve_online_documents( Retrieves online feature values for the specified embeddings. Args: + distance_metric: distance metric to use for retrieval. config: The config for the current feature store. table: The feature view whose feature values should be read. requested_feature: The name of the feature whose embeddings should be used for retrieval. diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 02fba0c1f6..bc6bcf3fac 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -303,7 +303,7 @@ def retrieve_online_documents( requested_feature: str, query: List[float], top_k: int, - distance_metric: str = "L2", + distance_metric: Optional[str] = None, ) -> List[ Tuple[ Optional[datetime], @@ -316,6 +316,7 @@ def retrieve_online_documents( Searches for the top-k most similar documents in the online document store. Args: + distance_metric: distance metric to use for the search. config: The config for the current feature store. table: The feature view whose embeddings should be searched. requested_feature: the requested document feature name. diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 5e38fd1775..00cbac1908 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -64,6 +64,7 @@ "rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore", "hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", "ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore", + "elasticsearch": "feast.infra.online_stores.contrib.elasticsearch.ElasticSearchOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/setup.py b/setup.py index b1c3806451..7d7c774691 100644 --- a/setup.py +++ b/setup.py @@ -150,7 +150,7 @@ DELTA_REQUIRED = ["deltalake"] -ELASTICSEARCH_REQUIRED = ["elasticsearch>=8"] +ELASTICSEARCH_REQUIRED = ["elasticsearch==8.13.4"] CI_REQUIRED = ( [ From 5a212ba487bc5ff4d17409560335238e94f5fc71 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sat, 11 May 2024 17:35:36 -0700 Subject: [PATCH 08/17] format Signed-off-by: cmuhao --- .../feast/infra/online_stores/contrib/elasticsearch.py | 8 +++++--- sdk/python/feast/infra/online_stores/contrib/postgres.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py index 2b34db53c2..4d514c6da4 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py @@ -1,10 +1,10 @@ from __future__ import absolute_import +import base64 import json import logging from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple -import base64 import pytz from elasticsearch import Elasticsearch, helpers @@ -51,6 +51,9 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch: online_store_config = config.online_store assert isinstance(online_store_config, ElasticSearchOnlineStoreConfig) + user = online_store_config.user if online_store_config.user is not None else '' + password = online_store_config.password if online_store_config.password is not None else '' + if self._client: return self._client else: @@ -62,7 +65,7 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch: "scheme": online_store_config.scheme or "http" } ], - basic_auth=(online_store_config.user, online_store_config.password), + basic_auth=(user, password) ) return self._client @@ -230,7 +233,6 @@ def retrieve_online_documents( ) rows = response["hits"]["hits"][0:top_k] for row in rows: - entity_key = row["_source"]["entity_key"] feature_value = row["_source"]["feature_value"] vector_value = row["_source"]["vector_value"] timestamp = row["_source"]["timestamp"] diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index f2c32fdafd..1043208ab3 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -283,7 +283,7 @@ def retrieve_online_documents( requested_feature: str, embedding: List[float], top_k: int, - distance_metric: str = "L2", + distance_metric: Optional[str] = "L2", ) -> List[ Tuple[ Optional[datetime], From 06890213249dd4db3f99a49f20f56fd4f4a0db0c Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sat, 11 May 2024 17:35:59 -0700 Subject: [PATCH 09/17] format Signed-off-by: cmuhao --- .../online_stores/contrib/elasticsearch.py | 84 ++++++++++--------- 1 file changed, 46 insertions(+), 38 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py index 4d514c6da4..86010e03b9 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py @@ -51,8 +51,12 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch: online_store_config = config.online_store assert isinstance(online_store_config, ElasticSearchOnlineStoreConfig) - user = online_store_config.user if online_store_config.user is not None else '' - password = online_store_config.password if online_store_config.password is not None else '' + user = online_store_config.user if online_store_config.user is not None else "" + password = ( + online_store_config.password + if online_store_config.password is not None + else "" + ) if self._client: return self._client @@ -62,10 +66,10 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch: { "host": online_store_config.host or "localhost", "port": online_store_config.port or 9200, - "scheme": online_store_config.scheme or "http" + "scheme": online_store_config.scheme or "http", } ], - basic_auth=(user, password) + basic_auth=(user, password), ) return self._client @@ -78,13 +82,13 @@ def _bulk_batch_actions(self, table: FeatureView, batch: List[Dict[str, Any]]): } def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], ) -> None: insert_values = [] for entity_key, values, timestamp, created_ts in data: @@ -97,7 +101,9 @@ def online_write_batch( if created_ts is not None: created_ts = _to_naive_utc(created_ts) for feature_name, value in values.items(): - encoded_value = base64.b64encode(value.SerializeToString()).decode("utf-8") + encoded_value = base64.b64encode(value.SerializeToString()).decode( + "utf-8" + ) vector_val = json.loads(get_list_val_str(value)) insert_values.append( { @@ -112,16 +118,16 @@ def online_write_batch( batch_size = config.online_config.write_batch_size for i in range(0, len(insert_values), batch_size): - batch = insert_values[i: i + batch_size] + batch = insert_values[i : i + batch_size] actions = self._bulk_batch_actions(table, batch) helpers.bulk(self._get_client(config), actions) def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: if not requested_features: body = { @@ -167,16 +173,18 @@ def create_index(self, config: RepoConfig, table: FeatureView): }, } } - self._get_client(config).indices.create(index=table.name, mappings=index_mapping) + self._get_client(config).indices.create( + index=table.name, mappings=index_mapping + ) def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): # implement the update method for table in tables_to_delete: @@ -185,10 +193,10 @@ def update( self.create_index(config, table) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): project = config.project try: @@ -199,14 +207,14 @@ def teardown( raise def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, - *args, - **kwargs, + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + *args, + **kwargs, ) -> List[ Tuple[ Optional[datetime], From 1dc08ba340dd6dd41dd7be924c7eb114226b66ce Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sat, 11 May 2024 17:36:20 -0700 Subject: [PATCH 10/17] format Signed-off-by: cmuhao --- .../online_stores/contrib/elasticsearch.py | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py index 86010e03b9..d54b9e5dba 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py @@ -82,13 +82,13 @@ def _bulk_batch_actions(self, table: FeatureView, batch: List[Dict[str, Any]]): } def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], ) -> None: insert_values = [] for entity_key, values, timestamp, created_ts in data: @@ -118,16 +118,16 @@ def online_write_batch( batch_size = config.online_config.write_batch_size for i in range(0, len(insert_values), batch_size): - batch = insert_values[i : i + batch_size] + batch = insert_values[i: i + batch_size] actions = self._bulk_batch_actions(table, batch) helpers.bulk(self._get_client(config), actions) def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: if not requested_features: body = { @@ -178,13 +178,13 @@ def create_index(self, config: RepoConfig, table: FeatureView): ) def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): # implement the update method for table in tables_to_delete: @@ -193,10 +193,10 @@ def update( self.create_index(config, table) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): project = config.project try: @@ -207,14 +207,14 @@ def teardown( raise def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, - *args, - **kwargs, + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + *args, + **kwargs, ) -> List[ Tuple[ Optional[datetime], From c689a048ad79e5a3d7bdba3881216a859b3c8232 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sat, 11 May 2024 17:36:50 -0700 Subject: [PATCH 11/17] format Signed-off-by: cmuhao --- .../online_stores/contrib/elasticsearch.py | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py index d54b9e5dba..86010e03b9 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py @@ -82,13 +82,13 @@ def _bulk_batch_actions(self, table: FeatureView, batch: List[Dict[str, Any]]): } def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], ) -> None: insert_values = [] for entity_key, values, timestamp, created_ts in data: @@ -118,16 +118,16 @@ def online_write_batch( batch_size = config.online_config.write_batch_size for i in range(0, len(insert_values), batch_size): - batch = insert_values[i: i + batch_size] + batch = insert_values[i : i + batch_size] actions = self._bulk_batch_actions(table, batch) helpers.bulk(self._get_client(config), actions) def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - requested_features: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: if not requested_features: body = { @@ -178,13 +178,13 @@ def create_index(self, config: RepoConfig, table: FeatureView): ) def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ): # implement the update method for table in tables_to_delete: @@ -193,10 +193,10 @@ def update( self.create_index(config, table) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ): project = config.project try: @@ -207,14 +207,14 @@ def teardown( raise def retrieve_online_documents( - self, - config: RepoConfig, - table: FeatureView, - requested_feature: str, - embedding: List[float], - top_k: int, - *args, - **kwargs, + self, + config: RepoConfig, + table: FeatureView, + requested_feature: str, + embedding: List[float], + top_k: int, + *args, + **kwargs, ) -> List[ Tuple[ Optional[datetime], From 999cba1213f2ac1b7f9e021abd30c40046398430 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sat, 11 May 2024 17:50:53 -0700 Subject: [PATCH 12/17] add test Signed-off-by: cmuhao --- Makefile | 19 ++++++++++++ .../elasticsearch_repo_configuration.py | 12 ++++++++ .../universal/online_store/elasticsearch.py | 29 +++++++++++++++++++ 3 files changed, 60 insertions(+) create mode 100644 sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py create mode 100644 sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py diff --git a/Makefile b/Makefile index 18006fe7d1..9b53752218 100644 --- a/Makefile +++ b/Makefile @@ -310,6 +310,25 @@ test-python-universal-cassandra-no-cloud-providers: not test_snowflake" \ sdk/python/tests + test-python-universal-elasticsearch-online: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.elasticsearch_repo_configuration \ + PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.elasticsearch \ + python -m pytest -n 8 --integration \ + -k "not test_universal_cli and \ + not test_go_feature_server and \ + not test_feature_logging and \ + not test_reorder_columns and \ + not test_logged_features_validation and \ + not test_lambda_materialization_consistency and \ + not test_offline_write and \ + not test_push_features_to_offline_store and \ + not gcs_registry and \ + not s3_registry and \ + not test_universal_types and \ + not test_snowflake" \ + sdk/python/tests + test-python-universal: python -m pytest -n 8 --integration sdk/python/tests diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py new file mode 100644 index 0000000000..bbc3ae31ff --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py @@ -0,0 +1,12 @@ +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.elasticsearch import ( + ElasticSearchOnlineStoreCreator +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig( + online_store="elasticsearch", online_store_creator=ElasticSearchOnlineStoreCreator + ), +] diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py new file mode 100644 index 0000000000..5b56faf5ce --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py @@ -0,0 +1,29 @@ +import os +from typing import Dict + +from testcontainers.elasticsearch import ElasticSearchContainer + +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class ElasticSearchOnlineStoreCreator(OnlineStoreCreator): + + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + self.container = ElasticSearchContainer( + "elasticsearch:8.3.3", + + ).with_exposed_ports(9200) + + def create_online_store(self) -> Dict[str, str]: + self.container.start() + return { + "host": "localhost", + "type": "elasticsearch", + "port": self.container.get_exposed_port(9200) + } + + def teardown(self): + self.container.stop() \ No newline at end of file From e93956d49159d909721127a6d75b142b85346083 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sat, 11 May 2024 17:51:10 -0700 Subject: [PATCH 13/17] add test Signed-off-by: cmuhao --- .../contrib/elasticsearch_repo_configuration.py | 5 +++-- .../feature_repos/universal/online_store/elasticsearch.py | 7 ++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py index bbc3ae31ff..4d1f2c3ca1 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py @@ -2,11 +2,12 @@ IntegrationTestRepoConfig, ) from tests.integration.feature_repos.universal.online_store.elasticsearch import ( - ElasticSearchOnlineStoreCreator + ElasticSearchOnlineStoreCreator, ) FULL_REPO_CONFIGS = [ IntegrationTestRepoConfig( - online_store="elasticsearch", online_store_creator=ElasticSearchOnlineStoreCreator + online_store="elasticsearch", + online_store_creator=ElasticSearchOnlineStoreCreator, ), ] diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py index 5b56faf5ce..42bf5cad58 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py @@ -1,4 +1,3 @@ -import os from typing import Dict from testcontainers.elasticsearch import ElasticSearchContainer @@ -9,12 +8,10 @@ class ElasticSearchOnlineStoreCreator(OnlineStoreCreator): - def __init__(self, project_name: str, **kwargs): super().__init__(project_name) self.container = ElasticSearchContainer( "elasticsearch:8.3.3", - ).with_exposed_ports(9200) def create_online_store(self) -> Dict[str, str]: @@ -22,8 +19,8 @@ def create_online_store(self) -> Dict[str, str]: return { "host": "localhost", "type": "elasticsearch", - "port": self.container.get_exposed_port(9200) + "port": self.container.get_exposed_port(9200), } def teardown(self): - self.container.stop() \ No newline at end of file + self.container.stop() From 369cfd412383a1ea1bee38518a24402e3bb3e427 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sun, 12 May 2024 16:29:18 -0700 Subject: [PATCH 14/17] fix e2e test Signed-off-by: cmuhao --- .../feast/infra/online_stores/contrib/elasticsearch.py | 8 ++++---- .../feature_repos/universal/online_store/elasticsearch.py | 4 +++- .../integration/online_store/test_universal_online.py | 2 +- setup.py | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py index 86010e03b9..a9132ab7a1 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py @@ -34,7 +34,7 @@ class ElasticSearchOnlineStoreConfig(FeastConfigBaseModel): scheme: Optional[str] = "http" # The number of rows to write in a single batch - write_batch_size: Optional[PositiveInt] = 40 + write_batch_size: Optional[int] = 40 # The length of the vector value vector_len: Optional[int] = 512 @@ -116,7 +116,7 @@ def online_write_batch( } ) - batch_size = config.online_config.write_batch_size + batch_size = config.online_store.write_batch_size for i in range(0, len(insert_values), batch_size): batch = insert_values[i : i + batch_size] actions = self._bulk_batch_actions(table, batch) @@ -167,9 +167,9 @@ def create_index(self, config: RepoConfig, table: FeatureView): "created_ts": {"type": "date"}, "vector_value": { "type": "dense_vector", - "dims": config.online_config.vector_len, + "dims": config.online_store.vector_len, "index": "true", - "similarity": config.online_config.similarity, + "similarity": config.online_store.similarity, }, } } diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py index 42bf5cad58..0e35efc981 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py @@ -11,7 +11,7 @@ class ElasticSearchOnlineStoreCreator(OnlineStoreCreator): def __init__(self, project_name: str, **kwargs): super().__init__(project_name) self.container = ElasticSearchContainer( - "elasticsearch:8.3.3", + "elasticsearch:8.13.4", ).with_exposed_ports(9200) def create_online_store(self) -> Dict[str, str]: @@ -20,6 +20,8 @@ def create_online_store(self) -> Dict[str, str]: "host": "localhost", "type": "elasticsearch", "port": self.container.get_exposed_port(9200), + "vector_len": 2, + "similarity": "cosine" } def teardown(self): diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 5d6462e5e3..9beba4d72b 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -789,7 +789,7 @@ def assert_feature_service_entity_mapping_correctness( @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["pgvector"]) +@pytest.mark.universal_online_stores(only=["pgvector", "elasticsearch"]) def test_retrieve_online_documents(environment, fake_document_data): fs = environment.feature_store df, data_source = fake_document_data diff --git a/setup.py b/setup.py index 7d7c774691..9181e64c2f 100644 --- a/setup.py +++ b/setup.py @@ -150,7 +150,7 @@ DELTA_REQUIRED = ["deltalake"] -ELASTICSEARCH_REQUIRED = ["elasticsearch==8.13.4"] +ELASTICSEARCH_REQUIRED = ["elasticsearch>=8.13.0"] CI_REQUIRED = ( [ From ff9612360bf4638c2120209195d25771d5963305 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sun, 12 May 2024 16:29:40 -0700 Subject: [PATCH 15/17] format Signed-off-by: cmuhao --- sdk/python/feast/infra/online_stores/contrib/elasticsearch.py | 1 - .../feature_repos/universal/online_store/elasticsearch.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py index a9132ab7a1..4b30e6221c 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py @@ -8,7 +8,6 @@ import pytz from elasticsearch import Elasticsearch, helpers -from pydantic import PositiveInt from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py index 0e35efc981..16b8a8dd6b 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py @@ -21,7 +21,7 @@ def create_online_store(self) -> Dict[str, str]: "type": "elasticsearch", "port": self.container.get_exposed_port(9200), "vector_len": 2, - "similarity": "cosine" + "similarity": "cosine", } def teardown(self): From d76bce6fb5345a700e73d313b0e2d556671b5db0 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sun, 12 May 2024 17:08:05 -0700 Subject: [PATCH 16/17] fix test Signed-off-by: cmuhao --- docs/reference/online-stores/elasticsearch.md | 76 +++++++++++++++++++ sdk/python/feast/feature_store.py | 6 +- .../feast/infra/passthrough_provider.py | 2 +- sdk/python/tests/foo_provider.py | 2 +- .../universal/online_store/elasticsearch.py | 2 +- 5 files changed, 82 insertions(+), 6 deletions(-) create mode 100644 docs/reference/online-stores/elasticsearch.md diff --git a/docs/reference/online-stores/elasticsearch.md b/docs/reference/online-stores/elasticsearch.md new file mode 100644 index 0000000000..624bbbf065 --- /dev/null +++ b/docs/reference/online-stores/elasticsearch.md @@ -0,0 +1,76 @@ +# ElasticSearch online store (contrib) + +## Description + +The ElasticSearch online store provides support for materializing tabular feature values, as well as embedding feature vectors, into an ElasticSearch index for serving online features. \ +The embedding feature vectors are stored as dense vectors, and can be used for similarity search. More information on dense vectors can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html). + +## Getting started +In order to use this online store, you'll need to run `pip install 'feast[elasticsearch]'`. You can get started by then running `feast init -t elasticsearch`. + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: elasticsearch + host: ES_HOST + port: ES_PORT + user: ES_USERNAME + password: ES_PASSWORD + vector_len: 512 + write_batch_size: 1000 +``` +{% endcode %} + +The full set of configuration options is available in [ElasticsearchOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.contrib.elasticsearch.ElasticsearchOnlineStoreConfig). + +## Functionality Matrix + + +| | Postgres | +| :-------------------------------------------------------- | :------- | +| write feature values to the online store | yes | +| read feature values from the online store | yes | +| update infrastructure (e.g. tables) in the online store | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | +| generate a plan of infrastructure changes | no | +| support for on-demand transforms | yes | +| readable by Python SDK | yes | +| readable by Java | no | +| readable by Go | no | +| support for entityless feature views | yes | +| support for concurrent writing to the same key | no | +| support for ttl (time to live) at retrieval | no | +| support for deleting expired data | no | +| collocated by feature view | yes | +| collocated by feature service | no | +| collocated by entity key | no | + +To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix). + +## Retrieving online document vectors + +The ElasticSearch online store supports retrieving document vectors for a given list of entity keys. The document vectors are returned as a dictionary where the key is the entity key and the value is the document vector. The document vector is a dense vector of floats. + +{% code title="python" %} +```python +from feast import FeatureStore + +feature_store = FeatureStore(repo_path="feature_store.yaml") + +query_vector = [1.0, 2.0, 3.0, 4.0, 5.0] +top_k = 5 + +# Retrieve the top k closest features to the query vector + +feature_values = feature_store.retrieve_online_documents( + feature="my_feature", + query=query_vector, + top_k=top_k +) +``` +{% endcode %} diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f45dbb1bc8..6ef621523a 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1740,7 +1740,7 @@ def retrieve_online_documents( feature: str, query: Union[str, List[float]], top_k: int, - distance_metric: str, + distance_metric: Optional[str] = None, ) -> OnlineResponse: """ Retrieves the top k closest document features. Note, embeddings are a subset of features. @@ -1765,7 +1765,7 @@ def _retrieve_online_documents( feature: str, query: Union[str, List[float]], top_k: int, - distance_metric: str = "L2", + distance_metric: Optional[str] = None, ): if isinstance(query, str): raise ValueError( @@ -2030,7 +2030,7 @@ def _retrieve_from_online_store( requested_feature: str, query: List[float], top_k: int, - distance_metric: str, + distance_metric: Optional[str], ) -> List[Tuple[Timestamp, "FieldStatus.ValueType", Value, Value, Value]]: """ Search and return document features from the online document store. diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 2f3e30018a..52d9a73dc5 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -196,7 +196,7 @@ def retrieve_online_documents( requested_feature: str, query: List[float], top_k: int, - distance_metric: str, + distance_metric: Optional[str] = None, ) -> List: set_usage_attribute("provider", self.__class__.__name__) result = [] diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index f869d82e11..456ceefb6f 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -111,7 +111,7 @@ def retrieve_online_documents( requested_feature: str, query: List[float], top_k: int, - distance_metric: str, + distance_metric: Optional[str] = None, ) -> List[ Tuple[ Optional[datetime], diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py index 16b8a8dd6b..c62a9009ca 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py @@ -11,7 +11,7 @@ class ElasticSearchOnlineStoreCreator(OnlineStoreCreator): def __init__(self, project_name: str, **kwargs): super().__init__(project_name) self.container = ElasticSearchContainer( - "elasticsearch:8.13.4", + "elasticsearch:8.3.3", ).with_exposed_ports(9200) def create_online_store(self) -> Dict[str, str]: From bdb973e2c48a491d73491d1a559f0edb8ce2012d Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sun, 12 May 2024 17:26:06 -0700 Subject: [PATCH 17/17] fix test Signed-off-by: cmuhao --- docs/reference/alpha-vector-database.md | 2 +- docs/reference/online-stores/elasticsearch.md | 49 +++++++++++++++++++ .../online_stores/contrib/elasticsearch.py | 7 +++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/docs/reference/alpha-vector-database.md b/docs/reference/alpha-vector-database.md index 3b0c924d84..37d9b9cdf8 100644 --- a/docs/reference/alpha-vector-database.md +++ b/docs/reference/alpha-vector-database.md @@ -10,7 +10,7 @@ Below are supported vector databases and implemented features: | Vector Database | Retrieval | Indexing | |-----------------|-----------|----------| | Pgvector | [x] | [ ] | -| Elasticsearch | [ ] | [ ] | +| Elasticsearch | [x] | [x] | | Milvus | [ ] | [ ] | | Faiss | [ ] | [ ] | diff --git a/docs/reference/online-stores/elasticsearch.md b/docs/reference/online-stores/elasticsearch.md index 624bbbf065..bf6f9a58db 100644 --- a/docs/reference/online-stores/elasticsearch.md +++ b/docs/reference/online-stores/elasticsearch.md @@ -74,3 +74,52 @@ feature_values = feature_store.retrieve_online_documents( ) ``` {% endcode %} + +## Indexing +Currently, the indexing mapping in the ElasticSearch online store is configured as: + +{% code title="indexing_mapping" %} +```json +"properties": { + "entity_key": {"type": "binary"}, + "feature_name": {"type": "keyword"}, + "feature_value": {"type": "binary"}, + "timestamp": {"type": "date"}, + "created_ts": {"type": "date"}, + "vector_value": { + "type": "dense_vector", + "dims": config.online_store.vector_len, + "index": "true", + "similarity": config.online_store.similarity, + }, +} +``` +{% endcode %} +And the online_read API mapping is configured as: + +{% code title="online_read_mapping" %} +```json +"query": { + "bool": { + "must": [ + {"terms": {"entity_key": entity_keys}}, + {"terms": {"feature_name": requested_features}}, + ] + } +}, +``` +{% endcode %} + +And the similarity search API mapping is configured as: + +{% code title="similarity_search_mapping" %} +```json +{ + "field": "vector_value", + "query_vector": embedding_vector, + "k": top_k, +} +``` +{% endcode %} + +These APIs are subject to change in future versions of Feast to improve performance and usability. \ No newline at end of file diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py index 4b30e6221c..429327e651 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py @@ -157,6 +157,13 @@ def online_read( return results def create_index(self, config: RepoConfig, table: FeatureView): + """ + Create an index in ElasticSearch for the given table. + TODO: This method can be exposed to users to customize the indexing functionality. + Args: + config: Feast repo configuration object. + table: FeatureView table for which the index needs to be created. + """ index_mapping = { "properties": { "entity_key": {"type": "binary"},