From b905ade7dce057f1f50a7ad76e7fe619478b0fe5 Mon Sep 17 00:00:00 2001 From: Michelle Rascati Date: Thu, 27 Jan 2022 15:45:37 -0600 Subject: [PATCH] historical_field_mappings2 merge for one sign off commit Signed-off-by: Michelle Rascati --- CONTRIBUTING.md | 2 +- sdk/python/feast/driver_test_data.py | 26 ++++++++++++++ .../feast/infra/offline_stores/bigquery.py | 4 +-- .../infra/offline_stores/offline_utils.py | 12 +++++-- .../feast/infra/offline_stores/redshift.py | 4 +-- sdk/python/setup.py | 2 +- .../feature_repos/repo_configuration.py | 14 ++++++++ .../feature_repos/universal/feature_views.py | 10 ++++++ .../test_universal_historical_retrieval.py | 36 +++++++++++++++++++ 9 files changed, 102 insertions(+), 8 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6918d7f1de..dbf44d4bef 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -50,7 +50,7 @@ Setting up your development environment for Feast Python SDK / CLI: 3. _Recommended:_ Create a virtual environment to isolate development dependencies to be installed ```sh # create & activate a virtual environment -python -v venv venv/ +python -m venv venv/ source venv/bin/activate ``` diff --git a/sdk/python/feast/driver_test_data.py b/sdk/python/feast/driver_test_data.py index 1c9a1dd20b..117bfcbd9c 100644 --- a/sdk/python/feast/driver_test_data.py +++ b/sdk/python/feast/driver_test_data.py @@ -264,3 +264,29 @@ def create_global_daily_stats_df(start_date, end_date) -> pd.DataFrame: # TODO: Remove created timestamp in order to test whether its really optional df_daily["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) return df_daily + + +def create_field_mapping_df(start_date, end_date) -> pd.DataFrame: + """ + Example df generated by this function: + | event_timestamp | column_name | created | + |------------------+-------------+------------------| + | 2021-03-17 19:00 | 99 | 2021-03-24 19:38 | + | 2021-03-17 19:00 | 22 | 2021-03-24 19:38 | + | 2021-03-17 19:00 | 7 | 2021-03-24 19:38 | + | 2021-03-17 19:00 | 45 | 2021-03-24 19:38 | + """ + size = 10 + df = pd.DataFrame() + df["column_name"] = np.random.randint(1, 100, size=size).astype(np.int32) + df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [ + _convert_event_timestamp( + pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"), + EventTimestampType(idx % 4), + ) + for idx, dt in enumerate( + pd.date_range(start=start_date, end=end_date, periods=size) + ) + ] + df["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) + return df diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 42a1a83907..837ab20ce7 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -598,7 +598,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} - {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %} + {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}' @@ -699,7 +699,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] SELECT {{featureview.name}}__entity_row_unique_id {% for feature in featureview.features %} - ,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %} + ,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %} {% endfor %} FROM {{ featureview.name }}__cleaned ) USING ({{featureview.name}}__entity_row_unique_id) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 0b60c3493d..eaf4925266 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -85,6 +85,7 @@ class FeatureViewQueryContext: ttl: int entities: List[str] features: List[str] # feature reference format + field_mapping: Dict[str, str] event_timestamp_column: str created_timestamp_column: Optional[str] table_subquery: str @@ -144,7 +145,10 @@ def get_feature_view_query_context( name=feature_view.projection.name_to_use(), ttl=ttl_seconds, entities=join_keys, - features=features, + features=[ + reverse_field_mapping.get(feature, feature) for feature in features + ], + field_mapping=feature_view.input.field_mapping, event_timestamp_column=reverse_field_mapping.get( event_timestamp_column, event_timestamp_column ), @@ -175,7 +179,11 @@ def build_point_in_time_query( final_output_feature_names = list(entity_df_columns) final_output_feature_names.extend( [ - (f"{fv.name}__{feature}" if full_feature_names else feature) + ( + f"{fv.name}__{fv.field_mapping.get(feature, feature)}" + if full_feature_names + else fv.field_mapping.get(feature, feature) + ) for fv in feature_view_query_contexts for feature in fv.features ] diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 2aa3d5c41c..3efd45bc74 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -563,7 +563,7 @@ def _get_entity_df_event_timestamp_range( {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} {% for feature in featureview.features %} - {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %} + {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}' @@ -664,7 +664,7 @@ def _get_entity_df_event_timestamp_range( SELECT {{featureview.name}}__entity_row_unique_id {% for feature in featureview.features %} - ,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %} + ,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %} {% endfor %} FROM {{ featureview.name }}__cleaned ) USING ({{featureview.name}}__entity_row_unique_id) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index d35ee9de11..4f01c7b4e0 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -132,7 +132,7 @@ + AWS_REQUIRED ) -DEV_REQUIRED = ["mypy-protobuf==1.*", "grpcio-testing==1.*"] + CI_REQUIRED +DEV_REQUIRED = ["mypy-protobuf>=1.*", "grpcio-testing==1.*"] + CI_REQUIRED # Get git repo root directory repo_root = str(pathlib.Path(__file__).resolve().parent.parent.parent) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index f66a92c9d6..f0fb0b28fd 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -35,6 +35,7 @@ create_customer_daily_profile_feature_view, create_driver_age_request_feature_view, create_driver_hourly_stats_feature_view, + create_field_mapping_feature_view, create_global_stats_feature_view, create_location_stats_feature_view, create_order_feature_view, @@ -126,6 +127,7 @@ def construct_universal_datasets( order_count=20, ) global_df = driver_test_data.create_global_daily_stats_df(start_time, end_time) + field_mapping_df = driver_test_data.create_field_mapping_df(start_time, end_time) entity_df = orders_df[ [ "customer_id", @@ -143,6 +145,7 @@ def construct_universal_datasets( "location": location_df, "orders": orders_df, "global": global_df, + "field_mapping": field_mapping_df, "entity": entity_df, } @@ -180,12 +183,20 @@ def construct_universal_data_sources( event_timestamp_column="event_timestamp", created_timestamp_column="created", ) + field_mapping_ds = data_source_creator.create_data_source( + datasets["field_mapping"], + destination_name="field_mapping", + event_timestamp_column="event_timestamp", + created_timestamp_column="created", + field_mapping={"column_name": "feature_name"}, + ) return { "customer": customer_ds, "driver": driver_ds, "location": location_ds, "orders": orders_ds, "global": global_ds, + "field_mapping": field_mapping_ds, } @@ -210,6 +221,9 @@ def construct_universal_feature_views( "driver_age_request_fv": create_driver_age_request_feature_view(), "order": create_order_feature_view(data_sources["orders"]), "location": create_location_stats_feature_view(data_sources["location"]), + "field_mapping": create_field_mapping_feature_view( + data_sources["field_mapping"] + ), } diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index f68add88cb..b0dc34197f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -217,3 +217,13 @@ def create_location_stats_feature_view(source, infer_features: bool = False): ttl=timedelta(days=2), ) return location_stats_feature_view + + +def create_field_mapping_feature_view(source): + return FeatureView( + name="field_mapping", + entities=[], + features=[Feature(name="feature_name", dtype=ValueType.INT32)], + batch_source=source, + ttl=timedelta(days=2), + ) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 5e4bd00460..147e20aee1 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -82,6 +82,8 @@ def get_expected_training_df( location_fv: FeatureView, global_df: pd.DataFrame, global_fv: FeatureView, + field_mapping_df: pd.DataFrame, + field_mapping_fv: FeatureView, entity_df: pd.DataFrame, event_timestamp: str, full_feature_names: bool = False, @@ -102,6 +104,10 @@ def get_expected_training_df( global_records = convert_timestamp_records_to_utc( global_df.to_dict("records"), global_fv.batch_source.event_timestamp_column ) + field_mapping_records = convert_timestamp_records_to_utc( + field_mapping_df.to_dict("records"), + field_mapping_fv.batch_source.event_timestamp_column, + ) entity_rows = convert_timestamp_records_to_utc( entity_df.to_dict("records"), event_timestamp ) @@ -156,6 +162,13 @@ def get_expected_training_df( ts_end=order_record[event_timestamp], ) + field_mapping_record = find_asof_record( + field_mapping_records, + ts_key=field_mapping_fv.batch_source.event_timestamp_column, + ts_start=order_record[event_timestamp] - field_mapping_fv.ttl, + ts_end=order_record[event_timestamp], + ) + entity_row.update( { ( @@ -197,6 +210,16 @@ def get_expected_training_df( } ) + # get field_mapping_record by column name, but label by feature name + entity_row.update( + { + ( + f"field_mapping__{feature}" if full_feature_names else feature + ): field_mapping_record.get(column, None) + for (column, feature) in field_mapping_fv.input.field_mapping.items() + } + ) + # Convert records back to pandas dataframe expected_df = pd.DataFrame(entity_rows) @@ -213,6 +236,7 @@ def get_expected_training_df( "customer_profile__current_balance": "float32", "customer_profile__avg_passenger_count": "float32", "global_stats__avg_ride_length": "float32", + "field_mapping__feature_name": "int32", } else: expected_column_types = { @@ -221,6 +245,7 @@ def get_expected_training_df( "current_balance": "float32", "avg_passenger_count": "float32", "avg_ride_length": "float32", + "feature_name": "int32", } for col, typ in expected_column_types.items(): @@ -311,6 +336,8 @@ def test_historical_features(environment, universal_data_sources, full_feature_n feature_views["location"], datasets["global"], feature_views["global"], + datasets["field_mapping"], + feature_views["field_mapping"], entity_df_with_request_data, event_timestamp, full_feature_names, @@ -336,6 +363,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "global_stats:num_rides", "global_stats:avg_ride_length", "driver_age:driver_age", + "field_mapping:feature_name", ], full_feature_names=full_feature_names, ) @@ -404,6 +432,7 @@ def test_historical_features_with_missing_request_data( "conv_rate_plus_100:conv_rate_plus_val_to_add", "global_stats:num_rides", "global_stats:avg_ride_length", + "field_mapping:feature_name", ], full_feature_names=full_feature_names, ) @@ -419,6 +448,7 @@ def test_historical_features_with_missing_request_data( "driver_age:driver_age", "global_stats:num_rides", "global_stats:avg_ride_length", + "field_mapping:feature_name", ], full_feature_names=full_feature_names, ) @@ -452,6 +482,7 @@ def test_historical_features_with_entities_from_query( "order:order_is_success", "global_stats:num_rides", "global_stats:avg_ride_length", + "field_mapping:feature_name", ], full_feature_names=full_feature_names, ) @@ -477,6 +508,8 @@ def test_historical_features_with_entities_from_query( feature_views["location"], datasets["global"], feature_views["global"], + datasets["field_mapping"], + feature_views["field_mapping"], datasets["entity"], event_timestamp, full_feature_names, @@ -538,6 +571,7 @@ def test_historical_features_persisting( "order:order_is_success", "global_stats:num_rides", "global_stats:avg_ride_length", + "field_mapping:feature_name", ], full_feature_names=full_feature_names, ) @@ -561,6 +595,8 @@ def test_historical_features_persisting( feature_views["location"], datasets["global"], feature_views["global"], + datasets["field_mapping"], + feature_views["field_mapping"], entity_df, event_timestamp, full_feature_names,