From 2a7935b04726d2b5a5fcfaacd597f8df1e1ab50e Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Wed, 26 Jan 2022 11:12:49 -0600 Subject: [PATCH 01/16] historical_field_mappings fix CONTRIBUTING docs for creating venv Signed-off-by: Michelle Rascati --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6918d7f1de9..dbf44d4bef9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -50,7 +50,7 @@ Setting up your development environment for Feast Python SDK / CLI: 3. _Recommended:_ Create a virtual environment to isolate development dependencies to be installed ```sh # create & activate a virtual environment -python -v venv venv/ +python -m venv venv/ source venv/bin/activate ``` From 8b95281749cb1269e5840555157203934729dcfe Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Wed, 26 Jan 2022 11:13:42 -0600 Subject: [PATCH 02/16] historical_field_mappings fix mypy-protobuf conflict Signed-off-by: Michelle Rascati --- sdk/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index d35ee9de11b..4f01c7b4e01 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -132,7 +132,7 @@ + AWS_REQUIRED ) -DEV_REQUIRED = ["mypy-protobuf==1.*", "grpcio-testing==1.*"] + CI_REQUIRED +DEV_REQUIRED = ["mypy-protobuf>=1.*", "grpcio-testing==1.*"] + CI_REQUIRED # Get git repo root directory repo_root = str(pathlib.Path(__file__).resolve().parent.parent.parent) From 6a8d1266625d1c493b42d421b77a6e700dc68b57 Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 11:31:37 -0600 Subject: [PATCH 03/16] historical_field_mappings adding tests for get_historical_features to return field mapped names Signed-off-by: Michelle Rascati --- sdk/python/feast/driver_test_data.py | 26 +++++++++++++++ .../infra/offline_stores/offline_utils.py | 2 +- .../feature_repos/repo_configuration.py | 14 ++++++++ .../feature_repos/universal/feature_views.py | 12 +++++++ .../test_universal_historical_retrieval.py | 33 ++++++++++++++++++- 5 files changed, 85 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/driver_test_data.py b/sdk/python/feast/driver_test_data.py index 1c9a1dd20bc..117bfcbd9cb 100644 --- a/sdk/python/feast/driver_test_data.py +++ b/sdk/python/feast/driver_test_data.py @@ -264,3 +264,29 @@ def create_global_daily_stats_df(start_date, end_date) -> pd.DataFrame: # TODO: Remove created timestamp in order to test whether its really optional df_daily["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) return df_daily + + +def create_field_mapping_df(start_date, end_date) -> pd.DataFrame: + """ + Example df generated by this function: + | event_timestamp | column_name | created | + |------------------+-------------+------------------| + | 2021-03-17 19:00 | 99 | 2021-03-24 19:38 | + | 2021-03-17 19:00 | 22 | 2021-03-24 19:38 | + | 2021-03-17 19:00 | 7 | 2021-03-24 19:38 | + | 2021-03-17 19:00 | 45 | 2021-03-24 19:38 | + """ + size = 10 + df = pd.DataFrame() + df["column_name"] = np.random.randint(1, 100, size=size).astype(np.int32) + df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [ + _convert_event_timestamp( + pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"), + EventTimestampType(idx % 4), + ) + for idx, dt in enumerate( + pd.date_range(start=start_date, end=end_date, periods=size) + ) + ] + df["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) + return df diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 0b60c3493df..c9d462fbc36 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -144,7 +144,7 @@ def get_feature_view_query_context( name=feature_view.projection.name_to_use(), ttl=ttl_seconds, entities=join_keys, - features=features, + features=[reverse_field_mapping.get(feature, feature) for feature in features], event_timestamp_column=reverse_field_mapping.get( event_timestamp_column, event_timestamp_column ), diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 45044574e01..022bd2badaf 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -38,6 +38,7 @@ create_global_stats_feature_view, create_location_stats_feature_view, create_order_feature_view, + create_field_mapping_feature_view, ) DYNAMO_CONFIG = {"type": "dynamodb", "region": "us-west-2"} @@ -126,6 +127,7 @@ def construct_universal_datasets( order_count=20, ) global_df = driver_test_data.create_global_daily_stats_df(start_time, end_time) + field_mapping_df = driver_test_data.create_field_mapping_df(start_time, end_time) entity_df = orders_df[ [ "customer_id", @@ -143,6 +145,7 @@ def construct_universal_datasets( "location": location_df, "orders": orders_df, "global": global_df, + "field_mapping": field_mapping_df, "entity": entity_df, } @@ -180,12 +183,22 @@ def construct_universal_data_sources( event_timestamp_column="event_timestamp", created_timestamp_column="created", ) + field_mapping_ds = data_source_creator.create_data_source( + datasets["field_mapping"], + destination_name="field_mapping", + event_timestamp_column="event_timestamp", + created_timestamp_column="created", + field_mapping={ + "column_name": "feature_name" + } + ) return { "customer": customer_ds, "driver": driver_ds, "location": location_ds, "orders": orders_ds, "global": global_ds, + "field_mapping": field_mapping_ds } @@ -210,6 +223,7 @@ def construct_universal_feature_views( "driver_age_request_fv": create_driver_age_request_feature_view(), "order": create_order_feature_view(data_sources["orders"]), "location": create_location_stats_feature_view(data_sources["location"]), + "field_mapping": create_field_mapping_feature_view(data_sources["field_mapping"]) } diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index f68add88cbb..3420d725dd6 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -217,3 +217,15 @@ def create_location_stats_feature_view(source, infer_features: bool = False): ttl=timedelta(days=2), ) return location_stats_feature_view + + +def create_field_mapping_feature_view(source): + return FeatureView( + name="field_mapping", + entities=[], + features=[ + Feature(name="feature_name", dtype=ValueType.INT32), + ], + batch_source=source, + ttl=timedelta(days=2), + ) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 99f111a3462..9fe070d63de 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -82,6 +82,8 @@ def get_expected_training_df( location_fv: FeatureView, global_df: pd.DataFrame, global_fv: FeatureView, + field_mapping_df: pd.DataFrame, + field_mapping_fv: FeatureView, entity_df: pd.DataFrame, event_timestamp: str, full_feature_names: bool = False, @@ -102,6 +104,9 @@ def get_expected_training_df( global_records = convert_timestamp_records_to_utc( global_df.to_dict("records"), global_fv.batch_source.event_timestamp_column ) + field_mapping_records = convert_timestamp_records_to_utc( + field_mapping_df.to_dict("records"), field_mapping_fv.batch_source.event_timestamp_column + ) entity_rows = convert_timestamp_records_to_utc( entity_df.to_dict("records"), event_timestamp ) @@ -156,6 +161,13 @@ def get_expected_training_df( ts_end=order_record[event_timestamp], ) + field_mapping_record = find_asof_record( + field_mapping_records, + ts_key=field_mapping_fv.batch_source.event_timestamp_column, + ts_start=order_record[event_timestamp] - field_mapping_fv.ttl, + ts_end=order_record[event_timestamp], + ) + entity_row.update( { ( @@ -197,6 +209,14 @@ def get_expected_training_df( } ) + # get field_mapping_record by column name, but label by feature name + entity_row.update( + { + (f"field_mapping__{feature}" if full_feature_names else feature): field_mapping_record.get(column, None) + for (column, feature) in field_mapping_fv.input.field_mapping.items() + } + ) + # Convert records back to pandas dataframe expected_df = pd.DataFrame(entity_rows) @@ -213,6 +233,7 @@ def get_expected_training_df( "customer_profile__current_balance": "float32", "customer_profile__avg_passenger_count": "float32", "global_stats__avg_ride_length": "float32", + "field_mapping__feature_name": "int32", } else: expected_column_types = { @@ -221,6 +242,7 @@ def get_expected_training_df( "current_balance": "float32", "avg_passenger_count": "float32", "avg_ride_length": "float32", + "feature_name": "int32", } for col, typ in expected_column_types.items(): @@ -255,13 +277,14 @@ def test_historical_features(environment, universal_data_sources, full_feature_n (entities, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) - customer_df, driver_df, location_df, orders_df, global_df, entity_df = ( + customer_df, driver_df, location_df, orders_df, global_df, entity_df, field_mapping_df = ( datasets["customer"], datasets["driver"], datasets["location"], datasets["orders"], datasets["global"], datasets["entity"], + datasets["field_mapping"], ) entity_df_with_request_data = entity_df.copy(deep=True) entity_df_with_request_data["val_to_add"] = [ @@ -279,6 +302,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n order_fv, global_fv, driver_age_request_fv, + field_mapping_fv, ) = ( feature_views["customer"], feature_views["driver"], @@ -287,6 +311,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n feature_views["order"], feature_views["global"], feature_views["driver_age_request_fv"], + feature_views["field_mapping"], ) feature_service = FeatureService( @@ -324,6 +349,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n location(), feature_service, feature_service_entity_mapping, + field_mapping_fv, ] ) store.apply(feast_objects) @@ -349,6 +375,8 @@ def test_historical_features(environment, universal_data_sources, full_feature_n location_fv, global_df, global_fv, + field_mapping_df, + field_mapping_fv, entity_df_with_request_data, event_timestamp, full_feature_names, @@ -444,6 +472,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "global_stats:num_rides", "global_stats:avg_ride_length", "driver_age:driver_age", + "field_mapping:feature_name", ], full_feature_names=full_feature_names, ) @@ -516,6 +545,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "conv_rate_plus_100:conv_rate_plus_val_to_add", "global_stats:num_rides", "global_stats:avg_ride_length", + "field_mapping:feature_name", ], full_feature_names=full_feature_names, ) @@ -532,6 +562,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "driver_age:driver_age", "global_stats:num_rides", "global_stats:avg_ride_length", + "field_mapping:feature_name", ], full_feature_names=full_feature_names, ) From 99bf1ed2cf143b0450965b2ec2df1321a2c12b0e Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 11:33:52 -0600 Subject: [PATCH 04/16] historical_field_mappings bigquery tests passing Signed-off-by: Michelle Rascati --- sdk/python/feast/infra/offline_stores/bigquery.py | 4 ++-- sdk/python/feast/infra/offline_stores/offline_utils.py | 5 ++++- .../offline_store/test_universal_historical_retrieval.py | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 34dde7aa7b9..cf27da0e2d8 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -531,7 +531,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} - {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %} + {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}' @@ -632,7 +632,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] SELECT {{featureview.name}}__entity_row_unique_id {% for feature in featureview.features %} - ,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %} + ,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %} {% endfor %} FROM {{ featureview.name }}__cleaned ) USING ({{featureview.name}}__entity_row_unique_id) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index c9d462fbc36..7e9640ca4ee 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -85,6 +85,7 @@ class FeatureViewQueryContext: ttl: int entities: List[str] features: List[str] # feature reference format + field_mapping: Optional[Dict[str, str]] event_timestamp_column: str created_timestamp_column: Optional[str] table_subquery: str @@ -145,6 +146,7 @@ def get_feature_view_query_context( ttl=ttl_seconds, entities=join_keys, features=[reverse_field_mapping.get(feature, feature) for feature in features], + field_mapping=feature_view.input.field_mapping, event_timestamp_column=reverse_field_mapping.get( event_timestamp_column, event_timestamp_column ), @@ -175,7 +177,8 @@ def build_point_in_time_query( final_output_feature_names = list(entity_df_columns) final_output_feature_names.extend( [ - (f"{fv.name}__{feature}" if full_feature_names else feature) + (f"{fv.name}__{fv.field_mapping.get(feature, feature)}" if full_feature_names + else fv.field_mapping.get(feature, feature)) for fv in feature_view_query_contexts for feature in fv.features ] diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 9fe070d63de..faf68dec638 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -399,6 +399,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "order:order_is_success", "global_stats:num_rides", "global_stats:avg_ride_length", + "field_mapping:feature_name" ], full_feature_names=full_feature_names, ) From a89559434180f44303931bb4bbf9a03afab8f538 Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 12:24:06 -0600 Subject: [PATCH 05/16] historical_field_mappings redshift tests pass --- sdk/python/feast/infra/offline_stores/redshift.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index df363967d6e..04adae85bf1 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -489,7 +489,7 @@ def _get_entity_df_event_timestamp_range( {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} - {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %} + {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}' @@ -590,7 +590,7 @@ def _get_entity_df_event_timestamp_range( SELECT {{featureview.name}}__entity_row_unique_id {% for feature in featureview.features %} - ,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %} + ,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %} {% endfor %} FROM {{ featureview.name }}__cleaned ) USING ({{featureview.name}}__entity_row_unique_id) From 12753f32e831689c59ece043f8e7e9371da78d8c Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 13:45:01 -0600 Subject: [PATCH 06/16] historical_field_mappings formatting Signed-off-by: Michelle Rascati --- .../infra/offline_stores/offline_utils.py | 11 ++++++++--- .../feature_repos/repo_configuration.py | 12 ++++++------ .../feature_repos/universal/feature_views.py | 4 +--- .../test_universal_historical_retrieval.py | 19 +++++++++++++++---- 4 files changed, 30 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 7e9640ca4ee..78ebd3f0487 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -145,7 +145,9 @@ def get_feature_view_query_context( name=feature_view.projection.name_to_use(), ttl=ttl_seconds, entities=join_keys, - features=[reverse_field_mapping.get(feature, feature) for feature in features], + features=[ + reverse_field_mapping.get(feature, feature) for feature in features + ], field_mapping=feature_view.input.field_mapping, event_timestamp_column=reverse_field_mapping.get( event_timestamp_column, event_timestamp_column @@ -177,8 +179,11 @@ def build_point_in_time_query( final_output_feature_names = list(entity_df_columns) final_output_feature_names.extend( [ - (f"{fv.name}__{fv.field_mapping.get(feature, feature)}" if full_feature_names - else fv.field_mapping.get(feature, feature)) + ( + f"{fv.name}__{fv.field_mapping.get(feature, feature)}" + if full_feature_names + else fv.field_mapping.get(feature, feature) + ) for fv in feature_view_query_contexts for feature in fv.features ] diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 022bd2badaf..b7d841f5af9 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -35,10 +35,10 @@ create_customer_daily_profile_feature_view, create_driver_age_request_feature_view, create_driver_hourly_stats_feature_view, + create_field_mapping_feature_view, create_global_stats_feature_view, create_location_stats_feature_view, create_order_feature_view, - create_field_mapping_feature_view, ) DYNAMO_CONFIG = {"type": "dynamodb", "region": "us-west-2"} @@ -188,9 +188,7 @@ def construct_universal_data_sources( destination_name="field_mapping", event_timestamp_column="event_timestamp", created_timestamp_column="created", - field_mapping={ - "column_name": "feature_name" - } + field_mapping={"column_name": "feature_name"}, ) return { "customer": customer_ds, @@ -198,7 +196,7 @@ def construct_universal_data_sources( "location": location_ds, "orders": orders_ds, "global": global_ds, - "field_mapping": field_mapping_ds + "field_mapping": field_mapping_ds, } @@ -223,7 +221,9 @@ def construct_universal_feature_views( "driver_age_request_fv": create_driver_age_request_feature_view(), "order": create_order_feature_view(data_sources["orders"]), "location": create_location_stats_feature_view(data_sources["location"]), - "field_mapping": create_field_mapping_feature_view(data_sources["field_mapping"]) + "field_mapping": create_field_mapping_feature_view( + data_sources["field_mapping"] + ), } diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 3420d725dd6..4470bea966c 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -223,9 +223,7 @@ def create_field_mapping_feature_view(source): return FeatureView( name="field_mapping", entities=[], - features=[ - Feature(name="feature_name", dtype=ValueType.INT32), - ], + features=[Feature(name="feature_name", dtype=ValueType.INT32),], batch_source=source, ttl=timedelta(days=2), ) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index faf68dec638..65eb57036ef 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -105,7 +105,8 @@ def get_expected_training_df( global_df.to_dict("records"), global_fv.batch_source.event_timestamp_column ) field_mapping_records = convert_timestamp_records_to_utc( - field_mapping_df.to_dict("records"), field_mapping_fv.batch_source.event_timestamp_column + field_mapping_df.to_dict("records"), + field_mapping_fv.batch_source.event_timestamp_column, ) entity_rows = convert_timestamp_records_to_utc( entity_df.to_dict("records"), event_timestamp @@ -212,7 +213,9 @@ def get_expected_training_df( # get field_mapping_record by column name, but label by feature name entity_row.update( { - (f"field_mapping__{feature}" if full_feature_names else feature): field_mapping_record.get(column, None) + ( + f"field_mapping__{feature}" if full_feature_names else feature + ): field_mapping_record.get(column, None) for (column, feature) in field_mapping_fv.input.field_mapping.items() } ) @@ -277,7 +280,15 @@ def test_historical_features(environment, universal_data_sources, full_feature_n (entities, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) - customer_df, driver_df, location_df, orders_df, global_df, entity_df, field_mapping_df = ( + ( + customer_df, + driver_df, + location_df, + orders_df, + global_df, + entity_df, + field_mapping_df, + ) = ( datasets["customer"], datasets["driver"], datasets["location"], @@ -399,7 +410,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "order:order_is_success", "global_stats:num_rides", "global_stats:avg_ride_length", - "field_mapping:feature_name" + "field_mapping:feature_name", ], full_feature_names=full_feature_names, ) From d15a210aef09a19cebe6cd59fcb6c37ccd8c69cc Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 13:55:34 -0600 Subject: [PATCH 07/16] historical_field_mappings make required so no .get() from None --- sdk/python/feast/infra/offline_stores/offline_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 78ebd3f0487..eaf4925266d 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -85,7 +85,7 @@ class FeatureViewQueryContext: ttl: int entities: List[str] features: List[str] # feature reference format - field_mapping: Optional[Dict[str, str]] + field_mapping: Dict[str, str] event_timestamp_column: str created_timestamp_column: Optional[str] table_subquery: str From ed011a3769fed81cd0eb6660836cc9d5911f1046 Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 14:03:13 -0600 Subject: [PATCH 08/16] historical_field_mappings type the registry so linter is happy --- .../tests/integration/feature_repos/repo_configuration.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index b7d841f5af9..6c12561c8ab 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -7,7 +7,7 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import pandas as pd import yaml @@ -17,6 +17,7 @@ from feast.data_source import DataSource from feast.errors import FeastModuleImportError from feast.repo_config import RegistryConfig, RepoConfig +from pydantic import StrictStr from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, ) @@ -287,6 +288,8 @@ def construct_test_environment( repo_dir_name = tempfile.mkdtemp() + registry: Union[StrictStr, RegistryConfig] + if test_repo_config.python_feature_server and test_repo_config.provider == "aws": from feast.infra.feature_servers.aws_lambda.config import ( AwsLambdaFeatureServerConfig, From af8fc8530b26baa88764ffd29bb7214fb84699a2 Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 14:21:32 -0600 Subject: [PATCH 09/16] historical_field_mappings making pyling happy --- sdk/python/feast/feature_store.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c4d03fd01cc..64b89efa2d4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -70,7 +70,7 @@ from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, GetOnlineFeaturesResponse, -) + FieldStatusValue) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value from feast.registry import Registry @@ -1349,7 +1349,7 @@ def _read_from_online_store( provider: Provider, requested_features: List[str], table: FeatureView, - ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: + ) -> List[Tuple[List[Timestamp], List[FieldStatusValue], List[Value]]]: """ Read and process data from the OnlineStore for a given FeatureView. This method guarentees that the order of the data in each element of the @@ -1404,7 +1404,7 @@ def _read_from_online_store( def _populate_response_from_feature_data( feature_data: Iterable[ Tuple[ - Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] + Iterable[Timestamp], Iterable[FieldStatusValue], Iterable[Value] ] ], indexes: Iterable[Iterable[int]], From e804fe9fb67b52dcb4d469de7d7cc80c72c93cb6 Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 14:24:09 -0600 Subject: [PATCH 10/16] historical_field_mappings formatting --- sdk/python/feast/feature_store.py | 7 +++---- .../tests/integration/feature_repos/repo_configuration.py | 2 +- .../integration/feature_repos/universal/feature_views.py | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 64b89efa2d4..c75fad8ab5c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -69,8 +69,9 @@ from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, + FieldStatusValue, GetOnlineFeaturesResponse, - FieldStatusValue) +) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value from feast.registry import Registry @@ -1403,9 +1404,7 @@ def _read_from_online_store( @staticmethod def _populate_response_from_feature_data( feature_data: Iterable[ - Tuple[ - Iterable[Timestamp], Iterable[FieldStatusValue], Iterable[Value] - ] + Tuple[Iterable[Timestamp], Iterable[FieldStatusValue], Iterable[Value]] ], indexes: Iterable[Iterable[int]], online_features_response: GetOnlineFeaturesResponse, diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 6c12561c8ab..27cf0f38f93 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -11,13 +11,13 @@ import pandas as pd import yaml +from pydantic import StrictStr from feast import FeatureStore, FeatureView, driver_test_data from feast.constants import FULL_REPO_CONFIGS_MODULE_ENV_NAME from feast.data_source import DataSource from feast.errors import FeastModuleImportError from feast.repo_config import RegistryConfig, RepoConfig -from pydantic import StrictStr from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, ) diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 4470bea966c..b0dc34197f3 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -223,7 +223,7 @@ def create_field_mapping_feature_view(source): return FeatureView( name="field_mapping", entities=[], - features=[Feature(name="feature_name", dtype=ValueType.INT32),], + features=[Feature(name="feature_name", dtype=ValueType.INT32)], batch_source=source, ttl=timedelta(days=2), ) From 0c1e79f927489e26d68127b68de3a31341f15d27 Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 14:46:35 -0600 Subject: [PATCH 11/16] historical_field_mappings Revert "historical_field_mappings making pyling happy" This reverts commit af8fc8530b26baa88764ffd29bb7214fb84699a2. --- sdk/python/feast/feature_store.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 8965b1560f1..2cce0cb0af8 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1438,7 +1438,7 @@ def _read_from_online_store( provider: Provider, requested_features: List[str], table: FeatureView, - ) -> List[Tuple[List[Timestamp], List[FieldStatusValue], List[Value]]]: + ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: """ Read and process data from the OnlineStore for a given FeatureView. This method guarentees that the order of the data in each element of the @@ -1492,7 +1492,9 @@ def _read_from_online_store( @staticmethod def _populate_response_from_feature_data( feature_data: Iterable[ - Tuple[Iterable[Timestamp], Iterable[FieldStatusValue], Iterable[Value]] + Tuple[ + Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] + ] ], indexes: Iterable[Iterable[int]], online_features_response: GetOnlineFeaturesResponse, From 60e3692df0a353e2755f680ae018323828bdb32e Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 14:49:36 -0600 Subject: [PATCH 12/16] historical_field_mappings redo FieldStatusValue --- sdk/python/feast/feature_store.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 2cce0cb0af8..5f30b64265e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -69,9 +69,8 @@ from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, - FieldStatusValue, GetOnlineFeaturesResponse, -) + FieldStatusValue) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value from feast.registry import Registry @@ -1438,7 +1437,7 @@ def _read_from_online_store( provider: Provider, requested_features: List[str], table: FeatureView, - ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: + ) -> List[Tuple[List[Timestamp], List[FieldStatusValue], List[Value]]]: """ Read and process data from the OnlineStore for a given FeatureView. This method guarentees that the order of the data in each element of the @@ -1493,7 +1492,7 @@ def _read_from_online_store( def _populate_response_from_feature_data( feature_data: Iterable[ Tuple[ - Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] + Iterable[Timestamp], Iterable[FieldStatusValue], Iterable[Value] ] ], indexes: Iterable[Iterable[int]], From 33dd5b90f4cff39775f0079967573ab719a243fc Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 14:56:56 -0600 Subject: [PATCH 13/16] historical_field_mappings already fixed upstream --- .../tests/integration/feature_repos/repo_configuration.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index aca9d381ccf..d7d2b46b045 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -288,8 +288,6 @@ def construct_test_environment( repo_dir_name = tempfile.mkdtemp() - registry: Union[StrictStr, RegistryConfig] - if test_repo_config.python_feature_server and test_repo_config.provider == "aws": from feast.infra.feature_servers.aws_lambda.config import ( AwsLambdaFeatureServerConfig, From c3c6747f00e157caad1c96fd8724c1cc6da172f9 Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 14:57:08 -0600 Subject: [PATCH 14/16] historical_field_mappings formatting --- sdk/python/feast/feature_store.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 5f30b64265e..8965b1560f1 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -69,8 +69,9 @@ from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, + FieldStatusValue, GetOnlineFeaturesResponse, - FieldStatusValue) +) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value from feast.registry import Registry @@ -1491,9 +1492,7 @@ def _read_from_online_store( @staticmethod def _populate_response_from_feature_data( feature_data: Iterable[ - Tuple[ - Iterable[Timestamp], Iterable[FieldStatusValue], Iterable[Value] - ] + Tuple[Iterable[Timestamp], Iterable[FieldStatusValue], Iterable[Value]] ], indexes: Iterable[Iterable[int]], online_features_response: GetOnlineFeaturesResponse, From 2d43b7469f4e67fe4111d44f9c37f2f848ce6c46 Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 14:58:37 -0600 Subject: [PATCH 15/16] historical_field_mappings remove unused import --- sdk/python/tests/integration/feature_repos/repo_configuration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index d7d2b46b045..f0fb0b28fda 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -11,7 +11,6 @@ import pandas as pd import yaml -from pydantic import StrictStr from feast import FeatureStore, FeatureView, driver_test_data from feast.constants import FULL_REPO_CONFIGS_MODULE_ENV_NAME From cbcbed737f387bf42041cf18386cac8dcf8dda3b Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 15:28:29 -0600 Subject: [PATCH 16/16] historical_field_mappings Revert "historical_field_mappings redo FieldStatusValue" This reverts commit 60e3692df0a353e2755f680ae018323828bdb32e. --- sdk/python/feast/feature_store.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 8965b1560f1..0024b368fe0 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -69,7 +69,6 @@ from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, - FieldStatusValue, GetOnlineFeaturesResponse, ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -1438,7 +1437,7 @@ def _read_from_online_store( provider: Provider, requested_features: List[str], table: FeatureView, - ) -> List[Tuple[List[Timestamp], List[FieldStatusValue], List[Value]]]: + ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: """ Read and process data from the OnlineStore for a given FeatureView. This method guarentees that the order of the data in each element of the @@ -1492,7 +1491,9 @@ def _read_from_online_store( @staticmethod def _populate_response_from_feature_data( feature_data: Iterable[ - Tuple[Iterable[Timestamp], Iterable[FieldStatusValue], Iterable[Value]] + Tuple[ + Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] + ] ], indexes: Iterable[Iterable[int]], online_features_response: GetOnlineFeaturesResponse,