Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
837e34f
feat: Add unified transformation
franciscojavierarceo Nov 26, 2025
d12fbfd
feat: Unify transformations
franciscojavierarceo Nov 27, 2025
9a2df49
feat: Unify Transformations
franciscojavierarceo Nov 28, 2025
9aceb7f
feat: Unify Transformations
franciscojavierarceo Nov 28, 2025
5c8b93c
updated docs
franciscojavierarceo Dec 4, 2025
6d5ce47
refactor: separate transformation logic from execution decisions with…
franciscojavierarceo Dec 23, 2025
9380cf9
format
franciscojavierarceo Dec 23, 2025
5b759ed
incorporaitng feedback
franciscojavierarceo Dec 24, 2025
2d7a43b
updated
franciscojavierarceo Dec 25, 2025
b6299d2
fix
franciscojavierarceo Dec 26, 2025
3726733
linter
franciscojavierarceo Dec 26, 2025
f55d4b4
cleanup
franciscojavierarceo Dec 26, 2025
716e692
cleanup
franciscojavierarceo Dec 26, 2025
a4f2e0a
more fix
franciscojavierarceo Dec 29, 2025
02ae40b
more fix
franciscojavierarceo Dec 29, 2025
74de467
updated
franciscojavierarceo Dec 31, 2025
c360aef
fix
franciscojavierarceo Jan 5, 2026
f73431c
fix
franciscojavierarceo Jan 5, 2026
50e536a
fix
franciscojavierarceo Jan 5, 2026
a87c4b4
fix
franciscojavierarceo Jan 5, 2026
e2f722c
Merge branch 'master' into refactor-odfv
franciscojavierarceo Jan 5, 2026
dde05bd
lint
franciscojavierarceo Jan 5, 2026
0e1f037
fix
franciscojavierarceo Jan 6, 2026
662e21b
linter
franciscojavierarceo Jan 6, 2026
9f48c75
fix linter
franciscojavierarceo Jan 6, 2026
a058aac
fix
franciscojavierarceo Jan 6, 2026
b8771c9
fix(redis): Preserve millisecond timestamp precision for Redis online…
jatin5251 Jan 6, 2026
34d9b52
feat: Add GCS registry store in Go feature server (#5818)
samuelkim7 Jan 6, 2026
6c35d45
chore: Refactor some unit tests into integration tests (#5820)
franciscojavierarceo Jan 6, 2026
5bcd6e6
test: Remove e2e_rhoai package tests
Srihari1192 Jan 5, 2026
61812ba
fix
franciscojavierarceo Jan 7, 2026
d4adcd5
fix
franciscojavierarceo Jan 7, 2026
a007be3
fix
franciscojavierarceo Jan 8, 2026
416b15c
uploading progress
franciscojavierarceo Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: separate transformation logic from execution decisions with…
… auto-inference

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Jan 5, 2026
commit 6d5ce4736c38ded6f476e3ec3d025c2ec103ebb0
20 changes: 16 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,14 +966,26 @@ def apply(
services_to_update,
)

# Handle dual registration for online_enabled FeatureViews
online_enabled_views = [
# Handle dual registration for FeatureViews with online transform execution
dual_registration_views = [
view
for view in views_to_update
if hasattr(view, "online_enabled") and view.online_enabled
if (
hasattr(view, "transform_when")
and view.transform_when
and (
view.transform_when in ["batch_on_read", "batch_on_write"]
or (
hasattr(view.transform_when, "value")
and view.transform_when.value in ["batch_on_read", "batch_on_write"]
)
)
and hasattr(view, "online")
and view.online
)
]

for fv in online_enabled_views:
for fv in dual_registration_views:
# Create OnDemandFeatureView for online serving with same transformation
if hasattr(fv, "feature_transformation") and fv.feature_transformation:
# Create ODFV with same transformation logic
Expand Down
39 changes: 27 additions & 12 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
FeatureTransformationV2 as FeatureTransformationProto,
)
from feast.transformation.base import Transformation
from feast.transformation.mode import TransformationMode, TransformationTiming
from feast.transformation.mode import TransformationMode, TransformExecutionPattern
from feast.types import from_value_type
from feast.value_type import ValueType

Expand Down Expand Up @@ -109,8 +109,7 @@ class FeatureView(BaseFeatureView):
materialization_intervals: List[Tuple[datetime, datetime]]
mode: Optional[Union["TransformationMode", str]]
feature_transformation: Optional[Transformation]
when: Optional[Union[TransformationTiming, str]]
online_enabled: bool
transform_when: Optional[Union["TransformExecutionPattern", str]]

def __init__(
self,
Expand All @@ -128,8 +127,7 @@ def __init__(
owner: str = "",
mode: Optional[Union["TransformationMode", str]] = None,
feature_transformation: Optional[Transformation] = None,
when: Optional[Union[TransformationTiming, str]] = None,
online_enabled: bool = False,
transform_when: Optional[Union["TransformExecutionPattern", str]] = None,
):
"""
Creates a FeatureView object.
Expand Down Expand Up @@ -157,10 +155,8 @@ def __init__(
when transformations are applied. Choose from TransformationMode enum values.
feature_transformation (optional): The transformation object containing the UDF and
mode for this feature view. Used for derived feature views.
when (optional): The timing for when transformation should execute. Choose from
TransformationTiming enum values (on_read, on_write, batch, streaming).
online_enabled (optional): Whether to enable dual registration for both batch
materialization and online serving with Feature Server.
transform_when (optional): The timing for when transformation should execute. Choose from
TransformExecutionPattern enum values (batch_only, batch_on_read, batch_on_write).

Raises:
ValueError: A field mapping conflicts with an Entity or a Feature.
Expand All @@ -176,8 +172,27 @@ def __init__(
or self.feature_transformation is None
):
self.feature_transformation = feature_transformation
self.when = when
self.online_enabled = online_enabled
self.transform_when = transform_when

# Auto-infer online setting based on transform_when pattern
if transform_when in [TransformExecutionPattern.BATCH_ON_READ, TransformExecutionPattern.BATCH_ON_WRITE]:
if online is False:
raise ValueError(
f"Cannot set online=False with transform_when='{transform_when}'. "
f"Online execution patterns require online=True."
)
self.online = True # Auto-infer online=True
elif transform_when == "batch_on_read" or transform_when == "batch_on_write":
# Handle string values as well
if online is False:
raise ValueError(
f"Cannot set online=False with transform_when='{transform_when}'. "
f"Online execution patterns require online=True."
)
self.online = True # Auto-infer online=True
else:
# For batch_only or None, respect the provided online setting
self.online = online

# Normalize source
self.stream_source = None
Expand Down Expand Up @@ -280,7 +295,7 @@ def __init__(
owner=owner,
source=self.batch_source,
)
self.online = online
# Note: self.online is now set by auto-inference logic above
self.offline = offline
self.mode = mode
self.materialization_intervals = []
Expand Down
92 changes: 4 additions & 88 deletions sdk/python/feast/transformation/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
TRANSFORMATION_CLASS_FOR_TYPE,
get_transformation_class_from_type,
)
from feast.transformation.mode import TransformationMode, TransformationTiming
from feast.transformation.mode import TransformationMode, TransformExecutionPattern

# Online compatibility constants
ONLINE_COMPATIBLE_MODES = {"python", "pandas"}
Expand Down Expand Up @@ -143,13 +143,6 @@ def infer_features(self, *args, **kwargs) -> Any:

def transformation(
mode: Union[TransformationMode, str], # Support both enum and string
when: Optional[str] = None,
online: Optional[bool] = None,
sources: Optional[
List[Union["FeatureView", "FeatureViewProjection", "RequestSource"]]
] = None,
schema: Optional[List[Field]] = None,
entities: Optional[List[Entity]] = None,
name: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
description: Optional[str] = "",
Expand All @@ -173,24 +166,6 @@ def decorator(user_function):
valid_modes = [m.value for m in TransformationMode]
raise ValueError(f"Invalid mode '{mode}'. Valid options: {valid_modes}")

# Validate timing if provided
if when is not None:
try:
TransformationTiming(when.lower()) # Validate timing string
except ValueError:
valid_timings = [t.value for t in TransformationTiming]
raise ValueError(
f"Invalid timing '{when}'. Valid options: {valid_timings}"
)

# Validate online compatibility
if online and not is_online_compatible(mode_str):
compatible_modes = list(ONLINE_COMPATIBLE_MODES)
raise ValueError(
f"Mode '{mode_str}' cannot run online in Feature Server. "
f"Use {compatible_modes} for online transformations."
)

# Create transformation object
udf_string = dill.source.getsource(user_function)
mainify(user_function)
Expand All @@ -204,67 +179,8 @@ def decorator(user_function):
udf_string=udf_string,
)

# If FeatureView parameters are provided, create and return FeatureView
if any(
param is not None for param in [when, online, sources, schema, entities]
):
# Import FeatureView here to avoid circular imports
from feast.feature_view import FeatureView

# Validate required parameters when creating FeatureView
if when is None:
raise ValueError(
"'when' parameter is required when creating FeatureView"
)
if online is None:
raise ValueError(
"'online' parameter is required when creating FeatureView"
)
if sources is None:
raise ValueError(
"'sources' parameter is required when creating FeatureView"
)
if schema is None:
raise ValueError(
"'schema' parameter is required when creating FeatureView"
)

# Handle source parameter correctly for FeatureView constructor
if not sources:
raise ValueError("At least one source must be provided for FeatureView")
elif len(sources) == 1:
# Single source - pass directly (works for DataSource or FeatureView)
source_param = sources[0]
else:
# Multiple sources - pass as list (must be List[FeatureView])
from feast.feature_view import FeatureView as FV

for src in sources:
if not isinstance(src, (FV, type(src).__name__ == "FeatureView")):
raise ValueError(
"Multiple sources must be FeatureViews, not DataSources"
)
source_param = sources

# Create FeatureView with transformation
fv = FeatureView(
name=name or user_function.__name__,
source=source_param,
entities=entities or [],
schema=schema,
feature_transformation=transformation_obj,
when=when,
online_enabled=online,
description=description,
tags=tags,
owner=owner,
mode=mode_str,
)
functools.update_wrapper(wrapper=fv, wrapped=user_function)
return fv
else:
# Backward compatibility: return Transformation object
functools.update_wrapper(wrapper=transformation_obj, wrapped=user_function)
return transformation_obj
# Return Transformation object with function metadata preserved
functools.update_wrapper(wrapper=transformation_obj, wrapped=user_function)
return transformation_obj

return decorator
9 changes: 4 additions & 5 deletions sdk/python/feast/transformation/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ class TransformationMode(Enum):
SUBSTRAIT = "substrait"


class TransformationTiming(Enum):
ON_READ = "on_read" # Execute during get_online_features()
ON_WRITE = "on_write" # Execute during materialization, cache results
BATCH = "batch" # Scheduled batch processing
STREAMING = "streaming" # Real-time stream processing
class TransformExecutionPattern(Enum):
BATCH_ONLY = "batch_only" # Pure batch: only in batch compute engine
BATCH_ON_READ = "batch_on_read" # Batch + feature server on read (lazy)
BATCH_ON_WRITE = "batch_on_write" # Batch + feature server on ingestion (eager)
58 changes: 31 additions & 27 deletions sdk/python/tests/unit/test_dual_registration.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Unit tests for dual registration functionality in FeatureStore.

Tests that online_enabled=True FeatureViews get automatically registered
Tests that online=True FeatureViews get automatically registered
as both batch FeatureViews and OnDemandFeatureViews for serving.
"""

Expand All @@ -20,9 +20,9 @@
class TestDualRegistration:
"""Test dual registration functionality"""

def test_online_enabled_creates_odfv(self):
"""Test that online_enabled=True creates an OnDemandFeatureView"""
# Create a FeatureView with online_enabled=True
def test_online_creates_odfv(self):
"""Test that online=True creates an OnDemandFeatureView"""
# Create a FeatureView with online=True
driver = Entity(name="driver", join_keys=["driver_id"])
mock_source = FileSource(path="test.parquet", timestamp_field="ts")

Expand All @@ -37,8 +37,8 @@ def test_online_enabled_creates_odfv(self):
entities=[driver],
schema=[Field(name="feature1", dtype=Float64)],
feature_transformation=test_transformation,
when="on_write",
online_enabled=True,
transform_when="batch_on_write",
# online=True auto-inferred from transform_when
)

# Mock registry and provider
Expand Down Expand Up @@ -96,7 +96,7 @@ def capture_feature_view(view, project, commit):
# Verify original FV
assert original_fv is not None
assert original_fv.name == "test_fv"
assert original_fv.online_enabled
assert original_fv.online
assert original_fv.feature_transformation is not None

# Verify generated ODFV
Expand All @@ -109,7 +109,7 @@ def capture_feature_view(view, project, commit):
assert generated_odfv.tags["dual_registration"] == "true"

def test_no_dual_registration_when_online_disabled(self):
"""Test that online_enabled=False does not create ODFV"""
"""Test that online=False does not create ODFV"""
driver = Entity(name="driver", join_keys=["driver_id"])
mock_source = FileSource(path="test.parquet", timestamp_field="ts")

Expand All @@ -118,7 +118,7 @@ def test_no_dual_registration_when_online_disabled(self):
source=mock_source,
entities=[driver],
schema=[Field(name="feature1", dtype=Float64)],
online_enabled=False, # Disabled
online=False, # Disabled
)

# Mock FeatureStore
Expand Down Expand Up @@ -163,7 +163,7 @@ def test_no_dual_registration_without_transformation(self):
source=mock_source,
entities=[driver],
schema=[Field(name="feature1", dtype=Float64)],
online_enabled=True, # Enabled
online=True, # Enabled
# No feature_transformation
)

Expand Down Expand Up @@ -199,31 +199,35 @@ def test_no_dual_registration_without_transformation(self):
assert isinstance(applied_views[0], FeatureView)
assert not isinstance(applied_views[0], OnDemandFeatureView)

def test_enhanced_decorator_with_dual_registration(self):
"""Test end-to-end: enhanced @transformation decorator -> dual registration"""
def test_separate_transformation_and_feature_view_with_dual_registration(self):
"""Test: create separate transformation and FeatureView -> dual registration"""
driver = Entity(name="driver", join_keys=["driver_id"])

# Create FeatureView using enhanced decorator with dummy source
# Create transformation separately
@transformation(mode="python", name="doubling_transform")
def doubling_transform_func(inputs):
return [{"doubled": inp.get("value", 0) * 2} for inp in inputs]

# Create FeatureView with transformation and dual registration settings
dummy_source = FileSource(
path="test.parquet", timestamp_field="event_timestamp"
)

@transformation(
mode="python",
when="on_write",
online=True,
sources=[dummy_source],
schema=[Field(name="doubled", dtype=Float64)],
entities=[driver],
fv = FeatureView(
name="doubling_transform",
source=dummy_source,
entities=[driver],
schema=[Field(name="doubled", dtype=Float64)],
feature_transformation=doubling_transform_func,
transform_when="batch_on_write",
# online=True auto-inferred from transform_when
)
def doubling_transform(inputs):
return [{"doubled": inp.get("value", 0) * 2} for inp in inputs]

# Verify it's a FeatureView with the right properties
assert isinstance(doubling_transform, FeatureView)
assert doubling_transform.online_enabled
assert doubling_transform.feature_transformation is not None
assert isinstance(fv, FeatureView)
assert fv.online # Auto-inferred
assert fv.transform_when == "batch_on_write"
assert fv.feature_transformation is not None

# Mock FeatureStore and apply
# Create FeatureStore instance with mocked initialization
Expand All @@ -250,7 +254,7 @@ def doubling_transform(inputs):
fs._provider.teardown_infra = Mock()

# Apply the FeatureView
fs.apply(doubling_transform)
fs.apply(fv)

# Should create both original FV and ODFV
assert len(applied_views) == 2
Expand All @@ -266,7 +270,7 @@ def doubling_transform(inputs):
test_input = [{"value": 5}]
expected_output = [{"doubled": 10}]

original_udf = doubling_transform.feature_transformation.udf
original_udf = fv.feature_transformation.udf
odfv_udf = odfv.feature_transformation.udf

assert original_udf(test_input) == expected_output
Expand Down
Loading