Skip to content

Commit 2d52ce7

Browse files
authored
Infer features for on demand feature views, support multiple output features (#1845)
* Infer features for on demand feature views Signed-off-by: Achal Shah <achals@gmail.com> * Update all feature_refs Signed-off-by: Achal Shah <achals@gmail.com> * make format Signed-off-by: Achal Shah <achals@gmail.com> * Refactor Signed-off-by: Achal Shah <achals@gmail.com> * remove imports Signed-off-by: Achal Shah <achals@gmail.com> * remove a wierd union in the return type for _group_refs Signed-off-by: Achal Shah <achals@gmail.com> * use self._list Signed-off-by: Achal Shah <achals@gmail.com> * build template Signed-off-by: Achal Shah <achals@gmail.com> * fix bug and remove prints Signed-off-by: Achal Shah <achals@gmail.com> * Remove more prints Signed-off-by: Achal Shah <achals@gmail.com> * CR updates Signed-off-by: Achal Shah <achals@gmail.com>
1 parent 9e94f0e commit 2d52ce7

File tree

14 files changed

+340
-156
lines changed

14 files changed

+340
-156
lines changed

README.md

Lines changed: 71 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -127,89 +127,78 @@ pprint(feature_vector)
127127

128128
## 📦 Functionality and Roadmap
129129

130-
The list below contains Feast functionality that contributors are planning to develop
131-
* Items below may indicate if it is planned for development or whether development is in progress.
132-
* We welcome contribution to all items in the roadmap, especially those that are not currently planned or in development.
130+
The list below contains the functionality that contributors are planning to develop for Feast
131+
132+
* Items below that are in development \(or planned for development\) will be indicated in parentheses.
133+
* We welcome contribution to all items in the roadmap!
133134
* Want to influence our roadmap and prioritization? Submit your feedback to [this form](https://docs.google.com/forms/d/e/1FAIpQLSfa1nRQ0sKz-JEFnMMCi4Jseag_yDssO_3nV9qMfxfrkil-wA/viewform).
134-
* Want to speak to a Feast contributor? We are more than happy to jump on a quick call. Please schedule a time using [Calendly](https://calendly.com/d/x2ry-g5bb/meet-with-feast-team).
135-
136-
- **Data Sources**
137-
- [x] [Redshift source](https://docs.feast.dev/reference/data-sources/redshift)
138-
- [x] [BigQuery source](https://docs.feast.dev/reference/data-sources/bigquery)
139-
- [x] [Parquet file source](https://docs.feast.dev/reference/data-sources/file)
140-
- [ ] Kafka source (Planned for Q4 2021)
141-
- [ ] HTTP source
142-
- [ ] Snowflake source
143-
- [ ] Synapse source
144-
145-
146-
- **Offline Stores**
147-
- [x] [Redshift](https://docs.feast.dev/reference/offline-stores/redshift)
148-
- [x] [BigQuery](https://docs.feast.dev/reference/offline-stores/bigquery)
149-
- [x] [In-memory / Pandas](https://docs.feast.dev/reference/offline-stores/file)
150-
- [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/adding-a-new-offline-store)
151-
- [x] [Hive (community maintained)](https://github.com/baineng/feast-hive)
152-
- [ ] Snowflake
153-
- [ ] Synapse
154-
155-
156-
- **Online Stores**
157-
- [x] [DynamoDB](https://docs.feast.dev/reference/online-stores/dynamodb)
158-
- [x] [Redis](https://docs.feast.dev/reference/online-stores/redis)
159-
- [x] [Datastore](https://docs.feast.dev/reference/online-stores/datastore)
160-
- [x] [SQLite](https://docs.feast.dev/reference/online-stores/sqlite)
161-
- [x] [Custom online store support](https://docs.feast.dev/how-to-guides/adding-support-for-a-new-online-store)
162-
- [ ] Postgres
163-
- [ ] Bigtable
164-
- [ ] Cassandra
165-
166-
167-
- **Streaming**
168-
- [ ] [Custom streaming ingestion job support](https://docs.feast.dev/how-to-guides/creating-a-custom-provider)
169-
- [ ] Streaming ingestion on AWS (Planned for Q4 2021)
170-
- [ ] Streaming ingestion on GCP
171-
172-
173-
- **Feature Engineering**
174-
- [ ] On-demand Transformations (Development in progress. See [RFC](https://docs.google.com/document/d/1lgfIw0Drc65LpaxbUu49RCeJgMew547meSJttnUqz7c/edit#))
175-
- [ ] Batch transformation (SQL)
176-
- [ ] Streaming transformation
177-
178-
179-
- **Deployments**
180-
- [ ] AWS Lambda (Development in progress. See [RFC](https://docs.google.com/document/d/1eZWKWzfBif66LDN32IajpaG-j82LSHCCOzY6R7Ax7MI/edit))
181-
- [ ] Cloud Run
182-
- [ ] Kubernetes
183-
- [ ] KNative
184-
185-
186-
- **Feature Serving**
187-
- [x] Python Client
188-
- [ ] REST Feature Server (Python) (Development in progress. See [RFC](https://docs.google.com/document/d/1iXvFhAsJ5jgAhPOpTdB3j-Wj1S9x3Ev_Wr6ZpnLzER4/edit))
189-
- [ ] gRPC Feature Server (Java) (See [#1497](https://github.com/feast-dev/feast/issues/1497))
190-
- [ ] Java Client
191-
- [ ] Go Client
192-
- [ ] Push API
193-
- [ ] Delete API
194-
- [ ] Feature Logging (for training)
195-
196-
197-
- **Data Quality Management**
198-
- [ ] Data profiling and validation (Great Expectations) (Planned for Q4 2021)
199-
- [ ] Metric production
200-
- [ ] Training-serving skew detection
201-
- [ ] Drift detection
202-
- [ ] Alerting
203-
204-
205-
- **Feature Discovery and Governance**
206-
- [x] Python SDK for browsing feature registry
207-
- [x] CLI for browsing feature registry
208-
- [x] Model-centric feature tracking (feature services)
209-
- [ ] REST API for browsing feature registry
210-
- [ ] Feast Web UI (Planned for Q4 2021)
211-
- [ ] Feature versioning
212-
- [ ] Amundsen integration
135+
* Want to speak to a Feast contributor? We are more than happy to jump on a call. Please schedule a time using [Calendly](https://calendly.com/d/x2ry-g5bb/meet-with-feast-team).
136+
137+
138+
139+
* **Data Sources**
140+
* [x] [Redshift source](https://docs.feast.dev/reference/data-sources/redshift)
141+
* [x] [BigQuery source](https://docs.feast.dev/reference/data-sources/bigquery)
142+
* [x] [Parquet file source](https://docs.feast.dev/reference/data-sources/file)
143+
* [ ] Kafka source \(Planned for Q4 2021\)
144+
* [ ] HTTP source
145+
* [ ] Snowflake source
146+
* [ ] Synapse source
147+
* **Offline Stores**
148+
* [x] [Redshift](https://docs.feast.dev/reference/offline-stores/redshift)
149+
* [x] [BigQuery](https://docs.feast.dev/reference/offline-stores/bigquery)
150+
* [x] [In-memory / Pandas](https://docs.feast.dev/reference/offline-stores/file)
151+
* [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/adding-a-new-offline-store)
152+
* [x] [Hive \(community maintained\)](https://github.com/baineng/feast-hive)
153+
* [ ] Snowflake
154+
* [ ] Synapse
155+
* **Online Stores**
156+
* [x] [DynamoDB](https://docs.feast.dev/reference/online-stores/dynamodb)
157+
* [x] [Redis](https://docs.feast.dev/reference/online-stores/redis)
158+
* [x] [Datastore](https://docs.feast.dev/reference/online-stores/datastore)
159+
* [x] [SQLite](https://docs.feast.dev/reference/online-stores/sqlite)
160+
* [x] [Custom online store support](https://docs.feast.dev/how-to-guides/adding-support-for-a-new-online-store)
161+
* [ ] Postgres
162+
* [ ] Bigtable
163+
* [ ] Cassandra
164+
* **Streaming**
165+
* [ ] [Custom streaming ingestion job support](https://docs.feast.dev/how-to-guides/creating-a-custom-provider)
166+
* [ ] Streaming ingestion on AWS \(Planned for Q4 2021\)
167+
* [ ] Streaming ingestion on GCP
168+
* **Feature Engineering**
169+
* [ ] On-demand Transformations \(Development in progress. See [RFC](https://docs.google.com/document/d/1lgfIw0Drc65LpaxbUu49RCeJgMew547meSJttnUqz7c/edit#)\)
170+
* [ ] Batch transformation \(SQL\)
171+
* [ ] Streaming transformation
172+
* **Deployments**
173+
* [ ] AWS Lambda \(Development in progress. See [RFC](https://docs.google.com/document/d/1eZWKWzfBif66LDN32IajpaG-j82LSHCCOzY6R7Ax7MI/edit)\)
174+
* [ ] Cloud Run
175+
* [ ] Kubernetes
176+
* [ ] KNative
177+
* **Feature Serving**
178+
* [x] Python Client
179+
* [ ] REST Feature Server \(Python\) \(Development in progress. See [RFC](https://docs.google.com/document/d/1iXvFhAsJ5jgAhPOpTdB3j-Wj1S9x3Ev_Wr6ZpnLzER4/edit)\)
180+
* [ ] gRPC Feature Server \(Java\) \(See [\#1497](https://github.com/feast-dev/feast/issues/1497)\)
181+
* [ ] Java Client
182+
* [ ] Go Client
183+
* [ ] Push API
184+
* [ ] Delete API
185+
* [ ] Feature Logging \(for training\)
186+
* **Data Quality Management**
187+
* [ ] Data profiling and validation \(Great Expectations\) \(Planned for Q4 2021\)
188+
* [ ] Metric production
189+
* [ ] Training-serving skew detection
190+
* [ ] Drift detection
191+
* [ ] Alerting
192+
* **Feature Discovery and Governance**
193+
* [x] Python SDK for browsing feature registry
194+
* [x] CLI for browsing feature registry
195+
* [x] Model-centric feature tracking \(feature services\)
196+
* [ ] REST API for browsing feature registry
197+
* [ ] Feast Web UI \(Planned for Q4 2021\)
198+
* [ ] Feature versioning
199+
* [ ] Amundsen integration
200+
201+
213202

214203
## 🎓 Important Resources
215204

sdk/python/feast/errors.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ def __init__(self, feature_refs_collisions: List[str], full_feature_names: bool)
129129
)
130130

131131

132+
class SpecifiedFeaturesNotPresentError(Exception):
133+
def __init__(self, specified_features: List[str], feature_view_name: str):
134+
features = ", ".join(specified_features)
135+
super().__init__(
136+
f"Explicitly specified features {features} not found in inferred list of features for '{feature_view_name}'"
137+
)
138+
139+
132140
class FeastOnlineStoreInvalidName(Exception):
133141
def __init__(self, online_store_class_name: str):
134142
super().__init__(

sdk/python/feast/feature.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def __lt__(self, other):
5959

6060
def __repr__(self):
6161
# return string representation of the reference
62-
return self.name
62+
return f"{self.name}-{self.dtype}"
6363

6464
def __str__(self):
6565
# readable string of the reference

sdk/python/feast/feature_store.py

Lines changed: 81 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,9 @@ def apply(
401401
for view in views_to_update:
402402
view.infer_features_from_batch_source(self.config)
403403

404+
for odfv in odfvs_to_update:
405+
odfv.infer_features_from_batch_source(self.config)
406+
404407
if len(views_to_update) + len(entities_to_update) + len(
405408
services_to_update
406409
) + len(odfvs_to_update) != len(objects):
@@ -535,10 +538,18 @@ def get_historical_features(
535538
_feature_refs = self._get_features(features, feature_refs)
536539

537540
all_feature_views = self.list_feature_views()
538-
feature_views = list(
539-
view for view, _ in _group_feature_refs(_feature_refs, all_feature_views)
541+
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
542+
project=self.project
540543
)
541544

545+
# TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider.
546+
# This is a weird interface quirk - we should revisit the `get_historical_features` to
547+
# pass in the on demand feature views as well.
548+
fvs, _ = _group_feature_refs(
549+
_feature_refs, all_feature_views, all_on_demand_feature_views
550+
)
551+
feature_views = list(view for view, _ in fvs)
552+
542553
_validate_feature_refs(_feature_refs, full_feature_names)
543554

544555
provider = self._get_provider()
@@ -773,8 +784,14 @@ def get_online_features(
773784
all_feature_views = self._list_feature_views(
774785
allow_cache=True, hide_dummy_entity=False
775786
)
787+
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
788+
project=self.project, allow_cache=True
789+
)
790+
776791
_validate_feature_refs(_feature_refs, full_feature_names)
777-
grouped_refs = _group_feature_refs(_feature_refs, all_feature_views)
792+
grouped_refs, _ = _group_feature_refs(
793+
_feature_refs, all_feature_views, all_on_demand_feature_views
794+
)
778795
feature_views = list(view for view, _ in grouped_refs)
779796
entityless_case = DUMMY_ENTITY_NAME in [
780797
entity_name
@@ -863,32 +880,49 @@ def _augment_response_with_on_demand_transforms(
863880
initial_response: OnlineResponse,
864881
result_rows: List[GetOnlineFeaturesResponse.FieldValues],
865882
) -> OnlineResponse:
866-
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
867-
project=self.project, allow_cache=True
868-
)
883+
all_on_demand_feature_views = {
884+
view.name: view
885+
for view in self._registry.list_on_demand_feature_views(
886+
project=self.project, allow_cache=True
887+
)
888+
}
889+
all_odfv_feature_names = all_on_demand_feature_views.keys()
890+
869891
if len(all_on_demand_feature_views) == 0:
870892
return initial_response
871893
initial_response_df = initial_response.to_df()
894+
895+
odfv_feature_refs = defaultdict(list)
896+
for feature_ref in feature_refs:
897+
view_name, feature_name = feature_ref.split(":")
898+
if view_name in all_odfv_feature_names:
899+
odfv_feature_refs[view_name].append(feature_name)
900+
872901
# Apply on demand transformations
873-
for odfv in all_on_demand_feature_views:
874-
feature_ref = odfv.name
875-
if feature_ref in feature_refs:
876-
transformed_features_df = odfv.get_transformed_features_df(
877-
full_feature_names, initial_response_df
878-
)
879-
for row_idx in range(len(result_rows)):
880-
result_row = result_rows[row_idx]
881-
# TODO(adchia): support multiple output features in an ODFV, which requires different naming
882-
# conventions
883-
result_row.fields[odfv.name].CopyFrom(
884-
python_value_to_proto_value(
885-
transformed_features_df[odfv.features[0].name].values[
886-
row_idx
887-
]
888-
)
902+
for odfv_name, _feature_refs in odfv_feature_refs.items():
903+
odfv = all_on_demand_feature_views[odfv_name]
904+
transformed_features_df = odfv.get_transformed_features_df(
905+
full_feature_names, initial_response_df
906+
)
907+
for row_idx in range(len(result_rows)):
908+
result_row = result_rows[row_idx]
909+
910+
selected_subset = [
911+
f for f in transformed_features_df.columns if f in _feature_refs
912+
]
913+
914+
for transformed_feature in selected_subset:
915+
transformed_feature_name = (
916+
f"{odfv.name}__{transformed_feature}"
917+
if full_feature_names
918+
else transformed_feature
889919
)
920+
proto_value = python_value_to_proto_value(
921+
transformed_features_df[transformed_feature].values[row_idx]
922+
)
923+
result_row.fields[transformed_feature_name].CopyFrom(proto_value)
890924
result_row.statuses[
891-
feature_ref
925+
transformed_feature_name
892926
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
893927
return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))
894928

@@ -941,36 +975,50 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F
941975

942976

943977
def _group_feature_refs(
944-
features: Union[List[str], FeatureService], all_feature_views: List[FeatureView]
945-
) -> List[Tuple[FeatureView, List[str]]]:
978+
features: Union[List[str], FeatureService],
979+
all_feature_views: List[FeatureView],
980+
all_on_demand_feature_views: List[OnDemandFeatureView],
981+
) -> Tuple[
982+
List[Tuple[FeatureView, List[str]]], List[Tuple[OnDemandFeatureView, List[str]]]
983+
]:
946984
""" Get list of feature views and corresponding feature names based on feature references"""
947985

948986
# view name to view proto
949987
view_index = {view.name: view for view in all_feature_views}
950988

989+
# on demand view to on demand view proto
990+
on_demand_view_index = {view.name: view for view in all_on_demand_feature_views}
991+
951992
# view name to feature names
952993
views_features = defaultdict(list)
953994

995+
# on demand view name to feature names
996+
on_demand_view_features = defaultdict(list)
997+
954998
if isinstance(features, list) and isinstance(features[0], str):
955999
for ref in features:
956-
if ":" not in ref:
957-
# This is an on demand feature view ref
958-
continue
9591000
view_name, feat_name = ref.split(":")
960-
if view_name not in view_index:
1001+
if view_name in view_index:
1002+
views_features[view_name].append(feat_name)
1003+
elif view_name in on_demand_view_index:
1004+
on_demand_view_features[view_name].append(feat_name)
1005+
else:
9611006
raise FeatureViewNotFoundException(view_name)
962-
views_features[view_name].append(feat_name)
9631007
elif isinstance(features, FeatureService):
9641008
for feature_projection in features.features:
9651009
projected_features = feature_projection.features
9661010
views_features[feature_projection.name].extend(
9671011
[f.name for f in projected_features]
9681012
)
9691013

970-
result = []
1014+
fvs_result: List[Tuple[FeatureView, List[str]]] = []
1015+
odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = []
1016+
9711017
for view_name, feature_names in views_features.items():
972-
result.append((view_index[view_name], feature_names))
973-
return result
1018+
fvs_result.append((view_index[view_name], feature_names))
1019+
for view_name, feature_names in on_demand_view_features.items():
1020+
odfvs_result.append((on_demand_view_index[view_name], feature_names))
1021+
return fvs_result, odfvs_result
9741022

9751023

9761024
def _get_feature_refs_from_feature_services(

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,13 @@ def get_historical_features(
9191
raise ValueError(
9292
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
9393
)
94-
feature_views_to_features = _get_requested_feature_views_to_features_dict(
95-
feature_refs, feature_views
94+
(
95+
feature_views_to_features,
96+
on_demand_feature_views_to_features,
97+
) = _get_requested_feature_views_to_features_dict(
98+
feature_refs,
99+
feature_views,
100+
registry.list_on_demand_feature_views(config.project),
96101
)
97102

98103
# Create lazy function that is only called from the RetrievalJob object

0 commit comments

Comments
 (0)