From 047437b8320ff8695543090453b1e6b8de7559dc Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 5 Aug 2022 15:49:08 -0700 Subject: [PATCH 1/3] Remove deprecated CLI warnings Signed-off-by: Felix Wang --- sdk/python/feast/cli.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index c6a301e958..6b415dd492 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -13,7 +13,6 @@ # limitations under the License. import json import logging -import warnings from datetime import datetime from pathlib import Path from typing import List, Optional @@ -45,7 +44,6 @@ from feast.utils import maybe_local_tz _logger = logging.getLogger(__name__) -warnings.filterwarnings("ignore", category=DeprecationWarning, module="(?!feast)") class NoOptionDefaultFormat(click.Command): @@ -197,11 +195,6 @@ def data_source_describe(ctx: click.Context, name: str): print(e) exit(1) - warnings.warn( - "Describing data sources will only work properly if all data sources have names or table names specified. " - "Starting Feast 0.24, data source unique names will be required to encourage data source discovery.", - RuntimeWarning, - ) print( yaml.dump( yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False @@ -224,11 +217,6 @@ def data_source_list(ctx: click.Context): from tabulate import tabulate - warnings.warn( - "Listing data sources will only work properly if all data sources have names or table names specified. " - "Starting Feast 0.24, data source unique names will be required to encourage data source discovery", - RuntimeWarning, - ) print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain")) From b0dc405ad186bc8884cfe94df21ad8a4a120d1e0 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Mon, 8 Aug 2022 14:48:07 -0700 Subject: [PATCH 2/3] Fix repo parsing logic Signed-off-by: Felix Wang --- sdk/python/feast/repo_operations.py | 60 +++++++++++++++++++---------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 91cab2e992..13f14b7722 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -14,7 +14,7 @@ from feast import PushSource from feast.batch_feature_view import BatchFeatureView -from feast.data_source import DataSource, KafkaSource +from feast.data_source import DataSource, KafkaSource, KinesisSource from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add from feast.entity import Entity from feast.feature_service import FeatureService @@ -114,17 +114,30 @@ def parse_repo(repo_root: Path) -> RepoContents: request_feature_views=[], ) - data_sources_set = set() for repo_file in get_repo_files(repo_root): module_path = py_path_to_module(repo_file) module = importlib.import_module(module_path) + for attr_name in dir(module): obj = getattr(module, attr_name) + if isinstance(obj, DataSource) and not any( (obj is ds) for ds in res.data_sources ): res.data_sources.append(obj) - data_sources_set.add(obj) + + # Handle batch sources defined within stream sources. + if ( + isinstance(obj, PushSource) + or isinstance(obj, KafkaSource) + or isinstance(obj, KinesisSource) + ): + batch_source = obj.batch_source + + if batch_source and not any( + (batch_source is ds) for ds in res.data_sources + ): + res.data_sources.append(batch_source) if ( isinstance(obj, FeatureView) and not any((obj is fv) for fv in res.feature_views) @@ -132,26 +145,33 @@ def parse_repo(repo_root: Path) -> RepoContents: and not isinstance(obj, BatchFeatureView) ): res.feature_views.append(obj) - if isinstance(obj.stream_source, PushSource) and not any( - (obj is ds) for ds in res.data_sources - ): - push_source_dep = obj.stream_source.batch_source - # Don't add if the push source's batch source is a duplicate of an existing batch source - if push_source_dep not in data_sources_set: - res.data_sources.append(push_source_dep) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + assert batch_source + if not any((batch_source is ds) for ds in res.data_sources): + res.data_sources.append(batch_source) + + # Handle stream sources defined with feature views. + if obj.stream_source: + stream_source = obj.stream_source + if not any((stream_source is ds) for ds in res.data_sources): + res.data_sources.append(stream_source) elif isinstance(obj, StreamFeatureView) and not any( (obj is sfv) for sfv in res.stream_feature_views ): res.stream_feature_views.append(obj) - if ( - isinstance(obj.stream_source, PushSource) - or isinstance(obj.stream_source, KafkaSource) - and not any((obj is ds) for ds in res.data_sources) - ): - batch_source_dep = obj.stream_source.batch_source - # Don't add if the push source's batch source is a duplicate of an existing batch source - if batch_source_dep and batch_source_dep not in data_sources_set: - res.data_sources.append(batch_source_dep) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + if not any((batch_source is ds) for ds in res.data_sources): + res.data_sources.append(batch_source) + + # Handle stream sources defined with feature views. + stream_source = obj.stream_source + assert stream_source + if not any((stream_source is ds) for ds in res.data_sources): + res.data_sources.append(stream_source) elif isinstance(obj, Entity) and not any( (obj is entity) for entity in res.entities ): @@ -168,6 +188,7 @@ def parse_repo(repo_root: Path) -> RepoContents: (obj is rfv) for rfv in res.request_feature_views ): res.request_feature_views.append(obj) + res.entities.append(DUMMY_ENTITY) return res @@ -300,7 +321,6 @@ def log_infra_changes( @log_exceptions_and_usage def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): - os.chdir(repo_path) project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path) apply_total_with_repo_instance( From e733c89eda7ac70504cf6de1382d3908074445a1 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Sat, 6 Aug 2022 15:43:17 -0700 Subject: [PATCH 3/3] Add tests Signed-off-by: Felix Wang --- sdk/python/feast/feature_store.py | 20 +- ...e_feature_repo_with_inline_batch_source.py | 28 ++ ..._feature_repo_with_inline_stream_source.py | 37 +++ ...example_feature_repo_with_stream_source.py | 18 ++ .../test_local_feature_store.py | 296 +++++++++++++++++- .../test_stream_feature_view_apply.py | 149 --------- sdk/python/tests/utils/cli_repo_creator.py | 11 +- 7 files changed, 397 insertions(+), 162 deletions(-) create mode 100644 sdk/python/tests/example_repos/example_feature_repo_with_inline_batch_source.py create mode 100644 sdk/python/tests/example_repos/example_feature_repo_with_inline_stream_source.py create mode 100644 sdk/python/tests/example_repos/example_feature_repo_with_stream_source.py delete mode 100644 sdk/python/tests/unit/local_feast_tests/test_stream_feature_view_apply.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ac682fb6cd..781c9406e1 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -43,7 +43,13 @@ from feast import feature_server, flags_helper, ui_server, utils from feast.base_feature_view import BaseFeatureView from feast.batch_feature_view import BatchFeatureView -from feast.data_source import DataSource, PushMode +from feast.data_source import ( + DataSource, + KafkaSource, + KinesisSource, + PushMode, + PushSource, +) from feast.diff.infra_diff import InfraDiff, diff_infra_protos from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between from feast.dqm.errors import ValidationFailed @@ -827,6 +833,18 @@ def apply( ob for ob in objects if isinstance(ob, ValidationReference) ] + batch_sources_to_add: List[DataSource] = [] + for data_source in data_sources_set_to_update: + if ( + isinstance(data_source, PushSource) + or isinstance(data_source, KafkaSource) + or isinstance(data_source, KinesisSource) + ): + assert data_source.batch_source + batch_sources_to_add.append(data_source.batch_source) + for batch_source in batch_sources_to_add: + data_sources_set_to_update.add(batch_source) + for fv in itertools.chain(views_to_update, sfvs_to_update): data_sources_set_to_update.add(fv.batch_source) if fv.stream_source: diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_inline_batch_source.py b/sdk/python/tests/example_repos/example_feature_repo_with_inline_batch_source.py new file mode 100644 index 0000000000..dc79d28195 --- /dev/null +++ b/sdk/python/tests/example_repos/example_feature_repo_with_inline_batch_source.py @@ -0,0 +1,28 @@ +from datetime import timedelta + +from feast import Entity, FeatureView, Field, FileSource +from feast.types import Float32, Int32, Int64 + +driver = Entity( + name="driver_id", + description="driver id", +) + +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + Field(name="driver_id", dtype=Int32), + ], + online=True, + source=FileSource( + path="data/driver_stats.parquet", # Fake path + timestamp_field="event_timestamp", + created_timestamp_column="created", + ), + tags={}, +) diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_inline_stream_source.py b/sdk/python/tests/example_repos/example_feature_repo_with_inline_stream_source.py new file mode 100644 index 0000000000..5d01791b73 --- /dev/null +++ b/sdk/python/tests/example_repos/example_feature_repo_with_inline_stream_source.py @@ -0,0 +1,37 @@ +from datetime import timedelta + +from feast import Entity, FeatureView, Field, FileSource, KafkaSource +from feast.data_format import AvroFormat +from feast.types import Float32, Int32, Int64 + +driver = Entity( + name="driver_id", + description="driver id", +) + +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + Field(name="driver_id", dtype=Int32), + ], + online=True, + source=KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource( + path="data/driver_stats.parquet", # Fake path + timestamp_field="event_timestamp", + created_timestamp_column="created", + ), + watermark_delay_threshold=timedelta(days=1), + ), + tags={}, +) diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_stream_source.py b/sdk/python/tests/example_repos/example_feature_repo_with_stream_source.py new file mode 100644 index 0000000000..0672e3552c --- /dev/null +++ b/sdk/python/tests/example_repos/example_feature_repo_with_stream_source.py @@ -0,0 +1,18 @@ +from datetime import timedelta + +from feast import FileSource, KafkaSource +from feast.data_format import AvroFormat + +stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource( + path="data/driver_stats.parquet", # Fake path + timestamp_field="event_timestamp", + created_timestamp_column="created", + ), + watermark_delay_threshold=timedelta(days=1), +) diff --git a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py index c93722cd6b..b2da58c4c0 100644 --- a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py +++ b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py @@ -4,15 +4,19 @@ import pytest from pytest_lazyfixture import lazy_fixture -from feast import FileSource -from feast.data_format import ParquetFormat +from feast.aggregation import Aggregation +from feast.data_format import AvroFormat, ParquetFormat +from feast.data_source import KafkaSource from feast.entity import Entity from feast.feature_store import FeatureStore from feast.feature_view import FeatureView from feast.field import Field +from feast.infra.offline_stores.file_source import FileSource from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast.repo_config import RepoConfig -from feast.types import Array, Bytes, Int64, String +from feast.stream_feature_view import stream_feature_view +from feast.types import Array, Bytes, Float32, Int64, String +from tests.utils.cli_repo_creator import CliRunner, get_example_repo from tests.utils.data_source_test_creator import prep_file_source @@ -20,7 +24,7 @@ "test_feature_store", [lazy_fixture("feature_store_with_local_registry")], ) -def test_apply_entity_success(test_feature_store): +def test_apply_entity(test_feature_store): entity = Entity( name="driver_car_id", description="Car driver id", @@ -48,7 +52,7 @@ def test_apply_entity_success(test_feature_store): "test_feature_store", [lazy_fixture("feature_store_with_local_registry")], ) -def test_apply_feature_view_success(test_feature_store): +def test_apply_feature_view(test_feature_store): # Create Feature Views batch_source = FileSource( file_format=ParquetFormat(), @@ -101,7 +105,97 @@ def test_apply_feature_view_success(test_feature_store): "test_feature_store", [lazy_fixture("feature_store_with_local_registry")], ) -def test_apply_object_and_read(test_feature_store): +def test_apply_feature_view_with_inline_batch_source( + test_feature_store, simple_dataset_1 +) -> None: + """Test that a feature view and an inline batch source are both correctly applied.""" + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity = Entity(name="driver_entity", join_keys=["test_key"]) + driver_fv = FeatureView( + name="driver_fv", + entities=[entity], + source=file_source, + ) + + test_feature_store.apply([entity, driver_fv]) + + fvs = test_feature_store.list_feature_views() + assert len(fvs) == 1 + assert fvs[0] == driver_fv + + ds = test_feature_store.list_data_sources() + assert len(ds) == 1 + assert ds[0] == file_source + + +def test_apply_feature_view_with_inline_batch_source_from_repo() -> None: + """Test that a feature view and an inline batch source are both correctly applied.""" + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_with_inline_batch_source.py"), "file" + ) as store: + ds = store.list_data_sources() + assert len(ds) == 1 + + +@pytest.mark.parametrize( + "test_feature_store", + [lazy_fixture("feature_store_with_local_registry")], +) +def test_apply_feature_view_with_inline_stream_source( + test_feature_store, simple_dataset_1 +) -> None: + """Test that a feature view and an inline stream source are both correctly applied.""" + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity = Entity(name="driver_entity", join_keys=["test_key"]) + + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=file_source, + watermark_delay_threshold=timedelta(days=1), + ) + + driver_fv = FeatureView( + name="driver_fv", + entities=[entity], + source=stream_source, + ) + + test_feature_store.apply([entity, driver_fv]) + + fvs = test_feature_store.list_feature_views() + assert len(fvs) == 1 + assert fvs[0] == driver_fv + + ds = test_feature_store.list_data_sources() + assert len(ds) == 2 + if isinstance(ds[0], FileSource): + assert ds[0] == file_source + assert ds[1] == stream_source + else: + assert ds[0] == stream_source + assert ds[1] == file_source + + +def test_apply_feature_view_with_inline_stream_source_from_repo() -> None: + """Test that a feature view and an inline stream source are both correctly applied.""" + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_with_inline_stream_source.py"), "file" + ) as store: + ds = store.list_data_sources() + assert len(ds) == 2 + + +@pytest.mark.parametrize( + "test_feature_store", + [lazy_fixture("feature_store_with_local_registry")], +) +def test_apply_entities_and_feature_views(test_feature_store): assert isinstance(test_feature_store, FeatureStore) # Create Feature Views batch_source = FileSource( @@ -163,9 +257,8 @@ def test_apply_object_and_read(test_feature_store): [lazy_fixture("feature_store_with_local_registry")], ) @pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")]) -def test_reapply_feature_view_success(test_feature_store, dataframe_source): +def test_reapply_feature_view(test_feature_store, dataframe_source): with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: - e = Entity(name="id", join_keys=["id_join_key"]) # Create Feature View @@ -215,7 +308,7 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source): test_feature_store.teardown() -def test_apply_conflicting_featureview_names(feature_store_with_local_registry): +def test_apply_conflicting_feature_view_names(feature_store_with_local_registry): """Test applying feature views with non-case-insensitively unique names""" driver = Entity(name="driver", join_keys=["driver_id"]) customer = Entity(name="customer", join_keys=["customer_id"]) @@ -251,6 +344,191 @@ def test_apply_conflicting_featureview_names(feature_store_with_local_registry): feature_store_with_local_registry.teardown() +@pytest.mark.parametrize( + "test_feature_store", + [lazy_fixture("feature_store_with_local_registry")], +) +def test_apply_stream_feature_view(test_feature_store, simple_dataset_1) -> None: + """Test that a stream feature view is correctly applied.""" + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity = Entity(name="driver_entity", join_keys=["test_key"]) + + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=file_source, + watermark_delay_threshold=timedelta(days=1), + ) + + @stream_feature_view( + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ), + Aggregation( + column="dummy_field2", + function="count", + time_window=timedelta(days=24), + ), + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + tags={}, + ) + def simple_sfv(df): + return df + + test_feature_store.apply([entity, simple_sfv]) + + stream_feature_views = test_feature_store.list_stream_feature_views() + assert len(stream_feature_views) == 1 + assert stream_feature_views[0] == simple_sfv + + features = test_feature_store.get_online_features( + features=["simple_sfv:dummy_field"], + entity_rows=[{"test_key": 1001}], + ).to_dict(include_event_timestamps=True) + + assert "test_key" in features + assert features["test_key"] == [1001] + assert "dummy_field" in features + assert features["dummy_field"] == [None] + + +@pytest.mark.parametrize( + "test_feature_store", + [lazy_fixture("feature_store_with_local_registry")], +) +def test_apply_stream_feature_view_udf(test_feature_store, simple_dataset_1) -> None: + """Test that a stream feature view with a udf is correctly applied.""" + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity = Entity(name="driver_entity", join_keys=["test_key"]) + + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=file_source, + watermark_delay_threshold=timedelta(days=1), + ) + + @stream_feature_view( + entities=[entity], + ttl=timedelta(days=30), + owner="test@example.com", + online=True, + schema=[Field(name="dummy_field", dtype=Float32)], + description="desc", + aggregations=[ + Aggregation( + column="dummy_field", + function="max", + time_window=timedelta(days=1), + ), + Aggregation( + column="dummy_field2", + function="count", + time_window=timedelta(days=24), + ), + ], + timestamp_field="event_timestamp", + mode="spark", + source=stream_source, + tags={}, + ) + def pandas_view(pandas_df): + import pandas as pd + + assert type(pandas_df) == pd.DataFrame + df = pandas_df.transform(lambda x: x + 10, axis=1) + df.insert(2, "C", [20.2, 230.0, 34.0], True) + return df + + import pandas as pd + + test_feature_store.apply([entity, pandas_view]) + + stream_feature_views = test_feature_store.list_stream_feature_views() + assert len(stream_feature_views) == 1 + assert stream_feature_views[0] == pandas_view + + sfv = stream_feature_views[0] + + df = pd.DataFrame({"A": [1, 2, 3], "B": [10, 20, 30]}) + new_df = sfv.udf(df) + expected_df = pd.DataFrame( + {"A": [11, 12, 13], "B": [20, 30, 40], "C": [20.2, 230.0, 34.0]} + ) + assert new_df.equals(expected_df) + + +@pytest.mark.parametrize( + "test_feature_store", + [lazy_fixture("feature_store_with_local_registry")], +) +def test_apply_batch_source(test_feature_store, simple_dataset_1) -> None: + """Test that a batch source is applied correctly.""" + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + test_feature_store.apply([file_source]) + + ds = test_feature_store.list_data_sources() + assert len(ds) == 1 + assert ds[0] == file_source + + +@pytest.mark.parametrize( + "test_feature_store", + [lazy_fixture("feature_store_with_local_registry")], +) +def test_apply_stream_source(test_feature_store, simple_dataset_1) -> None: + """Test that a stream source is applied correctly.""" + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=file_source, + watermark_delay_threshold=timedelta(days=1), + ) + + test_feature_store.apply([stream_source]) + + ds = test_feature_store.list_data_sources() + assert len(ds) == 2 + if isinstance(ds[0], FileSource): + assert ds[0] == file_source + assert ds[1] == stream_source + else: + assert ds[0] == stream_source + assert ds[1] == file_source + + +def test_apply_stream_source_from_repo() -> None: + """Test that a stream source is applied correctly.""" + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_with_stream_source.py"), "file" + ) as store: + ds = store.list_data_sources() + assert len(ds) == 2 + + @pytest.fixture def feature_store_with_local_registry(): fd, registry_path = mkstemp() diff --git a/sdk/python/tests/unit/local_feast_tests/test_stream_feature_view_apply.py b/sdk/python/tests/unit/local_feast_tests/test_stream_feature_view_apply.py deleted file mode 100644 index 0def3cc783..0000000000 --- a/sdk/python/tests/unit/local_feast_tests/test_stream_feature_view_apply.py +++ /dev/null @@ -1,149 +0,0 @@ -from datetime import timedelta - -from feast.aggregation import Aggregation -from feast.data_format import AvroFormat -from feast.data_source import KafkaSource -from feast.entity import Entity -from feast.field import Field -from feast.stream_feature_view import stream_feature_view -from feast.types import Float32 -from tests.utils.cli_repo_creator import CliRunner, get_example_repo -from tests.utils.data_source_test_creator import prep_file_source - - -def test_apply_stream_feature_view(simple_dataset_1) -> None: - """ - Test apply of StreamFeatureView. - """ - runner = CliRunner() - with runner.local_repo( - get_example_repo("empty_feature_repo.py"), "file" - ) as fs, prep_file_source( - df=simple_dataset_1, timestamp_field="ts_1" - ) as file_source: - entity = Entity(name="driver_entity", join_keys=["test_key"]) - - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=file_source, - watermark_delay_threshold=timedelta(days=1), - ) - - @stream_feature_view( - entities=[entity], - ttl=timedelta(days=30), - owner="test@example.com", - online=True, - schema=[Field(name="dummy_field", dtype=Float32)], - description="desc", - aggregations=[ - Aggregation( - column="dummy_field", - function="max", - time_window=timedelta(days=1), - ), - Aggregation( - column="dummy_field2", - function="count", - time_window=timedelta(days=24), - ), - ], - timestamp_field="event_timestamp", - mode="spark", - source=stream_source, - tags={}, - ) - def simple_sfv(df): - return df - - fs.apply([entity, simple_sfv]) - - stream_feature_views = fs.list_stream_feature_views() - assert len(stream_feature_views) == 1 - assert stream_feature_views[0] == simple_sfv - - features = fs.get_online_features( - features=["simple_sfv:dummy_field"], - entity_rows=[{"test_key": 1001}], - ).to_dict(include_event_timestamps=True) - - assert "test_key" in features - assert features["test_key"] == [1001] - assert "dummy_field" in features - assert features["dummy_field"] == [None] - - -def test_stream_feature_view_udf(simple_dataset_1) -> None: - """ - Test apply of StreamFeatureView udfs are serialized correctly and usable. - """ - runner = CliRunner() - with runner.local_repo( - get_example_repo("empty_feature_repo.py"), "file" - ) as fs, prep_file_source( - df=simple_dataset_1, timestamp_field="ts_1" - ) as file_source: - entity = Entity(name="driver_entity", join_keys=["test_key"]) - - stream_source = KafkaSource( - name="kafka", - timestamp_field="event_timestamp", - kafka_bootstrap_servers="", - message_format=AvroFormat(""), - topic="topic", - batch_source=file_source, - watermark_delay_threshold=timedelta(days=1), - ) - - @stream_feature_view( - entities=[entity], - ttl=timedelta(days=30), - owner="test@example.com", - online=True, - schema=[Field(name="dummy_field", dtype=Float32)], - description="desc", - aggregations=[ - Aggregation( - column="dummy_field", - function="max", - time_window=timedelta(days=1), - ), - Aggregation( - column="dummy_field2", - function="count", - time_window=timedelta(days=24), - ), - ], - timestamp_field="event_timestamp", - mode="spark", - source=stream_source, - tags={}, - ) - def pandas_view(pandas_df): - import pandas as pd - - assert type(pandas_df) == pd.DataFrame - df = pandas_df.transform(lambda x: x + 10, axis=1) - df.insert(2, "C", [20.2, 230.0, 34.0], True) - return df - - import pandas as pd - - fs.apply([entity, pandas_view]) - - stream_feature_views = fs.list_stream_feature_views() - assert len(stream_feature_views) == 1 - assert stream_feature_views[0] == pandas_view - - sfv = stream_feature_views[0] - - df = pd.DataFrame({"A": [1, 2, 3], "B": [10, 20, 30]}) - new_df = sfv.udf(df) - expected_df = pd.DataFrame( - {"A": [11, 12, 13], "B": [20, 30, 40], "C": [20.2, 230.0, 34.0]} - ) - assert new_df.equals(expected_df) diff --git a/sdk/python/tests/utils/cli_repo_creator.py b/sdk/python/tests/utils/cli_repo_creator.py index cbde49e583..66f67384f9 100644 --- a/sdk/python/tests/utils/cli_repo_creator.py +++ b/sdk/python/tests/utils/cli_repo_creator.py @@ -60,7 +60,6 @@ def local_repo(self, example_repo_py: str, offline_store: str): ) with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: - repo_path = Path(repo_dir_name) data_path = Path(data_dir_name) @@ -85,11 +84,17 @@ def local_repo(self, example_repo_py: str, offline_store: str): repo_example.write_text(example_repo_py) result = self.run(["apply"], cwd=repo_path) - print(f"Apply: stdout: {str(result.stdout)}\n stderr: {str(result.stderr)}") + stdout = result.stdout.decode("utf-8") + stderr = result.stderr.decode("utf-8") + print(f"Apply stdout:\n{stdout}") + print(f"Apply stderr:\n{stderr}") assert result.returncode == 0 yield FeatureStore(repo_path=str(repo_path), config=None) result = self.run(["teardown"], cwd=repo_path) - print(f"Apply: stdout: {str(result.stdout)}\n stderr: {str(result.stderr)}") + stdout = result.stdout.decode("utf-8") + stderr = result.stderr.decode("utf-8") + print(f"Apply stdout:\n{stdout}") + print(f"Apply stderr:\n{stderr}") assert result.returncode == 0