4040from feast .protos .feast .types .Value_pb2 import Value as ValueProto
4141from feast .repo_config import FeastConfigBaseModel , RepoConfig
4242from feast .type_map import feast_value_type_to_python_type
43- from feast .types import FEAST_VECTOR_TYPES
43+ from feast .types import FEAST_VECTOR_TYPES , PrimitiveFeastType
4444from feast .utils import (
4545 _build_retrieve_online_document_record ,
4646 _serialize_vector_to_float_list ,
@@ -442,6 +442,7 @@ def retrieve_online_documents_v2(
442442 query : List [float ],
443443 top_k : int ,
444444 distance_metric : Optional [str ] = None ,
445+ query_string : Optional [str ] = None ,
445446 ) -> List [
446447 Tuple [
447448 Optional [datetime ],
@@ -458,72 +459,135 @@ def retrieve_online_documents_v2(
458459 query: Query embedding to search for
459460 top_k: Number of items to return
460461 distance_metric: Distance metric to use (optional)
462+ query_string: The query string to search for using keyword search (bm25) (optional)
461463 Returns:
462464 List of tuples containing the event timestamp, entity key, and feature values
463465 """
464466 online_store = config .online_store
465467 if not isinstance (online_store , SqliteOnlineStoreConfig ):
466468 raise ValueError ("online_store must be SqliteOnlineStoreConfig" )
467- if not online_store .vector_enabled :
468- raise ValueError ("Vector search is not enabled in the online store config" )
469+ if not online_store .vector_enabled and not online_store .text_search_enabled :
470+ raise ValueError (
471+ "You must enable either vector search or text search in the online store config"
472+ )
469473
470474 conn = self ._get_conn (config )
471475 cur = conn .cursor ()
472476
473- if not online_store .vector_len :
477+ if online_store . vector_enabled and not online_store .vector_len :
474478 raise ValueError ("vector_len is not configured in the online store config" )
475479
476- query_embedding_bin = serialize_f32 (query , online_store .vector_len ) # type: ignore
477480 table_name = _table_id (config .project , table )
478481 vector_field = _get_vector_field (table )
479482
480- cur .execute (
481- f"""
482- CREATE VIRTUAL TABLE IF NOT EXISTS vec_table using vec0(
483- vector_value float[{ online_store .vector_len } ]
484- );
485- """
486- )
483+ if online_store .vector_enabled :
484+ query_embedding_bin = serialize_f32 (query , online_store .vector_len ) # type: ignore
485+ cur .execute (
486+ f"""
487+ CREATE VIRTUAL TABLE IF NOT EXISTS vec_table using vec0(
488+ vector_value float[{ online_store .vector_len } ]
489+ );
490+ """
491+ )
492+ cur .execute (
493+ f"""
494+ INSERT INTO vec_table (rowid, vector_value)
495+ select rowid, vector_value from { table_name }
496+ where feature_name = "{ vector_field } "
497+ """
498+ )
499+ elif online_store .text_search_enabled :
500+ string_field_list = [
501+ f .name for f in table .features if f .dtype == PrimitiveFeastType .STRING
502+ ]
503+ string_fields = ", " .join (string_field_list )
504+ # TODO: swap this for a value configurable in each Field()
505+ BM25_DEFAULT_WEIGHTS = ", " .join (
506+ [
507+ str (1.0 )
508+ for f in table .features
509+ if f .dtype == PrimitiveFeastType .STRING
510+ ]
511+ )
512+ cur .execute (
513+ f"""
514+ CREATE VIRTUAL TABLE IF NOT EXISTS search_table using fts5(
515+ entity_key, fv_rowid, { string_fields } , tokenize="porter unicode61"
516+ );
517+ """
518+ )
519+ insert_query = _generate_bm25_search_insert_query (
520+ table_name , string_field_list
521+ )
522+ cur .execute (insert_query )
487523
488- cur .execute (
489- f"""
490- INSERT INTO vec_table (rowid, vector_value)
491- select rowid, vector_value from { table_name }
492- where feature_name = "{ vector_field } "
493- """
494- )
524+ else :
525+ raise ValueError (
526+ "Neither vector search nor text search are enabled in the online store config"
527+ )
495528
496- cur .execute (
497- f"""
529+ if online_store .vector_enabled :
530+ cur .execute (
531+ f"""
532+ select
533+ fv2.entity_key,
534+ fv2.feature_name,
535+ fv2.value,
536+ fv.vector_value,
537+ f.distance,
538+ fv.event_ts,
539+ fv.created_ts
540+ from (
541+ select
542+ rowid,
543+ vector_value,
544+ distance
545+ from vec_table
546+ where vector_value match ?
547+ order by distance
548+ limit ?
549+ ) f
550+ left join { table_name } fv
551+ on f.rowid = fv.rowid
552+ left join { table_name } fv2
553+ on fv.entity_key = fv2.entity_key
554+ where fv2.feature_name != "{ vector_field } "
555+ """ ,
556+ (
557+ query_embedding_bin ,
558+ top_k ,
559+ ),
560+ )
561+ elif online_store .text_search_enabled :
562+ cur .execute (
563+ f"""
498564 select
499- fv2 .entity_key,
500- fv2 .feature_name,
501- fv2 .value,
565+ fv .entity_key,
566+ fv .feature_name,
567+ fv .value,
502568 fv.vector_value,
503569 f.distance,
504570 fv.event_ts,
505571 fv.created_ts
506- from (
507- select
508- rowid,
509- vector_value,
510- distance
511- from vec_table
512- where vector_value match ?
513- order by distance
514- limit ?
515- ) f
516- left join { table_name } fv
517- on f.rowid = fv.rowid
518- left join { table_name } fv2
519- on fv.entity_key = fv2.entity_key
520- where fv2.feature_name != "{ vector_field } "
521- """ ,
522- (
523- query_embedding_bin ,
524- top_k ,
525- ),
526- )
572+ from { table_name } fv
573+ inner join (
574+ select
575+ fv_rowid,
576+ entity_key,
577+ { string_fields } ,
578+ bm25(search_table, { BM25_DEFAULT_WEIGHTS } ) as distance
579+ from search_table
580+ where search_table match ? order by distance limit ?
581+ ) f
582+ on f.entity_key = fv.entity_key
583+ """ ,
584+ (query_string , top_k ),
585+ )
586+
587+ else :
588+ raise ValueError (
589+ "Neither vector search nor text search are enabled in the online store config"
590+ )
527591
528592 rows = cur .fetchall ()
529593 results : List [
@@ -557,9 +621,10 @@ def retrieve_online_documents_v2(
557621 feature_val .ParseFromString (value_bin )
558622 entity_dict [entity_key ]["entity_key_proto" ] = entity_key_proto
559623 entity_dict [entity_key ][feature_name ] = feature_val
560- entity_dict [entity_key ][vector_field ] = _serialize_vector_to_float_list (
561- vector_value
562- )
624+ if online_store .vector_enabled :
625+ entity_dict [entity_key ][vector_field ] = _serialize_vector_to_float_list (
626+ vector_value
627+ )
563628 entity_dict [entity_key ]["distance" ] = ValueProto (float_val = distance )
564629 entity_dict [entity_key ]["event_ts" ] = event_ts
565630 entity_dict [entity_key ]["created_ts" ] = created_ts
@@ -706,3 +771,31 @@ def _get_vector_field(table: FeatureView) -> str:
706771 )
707772 vector_field : str = vector_fields [0 ].name
708773 return vector_field
774+
775+
776+ def _generate_bm25_search_insert_query (
777+ table_name : str , string_field_list : List [str ]
778+ ) -> str :
779+ """
780+ Generates an SQL insertion query for the given table and string fields.
781+
782+ Args:
783+ table_name (str): The name of the table to select data from.
784+ string_field_list (List[str]): The list of string fields to be used in the insertion.
785+
786+ Returns:
787+ str: The generated SQL insertion query.
788+ """
789+ _string_fields = ", " .join (string_field_list )
790+ query = f"INSERT INTO search_table (entity_key, fv_rowid, { _string_fields } )\n SELECT\n \t DISTINCT fv0.entity_key,\n \t fv0.rowid as fv_rowid"
791+ from_query = f"\n FROM (select rowid, * from { table_name } where feature_name = '{ string_field_list [0 ]} ') fv0"
792+
793+ for i , string_field in enumerate (string_field_list ):
794+ query += f"\n \t ,fv{ i } .value as { string_field } "
795+ if i > 0 :
796+ from_query += (
797+ f"\n LEFT JOIN (select rowid, * from { table_name } where feature_name = '{ string_field } ') fv{ i } "
798+ + f"\n \t ON fv0.entity_key = fv{ i } .entity_key"
799+ )
800+
801+ return query + from_query
0 commit comments