Skip to content

Commit 6f09686

Browse files
committed
Merge changes from master
2 parents 8dc58aa + 2e0113e commit 6f09686

21 files changed

Lines changed: 348 additions & 136 deletions

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ print(training_df.head())
7575
# model = ml.fit(training_df)
7676
```
7777
```commandline
78-
event_timestamp driver_id driver_hourly_stats__conv_rate driver_hourly_stats__acc_rate
79-
2021-04-12 08:12:10 1002 0.497279 0.357702
80-
2021-04-12 10:59:42 1001 0.979747 0.008166
81-
2021-04-12 15:01:12 1004 0.151432 0.551748
82-
2021-04-12 16:40:26 1003 0.951506 0.753572
78+
event_timestamp driver_id conv_rate acc_rate avg_daily_trips
79+
0 2021-04-12 08:12:10+00:00 1002 0.713465 0.597095 531
80+
1 2021-04-12 10:59:42+00:00 1001 0.072752 0.044344 11
81+
2 2021-04-12 15:01:12+00:00 1004 0.658182 0.079150 220
82+
3 2021-04-12 16:40:26+00:00 1003 0.162092 0.309035 959
8383
8484
```
8585

docs/quickstart.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,9 @@ pprint(feature_vector)
119119
```python
120120
{
121121
'driver_id': [1001],
122-
'driver_hourly_stats__conv_rate': [0.49274],
123-
'driver_hourly_stats__acc_rate': [0.92743],
124-
'driver_hourly_stats__avg_daily_trips': [72],
122+
'conv_rate': [0.49274],
123+
'acc_rate': [0.92743],
124+
'avg_daily_trips': [72],
125125
}
126126
```
127127

sdk/python/feast/errors.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Set
1+
from typing import List, Set
22

33
from colorama import Fore, Style
44

@@ -98,6 +98,27 @@ def __init__(self, offline_store_name: str, data_source_name: str):
9898
)
9999

100100

101+
class FeatureNameCollisionError(Exception):
102+
def __init__(self, feature_refs_collisions: List[str], full_feature_names: bool):
103+
if full_feature_names:
104+
collisions = [ref.replace(":", "__") for ref in feature_refs_collisions]
105+
error_message = (
106+
"To resolve this collision, please ensure that the features in question "
107+
"have different names."
108+
)
109+
else:
110+
collisions = [ref.split(":")[1] for ref in feature_refs_collisions]
111+
error_message = (
112+
"To resolve this collision, either use the full feature name by setting "
113+
"'full_feature_names=True', or ensure that the features in question have different names."
114+
)
115+
116+
feature_names = ", ".join(set(collisions))
117+
super().__init__(
118+
f"Duplicate features named {feature_names} found.\n{error_message}"
119+
)
120+
121+
101122
class FeastOnlineStoreInvalidName(Exception):
102123
def __init__(self, online_store_class_name: str):
103124
super().__init__(

sdk/python/feast/feature_store.py

Lines changed: 62 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import os
15-
import sys
16-
from collections import OrderedDict, defaultdict
15+
from collections import Counter, OrderedDict, defaultdict
1716
from datetime import datetime, timedelta
1817
from pathlib import Path
1918
from typing import Any, Dict, List, Optional, Tuple, Union
@@ -24,8 +23,8 @@
2423

2524
from feast import utils
2625
from feast.entity import Entity
27-
from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException
2826
from feast.feature_service import FeatureService
27+
from feast.errors import FeatureNameCollisionError, FeatureViewNotFoundException
2928
from feast.feature_view import FeatureView
3029
from feast.inference import (
3130
update_data_sources_with_inferred_event_timestamp_col,
@@ -270,9 +269,11 @@ def apply(
270269
update_entities_with_inferred_types_from_feature_views(
271270
entities_to_update, views_to_update, self.config
272271
)
272+
273273
update_data_sources_with_inferred_event_timestamp_col(
274274
[view.input for view in views_to_update], self.config
275275
)
276+
276277
for view in views_to_update:
277278
view.infer_features_from_input_source(self.config)
278279

@@ -303,6 +304,7 @@ def get_historical_features(
303304
entity_df: Union[pd.DataFrame, str],
304305
features: Union[List[str], FeatureService],
305306
feature_refs: Optional[List[str]] = None,
307+
full_feature_names: bool = False,
306308
) -> RetrievalJob:
307309
"""Enrich an entity dataframe with historical feature values for either training or batch scoring.
308310
@@ -324,6 +326,9 @@ def get_historical_features(
324326
SQL query. The query must be of a format supported by the configured offline store (e.g., BigQuery)
325327
features: A list of features that should be retrieved from the offline store. Feature references are of
326328
the format "feature_view:feature", e.g., "customer_fv:daily_transactions".
329+
full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names,
330+
changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to
331+
"customer_fv__daily_transactions"). By default, this value is set to False.
327332
328333
Returns:
329334
RetrievalJob which can be used to materialize the results.
@@ -341,12 +346,12 @@ def get_historical_features(
341346
>>> feature_data = retrieval_job.to_df()
342347
>>> model.fit(feature_data) # insert your modeling framework here.
343348
"""
344-
345349
all_feature_views = self._registry.list_feature_views(project=self.project)
346-
try:
347-
feature_views = _get_requested_feature_views(features, all_feature_views)
348-
except FeatureViewNotFoundException as e:
349-
sys.exit(e)
350+
351+
_validate_feature_refs(feature_refs, full_feature_names)
352+
feature_views = list(
353+
view for view, _ in _group_feature_refs(feature_refs, all_feature_views)
354+
)
350355

351356
_features = features or feature_refs
352357
if not _features:
@@ -362,17 +367,16 @@ def get_historical_features(
362367
_feature_refs = _features
363368

364369
provider = self._get_provider()
365-
try:
366-
job = provider.get_historical_features(
367-
self.config,
368-
feature_views,
369-
_feature_refs,
370-
entity_df,
371-
self._registry,
372-
self.project,
373-
)
374-
except FeastProviderLoginError as e:
375-
sys.exit(e)
370+
371+
job = provider.get_historical_features(
372+
self.config,
373+
feature_views,
374+
feature_refs,
375+
entity_df,
376+
self._registry,
377+
self.project,
378+
full_feature_names,
379+
)
376380

377381
return job
378382

@@ -542,6 +546,7 @@ def get_online_features(
542546
features: Union[List[str], FeatureService],
543547
entity_rows: List[Dict[str, Any]],
544548
feature_refs: Optional[List[str]] = None,
549+
full_feature_names: bool = False,
545550
) -> OnlineResponse:
546551
"""
547552
Retrieves the latest online feature data.
@@ -617,7 +622,8 @@ def get_online_features(
617622
project=self.project, allow_cache=True
618623
)
619624

620-
grouped_refs = _group_refs(_features, all_feature_views)
625+
_validate_feature_refs(feature_refs, full_feature_names)
626+
grouped_refs = _group_feature_refs(feature_refs, all_feature_views)
621627
for table, requested_features in grouped_refs:
622628
entity_keys = _get_table_entity_keys(
623629
table, union_of_entity_keys, entity_name_to_join_key_map
@@ -634,13 +640,21 @@ def get_online_features(
634640

635641
if feature_data is None:
636642
for feature_name in requested_features:
637-
feature_ref = f"{table.name}__{feature_name}"
643+
feature_ref = (
644+
f"{table.name}__{feature_name}"
645+
if full_feature_names
646+
else feature_name
647+
)
638648
result_row.statuses[
639649
feature_ref
640650
] = GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND
641651
else:
642652
for feature_name in feature_data:
643-
feature_ref = f"{table.name}__{feature_name}"
653+
feature_ref = (
654+
f"{table.name}__{feature_name}"
655+
if full_feature_names
656+
else feature_name
657+
)
644658
if feature_name in requested_features:
645659
result_row.fields[feature_ref].CopyFrom(
646660
feature_data[feature_name]
@@ -668,8 +682,32 @@ def _entity_row_to_field_values(
668682
return result
669683

670684

671-
def _group_refs(
672-
features: Union[List[str], FeatureService], all_feature_views: List[FeatureView],
685+
def _validate_feature_refs(feature_refs: Union[List[str], FeatureService], full_feature_names: bool = False):
686+
collided_feature_refs = []
687+
688+
if full_feature_names:
689+
collided_feature_refs = [
690+
ref for ref, occurrences in Counter(feature_refs).items() if occurrences > 1
691+
]
692+
else:
693+
feature_names = [ref.split(":")[1] for ref in feature_refs]
694+
collided_feature_names = [
695+
ref
696+
for ref, occurrences in Counter(feature_names).items()
697+
if occurrences > 1
698+
]
699+
700+
for feature_name in collided_feature_names:
701+
collided_feature_refs.extend(
702+
[ref for ref in feature_refs if ref.endswith(":" + feature_name)]
703+
)
704+
705+
if len(collided_feature_refs) > 0:
706+
raise FeatureNameCollisionError(collided_feature_refs, full_feature_names)
707+
708+
709+
def _group_feature_refs(
710+
features: Union[List[str], FeatureService], all_feature_views: List[FeatureView]
673711
) -> List[Tuple[FeatureView, List[str]]]:
674712
""" Get list of feature views and corresponding feature names based on feature references"""
675713

@@ -703,14 +741,6 @@ def _get_features_refs_from_feature_services(
703741
return [f"{feature_service.name}:{f.name}" for f in feature_service.features]
704742

705743

706-
def _get_requested_feature_views(
707-
features: Union[List[str], FeatureService], all_feature_views: List[FeatureView],
708-
) -> List[FeatureView]:
709-
"""Get list of feature views based on feature references"""
710-
# TODO: Get rid of this function. We only need _group_refs
711-
return list(view for view, _ in _group_refs(features, all_feature_views))
712-
713-
714744
def _get_table_entity_keys(
715745
table: FeatureView, entity_keys: List[EntityKeyProto], join_key_map: Dict[str, str],
716746
) -> List[EntityKeyProto]:

sdk/python/feast/feature_view.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,6 @@ def to_proto(self) -> FeatureViewProto:
182182
ttl_duration = Duration()
183183
ttl_duration.FromTimedelta(self.ttl)
184184

185-
print(f"Stream soruce: {self.stream_source}, {type(self.stream_source)}")
186-
187185
spec = FeatureViewSpecProto(
188186
name=self.name,
189187
entities=self.entities,

sdk/python/feast/infra/aws.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ def get_historical_features(
129129
entity_df: Union[pandas.DataFrame, str],
130130
registry: Registry,
131131
project: str,
132+
full_feature_names: bool,
132133
) -> RetrievalJob:
133134
job = self.offline_store.get_historical_features(
134135
config=config,
@@ -137,5 +138,6 @@ def get_historical_features(
137138
entity_df=entity_df,
138139
registry=registry,
139140
project=project,
141+
full_feature_names=full_feature_names,
140142
)
141143
return job

sdk/python/feast/infra/gcp.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ def get_historical_features(
131131
entity_df: Union[pandas.DataFrame, str],
132132
registry: Registry,
133133
project: str,
134+
full_feature_names: bool,
134135
) -> RetrievalJob:
135136
job = self.offline_store.get_historical_features(
136137
config=config,
@@ -139,5 +140,6 @@ def get_historical_features(
139140
entity_df=entity_df,
140141
registry=registry,
141142
project=project,
143+
full_feature_names=full_feature_names,
142144
)
143145
return job

sdk/python/feast/infra/local.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def get_historical_features(
130130
entity_df: Union[pd.DataFrame, str],
131131
registry: Registry,
132132
project: str,
133+
full_feature_names: bool,
133134
) -> RetrievalJob:
134135
return self.offline_store.get_historical_features(
135136
config=config,
@@ -138,6 +139,7 @@ def get_historical_features(
138139
entity_df=entity_df,
139140
registry=registry,
140141
project=project,
142+
full_feature_names=full_feature_names,
141143
)
142144

143145

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ def get_historical_features(
9595
entity_df: Union[pandas.DataFrame, str],
9696
registry: Registry,
9797
project: str,
98+
full_feature_names: bool = False,
9899
) -> RetrievalJob:
99100
# TODO: Add entity_df validation in order to fail before interacting with BigQuery
100101
assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)
@@ -121,7 +122,11 @@ def get_historical_features(
121122

122123
# Build a query context containing all information required to template the BigQuery SQL query
123124
query_context = get_feature_view_query_context(
124-
feature_refs, feature_views, registry, project
125+
feature_refs,
126+
feature_views,
127+
registry,
128+
project,
129+
full_feature_names=full_feature_names,
125130
)
126131

127132
# TODO: Infer min_timestamp and max_timestamp from entity_df
@@ -132,6 +137,7 @@ def get_historical_features(
132137
max_timestamp=datetime.now() + timedelta(days=1),
133138
left_table_query_string=str(table.reference),
134139
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
140+
full_feature_names=full_feature_names,
135141
)
136142

137143
job = BigQueryRetrievalJob(query=query, client=client, config=config)
@@ -373,6 +379,7 @@ def get_feature_view_query_context(
373379
feature_views: List[FeatureView],
374380
registry: Registry,
375381
project: str,
382+
full_feature_names: bool = False,
376383
) -> List[FeatureViewQueryContext]:
377384
"""Build a query context containing all information required to template a BigQuery point-in-time SQL query"""
378385

@@ -432,6 +439,7 @@ def build_point_in_time_query(
432439
max_timestamp: datetime,
433440
left_table_query_string: str,
434441
entity_df_event_timestamp_col: str,
442+
full_feature_names: bool = False,
435443
):
436444
"""Build point-in-time query between each feature view table and the entity dataframe"""
437445
template = Environment(loader=BaseLoader()).from_string(
@@ -448,6 +456,7 @@ def build_point_in_time_query(
448456
[entity for fv in feature_view_query_contexts for entity in fv.entities]
449457
),
450458
"featureviews": [asdict(context) for context in feature_view_query_contexts],
459+
"full_feature_names": full_feature_names,
451460
}
452461

453462
query = template.render(template_context)
@@ -521,7 +530,7 @@ def _get_bigquery_client(project: Optional[str] = None):
521530
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
522531
{{ featureview.entity_selections | join(', ')}},
523532
{% for feature in featureview.features %}
524-
{{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
533+
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
525534
{% endfor %}
526535
FROM {{ featureview.table_subquery }}
527536
),
@@ -614,7 +623,7 @@ def _get_bigquery_client(project: Optional[str] = None):
614623
SELECT
615624
entity_row_unique_id,
616625
{% for feature in featureview.features %}
617-
{{ featureview.name }}__{{ feature }},
626+
{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %},
618627
{% endfor %}
619628
FROM {{ featureview.name }}__cleaned
620629
) USING (entity_row_unique_id)

0 commit comments

Comments
 (0)