|
14 | 14 | from datetime import timedelta |
15 | 15 | from tempfile import mkstemp |
16 | 16 |
|
| 17 | +import pandas as pd |
17 | 18 | import pytest |
18 | 19 | from pytest_lazyfixture import lazy_fixture |
19 | 20 |
|
| 21 | +from feast import FileSource |
| 22 | +from feast.data_format import AvroFormat |
| 23 | +from feast.data_source import KafkaSource |
20 | 24 | from feast.entity import Entity |
21 | | -from feast.feature_store import FeatureStore |
| 25 | +from feast.errors import ConflictingFeatureViewNames |
| 26 | +from feast.feature_store import FeatureStore, _validate_feature_views |
22 | 27 | from feast.feature_view import FeatureView |
| 28 | +from feast.field import Field |
23 | 29 | from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig |
| 30 | +from feast.on_demand_feature_view import on_demand_feature_view |
24 | 31 | from feast.repo_config import RepoConfig |
25 | | -from feast.types import Float64, Int64, String |
| 32 | +from feast.stream_feature_view import StreamFeatureView |
| 33 | +from feast.types import Float32, Float64, Int64, String |
26 | 34 | from tests.utils.data_source_test_creator import prep_file_source |
27 | 35 |
|
28 | 36 |
|
@@ -75,3 +83,146 @@ def feature_store_with_local_registry(): |
75 | 83 | entity_key_serialization_version=3, |
76 | 84 | ) |
77 | 85 | ) |
| 86 | + |
| 87 | + |
| 88 | +@pytest.mark.integration |
| 89 | +def test_validate_feature_views_cross_type_conflict(): |
| 90 | + """ |
| 91 | + Test that _validate_feature_views() catches cross-type name conflicts. |
| 92 | +
|
| 93 | + This is a unit test for the validation that happens during feast plan/apply. |
| 94 | + The validation must catch conflicts across FeatureView, StreamFeatureView, |
| 95 | + and OnDemandFeatureView to prevent silent data correctness bugs in |
| 96 | + get_online_features (which uses fixed-order lookup). |
| 97 | +
|
| 98 | + See: https://github.com/feast-dev/feast/issues/5995 |
| 99 | + """ |
| 100 | + # Create a simple entity |
| 101 | + entity = Entity(name="driver_entity", join_keys=["test_key"]) |
| 102 | + |
| 103 | + # Create a regular FeatureView |
| 104 | + file_source = FileSource(name="my_file_source", path="test.parquet") |
| 105 | + feature_view = FeatureView( |
| 106 | + name="my_feature_view", |
| 107 | + entities=[entity], |
| 108 | + schema=[Field(name="feature1", dtype=Float32)], |
| 109 | + source=file_source, |
| 110 | + ) |
| 111 | + |
| 112 | + # Create a StreamFeatureView with the SAME name |
| 113 | + stream_source = KafkaSource( |
| 114 | + name="kafka", |
| 115 | + timestamp_field="event_timestamp", |
| 116 | + kafka_bootstrap_servers="", |
| 117 | + message_format=AvroFormat(""), |
| 118 | + topic="topic", |
| 119 | + batch_source=file_source, |
| 120 | + watermark_delay_threshold=timedelta(days=1), |
| 121 | + ) |
| 122 | + stream_feature_view = StreamFeatureView( |
| 123 | + name="my_feature_view", # Same name as FeatureView! |
| 124 | + entities=[entity], |
| 125 | + ttl=timedelta(days=30), |
| 126 | + schema=[Field(name="feature1", dtype=Float32)], |
| 127 | + source=stream_source, |
| 128 | + ) |
| 129 | + |
| 130 | + # Validate should raise ConflictingFeatureViewNames |
| 131 | + with pytest.raises(ConflictingFeatureViewNames) as exc_info: |
| 132 | + _validate_feature_views([feature_view, stream_feature_view]) |
| 133 | + |
| 134 | + # Verify error message contains type information |
| 135 | + error_message = str(exc_info.value) |
| 136 | + assert "my_feature_view" in error_message |
| 137 | + assert "FeatureView" in error_message |
| 138 | + assert "StreamFeatureView" in error_message |
| 139 | + |
| 140 | + |
| 141 | +def test_validate_feature_views_same_type_conflict(): |
| 142 | + """ |
| 143 | + Test that _validate_feature_views() also catches same-type name conflicts |
| 144 | + with a proper error message indicating duplicate FeatureViews. |
| 145 | + """ |
| 146 | + # Create a simple entity |
| 147 | + entity = Entity(name="driver_entity", join_keys=["test_key"]) |
| 148 | + |
| 149 | + # Create two FeatureViews with the same name |
| 150 | + file_source = FileSource(name="my_file_source", path="test.parquet") |
| 151 | + fv1 = FeatureView( |
| 152 | + name="duplicate_fv", |
| 153 | + entities=[entity], |
| 154 | + schema=[Field(name="feature1", dtype=Float32)], |
| 155 | + source=file_source, |
| 156 | + ) |
| 157 | + fv2 = FeatureView( |
| 158 | + name="duplicate_fv", # Same name! |
| 159 | + entities=[entity], |
| 160 | + schema=[Field(name="feature2", dtype=Float32)], |
| 161 | + source=file_source, |
| 162 | + ) |
| 163 | + |
| 164 | + # Validate should raise ConflictingFeatureViewNames |
| 165 | + with pytest.raises(ConflictingFeatureViewNames) as exc_info: |
| 166 | + _validate_feature_views([fv1, fv2]) |
| 167 | + |
| 168 | + # Verify error message indicates same-type duplicate |
| 169 | + error_message = str(exc_info.value) |
| 170 | + assert "duplicate_fv" in error_message |
| 171 | + assert "Multiple FeatureViews" in error_message |
| 172 | + assert "case-insensitively unique" in error_message |
| 173 | + |
| 174 | + |
| 175 | +def test_validate_feature_views_case_insensitive(): |
| 176 | + """ |
| 177 | + Test that _validate_feature_views() catches case-insensitive conflicts. |
| 178 | + """ |
| 179 | + entity = Entity(name="driver_entity", join_keys=["test_key"]) |
| 180 | + file_source = FileSource(name="my_file_source", path="test.parquet") |
| 181 | + |
| 182 | + fv1 = FeatureView( |
| 183 | + name="MyFeatureView", |
| 184 | + entities=[entity], |
| 185 | + schema=[Field(name="feature1", dtype=Float32)], |
| 186 | + source=file_source, |
| 187 | + ) |
| 188 | + fv2 = FeatureView( |
| 189 | + name="myfeatureview", # Same name, different case! |
| 190 | + entities=[entity], |
| 191 | + schema=[Field(name="feature2", dtype=Float32)], |
| 192 | + source=file_source, |
| 193 | + ) |
| 194 | + |
| 195 | + # Validate should raise ConflictingFeatureViewNames (case-insensitive) |
| 196 | + with pytest.raises(ConflictingFeatureViewNames): |
| 197 | + _validate_feature_views([fv1, fv2]) |
| 198 | + |
| 199 | + |
| 200 | +def test_validate_feature_views_odfv_conflict(): |
| 201 | + """ |
| 202 | + Test that _validate_feature_views() catches OnDemandFeatureView name conflicts. |
| 203 | + """ |
| 204 | + entity = Entity(name="driver_entity", join_keys=["test_key"]) |
| 205 | + file_source = FileSource(name="my_file_source", path="test.parquet") |
| 206 | + |
| 207 | + fv = FeatureView( |
| 208 | + name="shared_name", |
| 209 | + entities=[entity], |
| 210 | + schema=[Field(name="feature1", dtype=Float32)], |
| 211 | + source=file_source, |
| 212 | + ) |
| 213 | + |
| 214 | + @on_demand_feature_view( |
| 215 | + sources=[fv], |
| 216 | + schema=[Field(name="output", dtype=Float32)], |
| 217 | + ) |
| 218 | + def shared_name(inputs: pd.DataFrame) -> pd.DataFrame: |
| 219 | + return pd.DataFrame({"output": inputs["feature1"] * 2}) |
| 220 | + |
| 221 | + # Validate should raise ConflictingFeatureViewNames |
| 222 | + with pytest.raises(ConflictingFeatureViewNames) as exc_info: |
| 223 | + _validate_feature_views([fv, shared_name]) |
| 224 | + |
| 225 | + error_message = str(exc_info.value) |
| 226 | + assert "shared_name" in error_message |
| 227 | + assert "FeatureView" in error_message |
| 228 | + assert "OnDemandFeatureView" in error_message |
0 commit comments