diff --git a/docs/tutorials/validating-historical-features.md b/docs/tutorials/validating-historical-features.md index addd309902a..70be38eced2 100644 --- a/docs/tutorials/validating-historical-features.md +++ b/docs/tutorials/validating-historical-features.md @@ -107,7 +107,7 @@ pyarrow.parquet.write_table(entities_2019_table, "entities.parquet") import pyarrow.parquet import pandas as pd -from feast import FeatureView, Entity, FeatureStore, Field +from feast import FeatureView, Entity, FeatureStore, Field, BatchFeatureView from feast.types import Float64, Int64 from feast.value_type import ValueType from feast.data_format import ParquetFormat @@ -134,7 +134,7 @@ taxi_entity = Entity(name='taxi', join_keys=['taxi_id']) ```python -trips_stats_fv = FeatureView( +trips_stats_fv = BatchFeatureView( name='trip_stats', entities=['taxi'], features=[ @@ -160,9 +160,9 @@ trips_stats_fv = FeatureView( Field("avg_trip_seconds", Float64), Field("earned_per_hour", Float64), ], - sources={ - "stats": trips_stats_fv - } + sources=[ + trips_stats_fv, + ] ) def on_demand_stats(inp): out = pd.DataFrame() diff --git a/examples/java-demo/feature_repo/driver_repo.py b/examples/java-demo/feature_repo/driver_repo.py index ce9469647f6..c9bdc7ce770 100644 --- a/examples/java-demo/feature_repo/driver_repo.py +++ b/examples/java-demo/feature_repo/driver_repo.py @@ -7,7 +7,7 @@ from google.protobuf.duration_pb2 import Duration from feast.field import Field -from feast import Entity, Feature, FeatureView, FileSource, ValueType +from feast import Entity, Feature, BatchFeatureView, FileSource, ValueType driver_hourly_stats = FileSource( path="data/driver_stats_with_string.parquet", @@ -15,7 +15,7 @@ created_timestamp_column="created", ) driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) -driver_hourly_stats_view = FeatureView( +driver_hourly_stats_view = BatchFeatureView( name="driver_hourly_stats", entities=["driver_id"], ttl=Duration(seconds=86400000), @@ -43,10 +43,10 @@ # Define an on demand feature view which can generate new features based on # existing feature views and RequestSource features @on_demand_feature_view( - inputs={ - "driver_hourly_stats": driver_hourly_stats_view, - "vals_to_add": input_request, - }, + inputs=[ + driver_hourly_stats_view, + input_request, + ], schema=[ Field(name="conv_rate_plus_val1", dtype=Float64), Field(name="conv_rate_plus_val2", dtype=Float64), diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index 5127e03b560..62cc52215c9 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -7,6 +7,7 @@ from feast.infra.offline_stores.redshift_source import RedshiftSource from feast.infra.offline_stores.snowflake_source import SnowflakeSource +from .batch_feature_view import BatchFeatureView from .data_source import ( KafkaSource, KinesisSource, @@ -23,6 +24,7 @@ from .on_demand_feature_view import OnDemandFeatureView from .repo_config import RepoConfig from .request_feature_view import RequestFeatureView +from .stream_feature_view import StreamFeatureView from .value_type import ValueType logging.basicConfig( @@ -38,6 +40,7 @@ pass __all__ = [ + "BatchFeatureView", "Entity", "KafkaSource", "KinesisSource", @@ -49,6 +52,7 @@ "OnDemandFeatureView", "RepoConfig", "SourceType", + "StreamFeatureView", "ValueType", "BigQuerySource", "FileSource", diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 7060870780b..65a4914a8f7 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -439,8 +439,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): else feature_view_proto.spec.ttl.ToTimedelta() ), source=batch_source, - stream_source=stream_source, ) + if stream_source: + feature_view.stream_source = stream_source # FeatureViewProjections are not saved in the FeatureView proto. # Create the default projection. diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index a807f3b4a40..d2cec18e520 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -8,6 +8,7 @@ import pandas as pd from feast.base_feature_view import BaseFeatureView +from feast.batch_feature_view import BatchFeatureView from feast.data_source import RequestSource from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError from feast.feature import Feature @@ -25,6 +26,7 @@ from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( UserDefinedFunction as UserDefinedFunctionProto, ) +from feast.stream_feature_view import StreamFeatureView from feast.type_map import ( feast_value_type_to_pandas_type, python_type_to_feast_value_type, @@ -66,14 +68,21 @@ class OnDemandFeatureView(BaseFeatureView): tags: Dict[str, str] owner: str - @log_exceptions - def __init__( + @log_exceptions # noqa: C901 + def __init__( # noqa: C901 self, *args, name: Optional[str] = None, features: Optional[List[Feature]] = None, sources: Optional[ - Dict[str, Union[FeatureView, FeatureViewProjection, RequestSource]] + List[ + Union[ + BatchFeatureView, + StreamFeatureView, + RequestSource, + FeatureViewProjection, + ] + ] ] = None, udf: Optional[MethodType] = None, inputs: Optional[ @@ -92,11 +101,11 @@ def __init__( features (deprecated): The list of features in the output of the on demand feature view, after the transformation has been applied. sources (optional): A map from input source names to the actual input sources, - which may be feature views, feature view projections, or request data sources. + which may be feature views, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. udf (optional): The user defined transformation function, which must take pandas dataframes as inputs. - inputs (optional): A map from input source names to the actual input sources, + inputs (optional): (Deprecated) A map from input source names to the actual input sources, which may be feature views, feature view projections, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. schema (optional): The list of features in the output of the on demand feature @@ -123,8 +132,7 @@ def __init__( ), DeprecationWarning, ) - - _sources = sources or inputs + _sources = sources or [] if inputs and sources: raise ValueError("At most one of `sources` or `inputs` can be specified.") elif inputs: @@ -135,7 +143,17 @@ def __init__( ), DeprecationWarning, ) - + for _, source in inputs.items(): + if isinstance(source, FeatureView): + _sources.append(feature_view_to_batch_feature_view(source)) + elif isinstance(source, RequestSource) or isinstance( + source, FeatureViewProjection + ): + _sources.append(source) + else: + raise ValueError( + "input can only accept FeatureView, FeatureViewProjection, or RequestSource" + ) _udf = udf if args: @@ -169,7 +187,18 @@ def __init__( DeprecationWarning, ) if len(args) >= 3: - _sources = args[2] + _inputs = args[2] + for _, source in _inputs.items(): + if isinstance(source, FeatureView): + _sources.append(feature_view_to_batch_feature_view(source)) + elif isinstance(source, RequestSource) or isinstance( + source, FeatureViewProjection + ): + _sources.append(source) + else: + raise ValueError( + "input can only accept FeatureView, FeatureViewProjection, or RequestSource" + ) warnings.warn( ( "The `inputs` parameter is being deprecated. Please use `sources` instead. " @@ -195,18 +224,17 @@ def __init__( tags=tags, owner=owner, ) - assert _sources is not None self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} self.source_request_sources: Dict[str, RequestSource] = {} - for source_name, odfv_source in _sources.items(): + for odfv_source in _sources: if isinstance(odfv_source, RequestSource): - self.source_request_sources[source_name] = odfv_source + self.source_request_sources[odfv_source.name] = odfv_source elif isinstance(odfv_source, FeatureViewProjection): - self.source_feature_view_projections[source_name] = odfv_source + self.source_feature_view_projections[odfv_source.name] = odfv_source else: self.source_feature_view_projections[ - source_name + odfv_source.name ] = odfv_source.projection if _udf is None: @@ -219,12 +247,12 @@ def proto_class(self) -> Type[OnDemandFeatureViewProto]: return OnDemandFeatureViewProto def __copy__(self): + fv = OnDemandFeatureView( name=self.name, schema=self.features, - sources=dict( - **self.source_feature_view_projections, **self.source_request_sources, - ), + sources=list(self.source_feature_view_projections.values()) + + list(self.source_request_sources.values()), udf=self.udf, description=self.description, tags=self.tags, @@ -302,22 +330,21 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): Returns: A OnDemandFeatureView object based on the on-demand feature view protobuf. """ - sources = {} - for ( - source_name, - on_demand_source, - ) in on_demand_feature_view_proto.spec.sources.items(): + sources = [] + for (_, on_demand_source,) in on_demand_feature_view_proto.spec.sources.items(): if on_demand_source.WhichOneof("source") == "feature_view": - sources[source_name] = FeatureView.from_proto( - on_demand_source.feature_view - ).projection + sources.append( + FeatureView.from_proto(on_demand_source.feature_view).projection + ) elif on_demand_source.WhichOneof("source") == "feature_view_projection": - sources[source_name] = FeatureViewProjection.from_proto( - on_demand_source.feature_view_projection + sources.append( + FeatureViewProjection.from_proto( + on_demand_source.feature_view_projection + ) ) else: - sources[source_name] = RequestSource.from_proto( - on_demand_source.request_data_source + sources.append( + RequestSource.from_proto(on_demand_source.request_data_source) ) on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, @@ -476,7 +503,16 @@ def get_requested_odfvs(feature_refs, project, registry): def on_demand_feature_view( *args, features: Optional[List[Feature]] = None, - sources: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None, + sources: Optional[ + List[ + Union[ + BatchFeatureView, + StreamFeatureView, + RequestSource, + FeatureViewProjection, + ] + ] + ] = None, inputs: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None, schema: Optional[List[Field]] = None, description: str = "", @@ -490,7 +526,7 @@ def on_demand_feature_view( features (deprecated): The list of features in the output of the on demand feature view, after the transformation has been applied. sources (optional): A map from input source names to the actual input sources, - which may be feature views, feature view projections, or request data sources. + which may be feature views, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. inputs (optional): A map from input source names to the actual input sources, which may be feature views, feature view projections, or request data sources. @@ -517,8 +553,7 @@ def on_demand_feature_view( ), DeprecationWarning, ) - - _sources = sources or inputs + _sources = sources or [] if inputs and sources: raise ValueError("At most one of `sources` or `inputs` can be specified.") elif inputs: @@ -529,6 +564,17 @@ def on_demand_feature_view( ), DeprecationWarning, ) + for _, source in inputs.items(): + if isinstance(source, FeatureView): + _sources.append(feature_view_to_batch_feature_view(source)) + elif isinstance(source, RequestSource) or isinstance( + source, FeatureViewProjection + ): + _sources.append(source) + else: + raise ValueError( + "input can only accept FeatureView, FeatureViewProjection, or RequestSource" + ) if args: warnings.warn( @@ -559,14 +605,25 @@ def on_demand_feature_view( DeprecationWarning, ) if len(args) >= 2: - _sources = args[1] - warnings.warn( - ( - "The `inputs` parameter is being deprecated. Please use `sources` instead. " - "Feast 0.21 and onwards will not support the `inputs` parameter." - ), - DeprecationWarning, - ) + _inputs = args[1] + for _, source in _inputs.items(): + if isinstance(source, FeatureView): + _sources.append(feature_view_to_batch_feature_view(source)) + elif isinstance(source, RequestSource) or isinstance( + source, FeatureViewProjection + ): + _sources.append(source) + else: + raise ValueError( + "input can only accept FeatureView, FeatureViewProjection, or RequestSource" + ) + warnings.warn( + ( + "The `inputs` parameter is being deprecated. Please use `sources` instead. " + "Feast 0.21 and onwards will not support the `inputs` parameter." + ), + DeprecationWarning, + ) if not _sources: raise ValueError("The `sources` parameter must be specified.") @@ -587,3 +644,16 @@ def decorator(user_function): return on_demand_feature_view_obj return decorator + + +def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: + return BatchFeatureView( + name=fv.name, + entities=fv.entities, + ttl=fv.ttl, + tags=fv.tags, + online=fv.online, + owner=fv.owner, + schema=fv.schema, + source=fv.source, + ) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 9902f7c7b8e..cc094a055aa 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -38,6 +38,7 @@ conv_rate_plus_100_feature_view, create_conv_rate_request_source, create_customer_daily_profile_feature_view, + create_driver_hourly_stats_batch_feature_view, create_driver_hourly_stats_feature_view, create_field_mapping_feature_view, create_global_stats_feature_view, @@ -311,15 +312,15 @@ def construct_universal_feature_views( data_sources: UniversalDataSources, with_odfv: bool = True, ) -> UniversalFeatureViews: driver_hourly_stats = create_driver_hourly_stats_feature_view(data_sources.driver) + driver_hourly_stats_base_feature_view = create_driver_hourly_stats_batch_feature_view( + data_sources.driver + ) return UniversalFeatureViews( customer=create_customer_daily_profile_feature_view(data_sources.customer), global_fv=create_global_stats_feature_view(data_sources.global_ds), driver=driver_hourly_stats, driver_odfv=conv_rate_plus_100_feature_view( - { - "driver": driver_hourly_stats, - "input_request": create_conv_rate_request_source(), - } + [driver_hourly_stats_base_feature_view, create_conv_rate_request_source()] ) if with_odfv else None, diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index a6786528e1f..26c2513995f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -5,6 +5,7 @@ import pandas as pd from feast import ( + BatchFeatureView, Feature, FeatureView, Field, @@ -150,6 +151,24 @@ def create_item_embeddings_feature_view(source, infer_features: bool = False): return item_embeddings_feature_view +def create_item_embeddings_batch_feature_view( + source, infer_features: bool = False +) -> BatchFeatureView: + item_embeddings_feature_view = BatchFeatureView( + name="item_embeddings", + entities=["item"], + schema=None + if infer_features + else [ + Field(name="embedding_double", dtype=Array(Float64)), + Field(name="embedding_float", dtype=Array(Float32)), + ], + source=source, + ttl=timedelta(hours=2), + ) + return item_embeddings_feature_view + + def create_driver_hourly_stats_feature_view(source, infer_features: bool = False): driver_stats_feature_view = FeatureView( name="driver_stats", @@ -167,6 +186,25 @@ def create_driver_hourly_stats_feature_view(source, infer_features: bool = False return driver_stats_feature_view +def create_driver_hourly_stats_batch_feature_view( + source, infer_features: bool = False +) -> BatchFeatureView: + driver_stats_feature_view = BatchFeatureView( + name="driver_stats", + entities=["driver"], + schema=None + if infer_features + else [ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int32), + ], + source=source, + ttl=timedelta(hours=2), + ) + return driver_stats_feature_view + + def create_customer_daily_profile_feature_view(source, infer_features: bool = False): customer_profile_feature_view = FeatureView( name="customer_profile", diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index 6e9aff1f034..c6819ac3c0c 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -173,7 +173,7 @@ def test_on_demand_features_type_inference(): ) @on_demand_feature_view( - sources={"date_request": date_request}, + sources=[date_request], schema=[ Field(name="output", dtype=UnixTimestamp), Field(name="string_output", dtype=String), @@ -245,7 +245,7 @@ def test_datasource_inference(request_source_schema): Feature(name="output", dtype=ValueType.UNIX_TIMESTAMP), Feature(name="string_output", dtype=ValueType.STRING), ], - sources={"date_request": date_request}, + sources=[date_request], ) def test_view(features_df: pd.DataFrame) -> pd.DataFrame: data = pd.DataFrame() @@ -256,7 +256,7 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame: test_view.infer_features() @on_demand_feature_view( - sources={"date_request": date_request}, + sources=[date_request], schema=[ Field(name="output", dtype=UnixTimestamp), Field(name="object_output", dtype=String), @@ -272,7 +272,7 @@ def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: invalid_test_view.infer_features() @on_demand_feature_view( - sources={"date_request": date_request}, + sources=[date_request], features=[ Feature(name="output", dtype=ValueType.UNIX_TIMESTAMP), Feature(name="missing", dtype=ValueType.STRING), diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 5f72fb7125b..f011d73d2dd 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -267,7 +267,7 @@ def test_modify_feature_views_success(test_registry, request_source_schema): Feature(name="odfv1_my_feature_1", dtype=ValueType.STRING), Feature(name="odfv1_my_feature_2", dtype=ValueType.INT32), ], - sources={"request_source": request_source}, + sources=[request_source], ) def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: data = pd.DataFrame() @@ -287,7 +287,7 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: Feature(name="odfv1_my_feature_1", dtype=ValueType.FLOAT), Feature(name="odfv1_my_feature_2", dtype=ValueType.INT32), ], - sources={"request_source": request_source}, + sources=[request_source], ) def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: data = pd.DataFrame() diff --git a/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py b/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py index 9dced8f13ad..04cb56367bd 100644 --- a/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py +++ b/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py @@ -11,8 +11,8 @@ from tests.integration.feature_repos.universal.feature_views import ( conv_rate_plus_100_feature_view, create_conv_rate_request_source, - create_driver_hourly_stats_feature_view, - create_item_embeddings_feature_view, + create_driver_hourly_stats_batch_feature_view, + create_item_embeddings_batch_feature_view, create_similarity_request_source, similarity_feature_view, ) @@ -26,11 +26,12 @@ def test_infer_odfv_features(environment, universal_data_sources, infer_features (entities, datasets, data_sources) = universal_data_sources - driver_hourly_stats = create_driver_hourly_stats_feature_view(data_sources.driver) + driver_hourly_stats = create_driver_hourly_stats_batch_feature_view( + data_sources.driver + ) request_source = create_conv_rate_request_source() driver_odfv = conv_rate_plus_100_feature_view( - {"driver": driver_hourly_stats, "input_request": request_source}, - infer_features=infer_features, + [driver_hourly_stats, request_source], infer_features=infer_features, ) feast_objects = [driver_hourly_stats, driver_odfv, driver(), customer()] @@ -59,13 +60,13 @@ def test_infer_odfv_list_features(environment, infer_features, tmp_path): timestamp_field="event_timestamp", created_timestamp_column="created", ) - items = create_item_embeddings_feature_view(fake_items_src) + item_feature_view = create_item_embeddings_batch_feature_view(fake_items_src) sim_odfv = similarity_feature_view( - {"items": items, "input_request": create_similarity_request_source()}, + [item_feature_view, create_similarity_request_source()], infer_features=infer_features, ) store = environment.feature_store - store.apply([item(), items, sim_odfv]) + store.apply([item(), item_feature_view, sim_odfv]) odfv = store.get_on_demand_feature_view("similarity") assert len(odfv.features) == 2 @@ -78,11 +79,12 @@ def test_infer_odfv_features_with_error(environment, universal_data_sources): (entities, datasets, data_sources) = universal_data_sources features = [Field(name="conv_rate_plus_200", dtype=Float64)] - driver_hourly_stats = create_driver_hourly_stats_feature_view(data_sources.driver) + driver_hourly_stats = create_driver_hourly_stats_batch_feature_view( + data_sources.driver + ) request_source = create_conv_rate_request_source() driver_odfv = conv_rate_plus_100_feature_view( - {"driver": driver_hourly_stats, "input_request": request_source}, - features=features, + [driver_hourly_stats, request_source], features=features, ) feast_objects = [driver_hourly_stats, driver_odfv, driver(), customer()] diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 9d45cfbb0b7..33435b8557e 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -11,13 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import pandas as pd +import pytest +from feast import RequestSource from feast.feature_view import FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource -from feast.on_demand_feature_view import OnDemandFeatureView -from feast.types import Float32 +from feast.on_demand_feature_view import OnDemandFeatureView, on_demand_feature_view +from feast.types import Float32, String, UnixTimestamp def udf1(features_df: pd.DataFrame) -> pd.DataFrame: @@ -45,7 +48,7 @@ def test_hash(): ], source=file_source, ) - sources = {"my-feature-view": feature_view} + sources = [feature_view] on_demand_feature_view_1 = OnDemandFeatureView( name="my-on-demand-feature-view", sources=sources, @@ -100,3 +103,62 @@ def test_hash(): on_demand_feature_view_4, } assert len(s4) == 3 + + +def test_inputs_parameter_deprecation_in_odfv(): + date_request = RequestSource( + name="date_request", schema=[Field(name="some_date", dtype=UnixTimestamp)], + ) + with pytest.warns(DeprecationWarning): + + @on_demand_feature_view( + inputs={"date_request": date_request}, + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="string_output", dtype=String), + ], + ) + def test_view(features_df: pd.DataFrame) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + data["string_output"] = features_df["some_date"].astype(pd.StringDtype()) + return data + + odfv = test_view + assert odfv.name == "test_view" + assert len(odfv.source_request_sources) == 1 + assert odfv.source_request_sources["date_request"].name == "date_request" + assert odfv.source_request_sources["date_request"].schema == date_request.schema + + with pytest.raises(ValueError): + + @on_demand_feature_view( + inputs={"date_request": date_request}, + sources=[date_request], + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="string_output", dtype=String), + ], + ) + def incorrect_testview(features_df: pd.DataFrame) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + data["string_output"] = features_df["some_date"].astype(pd.StringDtype()) + return data + + @on_demand_feature_view( + inputs={"odfv": date_request}, + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="string_output", dtype=String), + ], + ) + def test_correct_view(features_df: pd.DataFrame) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + data["string_output"] = features_df["some_date"].astype(pd.StringDtype()) + return data + + odfv = test_correct_view + assert odfv.name == "test_correct_view" + assert odfv.source_request_sources["date_request"].schema == date_request.schema