Skip to content

Commit 5c4a9c5

Browse files
authored
fix: Pgvector patch (#4103)
1 parent 504e40e commit 5c4a9c5

File tree

8 files changed

+108
-30
lines changed

8 files changed

+108
-30
lines changed

Makefile

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,25 @@ test-python-universal-postgres-online:
216216
not test_snowflake" \
217217
sdk/python/tests
218218

219+
test-python-universal-pgvector-online:
220+
PYTHONPATH='.' \
221+
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.pgvector_repo_configuration \
222+
PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.postgres \
223+
python -m pytest -n 8 --integration \
224+
-k "not test_universal_cli and \
225+
not test_go_feature_server and \
226+
not test_feature_logging and \
227+
not test_reorder_columns and \
228+
not test_logged_features_validation and \
229+
not test_lambda_materialization_consistency and \
230+
not test_offline_write and \
231+
not test_push_features_to_offline_store and \
232+
not gcs_registry and \
233+
not s3_registry and \
234+
not test_universal_types and \
235+
not test_snowflake" \
236+
sdk/python/tests
237+
219238
test-python-universal-mysql-online:
220239
PYTHONPATH='.' \
221240
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.mysql_repo_configuration \

docs/reference/online-stores/postgres.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ online_store:
3030
sslkey_path: /path/to/client-key.pem
3131
sslcert_path: /path/to/client-cert.pem
3232
sslrootcert_path: /path/to/server-ca.pem
33+
pgvector_enabled: false
34+
vector_len: 512
3335
```
3436
{% endcode %}
3537
@@ -60,3 +62,29 @@ Below is a matrix indicating which functionality is supported by the Postgres on
6062
| collocated by entity key | no |
6163
6264
To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix).
65+
66+
## PGVector
67+
The Postgres online store supports the use of [PGVector](https://pgvector.dev/) for storing feature values.
68+
To enable PGVector, set `pgvector_enabled: true` in the online store configuration.
69+
The `vector_len` parameter can be used to specify the length of the vector. The default value is 512.
70+
71+
Then you can use `retrieve_online_documents` to retrieve the top k closest vectors to a query vector.
72+
73+
{% code title="python" %}
74+
```python
75+
from feast import FeatureStore
76+
from feast.infra.online_stores.postgres import retrieve_online_documents
77+
78+
feature_store = FeatureStore(repo_path=".")
79+
80+
query_vector = [0.1, 0.2, 0.3, 0.4, 0.5]
81+
top_k = 5
82+
83+
feature_values = retrieve_online_documents(
84+
feature_store=feature_store,
85+
feature_view_name="document_fv:embedding_float",
86+
query_vector=query_vector,
87+
top_k=top_k,
88+
)
89+
```
90+
{% endcode %}

sdk/python/feast/infra/key_encoding_utils.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,13 @@ def serialize_entity_key(
7474
return b"".join(output)
7575

7676

77-
def get_val_str(val):
78-
accept_value_types = ["float_list_val", "double_list_val", "int_list_val"]
77+
def get_list_val_str(val):
78+
accept_value_types = [
79+
"float_list_val",
80+
"double_list_val",
81+
"int32_list_val",
82+
"int64_list_val",
83+
]
7984
for accept_type in accept_value_types:
8085
if val.HasField(accept_type):
8186
return str(getattr(val, accept_type).val)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from tests.integration.feature_repos.integration_test_repo_config import (
2+
IntegrationTestRepoConfig,
3+
)
4+
from tests.integration.feature_repos.universal.online_store.postgres import (
5+
PGVectorOnlineStoreCreator,
6+
)
7+
8+
FULL_REPO_CONFIGS = [
9+
IntegrationTestRepoConfig(
10+
online_store="pgvector", online_store_creator=PGVectorOnlineStoreCreator
11+
),
12+
]

sdk/python/feast/infra/online_stores/contrib/postgres.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
from collections import defaultdict
44
from datetime import datetime
5-
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union
5+
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple
66

77
import psycopg2
88
import pytz
@@ -12,7 +12,7 @@
1212

1313
from feast import Entity
1414
from feast.feature_view import FeatureView
15-
from feast.infra.key_encoding_utils import get_val_str, serialize_entity_key
15+
from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key
1616
from feast.infra.online_stores.online_store import OnlineStore
1717
from feast.infra.utils.postgres.connection_utils import _get_conn, _get_connection_pool
1818
from feast.infra.utils.postgres.postgres_config import ConnectionType, PostgreSQLConfig
@@ -74,19 +74,18 @@ def online_write_batch(
7474
created_ts = _to_naive_utc(created_ts)
7575

7676
for feature_name, val in values.items():
77-
val_str: Union[str, bytes]
77+
vector_val = None
7878
if (
79-
"pgvector_enabled" in config.online_config
80-
and config.online_config["pgvector_enabled"]
79+
"pgvector_enabled" in config.online_store
80+
and config.online_store.pgvector_enabled
8181
):
82-
val_str = get_val_str(val)
83-
else:
84-
val_str = val.SerializeToString()
82+
vector_val = get_list_val_str(val)
8583
insert_values.append(
8684
(
8785
entity_key_bin,
8886
feature_name,
89-
val_str,
87+
val.SerializeToString(),
88+
vector_val,
9089
timestamp,
9190
created_ts,
9291
)
@@ -100,11 +99,12 @@ def online_write_batch(
10099
sql.SQL(
101100
"""
102101
INSERT INTO {}
103-
(entity_key, feature_name, value, event_ts, created_ts)
102+
(entity_key, feature_name, value, vector_value, event_ts, created_ts)
104103
VALUES %s
105104
ON CONFLICT (entity_key, feature_name) DO
106105
UPDATE SET
107106
value = EXCLUDED.value,
107+
vector_value = EXCLUDED.vector_value,
108108
event_ts = EXCLUDED.event_ts,
109109
created_ts = EXCLUDED.created_ts;
110110
""",
@@ -226,20 +226,23 @@ def update(
226226

227227
for table in tables_to_keep:
228228
table_name = _table_id(project, table)
229-
value_type = "BYTEA"
230229
if (
231-
"pgvector_enabled" in config.online_config
232-
and config.online_config["pgvector_enabled"]
230+
"pgvector_enabled" in config.online_store
231+
and config.online_store.pgvector_enabled
233232
):
234-
value_type = f'vector({config.online_config["vector_len"]})'
233+
vector_value_type = f"vector({config.online_store.vector_len})"
234+
else:
235+
# keep the vector_value_type as BYTEA if pgvector is not enabled, to maintain compatibility
236+
vector_value_type = "BYTEA"
235237
cur.execute(
236238
sql.SQL(
237239
"""
238240
CREATE TABLE IF NOT EXISTS {}
239241
(
240242
entity_key BYTEA,
241243
feature_name TEXT,
242-
value {},
244+
value BYTEA,
245+
vector_value {} NULL,
243246
event_ts TIMESTAMPTZ,
244247
created_ts TIMESTAMPTZ,
245248
PRIMARY KEY(entity_key, feature_name)
@@ -248,7 +251,7 @@ def update(
248251
"""
249252
).format(
250253
sql.Identifier(table_name),
251-
sql.SQL(value_type),
254+
sql.SQL(vector_value_type),
252255
sql.Identifier(f"{table_name}_ek"),
253256
sql.Identifier(table_name),
254257
)
@@ -294,6 +297,14 @@ def retrieve_online_documents(
294297
"""
295298
project = config.project
296299

300+
if (
301+
"pgvector_enabled" not in config.online_store
302+
or not config.online_store.pgvector_enabled
303+
):
304+
raise ValueError(
305+
"pgvector is not enabled in the online store configuration"
306+
)
307+
297308
# Convert the embedding to a string to be used in postgres vector search
298309
query_embedding_str = f"[{','.join(str(el) for el in embedding)}]"
299310

@@ -311,8 +322,8 @@ def retrieve_online_documents(
311322
SELECT
312323
entity_key,
313324
feature_name,
314-
value,
315-
value <-> %s as distance,
325+
vector_value,
326+
vector_value <-> %s as distance,
316327
event_ts FROM {table_name}
317328
WHERE feature_name = {feature_name}
318329
ORDER BY distance
@@ -327,13 +338,13 @@ def retrieve_online_documents(
327338
)
328339
rows = cur.fetchall()
329340

330-
for entity_key, feature_name, value, distance, event_ts in rows:
341+
for entity_key, feature_name, vector_value, distance, event_ts in rows:
331342
# TODO Deserialize entity_key to return the entity in response
332343
# entity_key_proto = EntityKeyProto()
333344
# entity_key_proto_bin = bytes(entity_key)
334345

335346
# TODO Convert to List[float] for value type proto
336-
feature_value_proto = ValueProto(string_val=value)
347+
feature_value_proto = ValueProto(string_val=vector_value)
337348

338349
distance_value_proto = ValueProto(float_val=distance)
339350
result.append((event_ts, feature_value_proto, distance_value_proto))

sdk/python/feast/infra/online_stores/contrib/postgres_repo_configuration.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,11 @@
22
IntegrationTestRepoConfig,
33
)
44
from tests.integration.feature_repos.universal.online_store.postgres import (
5-
PGVectorOnlineStoreCreator,
65
PostgresOnlineStoreCreator,
76
)
87

98
FULL_REPO_CONFIGS = [
109
IntegrationTestRepoConfig(
1110
online_store="postgres", online_store_creator=PostgresOnlineStoreCreator
1211
),
13-
IntegrationTestRepoConfig(
14-
online_store="pgvector", online_store_creator=PGVectorOnlineStoreCreator
15-
),
1612
]
17-
18-
AVAILABLE_ONLINE_STORES = {"pgvector": PGVectorOnlineStoreCreator}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
CREATE EXTENSION IF NOT EXISTS vector;

sdk/python/tests/integration/feature_repos/universal/online_store/postgres.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
from typing import Dict
23

34
from testcontainers.core.container import DockerContainer
@@ -37,12 +38,17 @@ def teardown(self):
3738
class PGVectorOnlineStoreCreator(OnlineStoreCreator):
3839
def __init__(self, project_name: str, **kwargs):
3940
super().__init__(project_name)
41+
script_directory = os.path.dirname(os.path.abspath(__file__))
4042
self.container = (
4143
DockerContainer("pgvector/pgvector:pg16")
4244
.with_env("POSTGRES_USER", "root")
4345
.with_env("POSTGRES_PASSWORD", "test")
4446
.with_env("POSTGRES_DB", "test")
4547
.with_exposed_ports(5432)
48+
.with_volume_mapping(
49+
os.path.join(script_directory, "init.sql"),
50+
"/docker-entrypoint-initdb.d/init.sql",
51+
)
4652
)
4753

4854
def create_online_store(self) -> Dict[str, str]:
@@ -51,8 +57,10 @@ def create_online_store(self) -> Dict[str, str]:
5157
wait_for_logs(
5258
container=self.container, predicate=log_string_to_wait_for, timeout=10
5359
)
54-
command = "psql -h localhost -p 5432 -U root -d test -c 'CREATE EXTENSION IF NOT EXISTS vector;'"
55-
self.container.exec(command)
60+
init_log_string_to_wait_for = "PostgreSQL init process complete"
61+
wait_for_logs(
62+
container=self.container, predicate=init_log_string_to_wait_for, timeout=10
63+
)
5664
return {
5765
"host": "localhost",
5866
"type": "postgres",

0 commit comments

Comments
 (0)