Skip to content

Commit ca628b2

Browse files
committed
Feedback
Signed-off-by: Nick Quinn <nicholas_quinn@apple.com>
1 parent 9f57fcd commit ca628b2

2 files changed

Lines changed: 83 additions & 8 deletions

File tree

docs/reference/beta-on-demand-feature-view.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,42 @@ def driver_aggregated_stats(inputs):
6969

7070
Aggregated columns are automatically named using the pattern `{function}_{column}` (e.g., `sum_trips`, `mean_rating`).
7171

72+
### Using `input_schema` with Aggregations
73+
74+
When the input data is not already stored as a feature view, use `input_schema` instead of `sources` to describe the fields that will be passed at request time. Feast will create an internal `RequestSource` automatically.
75+
76+
```python
77+
from datetime import timedelta
78+
from feast import Field, on_demand_feature_view
79+
from feast.aggregation import Aggregation
80+
from feast.types import Float64, Int64
81+
82+
@on_demand_feature_view(
83+
input_schema=[
84+
Field(name="txn_amount", dtype=Float64),
85+
],
86+
schema=[
87+
Field(name="txn_count", dtype=Int64),
88+
Field(name="total_txn_amount", dtype=Float64),
89+
Field(name="avg_txn_amount", dtype=Float64),
90+
],
91+
aggregations=[
92+
Aggregation(column="txn_amount", function="count", name="txn_count",
93+
time_window=timedelta(days=30)),
94+
Aggregation(column="txn_amount", function="sum", name="total_txn_amount",
95+
time_window=timedelta(days=30)),
96+
Aggregation(column="txn_amount", function="mean", name="avg_txn_amount",
97+
time_window=timedelta(days=30)),
98+
],
99+
entities=[user],
100+
)
101+
def user_transaction_stats(inputs):
102+
# Aggregations replace the transformation function — no body needed.
103+
pass
104+
```
105+
106+
`input_schema` also accepts fields that are not aggregation columns — for example, thresholds, currency codes, or other contextual values passed at request time that your UDF needs but that are not stored as features.
107+
72108
## Example
73109
See [https://github.com/feast-dev/on-demand-feature-views-demo](https://github.com/feast-dev/on-demand-feature-views-demo) for an example on how to use on demand feature views.
74110

sdk/python/feast/on_demand_feature_view.py

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,11 @@ def __init__( # noqa: C901
185185
sources: A map from input source names to the actual input sources, which may be
186186
feature views, or request data sources. These sources serve as inputs to the udf,
187187
which will refer to them by name.
188-
input_schema (optional): A list of Fields describing the schema of the input data
189-
for aggregation-based views. When provided, sources is not required — an
190-
internal RequestSource will be created automatically.
188+
input_schema (optional): A list of Fields describing data that is accepted as input
189+
but not stored directly as features — e.g. aggregation columns, normalization
190+
parameters, thresholds, or other contextual values passed at request time.
191+
When provided, sources is not required — an internal RequestSource will be
192+
created automatically.
191193
udf: The user defined transformation function, which must take pandas
192194
dataframes as inputs.
193195
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
@@ -225,6 +227,7 @@ def __init__( # noqa: C901
225227
self.udf_string = udf_string
226228
self.source_feature_view_projections: dict[str, FeatureViewProjection] = {}
227229
self.source_request_sources: dict[str, RequestSource] = {}
230+
self._input_schema_sentinel: Optional[RequestSource] = None
228231

229232
# Strip any existing sentinel from sources (handles __copy__ round-trip)
230233
effective_sources: List[OnDemandSourceType] = [
@@ -237,12 +240,13 @@ def __init__( # noqa: C901
237240
]
238241

239242
if input_schema is not None:
240-
# Automatically create an internal RequestSource from input_schema
241-
sentinel = RequestSource(
243+
# Automatically create an internal RequestSource from input_schema.
244+
# Stored privately so it does not appear in source_request_sources for
245+
# external consumers (e.g. the feature server, apply(), utils.py).
246+
self._input_schema_sentinel = RequestSource(
242247
name=f"{self._INPUT_SCHEMA_SOURCE_PREFIX}{name}",
243248
schema=input_schema,
244249
)
245-
self.source_request_sources[sentinel.name] = sentinel
246250
elif not effective_sources:
247251
raise ValueError(
248252
"Either 'sources' or 'input_schema' must be provided for OnDemandFeatureView."
@@ -300,6 +304,20 @@ def __init__( # noqa: C901
300304
self.track_metrics = track_metrics
301305
self.aggregations = aggregations or []
302306

307+
if input_schema is not None and self.aggregations:
308+
input_field_names = {f.name for f in input_schema}
309+
unknown = [
310+
agg.column
311+
for agg in self.aggregations
312+
if agg.column and agg.column not in input_field_names
313+
]
314+
if unknown:
315+
raise ValueError(
316+
f"Aggregation column(s) {unknown} not found in input_schema "
317+
f"for OnDemandFeatureView '{name}'. "
318+
f"Available fields: {sorted(input_field_names)}"
319+
)
320+
303321
def _add_source_to_collections(self, odfv_source: OnDemandSourceType) -> None:
304322
"""
305323
Add a source to the appropriate collection with explicit type checking.
@@ -564,6 +582,14 @@ def to_proto(self) -> OnDemandFeatureViewProto:
564582
request_data_source=request_sources.to_proto()
565583
)
566584

585+
# Serialize the input_schema sentinel so that from_proto() can reconstruct
586+
# input_schema correctly; it is excluded from source_request_sources so that
587+
# external consumers never see it as a real data source.
588+
if self._input_schema_sentinel is not None:
589+
sources[self._input_schema_sentinel.name] = OnDemandSource(
590+
request_data_source=self._input_schema_sentinel.to_proto()
591+
)
592+
567593
feature_transformation = transformation_to_proto(self.feature_transformation)
568594

569595
tags = dict(self.tags) if self.tags else {}
@@ -851,6 +877,10 @@ def get_request_data_schema(self) -> dict[str, ValueType]:
851877
raise TypeError(
852878
f"Request source schema is not correct type: ${str(type(request_source.schema))}"
853879
)
880+
# Include fields from the input_schema sentinel (stored privately)
881+
if self._input_schema_sentinel is not None:
882+
for field in self._input_schema_sentinel.schema:
883+
schema[field.name] = field.dtype.to_value_type()
854884
return schema
855885

856886
def _get_projected_feature_name(self, feature: str) -> str:
@@ -1168,6 +1198,13 @@ def _construct_random_input(
11681198
sample_value = sample_values.get(value_type, default_value)
11691199
feature_dict[field.name] = sample_value
11701200

1201+
# Add input_schema fields (stored privately outside source_request_sources)
1202+
if self._input_schema_sentinel is not None:
1203+
for field in self._input_schema_sentinel.schema:
1204+
value_type = field.dtype.to_value_type()
1205+
sample_value = sample_values.get(value_type, default_value)
1206+
feature_dict[field.name] = sample_value
1207+
11711208
return feature_dict
11721209

11731210
def _get_sample_values_by_type(self) -> dict[ValueType, list[Any]]:
@@ -1287,8 +1324,10 @@ def on_demand_feature_view(
12871324
sources: A map from input source names to the actual input sources, which may be
12881325
feature views, or request data sources. These sources serve as inputs to the udf,
12891326
which will refer to them by name.
1290-
input_schema (optional): A list of Fields describing the schema of the input data
1291-
for aggregation-based views. When provided, sources is not required.
1327+
input_schema (optional): A list of Fields describing data that is accepted as input
1328+
but not stored directly as features — e.g. aggregation columns, normalization
1329+
parameters, thresholds, or other contextual values passed at request time.
1330+
When provided, sources is not required.
12921331
mode: The mode of execution (e.g,. Pandas or Python Native)
12931332
description (optional): A human-readable description.
12941333
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.

0 commit comments

Comments
 (0)