Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
837e34f
feat: Add unified transformation
franciscojavierarceo Nov 26, 2025
d12fbfd
feat: Unify transformations
franciscojavierarceo Nov 27, 2025
9a2df49
feat: Unify Transformations
franciscojavierarceo Nov 28, 2025
9aceb7f
feat: Unify Transformations
franciscojavierarceo Nov 28, 2025
5c8b93c
updated docs
franciscojavierarceo Dec 4, 2025
6d5ce47
refactor: separate transformation logic from execution decisions with…
franciscojavierarceo Dec 23, 2025
9380cf9
format
franciscojavierarceo Dec 23, 2025
5b759ed
incorporaitng feedback
franciscojavierarceo Dec 24, 2025
2d7a43b
updated
franciscojavierarceo Dec 25, 2025
b6299d2
fix
franciscojavierarceo Dec 26, 2025
3726733
linter
franciscojavierarceo Dec 26, 2025
f55d4b4
cleanup
franciscojavierarceo Dec 26, 2025
716e692
cleanup
franciscojavierarceo Dec 26, 2025
a4f2e0a
more fix
franciscojavierarceo Dec 29, 2025
02ae40b
more fix
franciscojavierarceo Dec 29, 2025
74de467
updated
franciscojavierarceo Dec 31, 2025
c360aef
fix
franciscojavierarceo Jan 5, 2026
f73431c
fix
franciscojavierarceo Jan 5, 2026
50e536a
fix
franciscojavierarceo Jan 5, 2026
a87c4b4
fix
franciscojavierarceo Jan 5, 2026
e2f722c
Merge branch 'master' into refactor-odfv
franciscojavierarceo Jan 5, 2026
dde05bd
lint
franciscojavierarceo Jan 5, 2026
0e1f037
fix
franciscojavierarceo Jan 6, 2026
662e21b
linter
franciscojavierarceo Jan 6, 2026
9f48c75
fix linter
franciscojavierarceo Jan 6, 2026
a058aac
fix
franciscojavierarceo Jan 6, 2026
b8771c9
fix(redis): Preserve millisecond timestamp precision for Redis online…
jatin5251 Jan 6, 2026
34d9b52
feat: Add GCS registry store in Go feature server (#5818)
samuelkim7 Jan 6, 2026
6c35d45
chore: Refactor some unit tests into integration tests (#5820)
franciscojavierarceo Jan 6, 2026
5bcd6e6
test: Remove e2e_rhoai package tests
Srihari1192 Jan 5, 2026
61812ba
fix
franciscojavierarceo Jan 7, 2026
d4adcd5
fix
franciscojavierarceo Jan 7, 2026
a007be3
fix
franciscojavierarceo Jan 8, 2026
416b15c
uploading progress
franciscojavierarceo Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Jan 5, 2026
commit c360aef3b4141d79fec3974a675874aac3c30e51
69 changes: 63 additions & 6 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,21 +1010,38 @@ def apply(
for fv in dual_registration_views:
# Create OnDemandFeatureView for online serving with same transformation
if hasattr(fv, "feature_transformation") and fv.feature_transformation:
# Create ODFV with same transformation logic
# Extract the transformation mode from the transformation
transformation_mode = fv.feature_transformation.mode
if hasattr(transformation_mode, "value"):
mode_str = transformation_mode.value
else:
mode_str = str(transformation_mode)

# Create ODFV with same transformation logic and correct mode
# Include both FeatureViews and RequestSources in sources
sources_list = list(fv.source_views or [])
if hasattr(fv, 'source_request_sources') and fv.source_request_sources:
sources_list.extend(fv.source_request_sources.values())

# Disable online serving for the original FeatureView since we're creating an ODFV for online serving
fv.online = False

online_fv = OnDemandFeatureView(
name=f"{fv.name}_online",
sources=cast(
List[Union[FeatureView, FeatureViewProjection, RequestSource]],
fv.source_views or [],
sources_list,
),
schema=fv.schema or [],
feature_transformation=fv.feature_transformation, # Same transformation!
mode=mode_str, # Pass the correct transformation mode!
description=f"Online serving for {fv.name}",
tags=dict(
fv.tags or {},
**{"generated_from": fv.name, "dual_registration": "true"},
),
owner=fv.owner,
write_to_online_store=False, # Always transform on-demand for unified FeatureViews
)

# Add to ODFVs to be registered
Expand Down Expand Up @@ -1250,7 +1267,7 @@ def get_historical_features(
# TODO(achal): _group_feature_refs returns the on demand feature views, but it's not passed into the provider.
# This is a weird interface quirk - we should revisit the `get_historical_features` to
# pass in the on demand feature views as well.
fvs, odfvs = utils._group_feature_refs(
fvs, odfvs, _ = utils._group_feature_refs(
_feature_refs,
all_feature_views,
all_on_demand_feature_views,
Expand All @@ -1274,19 +1291,59 @@ def get_historical_features(
utils._validate_feature_refs(_feature_refs, full_feature_names)
provider = self._get_provider()

# Handle FeatureViews with feature_transformation for historical retrieval
# These are supported by extracting their source views and applying transformations later
regular_feature_views = []
unified_transformation_views = []
source_feature_views = []

# Separate FeatureViews with transformations from regular ones
for (fv, features_list) in fvs:
if hasattr(fv, 'feature_transformation') and fv.feature_transformation is not None:
# FeatureView with transformation - collect for post-processing
unified_transformation_views.append((fv, features_list))

# Extract source FeatureViews from the transformation view
if hasattr(fv, 'source') and fv.source:
# Handle both single source and list of sources
sources = fv.source if isinstance(fv.source, list) else [fv.source]
for src in sources:
# Only add if it's actually a FeatureView, not a DataSource
if isinstance(src, FeatureView) and src not in source_feature_views:
source_feature_views.append(src)
else:
regular_feature_views.append(fv)

# Combine regular feature views with source feature views needed for transformations
# Do NOT include unified transformation views in the provider call as they would cause
# column selection errors - transformations will be applied post-retrieval
feature_views = regular_feature_views + source_feature_views

# Filter feature_refs to only include those that refer to feature_views being passed to provider
# Unified transformation feature refs will be handled post-retrieval
provider_feature_refs = []
for ref in _feature_refs:
fv_name = ref.split(":")[0] if ":" in ref else ref
for fv in feature_views:
if fv.name == fv_name:
provider_feature_refs.append(ref)
break

# Optional kwargs
kwargs: Dict[str, Any] = {}
if start_date is not None:
kwargs["start_date"] = start_date
if end_date is not None:
kwargs["end_date"] = end_date
# Note: Transformation execution is now handled automatically by providers
# based on feature view configurations and request patterns

# Pass unified feature views for transformation handling
unified_fvs = [fv for fv, _ in unified_transformation_views]
kwargs["unified_feature_views"] = unified_fvs

job = provider.get_historical_features(
self.config,
feature_views,
_feature_refs,
provider_feature_refs,
entity_df,
self._registry,
self.project,
Expand Down
48 changes: 42 additions & 6 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from feast import utils
from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource, KafkaSource, KinesisSource, PushSource
from feast.data_source import DataSource, KafkaSource, KinesisSource, PushSource, RequestSource
from feast.entity import Entity
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field
Expand Down Expand Up @@ -175,18 +175,26 @@ def __init__(
self.stream_source = None
self.data_source: Optional[DataSource] = None
self.source_views: List[FeatureView] = []
self.source_request_sources: Dict[str, RequestSource] = {}

if isinstance(source, DataSource):
self.data_source = source
elif isinstance(source, FeatureView):
self.source_views = [source]
elif isinstance(source, list) and all(
isinstance(sv, FeatureView) for sv in source
):
self.source_views = source
elif isinstance(source, list):
# Handle mixed list of FeatureViews and RequestSources
for sv in source:
if isinstance(sv, FeatureView):
self.source_views.append(sv)
elif isinstance(sv, RequestSource):
self.source_request_sources[sv.name] = sv
else:
raise TypeError(
f"List source items must be FeatureView or RequestSource, got {type(sv)}"
)
else:
raise TypeError(
"source must be a DataSource, a FeatureView, or a list of FeatureView."
"source must be a DataSource, a FeatureView, or a list containing FeatureViews and RequestSources."
)

# Set up stream, batch and derived view sources
Expand Down Expand Up @@ -692,3 +700,31 @@ def most_recent_end_time(self) -> Optional[datetime]:
if len(self.materialization_intervals) == 0:
return None
return max([interval[1] for interval in self.materialization_intervals])

@staticmethod
def get_requested_unified_fvs(feature_refs, project, registry) -> List["FeatureView"]:
"""
Extract FeatureViews with transformations that are requested in feature_refs.

Args:
feature_refs: List of feature references (e.g., ["fv_name:feature_name"])
project: Project name
registry: Registry instance

Returns:
List of FeatureViews with transformations that match the feature_refs
"""
all_feature_views = registry.list_feature_views(
project, allow_cache=True
)
requested_unified_fvs: List[FeatureView] = []

for fv in all_feature_views:
# Only include FeatureViews with transformations
if hasattr(fv, 'feature_transformation') and fv.feature_transformation is not None:
for feature in fv.features:
if f"{fv.name}:{feature.name}" in feature_refs:
requested_unified_fvs.append(fv)
break # Only add once per feature view

return requested_unified_fvs
7 changes: 7 additions & 0 deletions sdk/python/feast/infra/offline_stores/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
full_feature_names: bool,
repo_path: str,
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
unified_feature_views: Optional[List[FeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
):
"""Initialize a lazy historical retrieval job"""
Expand All @@ -67,6 +68,7 @@ def __init__(
self.evaluation_function = evaluation_function
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views or []
self._unified_feature_views = unified_feature_views or []
self._metadata = metadata
self.repo_path = repo_path

Expand All @@ -78,6 +80,10 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

@property
def unified_feature_views(self) -> List[FeatureView]:
return self._unified_feature_views

def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function().compute()
Expand Down Expand Up @@ -296,6 +302,7 @@ def evaluate_historical_retrieval():
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
feature_refs, project, registry
),
unified_feature_views=kwargs.get("unified_feature_views", []),
metadata=RetrievalMetadata(
features=feature_refs,
keys=list(set(entity_df.columns) - {entity_df_event_timestamp_col}),
Expand Down
12 changes: 12 additions & 0 deletions sdk/python/feast/infra/offline_stores/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,17 @@ def read_fv(

odfvs = OnDemandFeatureView.get_requested_odfvs(feature_refs, project, registry)

# Extract unified FeatureViews with transformations
unified_fvs = FeatureView.get_requested_unified_fvs(feature_refs, project, registry)

substrait_odfvs = [fv for fv in odfvs if fv.mode == "substrait"]
for odfv in substrait_odfvs:
res = odfv.transform_ibis(res, full_feature_names)

return IbisRetrievalJob(
res,
[fv for fv in odfvs if fv.mode != "substrait"],
unified_fvs,
full_feature_names,
metadata=RetrievalMetadata(
features=feature_refs,
Expand Down Expand Up @@ -481,6 +485,7 @@ def __init__(
self,
table,
on_demand_feature_views,
unified_feature_views,
full_feature_names,
metadata,
data_source_writer,
Expand All @@ -493,6 +498,9 @@ def __init__(
self._on_demand_feature_views: List[OnDemandFeatureView] = (
on_demand_feature_views
)
self._unified_feature_views: List[FeatureView] = (
unified_feature_views
)
self._full_feature_names = full_feature_names
self._metadata = metadata
self.data_source_writer = data_source_writer
Expand All @@ -514,6 +522,10 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

@property
def unified_feature_views(self) -> List[FeatureView]:
return self._unified_feature_views

def persist(
self,
storage: SavedDatasetStorage,
Expand Down
22 changes: 22 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,22 @@ def to_arrow(
col, transformed_arrow[col]
)

# Handle unified FeatureViews with transformations
if self.unified_feature_views:
for unified_fv in self.unified_feature_views:
if hasattr(unified_fv, 'feature_transformation') and unified_fv.feature_transformation is not None:
# Apply the transformation using the transform_arrow method
transformed_arrow = unified_fv.feature_transformation.transform_arrow(
features_table, unified_fv.features
)

for col in transformed_arrow.column_names:
if col.startswith("__index"):
continue
features_table = features_table.append_column(
col, transformed_arrow[col]
)

if validation_reference:
if not flags_helper.is_test():
warnings.warn(
Expand Down Expand Up @@ -255,6 +271,12 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
"""Returns a list containing all the on demand feature views to be handled."""
raise NotImplementedError

@property
def unified_feature_views(self) -> List["FeatureView"]:
"""Returns a list containing all the unified feature views with transformations to be handled."""
# Default implementation returns empty list for backwards compatibility
return []

def persist(
self,
storage: SavedDatasetStorage,
Expand Down
Loading