Skip to content

Commit 101e014

Browse files
authored
Support adding request data in on demand transforms (feast-dev#1851)
* Merge Signed-off-by: Danny Chiao <danny@tecton.ai> * Fixing bugs Signed-off-by: Danny Chiao <danny@tecton.ai> * Fix lint Signed-off-by: Danny Chiao <danny@tecton.ai> * Add tests Signed-off-by: Danny Chiao <danny@tecton.ai> * Address comments Signed-off-by: Danny Chiao <danny@tecton.ai> * Fix inference test Signed-off-by: Danny Chiao <danny@tecton.ai> * Fix merge Signed-off-by: Danny Chiao <danny@tecton.ai> * Fix feature service test Signed-off-by: Danny Chiao <danny@tecton.ai> * Fixing entity df test Signed-off-by: Danny Chiao <danny@tecton.ai> * Fix tests Signed-off-by: Danny Chiao <danny@tecton.ai> * Uncomment temporary test disable Signed-off-by: Danny Chiao <danny@tecton.ai>
1 parent b066695 commit 101e014

File tree

14 files changed

+359
-65
lines changed

14 files changed

+359
-65
lines changed

protos/feast/core/DataSource.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ option java_outer_classname = "DataSourceProto";
2323
option java_package = "feast.proto.core";
2424

2525
import "feast/core/DataFormat.proto";
26+
import "feast/types/Value.proto";
2627

2728
// Defines a Data Source that can be used source Feature data
2829
message DataSource {
@@ -39,6 +40,7 @@ message DataSource {
3940
STREAM_KINESIS = 4;
4041
BATCH_REDSHIFT = 5;
4142
CUSTOM_SOURCE = 6;
43+
REQUEST_SOURCE = 7;
4244
}
4345
SourceType type = 1;
4446

@@ -133,13 +135,23 @@ message DataSource {
133135
bytes configuration = 1;
134136
}
135137

138+
// Defines options for DataSource that sources features from request data
139+
message RequestDataOptions {
140+
// Name of the request data source
141+
string name = 1;
142+
143+
// Mapping of feature name to type
144+
map<string, feast.types.ValueType.Enum> schema = 2;
145+
}
146+
136147
// DataSource options.
137148
oneof options {
138149
FileOptions file_options = 11;
139150
BigQueryOptions bigquery_options = 12;
140151
KafkaOptions kafka_options = 13;
141152
KinesisOptions kinesis_options = 14;
142153
RedshiftOptions redshift_options = 15;
154+
RequestDataOptions request_data_options = 18;
143155
CustomSourceOptions custom_options = 16;
144156
}
145157
}

protos/feast/core/OnDemandFeatureView.proto

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ option java_package = "feast.proto.core";
2424

2525
import "feast/core/FeatureView.proto";
2626
import "feast/core/Feature.proto";
27+
import "feast/core/DataSource.proto";
2728

2829
message OnDemandFeatureView {
2930
// User-specified specifications of this feature view.
@@ -41,12 +42,18 @@ message OnDemandFeatureViewSpec {
4142
repeated FeatureSpecV2 features = 3;
4243

4344
// List of features specifications for each feature defined with this feature view.
44-
// TODO(adchia): add support for request data
45-
map<string, FeatureView> inputs = 4;
45+
map<string, OnDemandInput> inputs = 4;
4646

4747
UserDefinedFunction user_defined_function = 5;
4848
}
4949

50+
message OnDemandInput {
51+
oneof input {
52+
FeatureView feature_view = 1;
53+
DataSource request_data_source = 2;
54+
}
55+
}
56+
5057
// Serialized representation of python function.
5158
message UserDefinedFunction {
5259
// The function name

sdk/python/feast/data_source.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,74 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
512512
return type_map.redshift_to_feast_value_type
513513

514514

515+
class RequestDataSource(DataSource):
516+
"""
517+
RequestDataSource that can be used to provide input features for on demand transforms
518+
519+
Args:
520+
name: Name of the request data source
521+
schema: Schema mapping from the input feature name to a ValueType
522+
"""
523+
524+
@staticmethod
525+
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
526+
raise NotImplementedError
527+
528+
_name: str
529+
_schema: Dict[str, ValueType]
530+
531+
def __init__(
532+
self, name: str, schema: Dict[str, ValueType],
533+
):
534+
"""Creates a RequestDataSource object."""
535+
super().__init__()
536+
self._name = name
537+
self._schema = schema
538+
539+
@property
540+
def name(self) -> str:
541+
"""
542+
Returns the name of this data source
543+
"""
544+
return self._name
545+
546+
@property
547+
def schema(self) -> Dict[str, ValueType]:
548+
"""
549+
Returns the schema for this request data source
550+
"""
551+
return self._schema
552+
553+
def validate(self, config: RepoConfig):
554+
pass
555+
556+
def get_table_column_names_and_types(
557+
self, config: RepoConfig
558+
) -> Iterable[Tuple[str, str]]:
559+
pass
560+
561+
@staticmethod
562+
def from_proto(data_source: DataSourceProto):
563+
schema_pb = data_source.request_data_options.schema
564+
schema = {}
565+
for key in schema_pb.keys():
566+
schema[key] = ValueType(schema_pb.get(key))
567+
return RequestDataSource(
568+
name=data_source.request_data_options.name, schema=schema
569+
)
570+
571+
def to_proto(self) -> DataSourceProto:
572+
schema_pb = {}
573+
for key, value in self._schema.items():
574+
schema_pb[key] = value.value
575+
options = DataSourceProto.RequestDataOptions(name=self._name, schema=schema_pb)
576+
data_source_proto = DataSourceProto(
577+
type=DataSourceProto.REQUEST_SOURCE, request_data_options=options
578+
)
579+
580+
return data_source_proto
581+
582+
515583
class KinesisSource(DataSource):
516584
def validate(self, config: RepoConfig):
517585
pass

sdk/python/feast/errors.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,20 @@ def __init__(self, name, project=None):
5050
super().__init__(f"On demand feature view {name} does not exist")
5151

5252

53+
class RequestDataNotFoundInEntityDfException(FeastObjectNotFoundException):
54+
def __init__(self, feature_name, feature_view_name):
55+
super().__init__(
56+
f"Feature {feature_name} not found in the entity dataframe, but required by on demand feature view {feature_view_name}"
57+
)
58+
59+
60+
class RequestDataNotFoundInEntityRowsException(FeastObjectNotFoundException):
61+
def __init__(self, feature_names):
62+
super().__init__(
63+
f"Required request data source features {feature_names} not found in the entity rows, but required by on demand feature views"
64+
)
65+
66+
5367
class FeatureTableNotFoundException(FeastObjectNotFoundException):
5468
def __init__(self, name, project=None):
5569
if project:

sdk/python/feast/feature_store.py

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@
1616
from collections import Counter, OrderedDict, defaultdict
1717
from datetime import datetime, timedelta
1818
from pathlib import Path
19-
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
19+
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast
2020

2121
import pandas as pd
2222
from colorama import Fore, Style
2323
from tqdm import tqdm
2424

2525
from feast import feature_server, utils
26+
from feast.data_source import RequestDataSource
2627
from feast.entity import Entity
2728
from feast.errors import (
2829
EntityNotFoundException,
2930
FeatureNameCollisionError,
3031
FeatureViewNotFoundException,
32+
RequestDataNotFoundInEntityDfException,
33+
RequestDataNotFoundInEntityRowsException,
3134
)
3235
from feast.feature_service import FeatureService
3336
from feast.feature_table import FeatureTable
@@ -402,7 +405,7 @@ def apply(
402405
view.infer_features_from_batch_source(self.config)
403406

404407
for odfv in odfvs_to_update:
405-
odfv.infer_features_from_batch_source(self.config)
408+
odfv.infer_features()
406409

407410
if len(views_to_update) + len(entities_to_update) + len(
408411
services_to_update
@@ -545,10 +548,26 @@ def get_historical_features(
545548
# TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider.
546549
# This is a weird interface quirk - we should revisit the `get_historical_features` to
547550
# pass in the on demand feature views as well.
548-
fvs, _ = _group_feature_refs(
551+
fvs, odfvs = _group_feature_refs(
549552
_feature_refs, all_feature_views, all_on_demand_feature_views
550553
)
551554
feature_views = list(view for view, _ in fvs)
555+
on_demand_feature_views = list(view for view, _ in odfvs)
556+
557+
# Check that the right request data is present in the entity_df
558+
if type(entity_df) == pd.DataFrame:
559+
entity_pd_df = cast(pd.DataFrame, entity_df)
560+
for odfv in on_demand_feature_views:
561+
odfv_inputs = odfv.inputs.values()
562+
for odfv_input in odfv_inputs:
563+
if type(odfv_input) == RequestDataSource:
564+
request_data_source = cast(RequestDataSource, odfv_input)
565+
for feature_name in request_data_source.schema.keys():
566+
if feature_name not in entity_pd_df.columns:
567+
raise RequestDataNotFoundInEntityDfException(
568+
feature_name=feature_name,
569+
feature_view_name=odfv.name,
570+
)
552571

553572
_validate_feature_refs(_feature_refs, full_feature_names)
554573

@@ -789,7 +808,7 @@ def get_online_features(
789808
)
790809

791810
_validate_feature_refs(_feature_refs, full_feature_names)
792-
grouped_refs, _ = _group_feature_refs(
811+
grouped_refs, grouped_odfv_refs = _group_feature_refs(
793812
_feature_refs, all_feature_views, all_on_demand_feature_views
794813
)
795814
feature_views = list(view for view, _ in grouped_refs)
@@ -805,28 +824,61 @@ def get_online_features(
805824
for entity in entities:
806825
entity_name_to_join_key_map[entity.name] = entity.join_key
807826

827+
needed_request_data_features = self._get_needed_request_data_features(
828+
grouped_odfv_refs
829+
)
830+
808831
join_key_rows = []
832+
request_data_features: Dict[str, List[Any]] = {}
833+
# Entity rows may be either entities or request data.
809834
for row in entity_rows:
810835
join_key_row = {}
811836
for entity_name, entity_value in row.items():
837+
# Found request data
838+
if entity_name in needed_request_data_features:
839+
if entity_name not in request_data_features:
840+
request_data_features[entity_name] = []
841+
request_data_features[entity_name].append(entity_value)
842+
continue
812843
try:
813844
join_key = entity_name_to_join_key_map[entity_name]
814845
except KeyError:
815846
raise EntityNotFoundException(entity_name, self.project)
816847
join_key_row[join_key] = entity_value
817848
if entityless_case:
818849
join_key_row[DUMMY_ENTITY_ID] = DUMMY_ENTITY_VAL
819-
join_key_rows.append(join_key_row)
850+
if len(join_key_row) > 0:
851+
# May be empty if this entity row was request data
852+
join_key_rows.append(join_key_row)
853+
854+
if len(needed_request_data_features) != len(request_data_features.keys()):
855+
raise RequestDataNotFoundInEntityRowsException(
856+
feature_names=needed_request_data_features
857+
)
820858

821859
entity_row_proto_list = _infer_online_entity_rows(join_key_rows)
822860

823-
union_of_entity_keys = []
861+
union_of_entity_keys: List[EntityKeyProto] = []
824862
result_rows: List[GetOnlineFeaturesResponse.FieldValues] = []
825863

826864
for entity_row_proto in entity_row_proto_list:
865+
# Create a list of entity keys to filter down for each feature view at lookup time.
827866
union_of_entity_keys.append(_entity_row_to_key(entity_row_proto))
867+
# Also create entity values to append to the result
828868
result_rows.append(_entity_row_to_field_values(entity_row_proto))
829869

870+
# Add more feature values to the existing result rows for the request data features
871+
for feature_name, feature_values in request_data_features.items():
872+
for row_idx, feature_value in enumerate(feature_values):
873+
result_row = result_rows[row_idx]
874+
result_row.fields[feature_name].CopyFrom(
875+
python_value_to_proto_value(feature_value)
876+
)
877+
result_row.statuses[
878+
feature_name
879+
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
880+
881+
# Note: each "table" is a feature view
830882
for table, requested_features in grouped_refs:
831883
entity_keys = _get_table_entity_keys(
832884
table, union_of_entity_keys, entity_name_to_join_key_map
@@ -837,6 +889,7 @@ def get_online_features(
837889
entity_keys=entity_keys,
838890
requested_features=requested_features,
839891
)
892+
# Each row is a set of features for a given entity key
840893
for row_idx, read_row in enumerate(read_rows):
841894
row_ts, feature_data = read_row
842895
result_row = result_rows[row_idx]
@@ -873,6 +926,18 @@ def get_online_features(
873926
_feature_refs, full_feature_names, initial_response, result_rows
874927
)
875928

929+
def _get_needed_request_data_features(self, grouped_odfv_refs) -> Set[str]:
930+
needed_request_data_features = set()
931+
for odfv_to_feature_names in grouped_odfv_refs:
932+
odfv, requested_feature_names = odfv_to_feature_names
933+
odfv_inputs = odfv.inputs.values()
934+
for odfv_input in odfv_inputs:
935+
if type(odfv_input) == RequestDataSource:
936+
request_data_source = cast(RequestDataSource, odfv_input)
937+
for feature_name in request_data_source.schema.keys():
938+
needed_request_data_features.add(feature_name)
939+
return needed_request_data_features
940+
876941
def _augment_response_with_on_demand_transforms(
877942
self,
878943
feature_refs: List[str],

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ def get_historical_features(
156156
client=client,
157157
config=config,
158158
full_feature_names=full_feature_names,
159-
on_demand_feature_views=registry.list_on_demand_feature_views(
160-
project, allow_cache=True
159+
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
160+
feature_refs, project, registry
161161
),
162162
)
163163

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,8 @@ def evaluate_historical_retrieval():
247247
job = FileRetrievalJob(
248248
evaluation_function=evaluate_historical_retrieval,
249249
full_feature_names=full_feature_names,
250-
on_demand_feature_views=registry.list_on_demand_feature_views(
251-
project, allow_cache=True
250+
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
251+
feature_refs, project, registry
252252
),
253253
)
254254
return job

sdk/python/feast/infra/offline_stores/redshift.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,8 @@ def query_generator() -> Iterator[str]:
170170
s3_resource=s3_resource,
171171
config=config,
172172
full_feature_names=full_feature_names,
173-
on_demand_feature_views=registry.list_on_demand_feature_views(
174-
project=project, allow_cache=True
173+
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
174+
feature_refs, project, registry
175175
),
176176
drop_columns=["entity_timestamp"]
177177
+ [

0 commit comments

Comments
 (0)