Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
837e34f
feat: Add unified transformation
franciscojavierarceo Nov 26, 2025
d12fbfd
feat: Unify transformations
franciscojavierarceo Nov 27, 2025
9a2df49
feat: Unify Transformations
franciscojavierarceo Nov 28, 2025
9aceb7f
feat: Unify Transformations
franciscojavierarceo Nov 28, 2025
5c8b93c
updated docs
franciscojavierarceo Dec 4, 2025
6d5ce47
refactor: separate transformation logic from execution decisions with…
franciscojavierarceo Dec 23, 2025
9380cf9
format
franciscojavierarceo Dec 23, 2025
5b759ed
incorporaitng feedback
franciscojavierarceo Dec 24, 2025
2d7a43b
updated
franciscojavierarceo Dec 25, 2025
b6299d2
fix
franciscojavierarceo Dec 26, 2025
3726733
linter
franciscojavierarceo Dec 26, 2025
f55d4b4
cleanup
franciscojavierarceo Dec 26, 2025
716e692
cleanup
franciscojavierarceo Dec 26, 2025
a4f2e0a
more fix
franciscojavierarceo Dec 29, 2025
02ae40b
more fix
franciscojavierarceo Dec 29, 2025
74de467
updated
franciscojavierarceo Dec 31, 2025
c360aef
fix
franciscojavierarceo Jan 5, 2026
f73431c
fix
franciscojavierarceo Jan 5, 2026
50e536a
fix
franciscojavierarceo Jan 5, 2026
a87c4b4
fix
franciscojavierarceo Jan 5, 2026
e2f722c
Merge branch 'master' into refactor-odfv
franciscojavierarceo Jan 5, 2026
dde05bd
lint
franciscojavierarceo Jan 5, 2026
0e1f037
fix
franciscojavierarceo Jan 6, 2026
662e21b
linter
franciscojavierarceo Jan 6, 2026
9f48c75
fix linter
franciscojavierarceo Jan 6, 2026
a058aac
fix
franciscojavierarceo Jan 6, 2026
b8771c9
fix(redis): Preserve millisecond timestamp precision for Redis online…
jatin5251 Jan 6, 2026
34d9b52
feat: Add GCS registry store in Go feature server (#5818)
samuelkim7 Jan 6, 2026
6c35d45
chore: Refactor some unit tests into integration tests (#5820)
franciscojavierarceo Jan 6, 2026
5bcd6e6
test: Remove e2e_rhoai package tests
Srihari1192 Jan 5, 2026
61812ba
fix
franciscojavierarceo Jan 7, 2026
d4adcd5
fix
franciscojavierarceo Jan 7, 2026
a007be3
fix
franciscojavierarceo Jan 8, 2026
416b15c
uploading progress
franciscojavierarceo Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Unify transformations
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Jan 5, 2026
commit d12fbfd5072ce5cd63d4624152e25da02943dafc
12 changes: 8 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,21 +966,25 @@ def apply(

# Handle dual registration for online_enabled FeatureViews
online_enabled_views = [
view for view in views_to_update
if hasattr(view, 'online_enabled') and view.online_enabled
view
for view in views_to_update
if hasattr(view, "online_enabled") and view.online_enabled
]

for fv in online_enabled_views:
# Create OnDemandFeatureView for online serving with same transformation
if hasattr(fv, 'feature_transformation') and fv.feature_transformation:
if hasattr(fv, "feature_transformation") and fv.feature_transformation:
# Create ODFV with same transformation logic
online_fv = OnDemandFeatureView(
name=f"{fv.name}_online",
sources=fv.source_views or [], # Use source views for ODFV
schema=fv.schema or [],
feature_transformation=fv.feature_transformation, # Same transformation!
description=f"Online serving for {fv.name}",
tags=dict(fv.tags or {}, **{"generated_from": fv.name, "dual_registration": "true"}),
tags=dict(
fv.tags or {},
**{"generated_from": fv.name, "dual_registration": "true"},
),
owner=fv.owner,
)

Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ def __init__(
schema = schema or []
self.mode = mode
# Don't override feature_transformation if it's already set by subclass (e.g., BatchFeatureView)
if not hasattr(self, 'feature_transformation') or self.feature_transformation is None:
if (
not hasattr(self, "feature_transformation")
or self.feature_transformation is None
):
self.feature_transformation = feature_transformation
self.when = when
self.online_enabled = online_enabled
Expand Down
52 changes: 37 additions & 15 deletions sdk/python/feast/transformation/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from __future__ import annotations

import functools
from abc import ABC
from typing import Any, Callable, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

import dill

from feast.entity import Entity
from feast.field import Field

if TYPE_CHECKING:
from feast.data_source import RequestSource
from feast.feature_view import FeatureView, FeatureViewProjection
from feast.protos.feast.core.Transformation_pb2 import (
SubstraitTransformationV2 as SubstraitTransformationProto,
)
Expand All @@ -15,8 +23,6 @@
get_transformation_class_from_type,
)
from feast.transformation.mode import TransformationMode, TransformationTiming
from feast.entity import Entity
from feast.field import Field

# Online compatibility constants
ONLINE_COMPATIBLE_MODES = {"python", "pandas"}
Expand Down Expand Up @@ -139,7 +145,9 @@ def transformation(
mode: Union[TransformationMode, str], # Support both enum and string
when: Optional[str] = None,
online: Optional[bool] = None,
sources: Optional[List[Union["FeatureView", "FeatureViewProjection", "RequestSource"]]] = None,
sources: Optional[
List[Union["FeatureView", "FeatureViewProjection", "RequestSource"]]
] = None,
schema: Optional[List[Field]] = None,
entities: Optional[List[Entity]] = None,
name: Optional[str] = None,
Expand All @@ -160,19 +168,20 @@ def decorator(user_function):
else:
mode_str = mode.lower() # Normalize to lowercase
try:
mode_enum = TransformationMode(mode_str)
TransformationMode(mode_str) # Validate mode string
except ValueError:
valid_modes = [m.value for m in TransformationMode]
raise ValueError(f"Invalid mode '{mode}'. Valid options: {valid_modes}")

# Validate timing if provided
timing_enum = None
if when is not None:
try:
timing_enum = TransformationTiming(when.lower())
TransformationTiming(when.lower()) # Validate timing string
except ValueError:
valid_timings = [t.value for t in TransformationTiming]
raise ValueError(f"Invalid timing '{when}'. Valid options: {valid_timings}")
raise ValueError(
f"Invalid timing '{when}'. Valid options: {valid_timings}"
)

# Validate online compatibility
if online and not is_online_compatible(mode_str):
Expand All @@ -196,19 +205,29 @@ def decorator(user_function):
)

# If FeatureView parameters are provided, create and return FeatureView
if any(param is not None for param in [when, online, sources, schema, entities]):
if any(
param is not None for param in [when, online, sources, schema, entities]
):
# Import FeatureView here to avoid circular imports
from feast.feature_view import FeatureView

# Validate required parameters when creating FeatureView
if when is None:
raise ValueError("'when' parameter is required when creating FeatureView")
raise ValueError(
"'when' parameter is required when creating FeatureView"
)
if online is None:
raise ValueError("'online' parameter is required when creating FeatureView")
raise ValueError(
"'online' parameter is required when creating FeatureView"
)
if sources is None:
raise ValueError("'sources' parameter is required when creating FeatureView")
raise ValueError(
"'sources' parameter is required when creating FeatureView"
)
if schema is None:
raise ValueError("'schema' parameter is required when creating FeatureView")
raise ValueError(
"'schema' parameter is required when creating FeatureView"
)

# Handle source parameter correctly for FeatureView constructor
if not sources:
Expand All @@ -219,9 +238,12 @@ def decorator(user_function):
else:
# Multiple sources - pass as list (must be List[FeatureView])
from feast.feature_view import FeatureView as FV

for src in sources:
if not isinstance(src, (FV, type(src).__name__ == 'FeatureView')):
raise ValueError("Multiple sources must be FeatureViews, not DataSources")
if not isinstance(src, (FV, type(src).__name__ == "FeatureView")):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not isinstance(src, (FV, type(src).__name__ == "FeatureView")):
if not isinstance(src, FeatureView):

raise ValueError(
"Multiple sources must be FeatureViews, not DataSources"
)
source_param = sources

# Create FeatureView with transformation
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/transformation/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TransformationMode(Enum):


class TransformationTiming(Enum):
ON_READ = "on_read" # Execute during get_online_features()
ON_WRITE = "on_write" # Execute during materialization, cache results
BATCH = "batch" # Scheduled batch processing
STREAMING = "streaming" # Real-time stream processing
ON_READ = "on_read" # Execute during get_online_features()
ON_WRITE = "on_write" # Execute during materialization, cache results
BATCH = "batch" # Scheduled batch processing
STREAMING = "streaming" # Real-time stream processing
61 changes: 35 additions & 26 deletions sdk/python/tests/unit/test_dual_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
as both batch FeatureViews and OnDemandFeatureViews for serving.
"""

import pytest
from unittest.mock import Mock, patch, MagicMock
from unittest.mock import Mock, patch

from feast.entity import Entity
from feast.feature_store import FeatureStore
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.transformation.base import transformation, Transformation
from feast.transformation.mode import TransformationMode
from feast.field import Field
from feast.types import Float64, Int64
from feast.entity import Entity
from feast.infra.offline_stores.file_source import FileSource
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.transformation.base import Transformation, transformation
from feast.types import Float64


class TestDualRegistration:
Expand All @@ -29,9 +28,7 @@ def test_online_enabled_creates_odfv(self):

# Create transformation
test_transformation = Transformation(
mode="python",
udf=lambda x: x,
udf_string="lambda x: x"
mode="python", udf=lambda x: x, udf_string="lambda x: x"
)

fv = FeatureView(
Expand All @@ -41,15 +38,15 @@ def test_online_enabled_creates_odfv(self):
schema=[Field(name="feature1", dtype=Float64)],
feature_transformation=test_transformation,
when="on_write",
online_enabled=True
online_enabled=True,
)

# Mock registry and provider
mock_registry = Mock()
mock_provider = Mock()

# Create FeatureStore instance with mocked initialization
with patch.object(FeatureStore, '__init__', return_value=None):
with patch.object(FeatureStore, "__init__", return_value=None):
fs = FeatureStore()
fs._registry = mock_registry
fs._provider = mock_provider
Expand Down Expand Up @@ -89,15 +86,17 @@ def capture_feature_view(view, project, commit):
generated_odfv = None

for view in applied_views:
if isinstance(view, FeatureView) and not isinstance(view, OnDemandFeatureView):
if isinstance(view, FeatureView) and not isinstance(
view, OnDemandFeatureView
):
original_fv = view
elif isinstance(view, OnDemandFeatureView):
generated_odfv = view

# Verify original FV
assert original_fv is not None
assert original_fv.name == "test_fv"
assert original_fv.online_enabled == True
assert original_fv.online_enabled
assert original_fv.feature_transformation is not None

# Verify generated ODFV
Expand All @@ -119,12 +118,12 @@ def test_no_dual_registration_when_online_disabled(self):
source=mock_source,
entities=[driver],
schema=[Field(name="feature1", dtype=Float64)],
online_enabled=False # Disabled
online_enabled=False, # Disabled
)

# Mock FeatureStore
# Create FeatureStore instance with mocked initialization
with patch.object(FeatureStore, '__init__', return_value=None):
with patch.object(FeatureStore, "__init__", return_value=None):
fs = FeatureStore()
fs.config = Mock()
fs.config.project = "test_project"
Expand All @@ -134,7 +133,9 @@ def test_no_dual_registration_when_online_disabled(self):
fs._make_inferences = Mock()

applied_views = []
fs._registry.apply_feature_view.side_effect = lambda view, project, commit: applied_views.append(view)
fs._registry.apply_feature_view.side_effect = (
lambda view, project, commit: applied_views.append(view)
)
fs._registry.apply_entity = Mock()
fs._registry.apply_data_source = Mock()
fs._registry.apply_feature_service = Mock()
Expand Down Expand Up @@ -168,7 +169,7 @@ def test_no_dual_registration_without_transformation(self):

# Mock FeatureStore
# Create FeatureStore instance with mocked initialization
with patch.object(FeatureStore, '__init__', return_value=None):
with patch.object(FeatureStore, "__init__", return_value=None):
fs = FeatureStore()
fs.config = Mock()
fs.config.project = "test_project"
Expand All @@ -178,7 +179,9 @@ def test_no_dual_registration_without_transformation(self):
fs._make_inferences = Mock()

applied_views = []
fs._registry.apply_feature_view.side_effect = lambda view, project, commit: applied_views.append(view)
fs._registry.apply_feature_view.side_effect = (
lambda view, project, commit: applied_views.append(view)
)
fs._registry.apply_entity = Mock()
fs._registry.apply_data_source = Mock()
fs._registry.apply_feature_service = Mock()
Expand All @@ -201,7 +204,9 @@ def test_enhanced_decorator_with_dual_registration(self):
driver = Entity(name="driver", join_keys=["driver_id"])

# Create FeatureView using enhanced decorator with dummy source
dummy_source = FileSource(path="test.parquet", timestamp_field="event_timestamp")
dummy_source = FileSource(
path="test.parquet", timestamp_field="event_timestamp"
)

@transformation(
mode="python",
Expand All @@ -210,19 +215,19 @@ def test_enhanced_decorator_with_dual_registration(self):
sources=[dummy_source],
schema=[Field(name="doubled", dtype=Float64)],
entities=[driver],
name="doubling_transform"
name="doubling_transform",
)
def doubling_transform(inputs):
return [{"doubled": inp.get("value", 0) * 2} for inp in inputs]

# Verify it's a FeatureView with the right properties
assert isinstance(doubling_transform, FeatureView)
assert doubling_transform.online_enabled == True
assert doubling_transform.online_enabled
assert doubling_transform.feature_transformation is not None

# Mock FeatureStore and apply
# Create FeatureStore instance with mocked initialization
with patch.object(FeatureStore, '__init__', return_value=None):
with patch.object(FeatureStore, "__init__", return_value=None):
fs = FeatureStore()
fs.config = Mock()
fs.config.project = "test_project"
Expand All @@ -232,7 +237,9 @@ def doubling_transform(inputs):
fs._make_inferences = Mock()

applied_views = []
fs._registry.apply_feature_view.side_effect = lambda view, project, commit: applied_views.append(view)
fs._registry.apply_feature_view.side_effect = (
lambda view, project, commit: applied_views.append(view)
)
fs._registry.apply_entity = Mock()
fs._registry.apply_data_source = Mock()
fs._registry.apply_feature_service = Mock()
Expand All @@ -249,7 +256,9 @@ def doubling_transform(inputs):
assert len(applied_views) == 2

# Verify the ODFV has the same transformation
odfv = next((v for v in applied_views if isinstance(v, OnDemandFeatureView)), None)
odfv = next(
(v for v in applied_views if isinstance(v, OnDemandFeatureView)), None
)
assert odfv is not None
assert odfv.name == "doubling_transform_online"

Expand All @@ -261,4 +270,4 @@ def doubling_transform(inputs):
odfv_udf = odfv.feature_transformation.udf

assert original_udf(test_input) == expected_output
assert odfv_udf(test_input) == expected_output
assert odfv_udf(test_input) == expected_output
Loading