|
1 | 1 | """Integration tests for feature view versioning.""" |
2 | 2 |
|
| 3 | +import os |
3 | 4 | import tempfile |
4 | 5 | from datetime import timedelta |
5 | 6 | from pathlib import Path |
6 | 7 |
|
7 | 8 | import pytest |
8 | 9 |
|
| 10 | +from feast import FeatureStore |
9 | 11 | from feast.entity import Entity |
10 | 12 | from feast.errors import FeatureViewPinConflict, FeatureViewVersionNotFound |
| 13 | +from feast.feature_service import FeatureService |
11 | 14 | from feast.feature_view import FeatureView |
12 | 15 | from feast.field import Field |
| 16 | +from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig |
13 | 17 | from feast.infra.registry.registry import Registry |
14 | | -from feast.repo_config import RegistryConfig |
| 18 | +from feast.repo_config import RegistryConfig, RepoConfig |
15 | 19 | from feast.stream_feature_view import StreamFeatureView |
16 | 20 | from feast.types import Float32, Int64 |
17 | 21 | from feast.value_type import ValueType |
@@ -915,3 +919,168 @@ def test_version_qualified_ref_raises_when_online_versioning_disabled( |
915 | 919 | allow_cache=False, |
916 | 920 | hide_dummy_entity=False, |
917 | 921 | ) |
| 922 | + |
| 923 | + |
| 924 | +class TestFeatureServiceVersioningGates: |
| 925 | + """Tests that feature services are gated when referencing versioned feature views.""" |
| 926 | + |
| 927 | + @pytest.fixture |
| 928 | + def versioned_fv_and_entity(self): |
| 929 | + """Create a versioned feature view (v1) and its entity.""" |
| 930 | + entity = Entity( |
| 931 | + name="driver_id", |
| 932 | + join_keys=["driver_id"], |
| 933 | + value_type=ValueType.INT64, |
| 934 | + ) |
| 935 | + # v0 definition |
| 936 | + fv_v0 = FeatureView( |
| 937 | + name="driver_stats", |
| 938 | + entities=[entity], |
| 939 | + ttl=timedelta(days=1), |
| 940 | + schema=[ |
| 941 | + Field(name="driver_id", dtype=Int64), |
| 942 | + Field(name="trips_today", dtype=Int64), |
| 943 | + ], |
| 944 | + description="v0", |
| 945 | + ) |
| 946 | + # v1 definition (schema change) |
| 947 | + fv_v1 = FeatureView( |
| 948 | + name="driver_stats", |
| 949 | + entities=[entity], |
| 950 | + ttl=timedelta(days=1), |
| 951 | + schema=[ |
| 952 | + Field(name="driver_id", dtype=Int64), |
| 953 | + Field(name="trips_today", dtype=Int64), |
| 954 | + Field(name="avg_rating", dtype=Float32), |
| 955 | + ], |
| 956 | + description="v1", |
| 957 | + ) |
| 958 | + return entity, fv_v0, fv_v1 |
| 959 | + |
| 960 | + @pytest.fixture |
| 961 | + def unversioned_fv_and_entity(self): |
| 962 | + """Create an unversioned feature view (v0 only) and its entity.""" |
| 963 | + entity = Entity( |
| 964 | + name="driver_id", |
| 965 | + join_keys=["driver_id"], |
| 966 | + value_type=ValueType.INT64, |
| 967 | + ) |
| 968 | + fv = FeatureView( |
| 969 | + name="driver_stats", |
| 970 | + entities=[entity], |
| 971 | + ttl=timedelta(days=1), |
| 972 | + schema=[ |
| 973 | + Field(name="driver_id", dtype=Int64), |
| 974 | + Field(name="trips_today", dtype=Int64), |
| 975 | + ], |
| 976 | + description="only version", |
| 977 | + ) |
| 978 | + return entity, fv |
| 979 | + |
| 980 | + def _make_store(self, tmpdir, enable_versioning=False): |
| 981 | + """Create a FeatureStore with optional online versioning.""" |
| 982 | + registry_path = os.path.join(tmpdir, "registry.db") |
| 983 | + online_path = os.path.join(tmpdir, "online.db") |
| 984 | + return FeatureStore( |
| 985 | + config=RepoConfig( |
| 986 | + registry=RegistryConfig( |
| 987 | + path=registry_path, |
| 988 | + enable_online_feature_view_versioning=enable_versioning, |
| 989 | + ), |
| 990 | + project="test_project", |
| 991 | + provider="local", |
| 992 | + online_store=SqliteOnlineStoreConfig(path=online_path), |
| 993 | + entity_key_serialization_version=3, |
| 994 | + ) |
| 995 | + ) |
| 996 | + |
| 997 | + def test_feature_service_apply_fails_with_versioned_fv_when_flag_off( |
| 998 | + self, versioned_fv_and_entity |
| 999 | + ): |
| 1000 | + """Apply a feature service referencing a versioned FV with flag off -> ValueError.""" |
| 1001 | + entity, fv_v0, fv_v1 = versioned_fv_and_entity |
| 1002 | + |
| 1003 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 1004 | + store = self._make_store(tmpdir, enable_versioning=False) |
| 1005 | + |
| 1006 | + # Apply v0 first, then v1 to create version history |
| 1007 | + store.apply([entity, fv_v0]) |
| 1008 | + store.apply([entity, fv_v1]) |
| 1009 | + |
| 1010 | + # Now create a feature service referencing the versioned FV |
| 1011 | + fs = FeatureService( |
| 1012 | + name="driver_service", |
| 1013 | + features=[fv_v1], |
| 1014 | + ) |
| 1015 | + |
| 1016 | + with pytest.raises(ValueError, match="version v1"): |
| 1017 | + store.apply([fs]) |
| 1018 | + |
| 1019 | + def test_feature_service_apply_succeeds_with_versioned_fv_when_flag_on( |
| 1020 | + self, versioned_fv_and_entity |
| 1021 | + ): |
| 1022 | + """Apply a feature service referencing a versioned FV with flag on -> succeeds.""" |
| 1023 | + entity, fv_v0, fv_v1 = versioned_fv_and_entity |
| 1024 | + |
| 1025 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 1026 | + store = self._make_store(tmpdir, enable_versioning=True) |
| 1027 | + |
| 1028 | + # Apply v0 first, then v1 to create version history |
| 1029 | + store.apply([entity, fv_v0]) |
| 1030 | + store.apply([entity, fv_v1]) |
| 1031 | + |
| 1032 | + # Feature service referencing versioned FV should succeed |
| 1033 | + fs = FeatureService( |
| 1034 | + name="driver_service", |
| 1035 | + features=[fv_v1], |
| 1036 | + ) |
| 1037 | + store.apply([fs]) # Should not raise |
| 1038 | + |
| 1039 | + def test_feature_service_retrieval_fails_with_versioned_fv_when_flag_off( |
| 1040 | + self, versioned_fv_and_entity |
| 1041 | + ): |
| 1042 | + """get_online_features with a feature service referencing a versioned FV, flag off -> ValueError.""" |
| 1043 | + entity, fv_v0, fv_v1 = versioned_fv_and_entity |
| 1044 | + from feast.utils import _get_feature_views_to_use |
| 1045 | + |
| 1046 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 1047 | + # First apply with flag on so the feature service can be registered |
| 1048 | + store_on = self._make_store(tmpdir, enable_versioning=True) |
| 1049 | + store_on.apply([entity, fv_v0]) |
| 1050 | + store_on.apply([entity, fv_v1]) |
| 1051 | + fs = FeatureService( |
| 1052 | + name="driver_service", |
| 1053 | + features=[fv_v1], |
| 1054 | + ) |
| 1055 | + store_on.apply([fs]) |
| 1056 | + |
| 1057 | + # Now create a store with the flag off to test retrieval |
| 1058 | + store_off = self._make_store(tmpdir, enable_versioning=False) |
| 1059 | + registered_fs = store_off.registry.get_feature_service( |
| 1060 | + "driver_service", "test_project" |
| 1061 | + ) |
| 1062 | + |
| 1063 | + with pytest.raises(ValueError, match="online versioning is disabled"): |
| 1064 | + _get_feature_views_to_use( |
| 1065 | + registry=store_off.registry, |
| 1066 | + project="test_project", |
| 1067 | + features=registered_fs, |
| 1068 | + allow_cache=False, |
| 1069 | + hide_dummy_entity=False, |
| 1070 | + ) |
| 1071 | + |
| 1072 | + def test_feature_service_with_unversioned_fv_succeeds( |
| 1073 | + self, unversioned_fv_and_entity |
| 1074 | + ): |
| 1075 | + """Feature service with v0 FV works fine regardless of flag.""" |
| 1076 | + entity, fv = unversioned_fv_and_entity |
| 1077 | + |
| 1078 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 1079 | + store = self._make_store(tmpdir, enable_versioning=False) |
| 1080 | + |
| 1081 | + # Apply unversioned FV and feature service |
| 1082 | + fs = FeatureService( |
| 1083 | + name="driver_service", |
| 1084 | + features=[fv], |
| 1085 | + ) |
| 1086 | + store.apply([entity, fv, fs]) # Should not raise |
0 commit comments