Skip to content

Commit b509092

Browse files
authored
Enable entityless featureviews (feast-dev#1804)
* Iniitial commit for entityless featureviews Signed-off-by: Cody Lin <codyl@twitter.com> * Add entityless entity to entityless featureviews Signed-off-by: Cody Lin <codyl@twitter.com> * reorder applies of entities, views, and services Signed-off-by: Cody Lin <codyl@twitter.com> * Address some review comments Signed-off-by: Cody Lin <codyl@twitter.com> * hide entityless from get_entity Signed-off-by: Cody Lin <codyl@twitter.com> * address more comments, still need to test online Signed-off-by: Cody Lin <codyl@twitter.com> * hide hide_entityless from user functions Signed-off-by: Cody Lin <codyl@twitter.com> * restore pytest.mark.integration Signed-off-by: Cody Lin <codyl@twitter.com> * test entityless fv in local get_online_features and address more comments Signed-off-by: Cody Lin <codyl@twitter.com> * restore changes to is_valid Signed-off-by: Cody Lin <codyl@twitter.com> * fix a couple integration tests Signed-off-by: Cody Lin <codyl@twitter.com> * one more entityless edge case change in query Signed-off-by: Cody Lin <codyl@twitter.com> * fix typo in queries Signed-off-by: Cody Lin <codyl@twitter.com> * attempt to do entityless view integration test Signed-off-by: Cody Lin <codyl@twitter.com> * fix non-default follows default arg error Signed-off-by: Cody Lin <codyl@twitter.com> * fix entityless test table name conflict Signed-off-by: Cody Lin <codyl@twitter.com> * try to fix redshift query errors Signed-off-by: Cody Lin <codyl@twitter.com> * fix expected value for entityless join Signed-off-by: Cody Lin <codyl@twitter.com> * fix pull_latest to select entityless_entity col Signed-off-by: Cody Lin <codyl@twitter.com> * fix expected_entityless_value Signed-off-by: Cody Lin <codyl@twitter.com> * don't hide entityless_id in _get_columns Signed-off-by: Cody Lin <codyl@twitter.com> * hide entityless_id in _get_columns Signed-off-by: Cody Lin <codyl@twitter.com> * remove extra column from bigquery pull_latest Signed-off-by: Cody Lin <codyl@twitter.com> * fix lint issue after merge conflict Signed-off-by: Cody Lin <codyl@twitter.com> * fix unit tests Signed-off-by: Cody Lin <codyl@twitter.com> * initial round for test refactoring Signed-off-by: Cody Lin <codyl@twitter.com> * test entityless views universal_online Signed-off-by: Cody Lin <codyl@twitter.com> * fix some integration test failures Signed-off-by: Cody Lin <codyl@twitter.com> * fix table name conflicts in e2e tests Signed-off-by: Cody Lin <codyl@twitter.com> * convert to use timedelta Signed-off-by: Cody Lin <codyl@twitter.com> * remove unneeded line Signed-off-by: Cody Lin <codyl@twitter.com> * remove changes to e2e; change entityless_id to dummy_id Signed-off-by: Cody Lin <codyl@twitter.com> * hopefully fix test_usage failure Signed-off-by: Cody Lin <codyl@twitter.com>
1 parent ac71cdf commit b509092

16 files changed

Lines changed: 327 additions & 79 deletions

File tree

sdk/python/feast/driver_test_data.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,41 @@ def create_customer_daily_profile_df(customers, start_date, end_date) -> pd.Data
198198
# TODO: Remove created timestamp in order to test whether its really optional
199199
df_all_customers["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
200200
return df_all_customers
201+
202+
203+
def create_global_daily_stats_df(start_date, end_date) -> pd.DataFrame:
204+
"""
205+
Example df generated by this function:
206+
207+
| event_timestamp | num_rides | avg_ride_length | created |
208+
|------------------+-------------+-----------------+------------------|
209+
| 2021-03-17 19:00 | 99 | 0.889188 | 2021-03-24 19:38 |
210+
| 2021-03-18 19:00 | 52 | 0.979273 | 2021-03-24 19:38 |
211+
| 2021-03-19 19:00 | 66 | 0.976549 | 2021-03-24 19:38 |
212+
| 2021-03-20 19:00 | 84 | 0.273697 | 2021-03-24 19:38 |
213+
| 2021-03-21 19:00 | 89 | 0.438262 | 2021-03-24 19:38 |
214+
| | ... | ... | |
215+
| 2021-03-24 19:00 | 54 | 0.738860 | 2021-03-24 19:38 |
216+
| 2021-03-25 19:00 | 58 | 0.848397 | 2021-03-24 19:38 |
217+
| 2021-03-26 19:00 | 69 | 0.301552 | 2021-03-24 19:38 |
218+
| 2021-03-27 19:00 | 63 | 0.943030 | 2021-03-24 19:38 |
219+
| 2021-03-28 19:00 | 79 | 0.354919 | 2021-03-24 19:38 |
220+
"""
221+
df_daily = pd.DataFrame(
222+
{
223+
"event_timestamp": [
224+
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
225+
for dt in pd.date_range(
226+
start=start_date, end=end_date, freq="1D", closed="left"
227+
)
228+
]
229+
}
230+
)
231+
rows = df_daily["event_timestamp"].count()
232+
233+
df_daily["num_rides"] = np.random.randint(50, 100, size=rows).astype(np.int32)
234+
df_daily["avg_ride_length"] = np.random.random(size=rows).astype(np.float32)
235+
236+
# TODO: Remove created timestamp in order to test whether its really optional
237+
df_daily["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
238+
return df_daily

sdk/python/feast/feature_store.py

Lines changed: 73 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,12 @@
3131
)
3232
from feast.feature_service import FeatureService
3333
from feast.feature_table import FeatureTable
34-
from feast.feature_view import FeatureView
34+
from feast.feature_view import (
35+
DUMMY_ENTITY_ID,
36+
DUMMY_ENTITY_NAME,
37+
DUMMY_ENTITY_VAL,
38+
FeatureView,
39+
)
3540
from feast.inference import (
3641
update_data_sources_with_inferred_event_timestamp_col,
3742
update_entities_with_inferred_types_from_feature_views,
@@ -48,6 +53,7 @@
4853
from feast.repo_config import RepoConfig, load_repo_config
4954
from feast.type_map import python_value_to_proto_value
5055
from feast.usage import log_exceptions, log_exceptions_and_usage
56+
from feast.value_type import ValueType
5157
from feast.version import get_version
5258

5359
warnings.simplefilter("once", DeprecationWarning)
@@ -94,6 +100,12 @@ def __init__(
94100
repo_path=self.repo_path,
95101
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
96102
)
103+
DUMMY_ENTITY = Entity(
104+
name=DUMMY_ENTITY_NAME,
105+
join_key=DUMMY_ENTITY_ID,
106+
value_type=ValueType.INT32,
107+
)
108+
self.apply(DUMMY_ENTITY)
97109

98110
@log_exceptions
99111
def version(self) -> str:
@@ -148,7 +160,19 @@ def list_entities(self, allow_cache: bool = False) -> List[Entity]:
148160
Returns:
149161
A list of entities.
150162
"""
151-
return self._registry.list_entities(self.project, allow_cache=allow_cache)
163+
return self._list_entities(allow_cache)
164+
165+
def _list_entities(
166+
self, allow_cache: bool = False, hide_dummy_entity: bool = True
167+
) -> List[Entity]:
168+
all_entities = self._registry.list_entities(
169+
self.project, allow_cache=allow_cache
170+
)
171+
return [
172+
entity
173+
for entity in all_entities
174+
if entity.name != DUMMY_ENTITY_NAME or not hide_dummy_entity
175+
]
152176

153177
@log_exceptions_and_usage
154178
def list_feature_services(self) -> List[FeatureService]:
@@ -161,14 +185,29 @@ def list_feature_services(self) -> List[FeatureService]:
161185
return self._registry.list_feature_services(self.project)
162186

163187
@log_exceptions_and_usage
164-
def list_feature_views(self) -> List[FeatureView]:
188+
def list_feature_views(self, allow_cache: bool = False) -> List[FeatureView]:
165189
"""
166190
Retrieves the list of feature views from the registry.
167191
192+
Args:
193+
allow_cache: Whether to allow returning entities from a cached registry.
194+
168195
Returns:
169196
A list of feature views.
170197
"""
171-
return self._registry.list_feature_views(self.project)
198+
return self._list_feature_views(allow_cache)
199+
200+
def _list_feature_views(
201+
self, allow_cache: bool = False, hide_dummy_entity: bool = True
202+
) -> List[FeatureView]:
203+
feature_views = []
204+
for fv in self._registry.list_feature_views(
205+
self.project, allow_cache=allow_cache
206+
):
207+
if hide_dummy_entity and fv.entities[0] == DUMMY_ENTITY_NAME:
208+
fv.entities = []
209+
feature_views.append(fv)
210+
return feature_views
172211

173212
@log_exceptions_and_usage
174213
def list_on_demand_feature_views(self) -> List[OnDemandFeatureView]:
@@ -226,7 +265,15 @@ def get_feature_view(self, name: str) -> FeatureView:
226265
Raises:
227266
FeatureViewNotFoundException: The feature view could not be found.
228267
"""
229-
return self._registry.get_feature_view(name, self.project)
268+
return self._get_feature_view(name)
269+
270+
def _get_feature_view(
271+
self, name: str, hide_dummy_entity: bool = True
272+
) -> FeatureView:
273+
feature_view = self._registry.get_feature_view(name, self.project)
274+
if hide_dummy_entity and feature_view.entities[0] == DUMMY_ENTITY_NAME:
275+
feature_view.entities = []
276+
return feature_view
230277

231278
@log_exceptions_and_usage
232279
def get_on_demand_feature_view(self, name: str) -> OnDemandFeatureView:
@@ -485,7 +532,7 @@ def get_historical_features(
485532

486533
_feature_refs = self._get_features(features, feature_refs)
487534

488-
all_feature_views = self._registry.list_feature_views(project=self.project)
535+
all_feature_views = self.list_feature_views()
489536
feature_views = list(
490537
view for view, _ in _group_feature_refs(_feature_refs, all_feature_views)
491538
)
@@ -540,12 +587,12 @@ def materialize_incremental(
540587
"""
541588
feature_views_to_materialize = []
542589
if feature_views is None:
543-
feature_views_to_materialize = self._registry.list_feature_views(
544-
self.project
590+
feature_views_to_materialize = self._list_feature_views(
591+
hide_dummy_entity=False
545592
)
546593
else:
547594
for name in feature_views:
548-
feature_view = self._registry.get_feature_view(name, self.project)
595+
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
549596
feature_views_to_materialize.append(feature_view)
550597

551598
_print_materialization_log(
@@ -632,12 +679,12 @@ def materialize(
632679

633680
feature_views_to_materialize = []
634681
if feature_views is None:
635-
feature_views_to_materialize = self._registry.list_feature_views(
636-
self.project
682+
feature_views_to_materialize = self._list_feature_views(
683+
hide_dummy_entity=False
637684
)
638685
else:
639686
for name in feature_views:
640-
feature_view = self._registry.get_feature_view(name, self.project)
687+
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
641688
feature_views_to_materialize.append(feature_view)
642689

643690
_print_materialization_log(
@@ -721,9 +768,20 @@ def get_online_features(
721768
>>> online_response_dict = online_response.to_dict()
722769
"""
723770
_feature_refs = self._get_features(features, feature_refs)
771+
all_feature_views = self._list_feature_views(
772+
allow_cache=True, hide_dummy_entity=False
773+
)
774+
_validate_feature_refs(_feature_refs, full_feature_names)
775+
grouped_refs = _group_feature_refs(_feature_refs, all_feature_views)
776+
feature_views = list(view for view, _ in grouped_refs)
777+
entityless_case = DUMMY_ENTITY_NAME in [
778+
entity_name
779+
for feature_view in feature_views
780+
for entity_name in feature_view.entities
781+
]
724782

725783
provider = self._get_provider()
726-
entities = self.list_entities(allow_cache=True)
784+
entities = self._list_entities(allow_cache=True, hide_dummy_entity=False)
727785
entity_name_to_join_key_map = {}
728786
for entity in entities:
729787
entity_name_to_join_key_map[entity.name] = entity.join_key
@@ -737,6 +795,8 @@ def get_online_features(
737795
except KeyError:
738796
raise EntityNotFoundException(entity_name, self.project)
739797
join_key_row[join_key] = entity_value
798+
if entityless_case:
799+
join_key_row[DUMMY_ENTITY_ID] = DUMMY_ENTITY_VAL
740800
join_key_rows.append(join_key_row)
741801

742802
entity_row_proto_list = _infer_online_entity_rows(join_key_rows)
@@ -748,11 +808,6 @@ def get_online_features(
748808
union_of_entity_keys.append(_entity_row_to_key(entity_row_proto))
749809
result_rows.append(_entity_row_to_field_values(entity_row_proto))
750810

751-
all_feature_views = self._registry.list_feature_views(
752-
project=self.project, allow_cache=True
753-
)
754-
_validate_feature_refs(_feature_refs, full_feature_names)
755-
grouped_refs = _group_feature_refs(_feature_refs, all_feature_views)
756811
for table, requested_features in grouped_refs:
757812
entity_keys = _get_table_entity_keys(
758813
table, union_of_entity_keys, entity_name_to_join_key_map

sdk/python/feast/feature_view.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@
4040

4141
warnings.simplefilter("once", DeprecationWarning)
4242

43+
# DUMMY_ENTITY is a placeholder entity used in entityless FeatureViews
44+
DUMMY_ENTITY_ID = "__dummy_id"
45+
DUMMY_ENTITY_NAME = "__dummy"
46+
DUMMY_ENTITY_VAL = ""
47+
4348

4449
class FeatureView:
4550
"""
@@ -117,7 +122,7 @@ def __init__(
117122
)
118123

119124
self.name = name
120-
self.entities = entities
125+
self.entities = entities if entities else [DUMMY_ENTITY_NAME]
121126
self.features = _features
122127
self.tags = tags if tags is not None else {}
123128

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
FeastProviderLoginError,
1818
InvalidEntityType,
1919
)
20-
from feast.feature_view import FeatureView
20+
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView
2121
from feast.infra.offline_stores import offline_utils
2222
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
2323
from feast.on_demand_feature_view import OnDemandFeatureView
@@ -79,7 +79,9 @@ def pull_latest_from_table_or_query(
7979

8080
client = _get_bigquery_client(project=config.offline_store.project_id)
8181
query = f"""
82-
SELECT {field_string}
82+
SELECT
83+
{field_string}
84+
{f", {repr(DUMMY_ENTITY_VAL)} AS {DUMMY_ENTITY_ID}" if not join_key_columns else ""}
8385
FROM (
8486
SELECT {field_string},
8587
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
@@ -375,12 +377,16 @@ def _get_bigquery_client(project: Optional[str] = None):
375377
SELECT *,
376378
{{entity_df_event_timestamp_col}} AS entity_timestamp
377379
{% for featureview in featureviews %}
380+
{% if featureview.entities %}
378381
,CONCAT(
379382
{% for entity in featureview.entities %}
380383
CAST({{entity}} AS STRING),
381384
{% endfor %}
382385
CAST({{entity_df_event_timestamp_col}} AS STRING)
383386
) AS {{featureview.name}}__entity_row_unique_id
387+
{% else %}
388+
,CAST({{entity_df_event_timestamp_col}} AS STRING) AS {{featureview.name}}__entity_row_unique_id
389+
{% endif %}
384390
{% endfor %}
385391
FROM {{ left_table_query_string }}
386392
),
@@ -389,11 +395,14 @@ def _get_bigquery_client(project: Optional[str] = None):
389395
390396
{{ featureview.name }}__entity_dataframe AS (
391397
SELECT
392-
{{ featureview.entities | join(', ')}},
398+
{{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
393399
entity_timestamp,
394400
{{featureview.name}}__entity_row_unique_id
395401
FROM entity_dataframe
396-
GROUP BY {{ featureview.entities | join(', ')}}, entity_timestamp, {{featureview.name}}__entity_row_unique_id
402+
GROUP BY
403+
{{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
404+
entity_timestamp,
405+
{{featureview.name}}__entity_row_unique_id
397406
),
398407
399408
/*
@@ -417,7 +426,7 @@ def _get_bigquery_client(project: Optional[str] = None):
417426
SELECT
418427
{{ featureview.event_timestamp_column }} as event_timestamp,
419428
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
420-
{{ featureview.entity_selections | join(', ')}},
429+
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
421430
{% for feature in featureview.features %}
422431
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
423432
{% endfor %}

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from feast import FileSource, OnDemandFeatureView
1010
from feast.data_source import DataSource
1111
from feast.errors import FeastJoinKeysDuringMaterialization
12-
from feast.feature_view import FeatureView
12+
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView
1313
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
1414
from feast.infra.offline_stores.offline_utils import (
1515
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
@@ -217,7 +217,7 @@ def evaluate_historical_retrieval():
217217
df_to_join,
218218
left_on=entity_df_event_timestamp_col,
219219
right_on=event_timestamp_column,
220-
by=right_entity_columns,
220+
by=right_entity_columns or None,
221221
tolerance=feature_view.ttl,
222222
)
223223

@@ -296,13 +296,19 @@ def evaluate_offline_job():
296296
(source_df[event_timestamp_column] >= start_date)
297297
& (source_df[event_timestamp_column] < end_date)
298298
]
299-
last_values_df = filtered_df.drop_duplicates(
300-
join_key_columns, keep="last", ignore_index=True
301-
)
302299

303300
columns_to_extract = set(
304301
join_key_columns + feature_name_columns + ts_columns
305302
)
303+
if join_key_columns:
304+
last_values_df = filtered_df.drop_duplicates(
305+
join_key_columns, keep="last", ignore_index=True
306+
)
307+
else:
308+
last_values_df = filtered_df
309+
last_values_df[DUMMY_ENTITY_ID] = DUMMY_ENTITY_VAL
310+
columns_to_extract.add(DUMMY_ENTITY_ID)
311+
306312
return last_values_df[columns_to_extract]
307313

308314
# When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized

0 commit comments

Comments
 (0)