Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dbt-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
uv pip install --system dbt-core dbt-duckdb
- name: Run dbt integration tests
run: make test-python-integration-dbt
run: uv run make test-python-integration-dbt

- name: Minimize uv cache
run: uv cache prune --ci
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ test-python-integration-local: ## Run Python integration tests (local dev mode)
uv run python -m pytest --tb=short -v -n auto --color=yes --integration --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \
-k "not test_lambda_materialization and not test_snowflake_materialization" \
-m "not rbac_remote_integration_test" \
--ignore=sdk/python/tests/integration/compute_engines/ray_compute \
--log-cli-level=INFO -s \
sdk/python/tests

Expand Down
63 changes: 27 additions & 36 deletions sdk/python/feast/infra/online_stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def _construct_online_read_api_json_request(
entity_keys: List[EntityKeyProto],
table: FeatureView,
requested_features: Optional[List[str]] = None,
) -> str:
) -> dict:
api_requested_features = []
if requested_features is not None:
for requested_feature in requested_features:
Expand All @@ -432,13 +432,10 @@ def _construct_online_read_api_json_request(
getattr(row.entity_values[0], row.entity_values[0].WhichOneof("val"))
)

req_body = json.dumps(
{
"features": api_requested_features,
"entities": {entity_key: entity_values},
}
)
return req_body
return {
"features": api_requested_features,
"entities": {entity_key: entity_values},
}

def _construct_online_documents_api_json_request(
self,
Expand All @@ -447,21 +444,18 @@ def _construct_online_documents_api_json_request(
embedding: Optional[List[float]] = None,
top_k: Optional[int] = None,
distance_metric: Optional[str] = "L2",
) -> str:
) -> dict:
api_requested_features = []
if requested_features is not None:
for requested_feature in requested_features:
api_requested_features.append(f"{table.name}:{requested_feature}")

req_body = json.dumps(
{
"features": api_requested_features,
"query": embedding,
"top_k": top_k,
"distance_metric": distance_metric,
}
)
return req_body
return {
"features": api_requested_features,
"query": embedding,
"top_k": top_k,
"distance_metric": distance_metric,
}

def _construct_online_documents_v2_api_json_request(
self,
Expand All @@ -472,23 +466,20 @@ def _construct_online_documents_v2_api_json_request(
distance_metric: Optional[str] = None,
query_string: Optional[str] = None,
api_version: Optional[int] = 2,
) -> str:
) -> dict:
api_requested_features = []
if requested_features is not None:
for requested_feature in requested_features:
api_requested_features.append(f"{table.name}:{requested_feature}")

req_body = json.dumps(
{
"features": api_requested_features,
"query": embedding,
"top_k": top_k,
"distance_metric": distance_metric,
"query_string": query_string,
"api_version": api_version,
}
)
return req_body
return {
"features": api_requested_features,
"query": embedding,
"top_k": top_k,
"distance_metric": distance_metric,
"query_string": query_string,
"api_version": api_version,
}

def _get_event_ts(self, response_json) -> datetime:
event_ts = ""
Expand Down Expand Up @@ -574,33 +565,33 @@ async def close(self) -> None:

@rest_error_handling_decorator
def get_remote_online_features(
session: requests.Session, config: RepoConfig, req_body: str
session: requests.Session, config: RepoConfig, req_body: dict
) -> requests.Response:
if config.online_store.cert:
return session.post(
f"{config.online_store.path}/get-online-features",
data=req_body,
json=req_body,
verify=config.online_store.cert,
)
else:
return session.post(
f"{config.online_store.path}/get-online-features", data=req_body
f"{config.online_store.path}/get-online-features", json=req_body
)


@rest_error_handling_decorator
def get_remote_online_documents(
session: requests.Session, config: RepoConfig, req_body: str
session: requests.Session, config: RepoConfig, req_body: dict
) -> requests.Response:
if config.online_store.cert:
return session.post(
f"{config.online_store.path}/retrieve-online-documents",
data=req_body,
json=req_body,
verify=config.online_store.cert,
)
else:
return session.post(
f"{config.online_store.path}/retrieve-online-documents", data=req_body
f"{config.online_store.path}/retrieve-online-documents", json=req_body
)


Expand Down
11 changes: 6 additions & 5 deletions sdk/python/tests/benchmarks/test_key_encoding_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ def test_performance_regression_single_entity():
serialize_entity_key(entity_key, 3)
elapsed = time.perf_counter() - start_time

# Should be able to do 1000 single entity serializations in < 10ms
# This is a conservative regression test
assert elapsed < 0.01, (
# Should be able to do 1000 single entity serializations in < 50ms
# Using a generous threshold to avoid flaky failures on CI runners
assert elapsed < 0.05, (
f"Single entity serialization too slow: {elapsed:.4f}s for 1000 operations"
)

Expand All @@ -416,8 +416,9 @@ def test_performance_regression_deserialization():
deserialize_entity_key(serialized, 3)
elapsed = time.perf_counter() - start_time

# Should be able to do 1000 deserializations in < 15ms
assert elapsed < 0.015, (
# Should be able to do 1000 deserializations in < 200ms
# Using a generous threshold to avoid flaky failures on CI runners
assert elapsed < 0.2, (
f"Deserialization too slow: {elapsed:.4f}s for 1000 operations"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,18 @@
from feast.data_source import DataSource, RequestSource
from feast.feature_view_projection import FeatureViewProjection
from feast.on_demand_feature_view import PandasTransformation
from feast.types import Array, FeastType, Float32, Float64, Int32, Int64, String
from feast.types import (
Array,
FeastType,
Float32,
Float64,
Int32,
Int64,
Json,
Map,
String,
Struct,
)
from tests.integration.feature_repos.universal.entities import (
customer,
driver,
Expand Down Expand Up @@ -193,6 +204,12 @@ def create_driver_hourly_stats_feature_view(source, infer_features: bool = False
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int32),
Field(name=d.join_key, dtype=Int64),
Field(name="driver_metadata", dtype=Map),
Field(name="driver_config", dtype=Json),
Field(
name="driver_profile",
dtype=Struct({"name": String, "age": String}),
),
Comment on lines +207 to +212
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 New feature view schema fields break tests using fake_ingest_data fixture

Adding driver_metadata, driver_config, and driver_profile fields to the create_driver_hourly_stats_feature_view schema breaks tests that use the fake_ingest_data fixture with this feature view.

Root Cause

The fake_ingest_data fixture in sdk/python/tests/conftest.py:427-437 only provides columns driver_id, conv_rate, acc_rate, avg_daily_trips, event_timestamp, and created. It does NOT include the newly added driver_metadata, driver_config, or driver_profile columns.

Tests like test_connection_pool_online_stores (sdk/python/tests/integration/online_store/test_universal_online.py:67-74) and test_entity_ttl_online_store (sdk/python/tests/integration/online_store/test_universal_online.py:99-106) call:

  1. create_driver_hourly_stats_feature_view(data_sources.driver) — which now includes the 3 new fields in the schema
  2. fs.write_to_online_store("driver_stats", fake_ingest_data) — which passes the incomplete DataFrame

Inside write_to_online_store at sdk/python/feast/feature_store.py:2201-2203:

feature_column_names = [f.name for f in feature_view.features]
if feature_column_names:
    feature_df = df[feature_column_names]  # KeyError here

feature_view.features now includes driver_metadata, driver_config, driver_profile, but the DataFrame from fake_ingest_data doesn't have those columns, causing a KeyError.

Impact: Integration tests test_connection_pool_online_stores and test_entity_ttl_online_store will fail.

Prompt for agents
The fake_ingest_data fixture in sdk/python/tests/conftest.py (around line 427-437) needs to be updated to include the three new fields added to the driver_stats feature view schema. Add the following keys to the data dict:

  "driver_metadata": [{"vehicle_type": "sedan", "rating": "4.5"}],
  "driver_config": [json.dumps({"max_distance_km": 100, "preferred_zones": ["north"]})],
  "driver_profile": [{"name": "driver_1", "age": "30"}],

You will also need to add 'import json' at the top of conftest.py if not already present.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

],
source=source,
ttl=timedelta(hours=2),
Expand All @@ -213,6 +230,12 @@ def create_driver_hourly_stats_batch_feature_view(
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int32),
Field(name="driver_metadata", dtype=Map),
Field(name="driver_config", dtype=Json),
Field(
name="driver_profile",
dtype=Struct({"name": String, "age": String}),
),
],
source=source,
ttl=timedelta(hours=2),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import random
from datetime import timedelta

Expand All @@ -6,7 +7,7 @@
import pytest

from feast import FeatureView, Field
from feast.types import Float32, Int32
from feast.types import Float32, Int32, Json, Map, String, Struct
from feast.utils import _utc_now
from tests.integration.feature_repos.repo_configuration import (
construct_universal_feature_views,
Expand Down Expand Up @@ -36,6 +37,18 @@ def test_reorder_columns(environment, universal_data_sources):
"event_timestamp": [ts, ts],
"acc_rate": [random.random(), random.random()],
"driver_id": [1001, 1001],
"driver_metadata": [
{"vehicle_type": "sedan", "rating": "4.5"},
{"vehicle_type": "suv", "rating": "3.8"},
],
"driver_config": [
json.dumps({"max_distance_km": 100, "preferred_zones": ["north"]}),
json.dumps({"max_distance_km": 50, "preferred_zones": ["south"]}),
],
"driver_profile": [
{"name": "driver_1001", "age": "30"},
{"name": "driver_1001", "age": "30"},
],
},
)

Expand Down Expand Up @@ -66,7 +79,13 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources):
"created": [ts, ts],
},
)
expected_missing = ["acc_rate", "avg_daily_trips"]
expected_missing = [
"acc_rate",
"avg_daily_trips",
"driver_config",
"driver_metadata",
"driver_profile",
]
expected_extra = ["incorrect_schema"]

with pytest.raises(ValueError, match="missing_expected_columns") as excinfo:
Expand All @@ -92,6 +111,12 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
Field(name="avg_daily_trips", dtype=Int32),
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="driver_metadata", dtype=Map),
Field(name="driver_config", dtype=Json),
Field(
name="driver_profile",
dtype=Struct({"name": String, "age": String}),
),
],
source=data_sources.driver,
ttl=timedelta(
Expand Down Expand Up @@ -132,6 +157,18 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
"acc_rate": [random.random(), random.random()],
"avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)],
"created": [ts, ts],
"driver_metadata": [
{"vehicle_type": "sedan", "rating": "4.5"},
{"vehicle_type": "suv", "rating": "3.8"},
],
"driver_config": [
json.dumps({"max_distance_km": 100, "preferred_zones": ["north"]}),
json.dumps({"max_distance_km": 50, "preferred_zones": ["south"]}),
],
"driver_profile": [
{"name": "driver_1001", "age": "30"},
{"name": "driver_1001", "age": "35"},
],
},
)
first_df = first_df.astype({"conv_rate": "float32", "acc_rate": "float32"})
Expand Down Expand Up @@ -176,6 +213,18 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
"acc_rate": [random.random(), random.random()],
"avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)],
"created": [ts, ts],
"driver_metadata": [
{"vehicle_type": "truck", "rating": "4.0"},
{"vehicle_type": "sedan", "rating": "4.2"},
],
"driver_config": [
json.dumps({"max_distance_km": 150, "preferred_zones": ["east"]}),
json.dumps({"max_distance_km": 200, "preferred_zones": ["west"]}),
],
"driver_profile": [
{"name": "driver_1001", "age": "31"},
{"name": "driver_1001", "age": "36"},
],
},
)
second_df = second_df.astype({"conv_rate": "float32", "acc_rate": "float32"})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import os
import tempfile
Expand Down Expand Up @@ -383,6 +384,18 @@ def test_remote_online_store_read_write(auth_config, tls_mode):
"avg_daily_trips": [50, 45],
"event_timestamp": [pd.Timestamp(_utc_now()).round("ms")] * 2,
"created": [pd.Timestamp(_utc_now()).round("ms")] * 2,
"driver_metadata": [
{"vehicle_type": "sedan", "rating": "4.5"},
{"vehicle_type": "suv", "rating": "3.8"},
],
"driver_config": [
json.dumps({"max_distance_km": 100, "preferred_zones": ["north"]}),
json.dumps({"max_distance_km": 50, "preferred_zones": ["south"]}),
],
"driver_profile": [
{"name": "driver_1000", "age": "30"},
{"name": "driver_1001", "age": "35"},
],
}
)

Expand Down
Loading
Loading