From e33aa20e0d6203440024210f2a439cae4144ce57 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Thu, 9 May 2024 00:02:24 -0700
Subject: [PATCH 01/17] add elasticsearch as online store
Signed-off-by: cmuhao
---
README.md | 2 +-
.../online_stores/contrib/elastichsearch.py | 193 ++++++++++++++++++
setup.py | 4 +
3 files changed, 198 insertions(+), 1 deletion(-)
create mode 100644 sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
diff --git a/README.md b/README.md
index e9b7ff4743..a1e06774da 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@
-
+
[](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml)
[](https://github.com/feast-dev/feast/actions/workflows/master_only.yml)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
new file mode 100644
index 0000000000..28b18413c5
--- /dev/null
+++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
@@ -0,0 +1,193 @@
+from __future__ import absolute_import
+
+from datetime import datetime
+from typing import List, Tuple, Optional, Sequence, Dict, Callable, Any
+
+from elasticsearch import Elasticsearch, helpers
+import pytz
+from feast import RepoConfig, FeatureView, Entity
+from feast.infra.online_stores.online_store import OnlineStore
+from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
+from feast.protos.feast.types.Value_pb2 import Value as ValueProto
+from feast.repo_config import FeastConfigBaseModel
+from feast.infra.key_encoding_utils import serialize_entity_key, get_list_val_str
+
+
+class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel):
+ """
+ Configuration for the Elasticsearch online store.
+ NOTE: The class *must* end with the `OnlineStoreConfig` suffix.
+ """
+
+ type: str = "elasticsearch"
+
+ host: Optional[str] = None
+ user: Optional[str] = None
+ password: Optional[str] = None
+ port: Optional[int] = None
+ index: Optional[str] = None
+
+
+class ElasticsearchOnlineStore(OnlineStore):
+ _client: Optional[Elasticsearch] = None
+
+ _index: Optional[str] = None
+
+ def _get_client(self, config: RepoConfig) -> Elasticsearch:
+ online_store_config = config.online_store
+ assert isinstance(online_store_config, ElasticsearchOnlineStoreConfig)
+
+ if not self._client:
+ self._client = Elasticsearch(
+ hosts=[{"host": online_store_config.host or "localhost", "port": online_store_config.port or 9200}],
+ http_auth=(online_store_config.user, online_store_config.password),
+ )
+
+ def create_index(self, config: RepoConfig, table: FeatureView):
+ pass
+
+ def _bulk_batch_actions(self, batch):
+ for row in batch:
+ yield {
+ "_index": self._index,
+ "_id": f"{row['entity_key']}_{row['feature_name']}_{row['timestamp']}",
+ "_source": row
+ }
+
+ def online_write_batch(self,
+ config: RepoConfig,
+ table: FeatureView,
+ data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]],
+ progress: Optional[Callable[[int], Any]]) -> None:
+ insert_values = []
+ for entity_key, values, timestamp, created_ts in data:
+ entity_key_bin = serialize_entity_key(
+ entity_key,
+ entity_key_serialization_version=config.entity_key_serialization_version,
+ )
+ timestamp = _to_naive_utc(timestamp)
+ if created_ts is not None:
+ created_ts = _to_naive_utc(created_ts)
+ for feature_name, value in values.items():
+ vector_val = get_list_val_str(value)
+ insert_values.append({
+ "entity_key": entity_key_bin,
+ "feature_name": feature_name,
+ "feature_value": value,
+ "timestamp": timestamp,
+ "created_ts": created_ts,
+ "vector_value": vector_val,
+ })
+
+ batch_size = config.online_config.batch_size
+ for i in range(0, len(insert_values), batch_size):
+ batch = insert_values[i:i + batch_size]
+ actions = self._bulk_batch_actions(batch)
+ helpers.bulk(self._client, actions)
+
+ def online_read(self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None) -> List[
+ Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
+ if not requested_features:
+ body = {
+ "_source": {
+ "excludes": ["vector_value"]
+ },
+ "query": {
+ "match": {
+ "entity_key": entity_keys
+ }
+ }
+ }
+ else:
+ body = {
+ "_source": {
+ "excludes": ["vector_value"]
+ },
+ "query": {
+ "bool": {
+ "must": [
+ {"terms": {"entity_key": entity_keys}},
+ {"terms": {"feature_name": requested_features}}
+ ]
+ }
+ }
+ }
+ self._client.search(index=self._index, body=body)
+
+ def update(self,
+ config: RepoConfig,
+ tables_to_delete: Sequence[FeatureView],
+ tables_to_keep: Sequence[FeatureView],
+ entities_to_delete: Sequence[Entity],
+ entities_to_keep: Sequence[Entity], partial: bool):
+ # implement the update method
+ for table in tables_to_delete:
+ self._client.delete_by_query(index=table.name)
+ for table in tables_to_keep:
+ self.create_index(config, table)
+
+ def teardown(self, config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity]):
+ pass
+
+ def retrieve_online_documents(self,
+ config: RepoConfig,
+ table: FeatureView,
+ requested_feature: str,
+ embedding: List[float], top_k: int) -> List[
+ Tuple[
+ Optional[datetime],
+ Optional[ValueProto],
+ Optional[ValueProto],
+ Optional[ValueProto],
+ ]
+ ]:
+ result: List[
+ Tuple[
+ Optional[datetime],
+ Optional[ValueProto],
+ Optional[ValueProto],
+ Optional[ValueProto],
+ ]
+ ] = []
+ reponse = self._client.search(
+ index=self._index,
+ knn={
+ "field": requested_feature,
+ "query_vector": embedding,
+ "k": top_k,
+ }
+ )
+ rows = reponse["hits"]["hits"][0:top_k]
+ for row in rows:
+ (
+ entity_key,
+ feature_value,
+ timestamp,
+ created_ts,
+ vector_value,
+ ) = row["_source"]
+ feature_value_proto = ValueProto()
+ feature_value_proto.ParseFromString(feature_value)
+
+ vector_value_proto = ValueProto(string_val=vector_value)
+ vector_value_proto.ParseFromString(vector_value)
+ result.append(
+ (
+ timestamp,
+ feature_value_proto,
+ None,
+ vector_value_proto,
+ )
+ )
+ return result
+
+
+def _to_naive_utc(ts: datetime):
+ if ts.tzinfo is None:
+ return ts
+ else:
+ return ts.astimezone(pytz.utc).replace(tzinfo=None)
diff --git a/setup.py b/setup.py
index 6cc728ee98..b1c3806451 100644
--- a/setup.py
+++ b/setup.py
@@ -150,6 +150,8 @@
DELTA_REQUIRED = ["deltalake"]
+ELASTICSEARCH_REQUIRED = ["elasticsearch>=8"]
+
CI_REQUIRED = (
[
"build",
@@ -211,6 +213,7 @@
+ GRPCIO_REQUIRED
+ DUCKDB_REQUIRED
+ DELTA_REQUIRED
+ + ELASTICSEARCH_REQUIRED
)
DOCS_REQUIRED = CI_REQUIRED
@@ -377,6 +380,7 @@ def run(self):
"duckdb": DUCKDB_REQUIRED,
"ikv": IKV_REQUIRED,
"delta": DELTA_REQUIRED,
+ "elasticsearch": ELASTICSEARCH_REQUIRED,
},
include_package_data=True,
license="Apache",
From 091e838e5543445fd5847fd7d9f6386efa0e5e71 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Thu, 9 May 2024 00:03:01 -0700
Subject: [PATCH 02/17] add elasticsearch as online store
Signed-off-by: cmuhao
---
.../online_stores/contrib/elastichsearch.py | 120 ++++++++++--------
1 file changed, 68 insertions(+), 52 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
index 28b18413c5..5e84c21c14 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
@@ -1,16 +1,17 @@
from __future__ import absolute_import
from datetime import datetime
-from typing import List, Tuple, Optional, Sequence, Dict, Callable, Any
+from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
-from elasticsearch import Elasticsearch, helpers
import pytz
-from feast import RepoConfig, FeatureView, Entity
+from elasticsearch import Elasticsearch, helpers
+
+from feast import Entity, FeatureView, RepoConfig
+from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
-from feast.infra.key_encoding_utils import serialize_entity_key, get_list_val_str
class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel):
@@ -39,7 +40,12 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch:
if not self._client:
self._client = Elasticsearch(
- hosts=[{"host": online_store_config.host or "localhost", "port": online_store_config.port or 9200}],
+ hosts=[
+ {
+ "host": online_store_config.host or "localhost",
+ "port": online_store_config.port or 9200,
+ }
+ ],
http_auth=(online_store_config.user, online_store_config.password),
)
@@ -51,14 +57,18 @@ def _bulk_batch_actions(self, batch):
yield {
"_index": self._index,
"_id": f"{row['entity_key']}_{row['feature_name']}_{row['timestamp']}",
- "_source": row
+ "_source": row,
}
- def online_write_batch(self,
- config: RepoConfig,
- table: FeatureView,
- data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]],
- progress: Optional[Callable[[int], Any]]) -> None:
+ def online_write_batch(
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ data: List[
+ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
+ ],
+ progress: Optional[Callable[[int], Any]],
+ ) -> None:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
@@ -70,74 +80,80 @@ def online_write_batch(self,
created_ts = _to_naive_utc(created_ts)
for feature_name, value in values.items():
vector_val = get_list_val_str(value)
- insert_values.append({
- "entity_key": entity_key_bin,
- "feature_name": feature_name,
- "feature_value": value,
- "timestamp": timestamp,
- "created_ts": created_ts,
- "vector_value": vector_val,
- })
+ insert_values.append(
+ {
+ "entity_key": entity_key_bin,
+ "feature_name": feature_name,
+ "feature_value": value,
+ "timestamp": timestamp,
+ "created_ts": created_ts,
+ "vector_value": vector_val,
+ }
+ )
batch_size = config.online_config.batch_size
for i in range(0, len(insert_values), batch_size):
- batch = insert_values[i:i + batch_size]
+ batch = insert_values[i : i + batch_size]
actions = self._bulk_batch_actions(batch)
helpers.bulk(self._client, actions)
- def online_read(self,
- config: RepoConfig,
- table: FeatureView,
- entity_keys: List[EntityKeyProto],
- requested_features: Optional[List[str]] = None) -> List[
- Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
+ def online_read(
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
+ ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
if not requested_features:
body = {
- "_source": {
- "excludes": ["vector_value"]
- },
- "query": {
- "match": {
- "entity_key": entity_keys
- }
- }
+ "_source": {"excludes": ["vector_value"]},
+ "query": {"match": {"entity_key": entity_keys}},
}
else:
body = {
- "_source": {
- "excludes": ["vector_value"]
- },
+ "_source": {"excludes": ["vector_value"]},
"query": {
"bool": {
"must": [
{"terms": {"entity_key": entity_keys}},
- {"terms": {"feature_name": requested_features}}
+ {"terms": {"feature_name": requested_features}},
]
}
- }
+ },
}
self._client.search(index=self._index, body=body)
- def update(self,
- config: RepoConfig,
- tables_to_delete: Sequence[FeatureView],
- tables_to_keep: Sequence[FeatureView],
- entities_to_delete: Sequence[Entity],
- entities_to_keep: Sequence[Entity], partial: bool):
+ def update(
+ self,
+ config: RepoConfig,
+ tables_to_delete: Sequence[FeatureView],
+ tables_to_keep: Sequence[FeatureView],
+ entities_to_delete: Sequence[Entity],
+ entities_to_keep: Sequence[Entity],
+ partial: bool,
+ ):
# implement the update method
for table in tables_to_delete:
self._client.delete_by_query(index=table.name)
for table in tables_to_keep:
self.create_index(config, table)
- def teardown(self, config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity]):
+ def teardown(
+ self,
+ config: RepoConfig,
+ tables: Sequence[FeatureView],
+ entities: Sequence[Entity],
+ ):
pass
- def retrieve_online_documents(self,
- config: RepoConfig,
- table: FeatureView,
- requested_feature: str,
- embedding: List[float], top_k: int) -> List[
+ def retrieve_online_documents(
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ requested_feature: str,
+ embedding: List[float],
+ top_k: int,
+ ) -> List[
Tuple[
Optional[datetime],
Optional[ValueProto],
@@ -159,7 +175,7 @@ def retrieve_online_documents(self,
"field": requested_feature,
"query_vector": embedding,
"k": top_k,
- }
+ },
)
rows = reponse["hits"]["hits"][0:top_k]
for row in rows:
From de668fb9835cffad35d4d59a39eec10b1e0b892f Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Thu, 9 May 2024 00:05:53 -0700
Subject: [PATCH 03/17] format
Signed-off-by: cmuhao
---
.../infra/online_stores/contrib/elastichsearch.py | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
index 5e84c21c14..1a775e4059 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
@@ -121,7 +121,18 @@ def online_read(
}
},
}
- self._client.search(index=self._index, body=body)
+ response = self._client.search(index=self._index, body=body)
+ results: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
+ for hit in response["hits"]["hits"]:
+ results.append(
+ (
+ hit["_source"]["timestamp"],
+ {
+ hit["_source"]["feature_name"]: hit["_source"]["feature_value"]
+ },
+ )
+ )
+ return results
def update(
self,
From d3619f65abbbe4f237e1091dab1838b4dbe0ec0c Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Thu, 9 May 2024 00:09:19 -0700
Subject: [PATCH 04/17] format
Signed-off-by: cmuhao
---
.../online_stores/contrib/elastichsearch.py | 73 ++++++++++---------
1 file changed, 37 insertions(+), 36 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
index 1a775e4059..21121306e5 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
@@ -38,7 +38,9 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch:
online_store_config = config.online_store
assert isinstance(online_store_config, ElasticsearchOnlineStoreConfig)
- if not self._client:
+ if self._client:
+ return self._client
+ else:
self._client = Elasticsearch(
hosts=[
{
@@ -48,6 +50,7 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch:
],
http_auth=(online_store_config.user, online_store_config.password),
)
+ return self._client
def create_index(self, config: RepoConfig, table: FeatureView):
pass
@@ -61,13 +64,13 @@ def _bulk_batch_actions(self, batch):
}
def online_write_batch(
- self,
- config: RepoConfig,
- table: FeatureView,
- data: List[
- Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
- ],
- progress: Optional[Callable[[int], Any]],
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ data: List[
+ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
+ ],
+ progress: Optional[Callable[[int], Any]],
) -> None:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
@@ -93,16 +96,16 @@ def online_write_batch(
batch_size = config.online_config.batch_size
for i in range(0, len(insert_values), batch_size):
- batch = insert_values[i : i + batch_size]
+ batch = insert_values[i: i + batch_size]
actions = self._bulk_batch_actions(batch)
helpers.bulk(self._client, actions)
def online_read(
- self,
- config: RepoConfig,
- table: FeatureView,
- entity_keys: List[EntityKeyProto],
- requested_features: Optional[List[str]] = None,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
if not requested_features:
body = {
@@ -121,27 +124,25 @@ def online_read(
}
},
}
- response = self._client.search(index=self._index, body=body)
+ response = self._get_client(config).search(index=self._index, body=body)
results: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for hit in response["hits"]["hits"]:
results.append(
(
hit["_source"]["timestamp"],
- {
- hit["_source"]["feature_name"]: hit["_source"]["feature_value"]
- },
+ {hit["_source"]["feature_name"]: hit["_source"]["feature_value"]},
)
)
return results
def update(
- self,
- config: RepoConfig,
- tables_to_delete: Sequence[FeatureView],
- tables_to_keep: Sequence[FeatureView],
- entities_to_delete: Sequence[Entity],
- entities_to_keep: Sequence[Entity],
- partial: bool,
+ self,
+ config: RepoConfig,
+ tables_to_delete: Sequence[FeatureView],
+ tables_to_keep: Sequence[FeatureView],
+ entities_to_delete: Sequence[Entity],
+ entities_to_keep: Sequence[Entity],
+ partial: bool,
):
# implement the update method
for table in tables_to_delete:
@@ -150,20 +151,20 @@ def update(
self.create_index(config, table)
def teardown(
- self,
- config: RepoConfig,
- tables: Sequence[FeatureView],
- entities: Sequence[Entity],
+ self,
+ config: RepoConfig,
+ tables: Sequence[FeatureView],
+ entities: Sequence[Entity],
):
pass
def retrieve_online_documents(
- self,
- config: RepoConfig,
- table: FeatureView,
- requested_feature: str,
- embedding: List[float],
- top_k: int,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ requested_feature: str,
+ embedding: List[float],
+ top_k: int,
) -> List[
Tuple[
Optional[datetime],
@@ -180,7 +181,7 @@ def retrieve_online_documents(
Optional[ValueProto],
]
] = []
- reponse = self._client.search(
+ reponse = self._get_client(config).search(
index=self._index,
knn={
"field": requested_feature,
From 7a091385686ac691f56b45e0cd96d553dbebe52b Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Thu, 9 May 2024 00:12:32 -0700
Subject: [PATCH 05/17] format
Signed-off-by: cmuhao
---
.../feast/infra/online_stores/contrib/elastichsearch.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
index 21121306e5..5dfd901499 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
@@ -98,7 +98,7 @@ def online_write_batch(
for i in range(0, len(insert_values), batch_size):
batch = insert_values[i: i + batch_size]
actions = self._bulk_batch_actions(batch)
- helpers.bulk(self._client, actions)
+ helpers.bulk(self._get_client(config), actions)
def online_read(
self,
@@ -146,7 +146,7 @@ def update(
):
# implement the update method
for table in tables_to_delete:
- self._client.delete_by_query(index=table.name)
+ self._get_client(config).delete_by_query(index=table.name)
for table in tables_to_keep:
self.create_index(config, table)
From f33e7698464b33298a43318a7eb5e353b497fdd1 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Thu, 9 May 2024 00:14:40 -0700
Subject: [PATCH 06/17] format
Signed-off-by: cmuhao
---
.../online_stores/contrib/elastichsearch.py | 60 +++++++++----------
1 file changed, 30 insertions(+), 30 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
index 5dfd901499..d865fdfc67 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
@@ -64,13 +64,13 @@ def _bulk_batch_actions(self, batch):
}
def online_write_batch(
- self,
- config: RepoConfig,
- table: FeatureView,
- data: List[
- Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
- ],
- progress: Optional[Callable[[int], Any]],
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ data: List[
+ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
+ ],
+ progress: Optional[Callable[[int], Any]],
) -> None:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
@@ -96,16 +96,16 @@ def online_write_batch(
batch_size = config.online_config.batch_size
for i in range(0, len(insert_values), batch_size):
- batch = insert_values[i: i + batch_size]
+ batch = insert_values[i : i + batch_size]
actions = self._bulk_batch_actions(batch)
helpers.bulk(self._get_client(config), actions)
def online_read(
- self,
- config: RepoConfig,
- table: FeatureView,
- entity_keys: List[EntityKeyProto],
- requested_features: Optional[List[str]] = None,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
if not requested_features:
body = {
@@ -136,13 +136,13 @@ def online_read(
return results
def update(
- self,
- config: RepoConfig,
- tables_to_delete: Sequence[FeatureView],
- tables_to_keep: Sequence[FeatureView],
- entities_to_delete: Sequence[Entity],
- entities_to_keep: Sequence[Entity],
- partial: bool,
+ self,
+ config: RepoConfig,
+ tables_to_delete: Sequence[FeatureView],
+ tables_to_keep: Sequence[FeatureView],
+ entities_to_delete: Sequence[Entity],
+ entities_to_keep: Sequence[Entity],
+ partial: bool,
):
# implement the update method
for table in tables_to_delete:
@@ -151,20 +151,20 @@ def update(
self.create_index(config, table)
def teardown(
- self,
- config: RepoConfig,
- tables: Sequence[FeatureView],
- entities: Sequence[Entity],
+ self,
+ config: RepoConfig,
+ tables: Sequence[FeatureView],
+ entities: Sequence[Entity],
):
pass
def retrieve_online_documents(
- self,
- config: RepoConfig,
- table: FeatureView,
- requested_feature: str,
- embedding: List[float],
- top_k: int,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ requested_feature: str,
+ embedding: List[float],
+ top_k: int,
) -> List[
Tuple[
Optional[datetime],
From 9153983baa92381852ffb1aed7278a39cca85199 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sat, 11 May 2024 17:33:18 -0700
Subject: [PATCH 07/17] fix search
Signed-off-by: cmuhao
---
.../{elastichsearch.py => elasticsearch.py} | 167 +++++++++++-------
.../feast/infra/online_stores/online_store.py | 2 +
sdk/python/feast/infra/provider.py | 3 +-
sdk/python/feast/repo_config.py | 1 +
setup.py | 2 +-
5 files changed, 109 insertions(+), 66 deletions(-)
rename sdk/python/feast/infra/online_stores/contrib/{elastichsearch.py => elasticsearch.py} (52%)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
similarity index 52%
rename from sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
rename to sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
index d865fdfc67..2b34db53c2 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elastichsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
@@ -1,10 +1,14 @@
from __future__ import absolute_import
+import json
+import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
+import base64
import pytz
from elasticsearch import Elasticsearch, helpers
+from pydantic import PositiveInt
from feast import Entity, FeatureView, RepoConfig
from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key
@@ -14,9 +18,9 @@
from feast.repo_config import FeastConfigBaseModel
-class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel):
+class ElasticSearchOnlineStoreConfig(FeastConfigBaseModel):
"""
- Configuration for the Elasticsearch online store.
+ Configuration for the ElasticSearch online store.
NOTE: The class *must* end with the `OnlineStoreConfig` suffix.
"""
@@ -27,16 +31,25 @@ class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel):
password: Optional[str] = None
port: Optional[int] = None
index: Optional[str] = None
+ scheme: Optional[str] = "http"
+ # The number of rows to write in a single batch
+ write_batch_size: Optional[PositiveInt] = 40
+
+ # The length of the vector value
+ vector_len: Optional[int] = 512
+
+ # The vector similarity metric to use in KNN search
+ # more details: https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html
+ similarity: Optional[str] = "cosine"
-class ElasticsearchOnlineStore(OnlineStore):
- _client: Optional[Elasticsearch] = None
- _index: Optional[str] = None
+class ElasticSearchOnlineStore(OnlineStore):
+ _client: Optional[Elasticsearch] = None
def _get_client(self, config: RepoConfig) -> Elasticsearch:
online_store_config = config.online_store
- assert isinstance(online_store_config, ElasticsearchOnlineStoreConfig)
+ assert isinstance(online_store_config, ElasticSearchOnlineStoreConfig)
if self._client:
return self._client
@@ -46,31 +59,29 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch:
{
"host": online_store_config.host or "localhost",
"port": online_store_config.port or 9200,
+ "scheme": online_store_config.scheme or "http"
}
],
- http_auth=(online_store_config.user, online_store_config.password),
+ basic_auth=(online_store_config.user, online_store_config.password),
)
return self._client
- def create_index(self, config: RepoConfig, table: FeatureView):
- pass
-
- def _bulk_batch_actions(self, batch):
+ def _bulk_batch_actions(self, table: FeatureView, batch: List[Dict[str, Any]]):
for row in batch:
yield {
- "_index": self._index,
+ "_index": table.name,
"_id": f"{row['entity_key']}_{row['feature_name']}_{row['timestamp']}",
"_source": row,
}
def online_write_batch(
- self,
- config: RepoConfig,
- table: FeatureView,
- data: List[
- Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
- ],
- progress: Optional[Callable[[int], Any]],
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ data: List[
+ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
+ ],
+ progress: Optional[Callable[[int], Any]],
) -> None:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
@@ -78,34 +89,36 @@ def online_write_batch(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
+ encoded_entity_key = base64.b64encode(entity_key_bin).decode("utf-8")
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)
for feature_name, value in values.items():
- vector_val = get_list_val_str(value)
+ encoded_value = base64.b64encode(value.SerializeToString()).decode("utf-8")
+ vector_val = json.loads(get_list_val_str(value))
insert_values.append(
{
- "entity_key": entity_key_bin,
+ "entity_key": encoded_entity_key,
"feature_name": feature_name,
- "feature_value": value,
+ "feature_value": encoded_value,
"timestamp": timestamp,
"created_ts": created_ts,
"vector_value": vector_val,
}
)
- batch_size = config.online_config.batch_size
+ batch_size = config.online_config.write_batch_size
for i in range(0, len(insert_values), batch_size):
- batch = insert_values[i : i + batch_size]
- actions = self._bulk_batch_actions(batch)
+ batch = insert_values[i: i + batch_size]
+ actions = self._bulk_batch_actions(table, batch)
helpers.bulk(self._get_client(config), actions)
def online_read(
- self,
- config: RepoConfig,
- table: FeatureView,
- entity_keys: List[EntityKeyProto],
- requested_features: Optional[List[str]] = None,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
if not requested_features:
body = {
@@ -124,7 +137,7 @@ def online_read(
}
},
}
- response = self._get_client(config).search(index=self._index, body=body)
+ response = self._get_client(config).search(index=table.name, body=body)
results: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for hit in response["hits"]["hits"]:
results.append(
@@ -135,14 +148,32 @@ def online_read(
)
return results
+ def create_index(self, config: RepoConfig, table: FeatureView):
+ index_mapping = {
+ "properties": {
+ "entity_key": {"type": "binary"},
+ "feature_name": {"type": "keyword"},
+ "feature_value": {"type": "binary"},
+ "timestamp": {"type": "date"},
+ "created_ts": {"type": "date"},
+ "vector_value": {
+ "type": "dense_vector",
+ "dims": config.online_config.vector_len,
+ "index": "true",
+ "similarity": config.online_config.similarity,
+ },
+ }
+ }
+ self._get_client(config).indices.create(index=table.name, mappings=index_mapping)
+
def update(
- self,
- config: RepoConfig,
- tables_to_delete: Sequence[FeatureView],
- tables_to_keep: Sequence[FeatureView],
- entities_to_delete: Sequence[Entity],
- entities_to_keep: Sequence[Entity],
- partial: bool,
+ self,
+ config: RepoConfig,
+ tables_to_delete: Sequence[FeatureView],
+ tables_to_keep: Sequence[FeatureView],
+ entities_to_delete: Sequence[Entity],
+ entities_to_keep: Sequence[Entity],
+ partial: bool,
):
# implement the update method
for table in tables_to_delete:
@@ -151,20 +182,28 @@ def update(
self.create_index(config, table)
def teardown(
- self,
- config: RepoConfig,
- tables: Sequence[FeatureView],
- entities: Sequence[Entity],
+ self,
+ config: RepoConfig,
+ tables: Sequence[FeatureView],
+ entities: Sequence[Entity],
):
- pass
+ project = config.project
+ try:
+ for table in tables:
+ self._get_client(config).indices.delete(index=table.name)
+ except Exception as e:
+ logging.exception(f"Error deleting index in project {project}: {e}")
+ raise
def retrieve_online_documents(
- self,
- config: RepoConfig,
- table: FeatureView,
- requested_feature: str,
- embedding: List[float],
- top_k: int,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ requested_feature: str,
+ embedding: List[float],
+ top_k: int,
+ *args,
+ **kwargs,
) -> List[
Tuple[
Optional[datetime],
@@ -181,34 +220,34 @@ def retrieve_online_documents(
Optional[ValueProto],
]
] = []
- reponse = self._get_client(config).search(
- index=self._index,
+ response = self._get_client(config).search(
+ index=table.name,
knn={
- "field": requested_feature,
+ "field": "vector_value",
"query_vector": embedding,
"k": top_k,
},
)
- rows = reponse["hits"]["hits"][0:top_k]
+ rows = response["hits"]["hits"][0:top_k]
for row in rows:
- (
- entity_key,
- feature_value,
- timestamp,
- created_ts,
- vector_value,
- ) = row["_source"]
+ entity_key = row["_source"]["entity_key"]
+ feature_value = row["_source"]["feature_value"]
+ vector_value = row["_source"]["vector_value"]
+ timestamp = row["_source"]["timestamp"]
+ distance = row["_score"]
+ timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f")
+
feature_value_proto = ValueProto()
- feature_value_proto.ParseFromString(feature_value)
+ feature_value_proto.ParseFromString(base64.b64decode(feature_value))
- vector_value_proto = ValueProto(string_val=vector_value)
- vector_value_proto.ParseFromString(vector_value)
+ vector_value_proto = ValueProto(string_val=str(vector_value))
+ distance_value_proto = ValueProto(float_val=distance)
result.append(
(
timestamp,
feature_value_proto,
- None,
vector_value_proto,
+ distance_value_proto,
)
)
return result
diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py
index 2a81e37042..9de148a095 100644
--- a/sdk/python/feast/infra/online_stores/online_store.py
+++ b/sdk/python/feast/infra/online_stores/online_store.py
@@ -142,6 +142,7 @@ def retrieve_online_documents(
requested_feature: str,
embedding: List[float],
top_k: int,
+ distance_metric: Optional[str] = None,
) -> List[
Tuple[
Optional[datetime],
@@ -154,6 +155,7 @@ def retrieve_online_documents(
Retrieves online feature values for the specified embeddings.
Args:
+ distance_metric: distance metric to use for retrieval.
config: The config for the current feature store.
table: The feature view whose feature values should be read.
requested_feature: The name of the feature whose embeddings should be used for retrieval.
diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py
index 02fba0c1f6..bc6bcf3fac 100644
--- a/sdk/python/feast/infra/provider.py
+++ b/sdk/python/feast/infra/provider.py
@@ -303,7 +303,7 @@ def retrieve_online_documents(
requested_feature: str,
query: List[float],
top_k: int,
- distance_metric: str = "L2",
+ distance_metric: Optional[str] = None,
) -> List[
Tuple[
Optional[datetime],
@@ -316,6 +316,7 @@ def retrieve_online_documents(
Searches for the top-k most similar documents in the online document store.
Args:
+ distance_metric: distance metric to use for the search.
config: The config for the current feature store.
table: The feature view whose embeddings should be searched.
requested_feature: the requested document feature name.
diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py
index 5e38fd1775..00cbac1908 100644
--- a/sdk/python/feast/repo_config.py
+++ b/sdk/python/feast/repo_config.py
@@ -64,6 +64,7 @@
"rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore",
"hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore",
"ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore",
+ "elasticsearch": "feast.infra.online_stores.contrib.elasticsearch.ElasticSearchOnlineStore",
}
OFFLINE_STORE_CLASS_FOR_TYPE = {
diff --git a/setup.py b/setup.py
index b1c3806451..7d7c774691 100644
--- a/setup.py
+++ b/setup.py
@@ -150,7 +150,7 @@
DELTA_REQUIRED = ["deltalake"]
-ELASTICSEARCH_REQUIRED = ["elasticsearch>=8"]
+ELASTICSEARCH_REQUIRED = ["elasticsearch==8.13.4"]
CI_REQUIRED = (
[
From 5a212ba487bc5ff4d17409560335238e94f5fc71 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sat, 11 May 2024 17:35:36 -0700
Subject: [PATCH 08/17] format
Signed-off-by: cmuhao
---
.../feast/infra/online_stores/contrib/elasticsearch.py | 8 +++++---
sdk/python/feast/infra/online_stores/contrib/postgres.py | 2 +-
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
index 2b34db53c2..4d514c6da4 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
@@ -1,10 +1,10 @@
from __future__ import absolute_import
+import base64
import json
import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
-import base64
import pytz
from elasticsearch import Elasticsearch, helpers
@@ -51,6 +51,9 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch:
online_store_config = config.online_store
assert isinstance(online_store_config, ElasticSearchOnlineStoreConfig)
+ user = online_store_config.user if online_store_config.user is not None else ''
+ password = online_store_config.password if online_store_config.password is not None else ''
+
if self._client:
return self._client
else:
@@ -62,7 +65,7 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch:
"scheme": online_store_config.scheme or "http"
}
],
- basic_auth=(online_store_config.user, online_store_config.password),
+ basic_auth=(user, password)
)
return self._client
@@ -230,7 +233,6 @@ def retrieve_online_documents(
)
rows = response["hits"]["hits"][0:top_k]
for row in rows:
- entity_key = row["_source"]["entity_key"]
feature_value = row["_source"]["feature_value"]
vector_value = row["_source"]["vector_value"]
timestamp = row["_source"]["timestamp"]
diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py
index f2c32fdafd..1043208ab3 100644
--- a/sdk/python/feast/infra/online_stores/contrib/postgres.py
+++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py
@@ -283,7 +283,7 @@ def retrieve_online_documents(
requested_feature: str,
embedding: List[float],
top_k: int,
- distance_metric: str = "L2",
+ distance_metric: Optional[str] = "L2",
) -> List[
Tuple[
Optional[datetime],
From 06890213249dd4db3f99a49f20f56fd4f4a0db0c Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sat, 11 May 2024 17:35:59 -0700
Subject: [PATCH 09/17] format
Signed-off-by: cmuhao
---
.../online_stores/contrib/elasticsearch.py | 84 ++++++++++---------
1 file changed, 46 insertions(+), 38 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
index 4d514c6da4..86010e03b9 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
@@ -51,8 +51,12 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch:
online_store_config = config.online_store
assert isinstance(online_store_config, ElasticSearchOnlineStoreConfig)
- user = online_store_config.user if online_store_config.user is not None else ''
- password = online_store_config.password if online_store_config.password is not None else ''
+ user = online_store_config.user if online_store_config.user is not None else ""
+ password = (
+ online_store_config.password
+ if online_store_config.password is not None
+ else ""
+ )
if self._client:
return self._client
@@ -62,10 +66,10 @@ def _get_client(self, config: RepoConfig) -> Elasticsearch:
{
"host": online_store_config.host or "localhost",
"port": online_store_config.port or 9200,
- "scheme": online_store_config.scheme or "http"
+ "scheme": online_store_config.scheme or "http",
}
],
- basic_auth=(user, password)
+ basic_auth=(user, password),
)
return self._client
@@ -78,13 +82,13 @@ def _bulk_batch_actions(self, table: FeatureView, batch: List[Dict[str, Any]]):
}
def online_write_batch(
- self,
- config: RepoConfig,
- table: FeatureView,
- data: List[
- Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
- ],
- progress: Optional[Callable[[int], Any]],
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ data: List[
+ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
+ ],
+ progress: Optional[Callable[[int], Any]],
) -> None:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
@@ -97,7 +101,9 @@ def online_write_batch(
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)
for feature_name, value in values.items():
- encoded_value = base64.b64encode(value.SerializeToString()).decode("utf-8")
+ encoded_value = base64.b64encode(value.SerializeToString()).decode(
+ "utf-8"
+ )
vector_val = json.loads(get_list_val_str(value))
insert_values.append(
{
@@ -112,16 +118,16 @@ def online_write_batch(
batch_size = config.online_config.write_batch_size
for i in range(0, len(insert_values), batch_size):
- batch = insert_values[i: i + batch_size]
+ batch = insert_values[i : i + batch_size]
actions = self._bulk_batch_actions(table, batch)
helpers.bulk(self._get_client(config), actions)
def online_read(
- self,
- config: RepoConfig,
- table: FeatureView,
- entity_keys: List[EntityKeyProto],
- requested_features: Optional[List[str]] = None,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
if not requested_features:
body = {
@@ -167,16 +173,18 @@ def create_index(self, config: RepoConfig, table: FeatureView):
},
}
}
- self._get_client(config).indices.create(index=table.name, mappings=index_mapping)
+ self._get_client(config).indices.create(
+ index=table.name, mappings=index_mapping
+ )
def update(
- self,
- config: RepoConfig,
- tables_to_delete: Sequence[FeatureView],
- tables_to_keep: Sequence[FeatureView],
- entities_to_delete: Sequence[Entity],
- entities_to_keep: Sequence[Entity],
- partial: bool,
+ self,
+ config: RepoConfig,
+ tables_to_delete: Sequence[FeatureView],
+ tables_to_keep: Sequence[FeatureView],
+ entities_to_delete: Sequence[Entity],
+ entities_to_keep: Sequence[Entity],
+ partial: bool,
):
# implement the update method
for table in tables_to_delete:
@@ -185,10 +193,10 @@ def update(
self.create_index(config, table)
def teardown(
- self,
- config: RepoConfig,
- tables: Sequence[FeatureView],
- entities: Sequence[Entity],
+ self,
+ config: RepoConfig,
+ tables: Sequence[FeatureView],
+ entities: Sequence[Entity],
):
project = config.project
try:
@@ -199,14 +207,14 @@ def teardown(
raise
def retrieve_online_documents(
- self,
- config: RepoConfig,
- table: FeatureView,
- requested_feature: str,
- embedding: List[float],
- top_k: int,
- *args,
- **kwargs,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ requested_feature: str,
+ embedding: List[float],
+ top_k: int,
+ *args,
+ **kwargs,
) -> List[
Tuple[
Optional[datetime],
From 1dc08ba340dd6dd41dd7be924c7eb114226b66ce Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sat, 11 May 2024 17:36:20 -0700
Subject: [PATCH 10/17] format
Signed-off-by: cmuhao
---
.../online_stores/contrib/elasticsearch.py | 64 +++++++++----------
1 file changed, 32 insertions(+), 32 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
index 86010e03b9..d54b9e5dba 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
@@ -82,13 +82,13 @@ def _bulk_batch_actions(self, table: FeatureView, batch: List[Dict[str, Any]]):
}
def online_write_batch(
- self,
- config: RepoConfig,
- table: FeatureView,
- data: List[
- Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
- ],
- progress: Optional[Callable[[int], Any]],
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ data: List[
+ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
+ ],
+ progress: Optional[Callable[[int], Any]],
) -> None:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
@@ -118,16 +118,16 @@ def online_write_batch(
batch_size = config.online_config.write_batch_size
for i in range(0, len(insert_values), batch_size):
- batch = insert_values[i : i + batch_size]
+ batch = insert_values[i: i + batch_size]
actions = self._bulk_batch_actions(table, batch)
helpers.bulk(self._get_client(config), actions)
def online_read(
- self,
- config: RepoConfig,
- table: FeatureView,
- entity_keys: List[EntityKeyProto],
- requested_features: Optional[List[str]] = None,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
if not requested_features:
body = {
@@ -178,13 +178,13 @@ def create_index(self, config: RepoConfig, table: FeatureView):
)
def update(
- self,
- config: RepoConfig,
- tables_to_delete: Sequence[FeatureView],
- tables_to_keep: Sequence[FeatureView],
- entities_to_delete: Sequence[Entity],
- entities_to_keep: Sequence[Entity],
- partial: bool,
+ self,
+ config: RepoConfig,
+ tables_to_delete: Sequence[FeatureView],
+ tables_to_keep: Sequence[FeatureView],
+ entities_to_delete: Sequence[Entity],
+ entities_to_keep: Sequence[Entity],
+ partial: bool,
):
# implement the update method
for table in tables_to_delete:
@@ -193,10 +193,10 @@ def update(
self.create_index(config, table)
def teardown(
- self,
- config: RepoConfig,
- tables: Sequence[FeatureView],
- entities: Sequence[Entity],
+ self,
+ config: RepoConfig,
+ tables: Sequence[FeatureView],
+ entities: Sequence[Entity],
):
project = config.project
try:
@@ -207,14 +207,14 @@ def teardown(
raise
def retrieve_online_documents(
- self,
- config: RepoConfig,
- table: FeatureView,
- requested_feature: str,
- embedding: List[float],
- top_k: int,
- *args,
- **kwargs,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ requested_feature: str,
+ embedding: List[float],
+ top_k: int,
+ *args,
+ **kwargs,
) -> List[
Tuple[
Optional[datetime],
From c689a048ad79e5a3d7bdba3881216a859b3c8232 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sat, 11 May 2024 17:36:50 -0700
Subject: [PATCH 11/17] format
Signed-off-by: cmuhao
---
.../online_stores/contrib/elasticsearch.py | 64 +++++++++----------
1 file changed, 32 insertions(+), 32 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
index d54b9e5dba..86010e03b9 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
@@ -82,13 +82,13 @@ def _bulk_batch_actions(self, table: FeatureView, batch: List[Dict[str, Any]]):
}
def online_write_batch(
- self,
- config: RepoConfig,
- table: FeatureView,
- data: List[
- Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
- ],
- progress: Optional[Callable[[int], Any]],
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ data: List[
+ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
+ ],
+ progress: Optional[Callable[[int], Any]],
) -> None:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
@@ -118,16 +118,16 @@ def online_write_batch(
batch_size = config.online_config.write_batch_size
for i in range(0, len(insert_values), batch_size):
- batch = insert_values[i: i + batch_size]
+ batch = insert_values[i : i + batch_size]
actions = self._bulk_batch_actions(table, batch)
helpers.bulk(self._get_client(config), actions)
def online_read(
- self,
- config: RepoConfig,
- table: FeatureView,
- entity_keys: List[EntityKeyProto],
- requested_features: Optional[List[str]] = None,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
if not requested_features:
body = {
@@ -178,13 +178,13 @@ def create_index(self, config: RepoConfig, table: FeatureView):
)
def update(
- self,
- config: RepoConfig,
- tables_to_delete: Sequence[FeatureView],
- tables_to_keep: Sequence[FeatureView],
- entities_to_delete: Sequence[Entity],
- entities_to_keep: Sequence[Entity],
- partial: bool,
+ self,
+ config: RepoConfig,
+ tables_to_delete: Sequence[FeatureView],
+ tables_to_keep: Sequence[FeatureView],
+ entities_to_delete: Sequence[Entity],
+ entities_to_keep: Sequence[Entity],
+ partial: bool,
):
# implement the update method
for table in tables_to_delete:
@@ -193,10 +193,10 @@ def update(
self.create_index(config, table)
def teardown(
- self,
- config: RepoConfig,
- tables: Sequence[FeatureView],
- entities: Sequence[Entity],
+ self,
+ config: RepoConfig,
+ tables: Sequence[FeatureView],
+ entities: Sequence[Entity],
):
project = config.project
try:
@@ -207,14 +207,14 @@ def teardown(
raise
def retrieve_online_documents(
- self,
- config: RepoConfig,
- table: FeatureView,
- requested_feature: str,
- embedding: List[float],
- top_k: int,
- *args,
- **kwargs,
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ requested_feature: str,
+ embedding: List[float],
+ top_k: int,
+ *args,
+ **kwargs,
) -> List[
Tuple[
Optional[datetime],
From 999cba1213f2ac1b7f9e021abd30c40046398430 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sat, 11 May 2024 17:50:53 -0700
Subject: [PATCH 12/17] add test
Signed-off-by: cmuhao
---
Makefile | 19 ++++++++++++
.../elasticsearch_repo_configuration.py | 12 ++++++++
.../universal/online_store/elasticsearch.py | 29 +++++++++++++++++++
3 files changed, 60 insertions(+)
create mode 100644 sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py
create mode 100644 sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
diff --git a/Makefile b/Makefile
index 18006fe7d1..9b53752218 100644
--- a/Makefile
+++ b/Makefile
@@ -310,6 +310,25 @@ test-python-universal-cassandra-no-cloud-providers:
not test_snowflake" \
sdk/python/tests
+ test-python-universal-elasticsearch-online:
+ PYTHONPATH='.' \
+ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.elasticsearch_repo_configuration \
+ PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.elasticsearch \
+ python -m pytest -n 8 --integration \
+ -k "not test_universal_cli and \
+ not test_go_feature_server and \
+ not test_feature_logging and \
+ not test_reorder_columns and \
+ not test_logged_features_validation and \
+ not test_lambda_materialization_consistency and \
+ not test_offline_write and \
+ not test_push_features_to_offline_store and \
+ not gcs_registry and \
+ not s3_registry and \
+ not test_universal_types and \
+ not test_snowflake" \
+ sdk/python/tests
+
test-python-universal:
python -m pytest -n 8 --integration sdk/python/tests
diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py
new file mode 100644
index 0000000000..bbc3ae31ff
--- /dev/null
+++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py
@@ -0,0 +1,12 @@
+from tests.integration.feature_repos.integration_test_repo_config import (
+ IntegrationTestRepoConfig,
+)
+from tests.integration.feature_repos.universal.online_store.elasticsearch import (
+ ElasticSearchOnlineStoreCreator
+)
+
+FULL_REPO_CONFIGS = [
+ IntegrationTestRepoConfig(
+ online_store="elasticsearch", online_store_creator=ElasticSearchOnlineStoreCreator
+ ),
+]
diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
new file mode 100644
index 0000000000..5b56faf5ce
--- /dev/null
+++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
@@ -0,0 +1,29 @@
+import os
+from typing import Dict
+
+from testcontainers.elasticsearch import ElasticSearchContainer
+
+from tests.integration.feature_repos.universal.online_store_creator import (
+ OnlineStoreCreator,
+)
+
+
+class ElasticSearchOnlineStoreCreator(OnlineStoreCreator):
+
+ def __init__(self, project_name: str, **kwargs):
+ super().__init__(project_name)
+ self.container = ElasticSearchContainer(
+ "elasticsearch:8.3.3",
+
+ ).with_exposed_ports(9200)
+
+ def create_online_store(self) -> Dict[str, str]:
+ self.container.start()
+ return {
+ "host": "localhost",
+ "type": "elasticsearch",
+ "port": self.container.get_exposed_port(9200)
+ }
+
+ def teardown(self):
+ self.container.stop()
\ No newline at end of file
From e93956d49159d909721127a6d75b142b85346083 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sat, 11 May 2024 17:51:10 -0700
Subject: [PATCH 13/17] add test
Signed-off-by: cmuhao
---
.../contrib/elasticsearch_repo_configuration.py | 5 +++--
.../feature_repos/universal/online_store/elasticsearch.py | 7 ++-----
2 files changed, 5 insertions(+), 7 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py
index bbc3ae31ff..4d1f2c3ca1 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch_repo_configuration.py
@@ -2,11 +2,12 @@
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.universal.online_store.elasticsearch import (
- ElasticSearchOnlineStoreCreator
+ ElasticSearchOnlineStoreCreator,
)
FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(
- online_store="elasticsearch", online_store_creator=ElasticSearchOnlineStoreCreator
+ online_store="elasticsearch",
+ online_store_creator=ElasticSearchOnlineStoreCreator,
),
]
diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
index 5b56faf5ce..42bf5cad58 100644
--- a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
+++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
@@ -1,4 +1,3 @@
-import os
from typing import Dict
from testcontainers.elasticsearch import ElasticSearchContainer
@@ -9,12 +8,10 @@
class ElasticSearchOnlineStoreCreator(OnlineStoreCreator):
-
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.container = ElasticSearchContainer(
"elasticsearch:8.3.3",
-
).with_exposed_ports(9200)
def create_online_store(self) -> Dict[str, str]:
@@ -22,8 +19,8 @@ def create_online_store(self) -> Dict[str, str]:
return {
"host": "localhost",
"type": "elasticsearch",
- "port": self.container.get_exposed_port(9200)
+ "port": self.container.get_exposed_port(9200),
}
def teardown(self):
- self.container.stop()
\ No newline at end of file
+ self.container.stop()
From 369cfd412383a1ea1bee38518a24402e3bb3e427 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sun, 12 May 2024 16:29:18 -0700
Subject: [PATCH 14/17] fix e2e test
Signed-off-by: cmuhao
---
.../feast/infra/online_stores/contrib/elasticsearch.py | 8 ++++----
.../feature_repos/universal/online_store/elasticsearch.py | 4 +++-
.../integration/online_store/test_universal_online.py | 2 +-
setup.py | 2 +-
4 files changed, 9 insertions(+), 7 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
index 86010e03b9..a9132ab7a1 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
@@ -34,7 +34,7 @@ class ElasticSearchOnlineStoreConfig(FeastConfigBaseModel):
scheme: Optional[str] = "http"
# The number of rows to write in a single batch
- write_batch_size: Optional[PositiveInt] = 40
+ write_batch_size: Optional[int] = 40
# The length of the vector value
vector_len: Optional[int] = 512
@@ -116,7 +116,7 @@ def online_write_batch(
}
)
- batch_size = config.online_config.write_batch_size
+ batch_size = config.online_store.write_batch_size
for i in range(0, len(insert_values), batch_size):
batch = insert_values[i : i + batch_size]
actions = self._bulk_batch_actions(table, batch)
@@ -167,9 +167,9 @@ def create_index(self, config: RepoConfig, table: FeatureView):
"created_ts": {"type": "date"},
"vector_value": {
"type": "dense_vector",
- "dims": config.online_config.vector_len,
+ "dims": config.online_store.vector_len,
"index": "true",
- "similarity": config.online_config.similarity,
+ "similarity": config.online_store.similarity,
},
}
}
diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
index 42bf5cad58..0e35efc981 100644
--- a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
+++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
@@ -11,7 +11,7 @@ class ElasticSearchOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.container = ElasticSearchContainer(
- "elasticsearch:8.3.3",
+ "elasticsearch:8.13.4",
).with_exposed_ports(9200)
def create_online_store(self) -> Dict[str, str]:
@@ -20,6 +20,8 @@ def create_online_store(self) -> Dict[str, str]:
"host": "localhost",
"type": "elasticsearch",
"port": self.container.get_exposed_port(9200),
+ "vector_len": 2,
+ "similarity": "cosine"
}
def teardown(self):
diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py
index 5d6462e5e3..9beba4d72b 100644
--- a/sdk/python/tests/integration/online_store/test_universal_online.py
+++ b/sdk/python/tests/integration/online_store/test_universal_online.py
@@ -789,7 +789,7 @@ def assert_feature_service_entity_mapping_correctness(
@pytest.mark.integration
-@pytest.mark.universal_online_stores(only=["pgvector"])
+@pytest.mark.universal_online_stores(only=["pgvector", "elasticsearch"])
def test_retrieve_online_documents(environment, fake_document_data):
fs = environment.feature_store
df, data_source = fake_document_data
diff --git a/setup.py b/setup.py
index 7d7c774691..9181e64c2f 100644
--- a/setup.py
+++ b/setup.py
@@ -150,7 +150,7 @@
DELTA_REQUIRED = ["deltalake"]
-ELASTICSEARCH_REQUIRED = ["elasticsearch==8.13.4"]
+ELASTICSEARCH_REQUIRED = ["elasticsearch>=8.13.0"]
CI_REQUIRED = (
[
From ff9612360bf4638c2120209195d25771d5963305 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sun, 12 May 2024 16:29:40 -0700
Subject: [PATCH 15/17] format
Signed-off-by: cmuhao
---
sdk/python/feast/infra/online_stores/contrib/elasticsearch.py | 1 -
.../feature_repos/universal/online_store/elasticsearch.py | 2 +-
2 files changed, 1 insertion(+), 2 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
index a9132ab7a1..4b30e6221c 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
@@ -8,7 +8,6 @@
import pytz
from elasticsearch import Elasticsearch, helpers
-from pydantic import PositiveInt
from feast import Entity, FeatureView, RepoConfig
from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key
diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
index 0e35efc981..16b8a8dd6b 100644
--- a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
+++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
@@ -21,7 +21,7 @@ def create_online_store(self) -> Dict[str, str]:
"type": "elasticsearch",
"port": self.container.get_exposed_port(9200),
"vector_len": 2,
- "similarity": "cosine"
+ "similarity": "cosine",
}
def teardown(self):
From d76bce6fb5345a700e73d313b0e2d556671b5db0 Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sun, 12 May 2024 17:08:05 -0700
Subject: [PATCH 16/17] fix test
Signed-off-by: cmuhao
---
docs/reference/online-stores/elasticsearch.md | 76 +++++++++++++++++++
sdk/python/feast/feature_store.py | 6 +-
.../feast/infra/passthrough_provider.py | 2 +-
sdk/python/tests/foo_provider.py | 2 +-
.../universal/online_store/elasticsearch.py | 2 +-
5 files changed, 82 insertions(+), 6 deletions(-)
create mode 100644 docs/reference/online-stores/elasticsearch.md
diff --git a/docs/reference/online-stores/elasticsearch.md b/docs/reference/online-stores/elasticsearch.md
new file mode 100644
index 0000000000..624bbbf065
--- /dev/null
+++ b/docs/reference/online-stores/elasticsearch.md
@@ -0,0 +1,76 @@
+# ElasticSearch online store (contrib)
+
+## Description
+
+The ElasticSearch online store provides support for materializing tabular feature values, as well as embedding feature vectors, into an ElasticSearch index for serving online features. \
+The embedding feature vectors are stored as dense vectors, and can be used for similarity search. More information on dense vectors can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html).
+
+## Getting started
+In order to use this online store, you'll need to run `pip install 'feast[elasticsearch]'`. You can get started by then running `feast init -t elasticsearch`.
+
+## Example
+
+{% code title="feature_store.yaml" %}
+```yaml
+project: my_feature_repo
+registry: data/registry.db
+provider: local
+online_store:
+ type: elasticsearch
+ host: ES_HOST
+ port: ES_PORT
+ user: ES_USERNAME
+ password: ES_PASSWORD
+ vector_len: 512
+ write_batch_size: 1000
+```
+{% endcode %}
+
+The full set of configuration options is available in [ElasticsearchOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.contrib.elasticsearch.ElasticsearchOnlineStoreConfig).
+
+## Functionality Matrix
+
+
+| | Postgres |
+| :-------------------------------------------------------- | :------- |
+| write feature values to the online store | yes |
+| read feature values from the online store | yes |
+| update infrastructure (e.g. tables) in the online store | yes |
+| teardown infrastructure (e.g. tables) in the online store | yes |
+| generate a plan of infrastructure changes | no |
+| support for on-demand transforms | yes |
+| readable by Python SDK | yes |
+| readable by Java | no |
+| readable by Go | no |
+| support for entityless feature views | yes |
+| support for concurrent writing to the same key | no |
+| support for ttl (time to live) at retrieval | no |
+| support for deleting expired data | no |
+| collocated by feature view | yes |
+| collocated by feature service | no |
+| collocated by entity key | no |
+
+To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix).
+
+## Retrieving online document vectors
+
+The ElasticSearch online store supports retrieving document vectors for a given list of entity keys. The document vectors are returned as a dictionary where the key is the entity key and the value is the document vector. The document vector is a dense vector of floats.
+
+{% code title="python" %}
+```python
+from feast import FeatureStore
+
+feature_store = FeatureStore(repo_path="feature_store.yaml")
+
+query_vector = [1.0, 2.0, 3.0, 4.0, 5.0]
+top_k = 5
+
+# Retrieve the top k closest features to the query vector
+
+feature_values = feature_store.retrieve_online_documents(
+ feature="my_feature",
+ query=query_vector,
+ top_k=top_k
+)
+```
+{% endcode %}
diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py
index f45dbb1bc8..6ef621523a 100644
--- a/sdk/python/feast/feature_store.py
+++ b/sdk/python/feast/feature_store.py
@@ -1740,7 +1740,7 @@ def retrieve_online_documents(
feature: str,
query: Union[str, List[float]],
top_k: int,
- distance_metric: str,
+ distance_metric: Optional[str] = None,
) -> OnlineResponse:
"""
Retrieves the top k closest document features. Note, embeddings are a subset of features.
@@ -1765,7 +1765,7 @@ def _retrieve_online_documents(
feature: str,
query: Union[str, List[float]],
top_k: int,
- distance_metric: str = "L2",
+ distance_metric: Optional[str] = None,
):
if isinstance(query, str):
raise ValueError(
@@ -2030,7 +2030,7 @@ def _retrieve_from_online_store(
requested_feature: str,
query: List[float],
top_k: int,
- distance_metric: str,
+ distance_metric: Optional[str],
) -> List[Tuple[Timestamp, "FieldStatus.ValueType", Value, Value, Value]]:
"""
Search and return document features from the online document store.
diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py
index 2f3e30018a..52d9a73dc5 100644
--- a/sdk/python/feast/infra/passthrough_provider.py
+++ b/sdk/python/feast/infra/passthrough_provider.py
@@ -196,7 +196,7 @@ def retrieve_online_documents(
requested_feature: str,
query: List[float],
top_k: int,
- distance_metric: str,
+ distance_metric: Optional[str] = None,
) -> List:
set_usage_attribute("provider", self.__class__.__name__)
result = []
diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py
index f869d82e11..456ceefb6f 100644
--- a/sdk/python/tests/foo_provider.py
+++ b/sdk/python/tests/foo_provider.py
@@ -111,7 +111,7 @@ def retrieve_online_documents(
requested_feature: str,
query: List[float],
top_k: int,
- distance_metric: str,
+ distance_metric: Optional[str] = None,
) -> List[
Tuple[
Optional[datetime],
diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
index 16b8a8dd6b..c62a9009ca 100644
--- a/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
+++ b/sdk/python/tests/integration/feature_repos/universal/online_store/elasticsearch.py
@@ -11,7 +11,7 @@ class ElasticSearchOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.container = ElasticSearchContainer(
- "elasticsearch:8.13.4",
+ "elasticsearch:8.3.3",
).with_exposed_ports(9200)
def create_online_store(self) -> Dict[str, str]:
From bdb973e2c48a491d73491d1a559f0edb8ce2012d Mon Sep 17 00:00:00 2001
From: cmuhao
Date: Sun, 12 May 2024 17:26:06 -0700
Subject: [PATCH 17/17] fix test
Signed-off-by: cmuhao
---
docs/reference/alpha-vector-database.md | 2 +-
docs/reference/online-stores/elasticsearch.md | 49 +++++++++++++++++++
.../online_stores/contrib/elasticsearch.py | 7 +++
3 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/docs/reference/alpha-vector-database.md b/docs/reference/alpha-vector-database.md
index 3b0c924d84..37d9b9cdf8 100644
--- a/docs/reference/alpha-vector-database.md
+++ b/docs/reference/alpha-vector-database.md
@@ -10,7 +10,7 @@ Below are supported vector databases and implemented features:
| Vector Database | Retrieval | Indexing |
|-----------------|-----------|----------|
| Pgvector | [x] | [ ] |
-| Elasticsearch | [ ] | [ ] |
+| Elasticsearch | [x] | [x] |
| Milvus | [ ] | [ ] |
| Faiss | [ ] | [ ] |
diff --git a/docs/reference/online-stores/elasticsearch.md b/docs/reference/online-stores/elasticsearch.md
index 624bbbf065..bf6f9a58db 100644
--- a/docs/reference/online-stores/elasticsearch.md
+++ b/docs/reference/online-stores/elasticsearch.md
@@ -74,3 +74,52 @@ feature_values = feature_store.retrieve_online_documents(
)
```
{% endcode %}
+
+## Indexing
+Currently, the indexing mapping in the ElasticSearch online store is configured as:
+
+{% code title="indexing_mapping" %}
+```json
+"properties": {
+ "entity_key": {"type": "binary"},
+ "feature_name": {"type": "keyword"},
+ "feature_value": {"type": "binary"},
+ "timestamp": {"type": "date"},
+ "created_ts": {"type": "date"},
+ "vector_value": {
+ "type": "dense_vector",
+ "dims": config.online_store.vector_len,
+ "index": "true",
+ "similarity": config.online_store.similarity,
+ },
+}
+```
+{% endcode %}
+And the online_read API mapping is configured as:
+
+{% code title="online_read_mapping" %}
+```json
+"query": {
+ "bool": {
+ "must": [
+ {"terms": {"entity_key": entity_keys}},
+ {"terms": {"feature_name": requested_features}},
+ ]
+ }
+},
+```
+{% endcode %}
+
+And the similarity search API mapping is configured as:
+
+{% code title="similarity_search_mapping" %}
+```json
+{
+ "field": "vector_value",
+ "query_vector": embedding_vector,
+ "k": top_k,
+}
+```
+{% endcode %}
+
+These APIs are subject to change in future versions of Feast to improve performance and usability.
\ No newline at end of file
diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
index 4b30e6221c..429327e651 100644
--- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
+++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py
@@ -157,6 +157,13 @@ def online_read(
return results
def create_index(self, config: RepoConfig, table: FeatureView):
+ """
+ Create an index in ElasticSearch for the given table.
+ TODO: This method can be exposed to users to customize the indexing functionality.
+ Args:
+ config: Feast repo configuration object.
+ table: FeatureView table for which the index needs to be created.
+ """
index_mapping = {
"properties": {
"entity_key": {"type": "binary"},