Skip to content

Commit 592af75

Browse files
historical_field_mappings2 merge for one sign off commit (feast-dev#2252)
Signed-off-by: Michelle Rascati <michelle.rascati@sailpoint.com>
1 parent 5fc0b52 commit 592af75

9 files changed

Lines changed: 102 additions & 8 deletions

File tree

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ Setting up your development environment for Feast Python SDK / CLI:
5050
3. _Recommended:_ Create a virtual environment to isolate development dependencies to be installed
5151
```sh
5252
# create & activate a virtual environment
53-
python -v venv venv/
53+
python -m venv venv/
5454
source venv/bin/activate
5555
```
5656

sdk/python/feast/driver_test_data.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,29 @@ def create_global_daily_stats_df(start_date, end_date) -> pd.DataFrame:
264264
# TODO: Remove created timestamp in order to test whether its really optional
265265
df_daily["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
266266
return df_daily
267+
268+
269+
def create_field_mapping_df(start_date, end_date) -> pd.DataFrame:
270+
"""
271+
Example df generated by this function:
272+
| event_timestamp | column_name | created |
273+
|------------------+-------------+------------------|
274+
| 2021-03-17 19:00 | 99 | 2021-03-24 19:38 |
275+
| 2021-03-17 19:00 | 22 | 2021-03-24 19:38 |
276+
| 2021-03-17 19:00 | 7 | 2021-03-24 19:38 |
277+
| 2021-03-17 19:00 | 45 | 2021-03-24 19:38 |
278+
"""
279+
size = 10
280+
df = pd.DataFrame()
281+
df["column_name"] = np.random.randint(1, 100, size=size).astype(np.int32)
282+
df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [
283+
_convert_event_timestamp(
284+
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
285+
EventTimestampType(idx % 4),
286+
)
287+
for idx, dt in enumerate(
288+
pd.date_range(start=start_date, end=end_date, periods=size)
289+
)
290+
]
291+
df["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
292+
return df

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
598598
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
599599
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
600600
{% for feature in featureview.features %}
601-
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
601+
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
602602
{% endfor %}
603603
FROM {{ featureview.table_subquery }}
604604
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
@@ -699,7 +699,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
699699
SELECT
700700
{{featureview.name}}__entity_row_unique_id
701701
{% for feature in featureview.features %}
702-
,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}
702+
,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}
703703
{% endfor %}
704704
FROM {{ featureview.name }}__cleaned
705705
) USING ({{featureview.name}}__entity_row_unique_id)

sdk/python/feast/infra/offline_stores/offline_utils.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class FeatureViewQueryContext:
8585
ttl: int
8686
entities: List[str]
8787
features: List[str] # feature reference format
88+
field_mapping: Dict[str, str]
8889
event_timestamp_column: str
8990
created_timestamp_column: Optional[str]
9091
table_subquery: str
@@ -144,7 +145,10 @@ def get_feature_view_query_context(
144145
name=feature_view.projection.name_to_use(),
145146
ttl=ttl_seconds,
146147
entities=join_keys,
147-
features=features,
148+
features=[
149+
reverse_field_mapping.get(feature, feature) for feature in features
150+
],
151+
field_mapping=feature_view.input.field_mapping,
148152
event_timestamp_column=reverse_field_mapping.get(
149153
event_timestamp_column, event_timestamp_column
150154
),
@@ -175,7 +179,11 @@ def build_point_in_time_query(
175179
final_output_feature_names = list(entity_df_columns)
176180
final_output_feature_names.extend(
177181
[
178-
(f"{fv.name}__{feature}" if full_feature_names else feature)
182+
(
183+
f"{fv.name}__{fv.field_mapping.get(feature, feature)}"
184+
if full_feature_names
185+
else fv.field_mapping.get(feature, feature)
186+
)
179187
for fv in feature_view_query_contexts
180188
for feature in fv.features
181189
]

sdk/python/feast/infra/offline_stores/redshift.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ def _get_entity_df_event_timestamp_range(
563563
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
564564
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
565565
{% for feature in featureview.features %}
566-
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
566+
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
567567
{% endfor %}
568568
FROM {{ featureview.table_subquery }}
569569
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
@@ -664,7 +664,7 @@ def _get_entity_df_event_timestamp_range(
664664
SELECT
665665
{{featureview.name}}__entity_row_unique_id
666666
{% for feature in featureview.features %}
667-
,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}
667+
,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}
668668
{% endfor %}
669669
FROM {{ featureview.name }}__cleaned
670670
) USING ({{featureview.name}}__entity_row_unique_id)

sdk/python/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@
132132
+ AWS_REQUIRED
133133
)
134134

135-
DEV_REQUIRED = ["mypy-protobuf==1.*", "grpcio-testing==1.*"] + CI_REQUIRED
135+
DEV_REQUIRED = ["mypy-protobuf>=1.*", "grpcio-testing==1.*"] + CI_REQUIRED
136136

137137
# Get git repo root directory
138138
repo_root = str(pathlib.Path(__file__).resolve().parent.parent.parent)

sdk/python/tests/integration/feature_repos/repo_configuration.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
create_customer_daily_profile_feature_view,
3636
create_driver_age_request_feature_view,
3737
create_driver_hourly_stats_feature_view,
38+
create_field_mapping_feature_view,
3839
create_global_stats_feature_view,
3940
create_location_stats_feature_view,
4041
create_order_feature_view,
@@ -126,6 +127,7 @@ def construct_universal_datasets(
126127
order_count=20,
127128
)
128129
global_df = driver_test_data.create_global_daily_stats_df(start_time, end_time)
130+
field_mapping_df = driver_test_data.create_field_mapping_df(start_time, end_time)
129131
entity_df = orders_df[
130132
[
131133
"customer_id",
@@ -143,6 +145,7 @@ def construct_universal_datasets(
143145
"location": location_df,
144146
"orders": orders_df,
145147
"global": global_df,
148+
"field_mapping": field_mapping_df,
146149
"entity": entity_df,
147150
}
148151

@@ -180,12 +183,20 @@ def construct_universal_data_sources(
180183
event_timestamp_column="event_timestamp",
181184
created_timestamp_column="created",
182185
)
186+
field_mapping_ds = data_source_creator.create_data_source(
187+
datasets["field_mapping"],
188+
destination_name="field_mapping",
189+
event_timestamp_column="event_timestamp",
190+
created_timestamp_column="created",
191+
field_mapping={"column_name": "feature_name"},
192+
)
183193
return {
184194
"customer": customer_ds,
185195
"driver": driver_ds,
186196
"location": location_ds,
187197
"orders": orders_ds,
188198
"global": global_ds,
199+
"field_mapping": field_mapping_ds,
189200
}
190201

191202

@@ -210,6 +221,9 @@ def construct_universal_feature_views(
210221
"driver_age_request_fv": create_driver_age_request_feature_view(),
211222
"order": create_order_feature_view(data_sources["orders"]),
212223
"location": create_location_stats_feature_view(data_sources["location"]),
224+
"field_mapping": create_field_mapping_feature_view(
225+
data_sources["field_mapping"]
226+
),
213227
}
214228

215229

sdk/python/tests/integration/feature_repos/universal/feature_views.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,13 @@ def create_location_stats_feature_view(source, infer_features: bool = False):
217217
ttl=timedelta(days=2),
218218
)
219219
return location_stats_feature_view
220+
221+
222+
def create_field_mapping_feature_view(source):
223+
return FeatureView(
224+
name="field_mapping",
225+
entities=[],
226+
features=[Feature(name="feature_name", dtype=ValueType.INT32)],
227+
batch_source=source,
228+
ttl=timedelta(days=2),
229+
)

sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ def get_expected_training_df(
8282
location_fv: FeatureView,
8383
global_df: pd.DataFrame,
8484
global_fv: FeatureView,
85+
field_mapping_df: pd.DataFrame,
86+
field_mapping_fv: FeatureView,
8587
entity_df: pd.DataFrame,
8688
event_timestamp: str,
8789
full_feature_names: bool = False,
@@ -102,6 +104,10 @@ def get_expected_training_df(
102104
global_records = convert_timestamp_records_to_utc(
103105
global_df.to_dict("records"), global_fv.batch_source.event_timestamp_column
104106
)
107+
field_mapping_records = convert_timestamp_records_to_utc(
108+
field_mapping_df.to_dict("records"),
109+
field_mapping_fv.batch_source.event_timestamp_column,
110+
)
105111
entity_rows = convert_timestamp_records_to_utc(
106112
entity_df.to_dict("records"), event_timestamp
107113
)
@@ -156,6 +162,13 @@ def get_expected_training_df(
156162
ts_end=order_record[event_timestamp],
157163
)
158164

165+
field_mapping_record = find_asof_record(
166+
field_mapping_records,
167+
ts_key=field_mapping_fv.batch_source.event_timestamp_column,
168+
ts_start=order_record[event_timestamp] - field_mapping_fv.ttl,
169+
ts_end=order_record[event_timestamp],
170+
)
171+
159172
entity_row.update(
160173
{
161174
(
@@ -197,6 +210,16 @@ def get_expected_training_df(
197210
}
198211
)
199212

213+
# get field_mapping_record by column name, but label by feature name
214+
entity_row.update(
215+
{
216+
(
217+
f"field_mapping__{feature}" if full_feature_names else feature
218+
): field_mapping_record.get(column, None)
219+
for (column, feature) in field_mapping_fv.input.field_mapping.items()
220+
}
221+
)
222+
200223
# Convert records back to pandas dataframe
201224
expected_df = pd.DataFrame(entity_rows)
202225

@@ -213,6 +236,7 @@ def get_expected_training_df(
213236
"customer_profile__current_balance": "float32",
214237
"customer_profile__avg_passenger_count": "float32",
215238
"global_stats__avg_ride_length": "float32",
239+
"field_mapping__feature_name": "int32",
216240
}
217241
else:
218242
expected_column_types = {
@@ -221,6 +245,7 @@ def get_expected_training_df(
221245
"current_balance": "float32",
222246
"avg_passenger_count": "float32",
223247
"avg_ride_length": "float32",
248+
"feature_name": "int32",
224249
}
225250

226251
for col, typ in expected_column_types.items():
@@ -311,6 +336,8 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
311336
feature_views["location"],
312337
datasets["global"],
313338
feature_views["global"],
339+
datasets["field_mapping"],
340+
feature_views["field_mapping"],
314341
entity_df_with_request_data,
315342
event_timestamp,
316343
full_feature_names,
@@ -336,6 +363,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
336363
"global_stats:num_rides",
337364
"global_stats:avg_ride_length",
338365
"driver_age:driver_age",
366+
"field_mapping:feature_name",
339367
],
340368
full_feature_names=full_feature_names,
341369
)
@@ -404,6 +432,7 @@ def test_historical_features_with_missing_request_data(
404432
"conv_rate_plus_100:conv_rate_plus_val_to_add",
405433
"global_stats:num_rides",
406434
"global_stats:avg_ride_length",
435+
"field_mapping:feature_name",
407436
],
408437
full_feature_names=full_feature_names,
409438
)
@@ -419,6 +448,7 @@ def test_historical_features_with_missing_request_data(
419448
"driver_age:driver_age",
420449
"global_stats:num_rides",
421450
"global_stats:avg_ride_length",
451+
"field_mapping:feature_name",
422452
],
423453
full_feature_names=full_feature_names,
424454
)
@@ -452,6 +482,7 @@ def test_historical_features_with_entities_from_query(
452482
"order:order_is_success",
453483
"global_stats:num_rides",
454484
"global_stats:avg_ride_length",
485+
"field_mapping:feature_name",
455486
],
456487
full_feature_names=full_feature_names,
457488
)
@@ -477,6 +508,8 @@ def test_historical_features_with_entities_from_query(
477508
feature_views["location"],
478509
datasets["global"],
479510
feature_views["global"],
511+
datasets["field_mapping"],
512+
feature_views["field_mapping"],
480513
datasets["entity"],
481514
event_timestamp,
482515
full_feature_names,
@@ -538,6 +571,7 @@ def test_historical_features_persisting(
538571
"order:order_is_success",
539572
"global_stats:num_rides",
540573
"global_stats:avg_ride_length",
574+
"field_mapping:feature_name",
541575
],
542576
full_feature_names=full_feature_names,
543577
)
@@ -561,6 +595,8 @@ def test_historical_features_persisting(
561595
feature_views["location"],
562596
datasets["global"],
563597
feature_views["global"],
598+
datasets["field_mapping"],
599+
feature_views["field_mapping"],
564600
entity_df,
565601
event_timestamp,
566602
full_feature_names,

0 commit comments

Comments
 (0)