@@ -185,9 +185,11 @@ def __init__( # noqa: C901
185185 sources: A map from input source names to the actual input sources, which may be
186186 feature views, or request data sources. These sources serve as inputs to the udf,
187187 which will refer to them by name.
188- input_schema (optional): A list of Fields describing the schema of the input data
189- for aggregation-based views. When provided, sources is not required — an
190- internal RequestSource will be created automatically.
188+ input_schema (optional): A list of Fields describing data that is accepted as input
189+ but not stored directly as features — e.g. aggregation columns, normalization
190+ parameters, thresholds, or other contextual values passed at request time.
191+ When provided, sources is not required — an internal RequestSource will be
192+ created automatically.
191193 udf: The user defined transformation function, which must take pandas
192194 dataframes as inputs.
193195 udf_string: The source code version of the udf (for diffing and displaying in Web UI)
@@ -225,6 +227,7 @@ def __init__( # noqa: C901
225227 self .udf_string = udf_string
226228 self .source_feature_view_projections : dict [str , FeatureViewProjection ] = {}
227229 self .source_request_sources : dict [str , RequestSource ] = {}
230+ self ._input_schema_sentinel : Optional [RequestSource ] = None
228231
229232 # Strip any existing sentinel from sources (handles __copy__ round-trip)
230233 effective_sources : List [OnDemandSourceType ] = [
@@ -237,12 +240,13 @@ def __init__( # noqa: C901
237240 ]
238241
239242 if input_schema is not None :
240- # Automatically create an internal RequestSource from input_schema
241- sentinel = RequestSource (
243+ # Automatically create an internal RequestSource from input_schema.
244+ # Stored privately so it does not appear in source_request_sources for
245+ # external consumers (e.g. the feature server, apply(), utils.py).
246+ self ._input_schema_sentinel = RequestSource (
242247 name = f"{ self ._INPUT_SCHEMA_SOURCE_PREFIX } { name } " ,
243248 schema = input_schema ,
244249 )
245- self .source_request_sources [sentinel .name ] = sentinel
246250 elif not effective_sources :
247251 raise ValueError (
248252 "Either 'sources' or 'input_schema' must be provided for OnDemandFeatureView."
@@ -300,6 +304,20 @@ def __init__( # noqa: C901
300304 self .track_metrics = track_metrics
301305 self .aggregations = aggregations or []
302306
307+ if input_schema is not None and self .aggregations :
308+ input_field_names = {f .name for f in input_schema }
309+ unknown = [
310+ agg .column
311+ for agg in self .aggregations
312+ if agg .column and agg .column not in input_field_names
313+ ]
314+ if unknown :
315+ raise ValueError (
316+ f"Aggregation column(s) { unknown } not found in input_schema "
317+ f"for OnDemandFeatureView '{ name } '. "
318+ f"Available fields: { sorted (input_field_names )} "
319+ )
320+
303321 def _add_source_to_collections (self , odfv_source : OnDemandSourceType ) -> None :
304322 """
305323 Add a source to the appropriate collection with explicit type checking.
@@ -564,6 +582,14 @@ def to_proto(self) -> OnDemandFeatureViewProto:
564582 request_data_source = request_sources .to_proto ()
565583 )
566584
585+ # Serialize the input_schema sentinel so that from_proto() can reconstruct
586+ # input_schema correctly; it is excluded from source_request_sources so that
587+ # external consumers never see it as a real data source.
588+ if self ._input_schema_sentinel is not None :
589+ sources [self ._input_schema_sentinel .name ] = OnDemandSource (
590+ request_data_source = self ._input_schema_sentinel .to_proto ()
591+ )
592+
567593 feature_transformation = transformation_to_proto (self .feature_transformation )
568594
569595 tags = dict (self .tags ) if self .tags else {}
@@ -851,6 +877,10 @@ def get_request_data_schema(self) -> dict[str, ValueType]:
851877 raise TypeError (
852878 f"Request source schema is not correct type: ${ str (type (request_source .schema ))} "
853879 )
880+ # Include fields from the input_schema sentinel (stored privately)
881+ if self ._input_schema_sentinel is not None :
882+ for field in self ._input_schema_sentinel .schema :
883+ schema [field .name ] = field .dtype .to_value_type ()
854884 return schema
855885
856886 def _get_projected_feature_name (self , feature : str ) -> str :
@@ -1168,6 +1198,13 @@ def _construct_random_input(
11681198 sample_value = sample_values .get (value_type , default_value )
11691199 feature_dict [field .name ] = sample_value
11701200
1201+ # Add input_schema fields (stored privately outside source_request_sources)
1202+ if self ._input_schema_sentinel is not None :
1203+ for field in self ._input_schema_sentinel .schema :
1204+ value_type = field .dtype .to_value_type ()
1205+ sample_value = sample_values .get (value_type , default_value )
1206+ feature_dict [field .name ] = sample_value
1207+
11711208 return feature_dict
11721209
11731210 def _get_sample_values_by_type (self ) -> dict [ValueType , list [Any ]]:
@@ -1287,8 +1324,10 @@ def on_demand_feature_view(
12871324 sources: A map from input source names to the actual input sources, which may be
12881325 feature views, or request data sources. These sources serve as inputs to the udf,
12891326 which will refer to them by name.
1290- input_schema (optional): A list of Fields describing the schema of the input data
1291- for aggregation-based views. When provided, sources is not required.
1327+ input_schema (optional): A list of Fields describing data that is accepted as input
1328+ but not stored directly as features — e.g. aggregation columns, normalization
1329+ parameters, thresholds, or other contextual values passed at request time.
1330+ When provided, sources is not required.
12921331 mode: The mode of execution (e.g,. Pandas or Python Native)
12931332 description (optional): A human-readable description.
12941333 tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
0 commit comments