Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dbt-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
uv pip install --system dbt-core dbt-duckdb

- name: Run dbt integration tests
run: make test-python-integration-dbt
run: uv run make test-python-integration-dbt

- name: Minimize uv cache
run: uv cache prune --ci
63 changes: 27 additions & 36 deletions sdk/python/feast/infra/online_stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def _construct_online_read_api_json_request(
entity_keys: List[EntityKeyProto],
table: FeatureView,
requested_features: Optional[List[str]] = None,
) -> str:
) -> dict:
api_requested_features = []
if requested_features is not None:
for requested_feature in requested_features:
Expand All @@ -432,13 +432,10 @@ def _construct_online_read_api_json_request(
getattr(row.entity_values[0], row.entity_values[0].WhichOneof("val"))
)

req_body = json.dumps(
{
"features": api_requested_features,
"entities": {entity_key: entity_values},
}
)
return req_body
return {
"features": api_requested_features,
"entities": {entity_key: entity_values},
}

def _construct_online_documents_api_json_request(
self,
Expand All @@ -447,21 +444,18 @@ def _construct_online_documents_api_json_request(
embedding: Optional[List[float]] = None,
top_k: Optional[int] = None,
distance_metric: Optional[str] = "L2",
) -> str:
) -> dict:
api_requested_features = []
if requested_features is not None:
for requested_feature in requested_features:
api_requested_features.append(f"{table.name}:{requested_feature}")

req_body = json.dumps(
{
"features": api_requested_features,
"query": embedding,
"top_k": top_k,
"distance_metric": distance_metric,
}
)
return req_body
return {
"features": api_requested_features,
"query": embedding,
"top_k": top_k,
"distance_metric": distance_metric,
}

def _construct_online_documents_v2_api_json_request(
self,
Expand All @@ -472,23 +466,20 @@ def _construct_online_documents_v2_api_json_request(
distance_metric: Optional[str] = None,
query_string: Optional[str] = None,
api_version: Optional[int] = 2,
) -> str:
) -> dict:
api_requested_features = []
if requested_features is not None:
for requested_feature in requested_features:
api_requested_features.append(f"{table.name}:{requested_feature}")

req_body = json.dumps(
{
"features": api_requested_features,
"query": embedding,
"top_k": top_k,
"distance_metric": distance_metric,
"query_string": query_string,
"api_version": api_version,
}
)
return req_body
return {
"features": api_requested_features,
"query": embedding,
"top_k": top_k,
"distance_metric": distance_metric,
"query_string": query_string,
"api_version": api_version,
}

def _get_event_ts(self, response_json) -> datetime:
event_ts = ""
Expand Down Expand Up @@ -574,33 +565,33 @@ async def close(self) -> None:

@rest_error_handling_decorator
def get_remote_online_features(
session: requests.Session, config: RepoConfig, req_body: str
session: requests.Session, config: RepoConfig, req_body: dict
) -> requests.Response:
if config.online_store.cert:
return session.post(
f"{config.online_store.path}/get-online-features",
data=req_body,
json=req_body,
verify=config.online_store.cert,
)
else:
return session.post(
f"{config.online_store.path}/get-online-features", data=req_body
f"{config.online_store.path}/get-online-features", json=req_body
)


@rest_error_handling_decorator
def get_remote_online_documents(
session: requests.Session, config: RepoConfig, req_body: str
session: requests.Session, config: RepoConfig, req_body: dict
) -> requests.Response:
if config.online_store.cert:
return session.post(
f"{config.online_store.path}/retrieve-online-documents",
data=req_body,
json=req_body,
verify=config.online_store.cert,
)
else:
return session.post(
f"{config.online_store.path}/retrieve-online-documents", data=req_body
f"{config.online_store.path}/retrieve-online-documents", json=req_body
)


Expand Down
11 changes: 6 additions & 5 deletions sdk/python/tests/benchmarks/test_key_encoding_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ def test_performance_regression_single_entity():
serialize_entity_key(entity_key, 3)
elapsed = time.perf_counter() - start_time

# Should be able to do 1000 single entity serializations in < 10ms
# This is a conservative regression test
assert elapsed < 0.01, (
# Should be able to do 1000 single entity serializations in < 50ms
# Using a generous threshold to avoid flaky failures on CI runners
assert elapsed < 0.05, (
f"Single entity serialization too slow: {elapsed:.4f}s for 1000 operations"
)

Expand All @@ -416,8 +416,9 @@ def test_performance_regression_deserialization():
deserialize_entity_key(serialized, 3)
elapsed = time.perf_counter() - start_time

# Should be able to do 1000 deserializations in < 15ms
assert elapsed < 0.015, (
# Should be able to do 1000 deserializations in < 100ms
# Using a generous threshold to avoid flaky failures on CI runners
assert elapsed < 0.1, (
f"Deserialization too slow: {elapsed:.4f}s for 1000 operations"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,18 @@
from feast.data_source import DataSource, RequestSource
from feast.feature_view_projection import FeatureViewProjection
from feast.on_demand_feature_view import PandasTransformation
from feast.types import Array, FeastType, Float32, Float64, Int32, Int64, String
from feast.types import (
Array,
FeastType,
Float32,
Float64,
Int32,
Int64,
Json,
Map,
String,
Struct,
)
from tests.integration.feature_repos.universal.entities import (
customer,
driver,
Expand Down Expand Up @@ -193,6 +204,12 @@ def create_driver_hourly_stats_feature_view(source, infer_features: bool = False
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int32),
Field(name=d.join_key, dtype=Int64),
Field(name="driver_metadata", dtype=Map),
Field(name="driver_config", dtype=Json),
Field(
name="driver_profile",
dtype=Struct({"name": String, "age": String}),
),
Comment on lines +207 to +212
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 New feature view schema fields break tests using fake_ingest_data fixture

Adding driver_metadata, driver_config, and driver_profile fields to the create_driver_hourly_stats_feature_view schema breaks tests that use the fake_ingest_data fixture with this feature view.

Root Cause

The fake_ingest_data fixture in sdk/python/tests/conftest.py:427-437 only provides columns driver_id, conv_rate, acc_rate, avg_daily_trips, event_timestamp, and created. It does NOT include the newly added driver_metadata, driver_config, or driver_profile columns.

Tests like test_connection_pool_online_stores (sdk/python/tests/integration/online_store/test_universal_online.py:67-74) and test_entity_ttl_online_store (sdk/python/tests/integration/online_store/test_universal_online.py:99-106) call:

  1. create_driver_hourly_stats_feature_view(data_sources.driver) — which now includes the 3 new fields in the schema
  2. fs.write_to_online_store("driver_stats", fake_ingest_data) — which passes the incomplete DataFrame

Inside write_to_online_store at sdk/python/feast/feature_store.py:2201-2203:

feature_column_names = [f.name for f in feature_view.features]
if feature_column_names:
    feature_df = df[feature_column_names]  # KeyError here

feature_view.features now includes driver_metadata, driver_config, driver_profile, but the DataFrame from fake_ingest_data doesn't have those columns, causing a KeyError.

Impact: Integration tests test_connection_pool_online_stores and test_entity_ttl_online_store will fail.

Prompt for agents
The fake_ingest_data fixture in sdk/python/tests/conftest.py (around line 427-437) needs to be updated to include the three new fields added to the driver_stats feature view schema. Add the following keys to the data dict:

  "driver_metadata": [{"vehicle_type": "sedan", "rating": "4.5"}],
  "driver_config": [json.dumps({"max_distance_km": 100, "preferred_zones": ["north"]})],
  "driver_profile": [{"name": "driver_1", "age": "30"}],

You will also need to add 'import json' at the top of conftest.py if not already present.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

],
source=source,
ttl=timedelta(hours=2),
Expand All @@ -213,6 +230,12 @@ def create_driver_hourly_stats_batch_feature_view(
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int32),
Field(name="driver_metadata", dtype=Map),
Field(name="driver_config", dtype=Json),
Field(
name="driver_profile",
dtype=Struct({"name": String, "age": String}),
),
],
source=source,
ttl=timedelta(hours=2),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import random
from datetime import timedelta

Expand All @@ -6,7 +7,7 @@
import pytest

from feast import FeatureView, Field
from feast.types import Float32, Int32
from feast.types import Float32, Int32, Json, Map, String, Struct
from feast.utils import _utc_now
from tests.integration.feature_repos.repo_configuration import (
construct_universal_feature_views,
Expand Down Expand Up @@ -36,6 +37,18 @@ def test_reorder_columns(environment, universal_data_sources):
"event_timestamp": [ts, ts],
"acc_rate": [random.random(), random.random()],
"driver_id": [1001, 1001],
"driver_metadata": [
{"vehicle_type": "sedan", "rating": "4.5"},
{"vehicle_type": "suv", "rating": "3.8"},
],
"driver_config": [
json.dumps({"max_distance_km": 100, "preferred_zones": ["north"]}),
json.dumps({"max_distance_km": 50, "preferred_zones": ["south"]}),
],
"driver_profile": [
{"name": "driver_1001", "age": "30"},
{"name": "driver_1001", "age": "30"},
],
},
)

Expand Down Expand Up @@ -66,7 +79,13 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources):
"created": [ts, ts],
},
)
expected_missing = ["acc_rate", "avg_daily_trips"]
expected_missing = [
"acc_rate",
"avg_daily_trips",
"driver_config",
"driver_metadata",
"driver_profile",
]
expected_extra = ["incorrect_schema"]

with pytest.raises(ValueError, match="missing_expected_columns") as excinfo:
Expand All @@ -92,6 +111,12 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
Field(name="avg_daily_trips", dtype=Int32),
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="driver_metadata", dtype=Map),
Field(name="driver_config", dtype=Json),
Field(
name="driver_profile",
dtype=Struct({"name": String, "age": String}),
),
],
source=data_sources.driver,
ttl=timedelta(
Expand Down Expand Up @@ -132,6 +157,18 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
"acc_rate": [random.random(), random.random()],
"avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)],
"created": [ts, ts],
"driver_metadata": [
{"vehicle_type": "sedan", "rating": "4.5"},
{"vehicle_type": "suv", "rating": "3.8"},
],
"driver_config": [
json.dumps({"max_distance_km": 100, "preferred_zones": ["north"]}),
json.dumps({"max_distance_km": 50, "preferred_zones": ["south"]}),
],
"driver_profile": [
{"name": "driver_1001", "age": "30"},
{"name": "driver_1001", "age": "35"},
],
},
)
first_df = first_df.astype({"conv_rate": "float32", "acc_rate": "float32"})
Expand Down Expand Up @@ -176,6 +213,18 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
"acc_rate": [random.random(), random.random()],
"avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)],
"created": [ts, ts],
"driver_metadata": [
{"vehicle_type": "truck", "rating": "4.0"},
{"vehicle_type": "sedan", "rating": "4.2"},
],
"driver_config": [
json.dumps({"max_distance_km": 150, "preferred_zones": ["east"]}),
json.dumps({"max_distance_km": 200, "preferred_zones": ["west"]}),
],
"driver_profile": [
{"name": "driver_1001", "age": "31"},
{"name": "driver_1001", "age": "36"},
],
},
)
second_df = second_df.astype({"conv_rate": "float32", "acc_rate": "float32"})
Expand Down
Loading