Skip to content

Commit 02e9881

Browse files
uploading latest progress
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
1 parent 868e487 commit 02e9881

File tree

1 file changed

+104
-35
lines changed
  • sdk/python/feast/infra/online_stores

1 file changed

+104
-35
lines changed

sdk/python/feast/infra/online_stores/sqlite.py

Lines changed: 104 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ class SqliteOnlineStoreConfig(FeastConfigBaseModel):
4747
path: StrictStr = "data/online.db"
4848
""" (optional) Path to sqlite db """
4949

50-
faiss_enabled: Optional[bool] = False
51-
""" (optional) Enable or disable faiss indexing for online store (vector search)"""
50+
vss_enabled: Optional[bool] = False
51+
""" (optional) Enable or disable sqlite-vss for vector search"""
5252

5353

5454
class SqliteOnlineStore(OnlineStore):
@@ -109,35 +109,70 @@ def online_write_batch(
109109
created_ts = to_naive_utc(created_ts)
110110

111111
for feature_name, val in values.items():
112-
conn.execute(
113-
f"""
114-
UPDATE {_table_id(project, table)}
115-
SET value = ?, event_ts = ?, created_ts = ?
116-
WHERE (entity_key = ? AND feature_name = ?)
117-
""",
118-
(
119-
# SET
120-
val.SerializeToString(),
121-
timestamp,
122-
created_ts,
123-
# WHERE
124-
entity_key_bin,
125-
feature_name,
126-
),
127-
)
128-
129-
conn.execute(
130-
f"""INSERT OR IGNORE INTO {_table_id(project, table)}
131-
(entity_key, feature_name, value, event_ts, created_ts)
132-
VALUES (?, ?, ?, ?, ?)""",
133-
(
134-
entity_key_bin,
135-
feature_name,
136-
val.SerializeToString(),
137-
timestamp,
138-
created_ts,
139-
),
140-
)
112+
vector_val = None
113+
if config.online_store.vss_enabled:
114+
vector_val = get_list_val_str(val)
115+
conn.execute(
116+
f"""
117+
UPDATE {_table_id(project, table)}
118+
SET value = ?, vector_value = ?, event_ts = ?, created_ts = ?
119+
WHERE (entity_key = ? AND feature_name = ?)
120+
""",
121+
(
122+
# SET
123+
val.SerializeToString(),
124+
vector_val,
125+
timestamp,
126+
created_ts,
127+
# WHERE
128+
entity_key_bin,
129+
feature_name,
130+
),
131+
)
132+
133+
conn.execute(
134+
f"""INSERT OR IGNORE INTO {_table_id(project, table)}
135+
(entity_key, feature_name, value, vector_value, event_ts, created_ts)
136+
VALUES (?, ?, ?, ?, ?, ?)""",
137+
(
138+
entity_key_bin,
139+
feature_name,
140+
val.SerializeToString(),
141+
vector_val,
142+
timestamp,
143+
created_ts,
144+
),
145+
)
146+
else:
147+
conn.execute(
148+
f"""
149+
UPDATE {_table_id(project, table)}
150+
SET value = ?, event_ts = ?, created_ts = ?
151+
WHERE (entity_key = ? AND feature_name = ?)
152+
""",
153+
(
154+
# SET
155+
val.SerializeToString(),
156+
timestamp,
157+
created_ts,
158+
# WHERE
159+
entity_key_bin,
160+
feature_name,
161+
),
162+
)
163+
164+
conn.execute(
165+
f"""INSERT OR IGNORE INTO {_table_id(project, table)}
166+
(entity_key, feature_name, value, event_ts, created_ts)
167+
VALUES (?, ?, ?, ?, ?)""",
168+
(
169+
entity_key_bin,
170+
feature_name,
171+
val.SerializeToString(),
172+
timestamp,
173+
created_ts,
174+
),
175+
)
141176
if progress:
142177
progress(1)
143178

@@ -271,15 +306,32 @@ def retrieve_online_documents(
271306
embedding: The query embedding to search for
272307
top_k: The number of items to return
273308
Returns:
274-
List of tuples containing the event timestamp and the document feature
309+
List of tuples containing the event timestamp, the document feature, the vector value, and the distance
275310
"""
276311
project = config.project
277-
if not config.online_store.faiss_enabled:
278-
raise ValueError("Faiss is not enabled in the online store config")
312+
313+
if not config.online_store.vss_enabled:
314+
raise ValueError("sqlite-vss is not enabled in the online store config")
315+
316+
conn = self._get_conn(config)
317+
cur = conn.cursor()
279318

280319
# Convert the embedding to a string to be used in postgres vector search
281320
query_embedding_str = f"[{','.join(str(el) for el in embedding)}]"
282321

322+
cur.execute(
323+
f"""
324+
SELECT entity_key, feature_name, value, vector_value, vector_value <=> ? AS distance, event_ts
325+
FROM {_table_id(project, table)}
326+
WHERE feature_name = ?
327+
ORDER BY distance
328+
LIMIT ?
329+
""",
330+
(query_embedding_str, requested_feature, top_k),
331+
)
332+
333+
rows = cur.fetchall()
334+
283335
result: List[
284336
Tuple[
285337
Optional[datetime],
@@ -289,7 +341,24 @@ def retrieve_online_documents(
289341
]
290342
] = []
291343

292-
raise NotImplementedError("SQLiteOnlineStore does not support retrieval")
344+
for entity_key, feature_name, val_bin, vector_val, distance, event_ts in rows:
345+
feature_value_proto = ValueProto()
346+
feature_value_proto.ParseFromString(val_bin)
347+
348+
vector_value_proto = ValueProto(string_val=vector_val)
349+
distance_value_proto = ValueProto(float_val=distance)
350+
351+
result.append(
352+
(
353+
event_ts,
354+
feature_value_proto,
355+
vector_value_proto,
356+
distance_value_proto,
357+
)
358+
)
359+
360+
return result
361+
293362

294363

295364
def _initialize_conn(db_path: str):

0 commit comments

Comments
 (0)