Skip to content

Commit ac44967

Browse files
authored
feat: Enable keyword search for Milvus (#5199)
* feat: Add support for hybrid search with vector and text queries in Milvus Signed-off-by: Yassin Nouh <70436855+YassinNouh21@users.noreply.github.com> Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com> * test: Add keyword and hybrid search tests for Milvus online store Signed-off-by: Yassin Nouh <70436855+YassinNouh21@users.noreply.github.com> Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com> * fix linter Signed-off-by: Yassin Nouh <70436855+YassinNouh21@users.noreply.github.com> Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com> * fix linter 2 Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com> --------- Signed-off-by: Yassin Nouh <70436855+YassinNouh21@users.noreply.github.com> Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com>
1 parent 306acca commit ac44967

File tree

2 files changed

+301
-22
lines changed

2 files changed

+301
-22
lines changed

sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py

Lines changed: 126 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig):
9595
metric_type: Optional[str] = "COSINE"
9696
embedding_dim: Optional[int] = 128
9797
vector_enabled: Optional[bool] = True
98+
text_search_enabled: Optional[bool] = False
9899
nlist: Optional[int] = 128
99100
username: Optional[StrictStr] = ""
100101
password: Optional[StrictStr] = ""
@@ -492,7 +493,19 @@ def retrieve_online_documents_v2(
492493
Optional[Dict[str, ValueProto]],
493494
]
494495
]:
495-
assert embedding is not None, "Key Word Search not yet implemented for Milvus"
496+
"""
497+
Retrieve documents using vector similarity search or keyword search in Milvus.
498+
Args:
499+
config: Feast configuration object
500+
table: FeatureView object as the table to search
501+
requested_features: List of requested features to retrieve
502+
embedding: Query embedding to search for (optional)
503+
top_k: Number of items to return
504+
distance_metric: Distance metric to use (optional)
505+
query_string: The query string to search for using keyword search (optional)
506+
Returns:
507+
List of tuples containing the event timestamp, entity key, and feature values
508+
"""
496509
entity_name_feast_primitive_type_map = {
497510
k.name: k.dtype for k in table.entity_columns
498511
}
@@ -502,10 +515,8 @@ def retrieve_online_documents_v2(
502515
if not config.online_store.vector_enabled:
503516
raise ValueError("Vector search is not enabled in the online store config")
504517

505-
search_params = {
506-
"metric_type": distance_metric or config.online_store.metric_type,
507-
"params": {"nprobe": 10},
508-
}
518+
if embedding is None and query_string is None:
519+
raise ValueError("Either embedding or query_string must be provided")
509520

510521
composite_key_name = _get_composite_key_name(table)
511522

@@ -520,25 +531,118 @@ def retrieve_online_documents_v2(
520531
), (
521532
f"field(s) [{[field for field in output_fields if field not in [f['name'] for f in collection['fields']]]}] not found in collection schema"
522533
)
523-
# Note we choose the first vector field as the field to search on. Not ideal but it's something.
534+
535+
# Find the vector search field if we need it
524536
ann_search_field = None
525-
for field in collection["fields"]:
526-
if (
527-
field["type"] in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]
528-
and field["name"] in output_fields
529-
):
530-
ann_search_field = field["name"]
531-
break
537+
if embedding is not None:
538+
for field in collection["fields"]:
539+
if (
540+
field["type"] in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]
541+
and field["name"] in output_fields
542+
):
543+
ann_search_field = field["name"]
544+
break
532545

533546
self.client.load_collection(collection_name)
534-
results = self.client.search(
535-
collection_name=collection_name,
536-
data=[embedding],
537-
anns_field=ann_search_field,
538-
search_params=search_params,
539-
limit=top_k,
540-
output_fields=output_fields,
541-
)
547+
548+
if (
549+
embedding is not None
550+
and query_string is not None
551+
and config.online_store.vector_enabled
552+
):
553+
string_field_list = [
554+
f.name
555+
for f in table.features
556+
if isinstance(f.dtype, PrimitiveFeastType)
557+
and f.dtype.to_value_type() == ValueType.STRING
558+
]
559+
560+
if not string_field_list:
561+
raise ValueError(
562+
"No string fields found in the feature view for text search in hybrid mode"
563+
)
564+
565+
# Create a filter expression for text search
566+
filter_expressions = []
567+
for field in string_field_list:
568+
if field in output_fields:
569+
filter_expressions.append(f"{field} LIKE '%{query_string}%'")
570+
571+
# Combine filter expressions with OR
572+
filter_expr = " OR ".join(filter_expressions) if filter_expressions else ""
573+
574+
# Vector search with text filter
575+
search_params = {
576+
"metric_type": distance_metric or config.online_store.metric_type,
577+
"params": {"nprobe": 10},
578+
}
579+
580+
# For hybrid search, use filter parameter instead of expr
581+
results = self.client.search(
582+
collection_name=collection_name,
583+
data=[embedding],
584+
anns_field=ann_search_field,
585+
search_params=search_params,
586+
limit=top_k,
587+
output_fields=output_fields,
588+
filter=filter_expr if filter_expr else None,
589+
)
590+
591+
elif embedding is not None and config.online_store.vector_enabled:
592+
# Vector search only
593+
search_params = {
594+
"metric_type": distance_metric or config.online_store.metric_type,
595+
"params": {"nprobe": 10},
596+
}
597+
598+
results = self.client.search(
599+
collection_name=collection_name,
600+
data=[embedding],
601+
anns_field=ann_search_field,
602+
search_params=search_params,
603+
limit=top_k,
604+
output_fields=output_fields,
605+
)
606+
607+
elif query_string is not None:
608+
string_field_list = [
609+
f.name
610+
for f in table.features
611+
if isinstance(f.dtype, PrimitiveFeastType)
612+
and f.dtype.to_value_type() == ValueType.STRING
613+
]
614+
615+
if not string_field_list:
616+
raise ValueError(
617+
"No string fields found in the feature view for text search"
618+
)
619+
620+
filter_expressions = []
621+
for field in string_field_list:
622+
if field in output_fields:
623+
filter_expressions.append(f"{field} LIKE '%{query_string}%'")
624+
625+
filter_expr = " OR ".join(filter_expressions)
626+
627+
if not filter_expr:
628+
raise ValueError(
629+
"No text fields found in requested features for search"
630+
)
631+
632+
query_results = self.client.query(
633+
collection_name=collection_name,
634+
filter=filter_expr,
635+
output_fields=output_fields,
636+
limit=top_k,
637+
)
638+
639+
results = [
640+
[{"entity": entity, "distance": -1.0}] for entity in query_results
641+
]
642+
else:
643+
raise ValueError(
644+
"Either vector_enabled must be True for embedding search or query_string must be provided for keyword search"
645+
)
542646

543647
result_list = []
544648
for hits in results:
@@ -559,7 +663,7 @@ def retrieve_online_documents_v2(
559663
# entity_key_proto = None
560664
if field in ["created_ts", "event_ts"]:
561665
res_ts = datetime.fromtimestamp(field_value / 1e6)
562-
elif field == ann_search_field:
666+
elif field == ann_search_field and embedding is not None:
563667
serialized_embedding = _serialize_vector_to_float_list(
564668
embedding
565669
)

sdk/python/tests/unit/online_store/test_online_retrieval.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1484,3 +1484,178 @@ def test_milvus_native_from_feast_data() -> None:
14841484

14851485
# Clean up the collection
14861486
client.drop_collection(collection_name=COLLECTION_NAME)
1487+
1488+
1489+
def test_milvus_keyword_search() -> None:
1490+
"""
1491+
Test retrieving documents from the Milvus online store using keyword search.
1492+
"""
1493+
random.seed(42)
1494+
n = 10 # number of samples
1495+
vector_length = 10
1496+
runner = CliRunner()
1497+
with runner.local_repo(
1498+
example_repo_py=get_example_repo("example_rag_feature_repo.py"),
1499+
offline_store="file",
1500+
online_store="milvus",
1501+
apply=False,
1502+
teardown=False,
1503+
) as store:
1504+
from datetime import timedelta
1505+
1506+
from feast import Entity, FeatureView, Field, FileSource
1507+
from feast.types import Array, Float32, Int64, String, UnixTimestamp
1508+
1509+
rag_documents_source = FileSource(
1510+
path="data/embedded_documents.parquet",
1511+
timestamp_field="event_timestamp",
1512+
created_timestamp_column="created_timestamp",
1513+
)
1514+
1515+
item = Entity(
1516+
name="item_id",
1517+
join_keys=["item_id"],
1518+
value_type=ValueType.INT64,
1519+
)
1520+
author = Entity(
1521+
name="author_id",
1522+
join_keys=["author_id"],
1523+
value_type=ValueType.STRING,
1524+
)
1525+
1526+
document_embeddings = FeatureView(
1527+
name="text_documents",
1528+
entities=[item, author],
1529+
schema=[
1530+
Field(
1531+
name="vector",
1532+
dtype=Array(Float32),
1533+
vector_index=True,
1534+
vector_search_metric="COSINE",
1535+
),
1536+
Field(name="item_id", dtype=Int64),
1537+
Field(name="author_id", dtype=String),
1538+
Field(name="content", dtype=String),
1539+
Field(name="title", dtype=String),
1540+
Field(name="created_timestamp", dtype=UnixTimestamp),
1541+
Field(name="event_timestamp", dtype=UnixTimestamp),
1542+
],
1543+
source=rag_documents_source,
1544+
ttl=timedelta(hours=24),
1545+
)
1546+
1547+
store.apply([rag_documents_source, item, document_embeddings])
1548+
1549+
# Write some data with specific text content for keyword search
1550+
document_embeddings_fv = store.get_feature_view(name="text_documents")
1551+
provider = store._get_provider()
1552+
1553+
contents = [
1554+
"Feast is an open source feature store for machine learning",
1555+
"Feature stores solve the problem of coordinating features for training and serving",
1556+
"Milvus is a vector database that can be used with Feast",
1557+
"Keyword search uses BM25 algorithm for relevance ranking",
1558+
"Vector search uses embeddings for semantic similarity",
1559+
"Python is a popular programming language for machine learning",
1560+
"Feast supports multiple storage backends for online and offline use cases",
1561+
"Online stores are used for low-latency feature serving",
1562+
"Offline stores are used for batch feature retrieval during training",
1563+
"Feast enables data scientists to define, manage, and share features",
1564+
]
1565+
1566+
titles = [
1567+
"Introduction to Feast",
1568+
"Feature Store Benefits",
1569+
"Using Milvus with Feast",
1570+
"Keyword Search Fundamentals",
1571+
"Vector Search Overview",
1572+
"Python for ML",
1573+
"Feast Storage Options",
1574+
"Online Serving with Feast",
1575+
"Offline Training Support",
1576+
"Feast for Data Scientists",
1577+
]
1578+
1579+
item_keys = [
1580+
EntityKeyProto(
1581+
join_keys=["item_id", "author_id"],
1582+
entity_values=[
1583+
ValueProto(int64_val=i),
1584+
ValueProto(string_val=f"author_{i}"),
1585+
],
1586+
)
1587+
for i in range(n)
1588+
]
1589+
data = []
1590+
for i, item_key in enumerate(item_keys):
1591+
data.append(
1592+
(
1593+
item_key,
1594+
{
1595+
"vector": ValueProto(
1596+
float_list_val=FloatListProto(
1597+
val=np.random.random(vector_length)
1598+
)
1599+
),
1600+
"content": ValueProto(string_val=contents[i]),
1601+
"title": ValueProto(string_val=titles[i]),
1602+
},
1603+
_utc_now(),
1604+
_utc_now(),
1605+
)
1606+
)
1607+
1608+
provider.online_write_batch(
1609+
config=store.config,
1610+
table=document_embeddings_fv,
1611+
data=data,
1612+
progress=None,
1613+
)
1614+
1615+
# Test keyword search for "Milvus"
1616+
result_milvus = store.retrieve_online_documents_v2(
1617+
features=[
1618+
"text_documents:content",
1619+
"text_documents:title",
1620+
],
1621+
query_string="Milvus",
1622+
top_k=3,
1623+
).to_dict()
1624+
1625+
# Verify that documents containing "Milvus" are returned
1626+
assert len(result_milvus["content"]) > 0
1627+
assert any("Milvus" in content for content in result_milvus["content"])
1628+
1629+
# Test keyword search for "machine learning"
1630+
result_ml = store.retrieve_online_documents_v2(
1631+
features=[
1632+
"text_documents:content",
1633+
"text_documents:title",
1634+
],
1635+
query_string="machine learning",
1636+
top_k=3,
1637+
).to_dict()
1638+
1639+
# Verify that documents containing "machine learning" are returned
1640+
assert len(result_ml["content"]) > 0
1641+
assert any(
1642+
"machine learning" in content.lower() for content in result_ml["content"]
1643+
)
1644+
1645+
# Test hybrid search (vector + keyword)
1646+
query_embedding = np.random.random(vector_length).tolist()
1647+
result_hybrid = store.retrieve_online_documents_v2(
1648+
features=[
1649+
"text_documents:content",
1650+
"text_documents:title",
1651+
"text_documents:vector",
1652+
],
1653+
query=query_embedding,
1654+
query_string="Feast",
1655+
top_k=3,
1656+
).to_dict()
1657+
1658+
# Verify hybrid search results
1659+
assert len(result_hybrid["content"]) > 0
1660+
assert any("Feast" in content for content in result_hybrid["content"])
1661+
assert len(result_hybrid["vector"]) > 0

0 commit comments

Comments
 (0)