Skip to content

Commit 0fffe21

Browse files
feat: Add SQLite retrieve_online_documents_v2 (#5032)
* feat: Add SQLite Offline store Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * removing commented out thing Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * updated entity key serialization Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * updated Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * reverting some changes Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * updated Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * fixed implementation Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * updated docs Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> --------- Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
1 parent a13fa9b commit 0fffe21

File tree

3 files changed

+211
-25
lines changed

3 files changed

+211
-25
lines changed

docs/reference/alpha-vector-database.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ Vector database allows user to store and retrieve embeddings. Feast provides gen
77
## Integration
88
Below are supported vector databases and implemented features:
99

10-
| Vector Database | Retrieval | Indexing | V2 Support* |
11-
|-----------------|-----------|----------|-------------|
12-
| Pgvector | [x] | [ ] | [] |
13-
| Elasticsearch | [x] | [x] | [] |
14-
| Milvus | [x] | [x] | [x] |
15-
| Faiss | [ ] | [ ] | [] |
16-
| SQLite | [x] | [ ] | [] |
17-
| Qdrant | [x] | [x] | [] |
10+
| Vector Database | Retrieval | Indexing | V2 Support* | Online Read |
11+
|-----------------|-----------|----------|-------------|-------------|
12+
| Pgvector | [x] | [ ] | [] | [] |
13+
| Elasticsearch | [x] | [x] | [] | [] |
14+
| Milvus | [x] | [x] | [x] | [x] |
15+
| Faiss | [ ] | [ ] | [] | [] |
16+
| SQLite | [x] | [ ] | [x] | [x] |
17+
| Qdrant | [x] | [x] | [] | [] |
1818

1919
*Note: V2 Support means the SDK supports retrieval of features along with vector embeddings from vector similarity search.
2020

@@ -30,7 +30,7 @@ Beyond that, we will then have `retrieve_online_documents` and `retrieve_online_
3030
backwards compatibility and the adopt industry standard naming conventions.
3131
{% endhint %}
3232

33-
**Note**: Milvus implements the v2 `retrieve_online_documents_v2` method in the SDK. This will be the longer-term solution so that Data Scientists can easily enable vector similarity search by just flipping a flag.
33+
**Note**: Milvus and SQLite implement the v2 `retrieve_online_documents_v2` method in the SDK. This will be the longer-term solution so that Data Scientists can easily enable vector similarity search by just flipping a flag.
3434

3535
## Examples
3636

sdk/python/feast/infra/online_stores/sqlite.py

Lines changed: 139 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from feast.feature_view import FeatureView
2727
from feast.infra.infra_object import SQLITE_INFRA_OBJECT_CLASS_TYPE, InfraObject
2828
from feast.infra.key_encoding_utils import (
29+
deserialize_entity_key,
2930
serialize_entity_key,
3031
serialize_f32,
3132
)
@@ -91,6 +92,9 @@ class SqliteOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig):
9192
path: StrictStr = "data/online.db"
9293
""" (optional) Path to sqlite db """
9394

95+
vector_enabled: bool = False
96+
vector_len: Optional[int] = None
97+
9498

9599
class SqliteOnlineStore(OnlineStore):
96100
"""
@@ -116,14 +120,12 @@ def _get_db_path(config: RepoConfig) -> str:
116120
return db_path
117121

118122
def _get_conn(self, config: RepoConfig):
123+
enable_sqlite_vec = (
124+
sys.version_info[0:2] == (3, 10) and config.online_store.vector_enabled
125+
)
119126
if not self._conn:
120127
db_path = self._get_db_path(config)
121-
self._conn = _initialize_conn(db_path)
122-
if sys.version_info[0:2] == (3, 10) and config.online_store.vector_enabled:
123-
import sqlite_vec # noqa: F401
124-
125-
self._conn.enable_load_extension(True) # type: ignore
126-
sqlite_vec.load(self._conn)
128+
self._conn = _initialize_conn(db_path, enable_sqlite_vec)
127129

128130
return self._conn
129131

@@ -370,7 +372,7 @@ def retrieve_online_documents(
370372

371373
cur.execute(
372374
f"""
373-
CREATE VIRTUAL TABLE vec_example using vec0(
375+
CREATE VIRTUAL TABLE vec_table using vec0(
374376
vector_value float[{config.online_store.vector_len}]
375377
);
376378
"""
@@ -379,13 +381,13 @@ def retrieve_online_documents(
379381
# Currently I can only insert the embedding value without crashing SQLite, will report a bug
380382
cur.execute(
381383
f"""
382-
INSERT INTO vec_example(rowid, vector_value)
384+
INSERT INTO vec_table(rowid, vector_value)
383385
select rowid, vector_value from {table_name}
384386
"""
385387
)
386388
cur.execute(
387389
"""
388-
INSERT INTO vec_example(rowid, vector_value)
390+
INSERT INTO vec_table(rowid, vector_value)
389391
VALUES (?, ?)
390392
""",
391393
(0, query_embedding_bin),
@@ -406,7 +408,7 @@ def retrieve_online_documents(
406408
rowid,
407409
vector_value,
408410
distance
409-
from vec_example
411+
from vec_table
410412
where vector_value match ?
411413
order by distance
412414
limit ?
@@ -444,18 +446,139 @@ def retrieve_online_documents(
444446

445447
return result
446448

449+
def retrieve_online_documents_v2(
450+
self,
451+
config: RepoConfig,
452+
table: FeatureView,
453+
requested_features: List[str],
454+
query: List[float],
455+
top_k: int,
456+
distance_metric: Optional[str] = None,
457+
) -> List[
458+
Tuple[
459+
Optional[datetime],
460+
Optional[EntityKeyProto],
461+
Optional[Dict[str, ValueProto]],
462+
]
463+
]:
464+
"""
465+
Retrieve documents using vector similarity search.
466+
Args:
467+
config: Feast configuration object
468+
table: FeatureView object as the table to search
469+
requested_features: List of requested features to retrieve
470+
query: Query embedding to search for
471+
top_k: Number of items to return
472+
distance_metric: Distance metric to use (optional)
473+
Returns:
474+
List of tuples containing the event timestamp, entity key, and feature values
475+
"""
476+
online_store = config.online_store
477+
if not isinstance(online_store, SqliteOnlineStoreConfig):
478+
raise ValueError("online_store must be SqliteOnlineStoreConfig")
479+
if not online_store.vector_enabled:
480+
raise ValueError("Vector search is not enabled in the online store config")
481+
482+
conn = self._get_conn(config)
483+
cur = conn.cursor()
484+
485+
online_store = config.online_store
486+
if not isinstance(online_store, SqliteOnlineStoreConfig):
487+
raise ValueError("online_store must be SqliteOnlineStoreConfig")
488+
if not online_store.vector_len:
489+
raise ValueError("vector_len is not configured in the online store config")
490+
query_embedding_bin = serialize_f32(query, online_store.vector_len) # type: ignore
491+
table_name = _table_id(config.project, table)
492+
493+
cur.execute(
494+
f"""
495+
CREATE VIRTUAL TABLE IF NOT EXISTS vec_table using vec0(
496+
vector_value float[{online_store.vector_len}]
497+
);
498+
"""
499+
)
500+
501+
cur.execute(
502+
f"""
503+
INSERT INTO vec_table(rowid, vector_value)
504+
select rowid, vector_value from {table_name}
505+
"""
506+
)
507+
508+
cur.execute(
509+
f"""
510+
select
511+
fv.entity_key,
512+
fv.feature_name,
513+
fv.value,
514+
f.distance,
515+
fv.event_ts,
516+
fv.created_ts
517+
from (
518+
select
519+
rowid,
520+
vector_value,
521+
distance
522+
from vec_table
523+
where vector_value match ?
524+
order by distance
525+
limit ?
526+
) f
527+
left join {table_name} fv
528+
on f.rowid = fv.rowid
529+
where fv.feature_name in ({",".join(["?" for _ in requested_features])})
530+
""",
531+
(
532+
query_embedding_bin,
533+
top_k,
534+
*[f.split(":")[-1] for f in requested_features],
535+
),
536+
)
537+
538+
rows = cur.fetchall()
539+
result: List[
540+
Tuple[
541+
Optional[datetime],
542+
Optional[EntityKeyProto],
543+
Optional[Dict[str, ValueProto]],
544+
]
545+
] = []
546+
547+
for entity_key, feature_name, value_bin, distance, event_ts, created_ts in rows:
548+
val = ValueProto()
549+
val.ParseFromString(value_bin)
550+
entity_key_proto = None
551+
if entity_key:
552+
entity_key_proto = deserialize_entity_key(
553+
entity_key,
554+
entity_key_serialization_version=config.entity_key_serialization_version,
555+
)
556+
res = {feature_name: val}
557+
res["distance"] = ValueProto(float_val=distance)
558+
result.append((event_ts, entity_key_proto, res))
559+
560+
return result
561+
447562

448-
def _initialize_conn(db_path: str):
449-
try:
450-
import sqlite_vec # noqa: F401
451-
except ModuleNotFoundError:
452-
logging.warning("Cannot use sqlite_vec for vector search")
563+
def _initialize_conn(
564+
db_path: str, enable_sqlite_vec: bool = False
565+
) -> sqlite3.Connection:
453566
Path(db_path).parent.mkdir(exist_ok=True)
454-
return sqlite3.connect(
567+
db = sqlite3.connect(
455568
db_path,
456569
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
457570
check_same_thread=False,
458571
)
572+
if enable_sqlite_vec:
573+
try:
574+
import sqlite_vec # noqa: F401
575+
except ModuleNotFoundError:
576+
logging.warning("Cannot use sqlite_vec for vector search")
577+
578+
db.enable_load_extension(True)
579+
sqlite_vec.load(db)
580+
581+
return db
459582

460583

461584
def _table_id(project: str, table: FeatureView) -> str:

sdk/python/tests/unit/online_store/test_online_retrieval.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,69 @@ def test_sqlite_vec_import() -> None:
753753
assert result == [(2, 2.39), (1, 2.39)]
754754

755755

756+
@pytest.mark.skipif(
757+
sys.version_info[0:2] != (3, 10),
758+
reason="Only works on Python 3.10",
759+
)
760+
def test_sqlite_get_online_documents_v2() -> None:
761+
"""Test retrieving documents using v2 method with vector similarity search."""
762+
n = 10
763+
vector_length = 8
764+
runner = CliRunner()
765+
with runner.local_repo(
766+
get_example_repo("example_feature_repo_1.py"), "file"
767+
) as store:
768+
store.config.online_store.vector_enabled = True
769+
store.config.online_store.vector_len = vector_length
770+
store.config.entity_key_serialization_version = 3
771+
document_embeddings_fv = store.get_feature_view(name="document_embeddings")
772+
773+
provider = store._get_provider()
774+
775+
# Create test data
776+
item_keys = [
777+
EntityKeyProto(
778+
join_keys=["item_id"], entity_values=[ValueProto(int64_val=i)]
779+
)
780+
for i in range(n)
781+
]
782+
data = []
783+
for item_key in item_keys:
784+
data.append(
785+
(
786+
item_key,
787+
{
788+
"Embeddings": ValueProto(
789+
float_list_val=FloatListProto(
790+
val=[float(x) for x in np.random.random(vector_length)]
791+
)
792+
)
793+
},
794+
_utc_now(),
795+
_utc_now(),
796+
)
797+
)
798+
799+
provider.online_write_batch(
800+
config=store.config,
801+
table=document_embeddings_fv,
802+
data=data,
803+
progress=None,
804+
)
805+
806+
# Test vector similarity search
807+
query_embedding = [float(x) for x in np.random.random(vector_length)]
808+
result = store.retrieve_online_documents_v2(
809+
features=["document_embeddings:Embeddings"],
810+
query=query_embedding,
811+
top_k=3,
812+
).to_dict()
813+
814+
assert "Embeddings" in result
815+
assert "distance" in result
816+
assert len(result["distance"]) == 3
817+
818+
756819
@pytest.mark.skip(reason="Skipping this test as CI struggles with it")
757820
def test_local_milvus() -> None:
758821
import random

0 commit comments

Comments
 (0)