Skip to content

Commit 3c08212

Browse files
committed
Made simple feature names default on data retrieval, provides option for names prefixed with featureviews
Signed-off-by: Mwad22 <51929507+Mwad22@users.noreply.github.com>
1 parent 68cacd9 commit 3c08212

File tree

15 files changed

+316
-34
lines changed

15 files changed

+316
-34
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ training_df = store.get_historical_features(
6767
'driver_hourly_stats:acc_rate',
6868
'driver_hourly_stats:avg_daily_trips'
6969
],
70+
full_feature_names=True
7071
).to_df()
7172

7273
print(training_df.head())

docs/quickstart.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ feature_vector = store.get_online_features(
234234
'driver_hourly_stats:acc_rate',
235235
'driver_hourly_stats:avg_daily_trips'
236236
],
237-
entity_rows=[{"driver_id": 1001}]
237+
entity_rows=[{"driver_id": 1001}],
238+
full_feature_names=True
238239
).to_dict()
239240
240241
pprint(feature_vector)

sdk/python/feast/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ def __init__(self, offline_store_name: str, data_source_name: str):
7171
)
7272

7373

74+
class FeatureNameCollisionError(Exception):
75+
def __init__(self, feature_name_collisions: str):
76+
super().__init__(
77+
f"The following feature name(s) have collisions: {feature_name_collisions}. Set 'feature_names_only' argument in the data retrieval function to False to use the full feature name which is prefixed by the feature view name."
78+
)
79+
80+
7481
class FeastOnlineStoreUnsupportedDataSource(Exception):
7582
def __init__(self, online_store_name: str, data_source_name: str):
7683
super().__init__(

sdk/python/feast/feature_store.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@
2424

2525
from feast import utils
2626
from feast.entity import Entity
27-
from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException
27+
from feast.errors import (
28+
FeastProviderLoginError,
29+
FeatureNameCollisionError,
30+
FeatureViewNotFoundException,
31+
)
2832
from feast.feature_view import FeatureView
2933
from feast.inference import infer_entity_value_type_from_feature_views
3034
from feast.infra.provider import Provider, RetrievalJob, get_provider
@@ -244,7 +248,10 @@ def apply(
244248

245249
@log_exceptions_and_usage
246250
def get_historical_features(
247-
self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str],
251+
self,
252+
entity_df: Union[pd.DataFrame, str],
253+
feature_refs: List[str],
254+
full_feature_names: bool = False,
248255
) -> RetrievalJob:
249256
"""Enrich an entity dataframe with historical feature values for either training or batch scoring.
250257
@@ -266,6 +273,10 @@ def get_historical_features(
266273
SQL query. The query must be of a format supported by the configured offline store (e.g., BigQuery)
267274
feature_refs: A list of features that should be retrieved from the offline store. Feature references are of
268275
the format "feature_view:feature", e.g., "customer_fv:daily_transactions".
276+
full_feature_names: By default, this value is set to False. This strips the feature view prefixes from the data
277+
and returns only the feature name, changing them from the format "feature_view__feature" to "feature"
278+
(e.g., "customer_fv__daily_transactions" changes to "daily_transactions"). Set the value to True for
279+
the feature names to be prefixed by the feature view name in the format "feature_view__feature".
269280
270281
Returns:
271282
RetrievalJob which can be used to materialize the results.
@@ -278,12 +289,12 @@ def get_historical_features(
278289
>>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
279290
>>> retrieval_job = fs.get_historical_features(
280291
>>> entity_df="SELECT event_timestamp, order_id, customer_id from gcp_project.my_ds.customer_orders",
281-
>>> feature_refs=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"]
282-
>>> )
292+
>>> feature_refs=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"],
293+
>>> full_feature_names=False
294+
>>> )
283295
>>> feature_data = retrieval_job.to_df()
284296
>>> model.fit(feature_data) # insert your modeling framework here.
285297
"""
286-
287298
all_feature_views = self._registry.list_feature_views(project=self.project)
288299
try:
289300
feature_views = _get_requested_feature_views(
@@ -301,6 +312,7 @@ def get_historical_features(
301312
entity_df,
302313
self._registry,
303314
self.project,
315+
full_feature_names,
304316
)
305317
except FeastProviderLoginError as e:
306318
sys.exit(e)
@@ -467,7 +479,10 @@ def tqdm_builder(length):
467479

468480
@log_exceptions_and_usage
469481
def get_online_features(
470-
self, feature_refs: List[str], entity_rows: List[Dict[str, Any]],
482+
self,
483+
feature_refs: List[str],
484+
entity_rows: List[Dict[str, Any]],
485+
full_feature_names: bool = False,
471486
) -> OnlineResponse:
472487
"""
473488
Retrieves the latest online feature data.
@@ -535,7 +550,7 @@ def get_online_features(
535550
project=self.project, allow_cache=True
536551
)
537552

538-
grouped_refs = _group_refs(feature_refs, all_feature_views)
553+
grouped_refs = _group_refs(feature_refs, all_feature_views, full_feature_names)
539554
for table, requested_features in grouped_refs:
540555
entity_keys = _get_table_entity_keys(
541556
table, union_of_entity_keys, entity_name_to_join_key_map
@@ -552,13 +567,21 @@ def get_online_features(
552567

553568
if feature_data is None:
554569
for feature_name in requested_features:
555-
feature_ref = f"{table.name}__{feature_name}"
570+
feature_ref = (
571+
f"{table.name}__{feature_name}"
572+
if full_feature_names
573+
else feature_name
574+
)
556575
result_row.statuses[
557576
feature_ref
558577
] = GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND
559578
else:
560579
for feature_name in feature_data:
561-
feature_ref = f"{table.name}__{feature_name}"
580+
feature_ref = (
581+
f"{table.name}__{feature_name}"
582+
if full_feature_names
583+
else feature_name
584+
)
562585
if feature_name in requested_features:
563586
result_row.fields[feature_ref].CopyFrom(
564587
feature_data[feature_name]
@@ -587,7 +610,9 @@ def _entity_row_to_field_values(
587610

588611

589612
def _group_refs(
590-
feature_refs: List[str], all_feature_views: List[FeatureView]
613+
feature_refs: List[str],
614+
all_feature_views: List[FeatureView],
615+
full_feature_names: bool = False,
591616
) -> List[Tuple[FeatureView, List[str]]]:
592617
""" Get list of feature views and corresponding feature names based on feature references"""
593618

@@ -597,12 +622,23 @@ def _group_refs(
597622
# view name to feature names
598623
views_features = defaultdict(list)
599624

625+
feature_set = set()
626+
feature_collision_set = set()
627+
600628
for ref in feature_refs:
601629
view_name, feat_name = ref.split(":")
630+
if feat_name in feature_set:
631+
feature_collision_set.add(feat_name)
632+
else:
633+
feature_set.add(feat_name)
602634
if view_name not in view_index:
603635
raise FeatureViewNotFoundException(view_name)
604636
views_features[view_name].append(feat_name)
605637

638+
if not full_feature_names and len(feature_collision_set) > 0:
639+
err = ", ".join(x for x in feature_collision_set)
640+
raise FeatureNameCollisionError(err)
641+
606642
result = []
607643
for view_name, feature_names in views_features.items():
608644
result.append((view_index[view_name], feature_names))

sdk/python/feast/infra/gcp.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ def get_historical_features(
128128
entity_df: Union[pandas.DataFrame, str],
129129
registry: Registry,
130130
project: str,
131+
full_feature_names: bool = False,
131132
) -> RetrievalJob:
132133
job = self.offline_store.get_historical_features(
133134
config=config,
@@ -136,5 +137,6 @@ def get_historical_features(
136137
entity_df=entity_df,
137138
registry=registry,
138139
project=project,
140+
full_feature_names=full_feature_names,
139141
)
140142
return job

sdk/python/feast/infra/local.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ def get_historical_features(
127127
entity_df: Union[pd.DataFrame, str],
128128
registry: Registry,
129129
project: str,
130+
full_feature_names: bool = False,
130131
) -> RetrievalJob:
131132
return self.offline_store.get_historical_features(
132133
config=config,
@@ -135,6 +136,7 @@ def get_historical_features(
135136
entity_df=entity_df,
136137
registry=registry,
137138
project=project,
139+
full_feature_names=full_feature_names,
138140
)
139141

140142

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def get_historical_features(
8484
entity_df: Union[pandas.DataFrame, str],
8585
registry: Registry,
8686
project: str,
87+
full_feature_names: bool = False,
8788
) -> RetrievalJob:
8889
# TODO: Add entity_df validation in order to fail before interacting with BigQuery
8990

@@ -105,7 +106,7 @@ def get_historical_features(
105106

106107
# Build a query context containing all information required to template the BigQuery SQL query
107108
query_context = get_feature_view_query_context(
108-
feature_refs, feature_views, registry, project
109+
feature_refs, feature_views, registry, project, full_feature_names
109110
)
110111

111112
# TODO: Infer min_timestamp and max_timestamp from entity_df
@@ -116,6 +117,7 @@ def get_historical_features(
116117
max_timestamp=datetime.now() + timedelta(days=1),
117118
left_table_query_string=str(table.reference),
118119
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
120+
full_feature_names=full_feature_names,
119121
)
120122

121123
job = BigQueryRetrievalJob(query=query, client=client)
@@ -292,11 +294,12 @@ def get_feature_view_query_context(
292294
feature_views: List[FeatureView],
293295
registry: Registry,
294296
project: str,
297+
full_feature_names: bool = False,
295298
) -> List[FeatureViewQueryContext]:
296299
"""Build a query context containing all information required to template a BigQuery point-in-time SQL query"""
297300

298301
feature_views_to_feature_map = _get_requested_feature_views_to_features_dict(
299-
feature_refs, feature_views
302+
feature_refs, feature_views, full_feature_names
300303
)
301304

302305
query_context = []
@@ -351,6 +354,7 @@ def build_point_in_time_query(
351354
max_timestamp: datetime,
352355
left_table_query_string: str,
353356
entity_df_event_timestamp_col: str,
357+
full_feature_names: bool = False,
354358
):
355359
"""Build point-in-time query between each feature view table and the entity dataframe"""
356360
template = Environment(loader=BaseLoader()).from_string(
@@ -367,6 +371,7 @@ def build_point_in_time_query(
367371
[entity for fv in feature_view_query_contexts for entity in fv.entities]
368372
),
369373
"featureviews": [asdict(context) for context in feature_view_query_contexts],
374+
"full_feature_names": full_feature_names,
370375
}
371376

372377
query = template.render(template_context)
@@ -440,7 +445,7 @@ def _get_bigquery_client():
440445
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
441446
{{ featureview.entity_selections | join(', ')}},
442447
{% for feature in featureview.features %}
443-
{{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
448+
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
444449
{% endfor %}
445450
FROM {{ featureview.table_subquery }}
446451
),
@@ -533,7 +538,7 @@ def _get_bigquery_client():
533538
SELECT
534539
entity_row_unique_id,
535540
{% for feature in featureview.features %}
536-
{{ featureview.name }}__{{ feature }},
541+
{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %},
537542
{% endfor %}
538543
FROM {{ featureview.name }}__cleaned
539544
) USING (entity_row_unique_id)

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def get_historical_features(
4040
entity_df: Union[pd.DataFrame, str],
4141
registry: Registry,
4242
project: str,
43+
full_feature_names: bool = False,
4344
) -> FileRetrievalJob:
4445
if not isinstance(entity_df, pd.DataFrame):
4546
raise ValueError(
@@ -59,9 +60,8 @@ def get_historical_features(
5960
raise ValueError(
6061
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
6162
)
62-
6363
feature_views_to_features = _get_requested_feature_views_to_features_dict(
64-
feature_refs, feature_views
64+
feature_refs, feature_views, full_feature_names
6565
)
6666

6767
# Create lazy function that is only called from the RetrievalJob object
@@ -125,14 +125,16 @@ def evaluate_historical_retrieval():
125125
# Modify the separator for feature refs in column names to double underscore. We are using
126126
# double underscore as separator for consistency with other databases like BigQuery,
127127
# where there are very few characters available for use as separators
128-
prefixed_feature_name = f"{feature_view.name}__{feature}"
129-
128+
if full_feature_names:
129+
formatted_feature_name = f"{feature_view.name}__{feature}"
130+
else:
131+
formatted_feature_name = feature
130132
# Add the feature name to the list of columns
131-
feature_names.append(prefixed_feature_name)
133+
feature_names.append(formatted_feature_name)
132134

133135
# Ensure that the source dataframe feature column includes the feature view name as a prefix
134136
df_to_join.rename(
135-
columns={feature: prefixed_feature_name}, inplace=True,
137+
columns={feature: formatted_feature_name}, inplace=True,
136138
)
137139

138140
# Build a list of entity columns to join on (from the right table)

sdk/python/feast/infra/offline_stores/offline_store.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,5 +66,6 @@ def get_historical_features(
6666
entity_df: Union[pd.DataFrame, str],
6767
registry: Registry,
6868
project: str,
69+
full_feature_names: bool = False,
6970
) -> RetrievalJob:
7071
pass

sdk/python/feast/infra/provider.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import importlib
33
from datetime import datetime
44
from pathlib import Path
5-
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
5+
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union
66

77
import pandas
88
import pyarrow
@@ -116,6 +116,7 @@ def get_historical_features(
116116
entity_df: Union[pandas.DataFrame, str],
117117
registry: Registry,
118118
project: str,
119+
full_feature_names: bool = False,
119120
) -> RetrievalJob:
120121
pass
121122

@@ -179,15 +180,24 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
179180

180181

181182
def _get_requested_feature_views_to_features_dict(
182-
feature_refs: List[str], feature_views: List[FeatureView]
183+
feature_refs: List[str], feature_views: List[FeatureView], full_feature_names: bool
183184
) -> Dict[FeatureView, List[str]]:
184-
"""Create a dict of FeatureView -> List[Feature] for all requested features"""
185+
"""Create a dict of FeatureView -> List[Feature] for all requested features.
186+
Features are prefixed by the feature view name, set value to True to obtain only the feature names."""
185187

186188
feature_views_to_feature_map = {} # type: Dict[FeatureView, List[str]]
189+
feature_set = set() # type: Set[str]
190+
feature_collision_set = set() # type: Set[str]
191+
187192
for ref in feature_refs:
188193
ref_parts = ref.split(":")
189194
feature_view_from_ref = ref_parts[0]
190195
feature_from_ref = ref_parts[1]
196+
if feature_from_ref in feature_set:
197+
feature_collision_set.add(feature_from_ref)
198+
else:
199+
feature_set.add(feature_from_ref)
200+
191201
found = False
192202
for feature_view_from_registry in feature_views:
193203
if feature_view_from_registry.name == feature_view_from_ref:
@@ -203,6 +213,11 @@ def _get_requested_feature_views_to_features_dict(
203213

204214
if not found:
205215
raise ValueError(f"Could not find feature view from reference {ref}")
216+
217+
if not full_feature_names and len(feature_collision_set) > 0:
218+
err = ", ".join(x for x in feature_collision_set)
219+
raise errors.FeatureNameCollisionError(err)
220+
206221
return feature_views_to_feature_map
207222

208223

0 commit comments

Comments
 (0)