From 6b7c98a121c352d2fdba180e5d08c3009c55c4fd Mon Sep 17 00:00:00 2001 From: yassinnouh21 Date: Wed, 2 Apr 2025 18:41:30 +0200 Subject: [PATCH 1/5] feat: qdrant add `retrieve_online_documents_v2` Signed-off-by: yassinnouh21 --- .../qdrant_online_store/qdrant.py | 310 +++++++++++++++++- 1 file changed, 292 insertions(+), 18 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py index 2a2fafdab1c..9a8297fc593 100644 --- a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py +++ b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py @@ -1,17 +1,16 @@ from __future__ import absolute_import import base64 -import json import logging import uuid from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, cast from qdrant_client import QdrantClient, models from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import ( - get_list_val_str, + deserialize_entity_key, serialize_entity_key, ) from feast.infra.online_stores.online_store import OnlineStore @@ -60,6 +59,10 @@ class QdrantOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig): # If `true`, each request will explicitly wait for the confirmation of completion. Might be slower. # If `false`, each reequest will return immediately after receiving an acknowledgement. upload_wait: bool = True + # Enable text indexing for faster text searches + enable_text_index: bool = True + # Search parameters for HNSW + hnsw_ef: int = 128 class QdrantOnlineStore(OnlineStore): @@ -107,6 +110,7 @@ def online_write_batch( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ) + entity_key_str = base64.b64encode(entity_key_bin).decode("utf-8") timestamp = to_naive_utc(timestamp) if created_ts is not None: @@ -115,20 +119,40 @@ def online_write_batch( encoded_value = base64.b64encode(value.SerializeToString()).decode( "utf-8" ) - vector_val = json.loads(get_list_val_str(value)) - points.append( - models.PointStruct( - id=uuid.uuid4().hex, - payload={ - "entity_key": entity_key_bin, - "feature_name": feature_name, - "feature_value": encoded_value, - "timestamp": timestamp, - "created_ts": created_ts, - }, - vector={config.online_store.vector_name: vector_val}, + + payload = { + "entity_key": entity_key_str, + "feature_name": feature_name, + "feature_value": encoded_value, + "timestamp": timestamp, + "created_ts": created_ts, + } + + if config.online_store.enable_text_index and value.HasField( + "string_val" + ): + payload["text_value"] = value.string_val + + if feature_name == config.online_store.vector_name: + vector_val = list(value.float_list_val.val) + points.append( + models.PointStruct( + id=uuid.uuid4().hex, + payload=payload, + vector={config.online_store.vector_name: vector_val}, + ) + ) + else: + points.append( + models.PointStruct( + id=uuid.uuid4().hex, + payload=payload, + vector={ + config.online_store.vector_name: [0.0] + * config.online_store.vector_len + }, + ) ) - ) self._get_client(config).upload_points( collection_name=table.name, @@ -216,6 +240,16 @@ def create_collection(self, config: RepoConfig, table: FeatureView): field_schema=models.PayloadSchemaType.KEYWORD, ) + if config.online_store.enable_text_index: + try: + client.create_payload_index( + collection_name=table.name, + field_name="text_value", + field_schema=models.PayloadSchemaType.TEXT, + ) + except Exception: + logging.warning("Failed to create text index") + def update( self, config: RepoConfig, @@ -240,8 +274,8 @@ def teardown( try: for table in tables: self._get_client(config).delete_collection(collection_name=table.name) - except Exception as e: - logging.exception(f"Error deleting collection in project {project}: {e}") + except Exception: + logging.exception(f"Error deleting collection in project {project}") raise def retrieve_online_documents( @@ -309,3 +343,243 @@ def retrieve_online_documents( ) ) return result + + def retrieve_online_documents_v2( + self, + config: RepoConfig, + table: FeatureView, + requested_features: List[str], + embedding: Optional[List[float]], + top_k: int, + distance_metric: Optional[str] = None, + query_string: Optional[str] = None, + ) -> List[ + Tuple[ + Optional[datetime], + Optional[EntityKeyProto], + Optional[Dict[str, ValueProto]], + ] + ]: + """ + Retrieves online feature values for the specified embeddings or query string. + + Args: + config: The config for the current feature store. + table: The feature view whose feature values should be read. + requested_features: The list of features to retrieve. + embedding: The embeddings to use for retrieval (optional). + top_k: The number of documents to retrieve. + distance_metric: Distance metric to use for retrieval (optional). + query_string: The query string to search for using keyword search (optional). + + Returns: + List of tuples containing the event timestamp, entity key, and a dictionary of + feature name to feature values. + """ + if embedding is None and query_string is None: + raise ValueError("Either embedding or query_string must be provided") + + if distance_metric and distance_metric.lower() not in DISTANCE_MAPPING: + raise ValueError(f"Unsupported distance metric: {distance_metric}") + + client = self._get_client(config) + result = [] + + # Vector search + if embedding is not None: + search_params = {} + if distance_metric: + search_params["search_params"] = models.SearchParams( + hnsw_ef=config.online_store.hnsw_ef, exact=False + ) + + filter_conditions = None + if query_string is not None and config.online_store.enable_text_index: + # Use text index for efficient search if available + filter_conditions = models.Filter( + must=[ + models.FieldCondition( + key="text_value", + match=models.MatchText(text=query_string), + ), + models.Filter( + should=list([ + models.FieldCondition( + key="feature_name", + match=models.MatchValue(value=feat_name), + ) + for feat_name in requested_features + ]), + must_not=None, + ), + ] + ) + elif query_string is not None: + # Fallback to filtering by feature name only + text_conditions = [] + for feature_name in requested_features: + text_conditions.append( + models.FieldCondition( + key="feature_name", + match=models.MatchValue(value=feature_name), + ) + ) + + filter_conditions = models.Filter( + must=[ + models.Filter( + should=list(text_conditions), + must_not=None, + ) + ] + ) + + points = client.query_points( + collection_name=table.name, + query=embedding, + query_filter=filter_conditions, + limit=top_k, + with_payload=True, + with_vectors=False, + search_params=search_params, + using=config.online_store.vector_name or None, + ).points + elif query_string is not None: + if config.online_store.enable_text_index: + filter_conditions = models.Filter( + must=[ + models.FieldCondition( + key="text_value", + match=models.MatchText(text=query_string), + ), + models.Filter( + should=list([ + models.FieldCondition( + key="feature_name", + match=models.MatchValue(value=feat_name), + ) + for feat_name in requested_features + ]), + must_not=None, + ), + ] + ) + + points = client.query_points( + collection_name=table.name, + query=[0.0] + * config.online_store.vector_len, # Dummy vector for text-only search + query_filter=filter_conditions, + limit=top_k, + with_payload=True, + with_vectors=False, + using=config.online_store.vector_name or None, + ).points + else: + # Fallback to scrolling if text index is not enabled + points = [] + next_offset = None + stop_scrolling = False + + # Create a filter for specific feature names to reduce data to scan + feature_filter = models.Filter( + should=list([ + models.FieldCondition( + key="feature_name", + match=models.MatchValue(value=feat_name), + ) + for feat_name in requested_features + ]) + ) + + while not stop_scrolling: + records, next_offset = client.scroll( + collection_name=table.name, + limit=SCROLL_SIZE, + offset=next_offset, + with_payload=True, + with_vectors=False, + scroll_filter=feature_filter, + ) + stop_scrolling = next_offset is None + + # Filter records that contain the query string + for record in records: + if record.payload and "feature_name" in record.payload: + # Try to decode the feature value and check if it contains the query string + try: + feature_value = record.payload.get("feature_value", "") + if feature_value is not None: + decoded_value = base64.b64decode(feature_value).decode( + "utf-8", errors="ignore" + ) + if query_string.lower() in decoded_value.lower(): + # Cast to ScoredPoint type for compatibility + points.append(record) # type: ignore + + except Exception: + continue + + # Limit to top_k results + if len(points) >= top_k: + points = points[:top_k] + break + + # Process and format results + for point in points: + payload = point.payload or {} + + # Handle entity_key + raw_entity_key = payload.get("entity_key") + if not raw_entity_key: + continue + entity_key_str = cast(str, raw_entity_key) + + # Handle feature name + feature_name = payload.get("feature_name", "") + feature_value_encoded = payload.get("feature_value") + + if not all([entity_key_str, feature_name, feature_value_encoded]): + continue + + # Handle timestamp + timestamp_val = payload.get("timestamp") + if timestamp_val is None: + continue + + # Convert to string explicitly with type checking + if isinstance(timestamp_val, (str, int, float, datetime)): + timestamp_str = str(timestamp_val) + if not timestamp_str: + continue + + try: + timestamp = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%f") + except ValueError: + continue + else: + continue + + entity_key_proto = EntityKeyProto() + if isinstance(entity_key_str, str): + entity_key_bin = base64.b64decode(entity_key_str) + entity_key_proto = deserialize_entity_key( + entity_key_bin, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + + feature_value_proto = ValueProto() + if isinstance(feature_value_encoded, str): + feature_value_proto.ParseFromString(base64.b64decode(feature_value_encoded)) + + # Create a dictionary with the feature values + feature_values = {feature_name: feature_value_proto} + + result.append((timestamp, entity_key_proto, feature_values)) + + return_result: List[ + Tuple[Optional[datetime], Optional[EntityKeyProto], Optional[Dict[str, ValueProto]]] + ] = [] + for entry in result: + return_result.append(entry) + return return_result From c1ef1090ed12a9f97b9d70c5d12342893f027bd6 Mon Sep 17 00:00:00 2001 From: yassinnouh21 Date: Wed, 2 Apr 2025 18:42:13 +0200 Subject: [PATCH 2/5] formatting Signed-off-by: yassinnouh21 --- .../qdrant_online_store/qdrant.py | 74 +++++++++++-------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py index 9a8297fc593..589a37e0456 100644 --- a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py +++ b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py @@ -403,13 +403,15 @@ def retrieve_online_documents_v2( match=models.MatchText(text=query_string), ), models.Filter( - should=list([ - models.FieldCondition( - key="feature_name", - match=models.MatchValue(value=feat_name), - ) - for feat_name in requested_features - ]), + should=list( + [ + models.FieldCondition( + key="feature_name", + match=models.MatchValue(value=feat_name), + ) + for feat_name in requested_features + ] + ), must_not=None, ), ] @@ -453,13 +455,15 @@ def retrieve_online_documents_v2( match=models.MatchText(text=query_string), ), models.Filter( - should=list([ - models.FieldCondition( - key="feature_name", - match=models.MatchValue(value=feat_name), - ) - for feat_name in requested_features - ]), + should=list( + [ + models.FieldCondition( + key="feature_name", + match=models.MatchValue(value=feat_name), + ) + for feat_name in requested_features + ] + ), must_not=None, ), ] @@ -483,13 +487,15 @@ def retrieve_online_documents_v2( # Create a filter for specific feature names to reduce data to scan feature_filter = models.Filter( - should=list([ - models.FieldCondition( - key="feature_name", - match=models.MatchValue(value=feat_name), - ) - for feat_name in requested_features - ]) + should=list( + [ + models.FieldCondition( + key="feature_name", + match=models.MatchValue(value=feat_name), + ) + for feat_name in requested_features + ] + ) ) while not stop_scrolling: @@ -510,9 +516,9 @@ def retrieve_online_documents_v2( try: feature_value = record.payload.get("feature_value", "") if feature_value is not None: - decoded_value = base64.b64decode(feature_value).decode( - "utf-8", errors="ignore" - ) + decoded_value = base64.b64decode( + feature_value + ).decode("utf-8", errors="ignore") if query_string.lower() in decoded_value.lower(): # Cast to ScoredPoint type for compatibility points.append(record) # type: ignore @@ -528,13 +534,13 @@ def retrieve_online_documents_v2( # Process and format results for point in points: payload = point.payload or {} - + # Handle entity_key raw_entity_key = payload.get("entity_key") if not raw_entity_key: continue entity_key_str = cast(str, raw_entity_key) - + # Handle feature name feature_name = payload.get("feature_name", "") feature_value_encoded = payload.get("feature_value") @@ -546,13 +552,13 @@ def retrieve_online_documents_v2( timestamp_val = payload.get("timestamp") if timestamp_val is None: continue - + # Convert to string explicitly with type checking if isinstance(timestamp_val, (str, int, float, datetime)): timestamp_str = str(timestamp_val) if not timestamp_str: continue - + try: timestamp = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%f") except ValueError: @@ -561,7 +567,7 @@ def retrieve_online_documents_v2( continue entity_key_proto = EntityKeyProto() - if isinstance(entity_key_str, str): + if isinstance(entity_key_str, str): entity_key_bin = base64.b64decode(entity_key_str) entity_key_proto = deserialize_entity_key( entity_key_bin, @@ -570,7 +576,9 @@ def retrieve_online_documents_v2( feature_value_proto = ValueProto() if isinstance(feature_value_encoded, str): - feature_value_proto.ParseFromString(base64.b64decode(feature_value_encoded)) + feature_value_proto.ParseFromString( + base64.b64decode(feature_value_encoded) + ) # Create a dictionary with the feature values feature_values = {feature_name: feature_value_proto} @@ -578,7 +586,11 @@ def retrieve_online_documents_v2( result.append((timestamp, entity_key_proto, feature_values)) return_result: List[ - Tuple[Optional[datetime], Optional[EntityKeyProto], Optional[Dict[str, ValueProto]]] + Tuple[ + Optional[datetime], + Optional[EntityKeyProto], + Optional[Dict[str, ValueProto]], + ] ] = [] for entry in result: return_result.append(entry) From 660fe50f1823e8e989a23f1f59b1f373197312f6 Mon Sep 17 00:00:00 2001 From: yassinnouh21 Date: Wed, 2 Apr 2025 18:44:10 +0200 Subject: [PATCH 3/5] feat: add Qdrant online store support and retrieval tests Signed-off-by: yassinnouh21 --- .../feature_repos/repo_configuration.py | 15 +- .../online_store/test_universal_online.py | 73 +++++++ .../online_store/test_online_retrieval.py | 195 ++++++++++++++++++ sdk/python/tests/utils/cli_repo_creator.py | 17 ++ 4 files changed, 299 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index c24976ffcfc..653051e9cec 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -81,6 +81,9 @@ from tests.integration.feature_repos.universal.online_store.milvus import ( MilvusOnlineStoreCreator, ) +from tests.integration.feature_repos.universal.online_store.qdrant import ( + QdrantOnlineStoreCreator, +) from tests.integration.feature_repos.universal.online_store.redis import ( RedisOnlineStoreCreator, ) @@ -149,7 +152,13 @@ AVAILABLE_ONLINE_STORES: Dict[ str, Tuple[Union[str, Dict[Any, Any]], Optional[Type[OnlineStoreCreator]]] -] = {"sqlite": ({"type": "sqlite"}, None)} +] = { + "sqlite": ({"type": "sqlite"}, None), + "qdrant": ( + {"type": "qdrant", "vector_len": 2, "similarity": "cosine"}, + QdrantOnlineStoreCreator, + ), +} # Only configure Cloud DWH if running full integration tests if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True": @@ -167,6 +176,10 @@ AVAILABLE_ONLINE_STORES["snowflake"] = (SNOWFLAKE_CONFIG, None) AVAILABLE_ONLINE_STORES["bigtable"] = (BIGTABLE_CONFIG, None) AVAILABLE_ONLINE_STORES["milvus"] = (MILVUS_CONFIG, None) + AVAILABLE_ONLINE_STORES["qdrant"] = ( + {"type": "qdrant", "vector_len": 2, "similarity": "cosine"}, + QdrantOnlineStoreCreator, + ) # Uncomment to test using private IKV account. Currently not enabled as # there is no dedicated IKV instance for CI testing and there is no diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 3445cd27aa3..a8e1308cb49 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -919,3 +919,76 @@ def test_retrieve_online_milvus_documents(environment, fake_document_data): assert len(documents["item_id"]) == 2 assert documents["item_id"] == [2, 3] + + +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["qdrant"]) +def test_retrieve_online_qdrant_documents_v2(environment, fake_document_data): + fs = environment.feature_store + df, data_source = fake_document_data + item_embeddings_feature_view = create_item_embeddings_feature_view(data_source) + fs.apply([item_embeddings_feature_view, item()]) + fs.write_to_online_store("item_embeddings", df) + + # Test vector search + vector_documents = fs.retrieve_online_documents_v2( + features=[ + "item_embeddings:embedding_float", + "item_embeddings:item_id", + "item_embeddings:string_feature", + ], + query=[1.0, 2.0], + top_k=2, + distance_metric="cosine", + ).to_dict() + + assert len(vector_documents["embedding_float"]) > 0 + assert len(vector_documents["item_id"]) > 0 + assert len(vector_documents["string_feature"]) > 0 + + # Test hybrid search (vector + text) + hybrid_documents = fs.retrieve_online_documents_v2( + features=[ + "item_embeddings:embedding_float", + "item_embeddings:item_id", + "item_embeddings:string_feature", + ], + query=[1.0, 2.0], + top_k=2, + distance_metric="cosine", + query_string="item", + ).to_dict() + + assert len(hybrid_documents["embedding_float"]) >= 0 + + try: + text_documents = fs.retrieve_online_documents_v2( + features=[ + "item_embeddings:embedding_float", + "item_embeddings:item_id", + "item_embeddings:string_feature", + ], + query=None, + top_k=2, + query_string="item", + ).to_dict() + + if text_documents and len(text_documents.get("string_feature", [])) > 0: + assert all( + "item" in str(feature).lower() + for feature in text_documents["string_feature"] + ) + except Exception as e: + pytest.fail(f"Text search failed: {e}") + + # Test error case + with pytest.raises(ValueError): + fs.retrieve_online_documents_v2( + features=[ + "item_embeddings:embedding_float", + "item_embeddings:item_id", + "item_embeddings:string_feature", + ], + query=None, + top_k=2, + ) diff --git a/sdk/python/tests/unit/online_store/test_online_retrieval.py b/sdk/python/tests/unit/online_store/test_online_retrieval.py index 409a729ceee..16a3e60e50b 100644 --- a/sdk/python/tests/unit/online_store/test_online_retrieval.py +++ b/sdk/python/tests/unit/online_store/test_online_retrieval.py @@ -11,6 +11,7 @@ import pytest import sqlite_vec from pandas.testing import assert_frame_equal +from qdrant_client import models from feast import FeatureStore, RepoConfig from feast.errors import FeatureViewNotFoundException @@ -1659,3 +1660,197 @@ def test_milvus_keyword_search() -> None: assert len(result_hybrid["content"]) > 0 assert any("Feast" in content for content in result_hybrid["content"]) assert len(result_hybrid["vector"]) > 0 + + +def test_qdrant_retrieve_online_documents_v2() -> None: + """ + Test retrieving documents from the online store in local mode. + """ + random.seed(42) + n = 10 # number of samples + vector_length = 10 + runner = CliRunner() + with runner.local_repo( + example_repo_py=get_example_repo("example_rag_feature_repo.py"), + offline_store="file", + online_store="qdrant", + apply=False, + teardown=False, + ) as store: + from datetime import timedelta + + from feast import Entity, FeatureView, Field, FileSource + from feast.types import Array, Float32, Int64, String, UnixTimestamp + + rag_documents_source = FileSource( + path="data/embedded_documents.parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + ) + + item = Entity( + name="item_id", + join_keys=["item_id"], + ) + author = Entity( + name="author_id", + join_keys=["author_id"], + value_type=ValueType.STRING, + ) + + document_embeddings = FeatureView( + name="embedded_documents", + entities=[item, author], + schema=[ + Field( + name="vector", + dtype=Array(Float32), + vector_index=True, + vector_search_metric="COSINE", + ), + Field(name="item_id", dtype=Int64), + Field(name="author_id", dtype=String), + Field(name="created_timestamp", dtype=UnixTimestamp), + Field(name="sentence_chunks", dtype=String), + Field(name="event_timestamp", dtype=UnixTimestamp), + ], + source=rag_documents_source, + ttl=timedelta(hours=24), + ) + + store.apply([rag_documents_source, item, document_embeddings]) + + document_embeddings_fv = store.get_feature_view(name="embedded_documents") + + provider = store._get_provider() + + item_keys = [ + EntityKeyProto( + join_keys=["item_id", "author_id"], + entity_values=[ + ValueProto(int64_val=i), + ValueProto(string_val=f"author_{i}"), + ], + ) + for i in range(n) + ] + data = [] + for i, item_key in enumerate(item_keys): + data.append( + ( + item_key, + { + "vector": ValueProto( + float_list_val=FloatListProto( + val=np.random.random( + vector_length, + ) + + i + ) + ), + "sentence_chunks": ValueProto(string_val=f"sentence chunk {i}"), + }, + _utc_now(), + _utc_now(), + ) + ) + + provider.online_write_batch( + config=store.config, + table=document_embeddings_fv, + data=data, + progress=None, + ) + documents_df = pd.DataFrame( + { + "item_id": [str(i) for i in range(n)], + "author_id": [f"author_{i}" for i in range(n)], + "vector": [ + np.random.random( + vector_length, + ) + + i + for i in range(n) + ], + "sentence_chunks": [f"sentence chunk {i}" for i in range(n)], + "event_timestamp": [_utc_now() for _ in range(n)], + "created_timestamp": [_utc_now() for _ in range(n)], + } + ) + + store.write_to_online_store( + feature_view_name="embedded_documents", + df=documents_df, + ) + + query_embedding = np.random.random( + vector_length, + ) + + # Test direct Qdrant client search + client = store._provider._online_store._get_client(store.config) + collection_name = document_embeddings_fv.name + search_params = models.SearchParams(hnsw_ef=128, exact=False) + + results = client.query_points( + collection_name=collection_name, + query=query_embedding, + limit=3, + with_payload=True, + with_vectors=False, + search_params=search_params, + using=store.config.online_store.vector_name or None, + ).points + + # Test Feast API + result = store.retrieve_online_documents_v2( + features=[ + "embedded_documents:vector", + "embedded_documents:item_id", + "embedded_documents:author_id", + "embedded_documents:sentence_chunks", + ], + query=query_embedding, + top_k=3, + ).to_dict() + + # Verify results + for k in ["vector", "item_id", "author_id", "sentence_chunks", "distance"]: + assert k in result, f"Missing {k} in retrieve_online_documents response" + assert len(result["distance"]) == len(results) + + # Test hybrid search (vector + text) + hybrid_result = store.retrieve_online_documents_v2( + features=[ + "embedded_documents:vector", + "embedded_documents:item_id", + "embedded_documents:author_id", + "embedded_documents:sentence_chunks", + ], + query=query_embedding, + query_string="sentence", + top_k=3, + ).to_dict() + + # Verify hybrid results + for k in ["vector", "item_id", "author_id", "sentence_chunks", "distance"]: + assert k in hybrid_result, f"Missing {k} in hybrid search response" + assert len(hybrid_result["distance"]) > 0 + + # Test text-only search + text_result = store.retrieve_online_documents_v2( + features=[ + "embedded_documents:vector", + "embedded_documents:item_id", + "embedded_documents:author_id", + "embedded_documents:sentence_chunks", + ], + query=None, + query_string="sentence", + top_k=3, + ).to_dict() + + # Verify text search results + for k in ["vector", "item_id", "author_id", "sentence_chunks", "distance"]: + assert k in text_result, f"Missing {k} in text search response" + assert len(text_result["distance"]) > 0 diff --git a/sdk/python/tests/utils/cli_repo_creator.py b/sdk/python/tests/utils/cli_repo_creator.py index 4b8f9aad04b..a82f0b0bb89 100644 --- a/sdk/python/tests/utils/cli_repo_creator.py +++ b/sdk/python/tests/utils/cli_repo_creator.py @@ -103,6 +103,23 @@ def local_repo( entity_key_serialization_version: 3 """ ) + elif online_store == "qdrant": + yaml_config = dedent( + f""" + project: {project_id} + registry: {data_path / "registry.db"} + provider: local + online_store: + type: qdrant + location: ":memory:" + vector_len: 10 + similarity: "cosine" + vector_name: "vector" + offline_store: + type: {offline_store} + entity_key_serialization_version: 3 + """ + ) else: pass From 3a4d4e0f7ce6aa8a3635fc84c3e8cef1d4576cef Mon Sep 17 00:00:00 2001 From: Yassin Nouh <70436855+YassinNouh21@users.noreply.github.com> Date: Sat, 5 Apr 2025 22:19:39 +0200 Subject: [PATCH 4/5] Fix Qdrant entity key handling in online store implementation --- .../qdrant_online_store/qdrant.py | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py index d68f1f53a99..5193711bd61 100644 --- a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py +++ b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py @@ -174,10 +174,20 @@ def online_read( ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: conditions: List[models.Condition] = [] if entity_keys: + # Convert entity keys to the string format stored in Qdrant + entity_key_strs = [] + for entity_key in entity_keys: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + entity_key_str = base64.b64encode(entity_key_bin).decode("utf-8") + entity_key_strs.append(entity_key_str) + conditions.append( models.FieldCondition( key="entity_key", - match=models.MatchAny(any=entity_keys), # type: ignore + match=models.MatchAny(any=entity_key_strs), ) ) @@ -325,7 +335,10 @@ def retrieve_online_documents( ) for point in points: payload = point.payload or {} - entity_key = str(payload.get("entity_key")) + entity_key_str = payload.get("entity_key") + if not entity_key_str: + continue + feature_value = str(payload.get("feature_value")) timestamp_str = str(payload.get("timestamp")) timestamp = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%f") @@ -338,7 +351,7 @@ def retrieve_online_documents( result.append( _build_retrieve_online_document_record( - entity_key, + entity_key_str, base64.b64decode(feature_value), vector_value, distance, @@ -543,8 +556,9 @@ def retrieve_online_documents_v2( raw_entity_key = payload.get("entity_key") if not raw_entity_key: continue - entity_key_str = cast(str, raw_entity_key) - + + entity_key_str = raw_entity_key + # Handle feature name feature_name = payload.get("feature_name", "") feature_value_encoded = payload.get("feature_value") From e4e0383db330de12bc11b2b289b4a1a24809d4ab Mon Sep 17 00:00:00 2001 From: Yassin Nouh <70436855+YassinNouh21@users.noreply.github.com> Date: Sat, 5 Apr 2025 22:20:38 +0200 Subject: [PATCH 5/5] Fix entity key deserialization in Qdrant online store --- .../qdrant_online_store/qdrant.py | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py index 5193711bd61..21be7132418 100644 --- a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py +++ b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py @@ -349,16 +349,27 @@ def retrieve_online_documents( else point.vector ) + # Instead of using _build_retrieve_online_document_record directly, handle the deserialization ourselves + # to have more control over the process + entity_key_bin = base64.b64decode(entity_key_str) + entity_key_proto = deserialize_entity_key( + entity_key_bin, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + + feature_value_proto = ValueProto() + feature_value_proto.ParseFromString(base64.b64decode(feature_value)) + + vector_proto = ValueProto() + vector_proto.float_list_val.val.extend(json.loads(vector_value)) + + distance_proto = ValueProto() + distance_proto.double_val = distance + result.append( - _build_retrieve_online_document_record( - entity_key_str, - base64.b64decode(feature_value), - vector_value, - distance, - timestamp, - config.entity_key_serialization_version, - ) + (timestamp, entity_key_proto, feature_value_proto, vector_proto, distance_proto) ) + return result def retrieve_online_documents_v2(