Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Draft: multi source support
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
  • Loading branch information
HaoXuAI committed Jun 30, 2025
commit 98b61793cc0d41e3d16ab57779deca0440346b74
6 changes: 4 additions & 2 deletions sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ def __init__(
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
)
elif source_view is None:
raise ValueError("BatchFeatureView must have either 'source' or 'source_view'.")
raise ValueError(
"BatchFeatureView must have either 'source' or 'source_view'."
)

self.mode = mode
self.udf = udf
Expand All @@ -129,7 +131,7 @@ def __init__(
owner=owner,
schema=schema,
source=source,
source_view=source_view
source_view=source_view,
)

def get_feature_transformation(self) -> Optional[Transformation]:
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ def __init__(
schema = schema or []

if (source is None) == (source_view is None):
raise ValueError("FeatureView must have exactly one of 'source' or 'source_view', not both/neither.")
raise ValueError(
"FeatureView must have exactly one of 'source' or 'source_view', not both/neither."
)

# Initialize data sources.
if (
Expand Down Expand Up @@ -427,7 +429,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
else None
)
source_view = (
FeatureView.from_proto(FeatureViewProto(spec=feature_view_proto.spec.source_view, meta=None))
FeatureView.from_proto(
FeatureViewProto(spec=feature_view_proto.spec.source_view, meta=None)
)
if feature_view_proto.spec.HasField("source_view")
else None
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# feast/infra/compute_engines/dag/utils.py

from typing import List, Set

from feast.infra.compute_engines.dag.node import DAGNode


Expand Down
6 changes: 2 additions & 4 deletions sdk/python/feast/infra/compute_engines/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence, Union
from typing import List, Sequence, Union

import pyarrow as pa

Expand All @@ -12,13 +12,12 @@
MaterializationTask,
)
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext
from feast.infra.compute_engines.dag.context import ExecutionContext
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names


class ComputeEngine(ABC):
Expand Down Expand Up @@ -132,4 +131,3 @@ def get_execution_context(
entity_defs=entity_defs,
entity_df=entity_df,
)

68 changes: 26 additions & 42 deletions sdk/python/feast/infra/compute_engines/feature_builder.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from abc import ABC, abstractmethod
from typing import Union, List, Optional
from typing import List, Optional, Union

from feast import BatchFeatureView, StreamFeatureView, FeatureView
from feast import BatchFeatureView, FeatureView, StreamFeatureView
from feast.infra.common.materialization_job import MaterializationTask
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.compute_engines.dag.context import ColumnInfo
from feast.infra.compute_engines.dag.node import DAGNode
from feast.infra.compute_engines.dag.plan import ExecutionPlan
from feast.infra.compute_engines.feature_resolver import FeatureResolver, FeatureViewNode
from feast.infra.compute_engines.dag.context import ColumnInfo
from feast.infra.compute_engines.feature_resolver import (
FeatureResolver,
FeatureViewNode,
)
from feast.infra.registry.base_registry import BaseRegistry
from feast.utils import _get_column_names

Expand All @@ -19,10 +22,10 @@ class FeatureBuilder(ABC):
"""

def __init__(
self,
registry: BaseRegistry,
feature_view,
task: Union[MaterializationTask, HistoricalRetrievalTask],
self,
registry: BaseRegistry,
feature_view,
task: Union[MaterializationTask, HistoricalRetrievalTask],
):
self.registry = registry
self.task = task
Expand All @@ -36,64 +39,46 @@ def build_source_node(self, view):
raise NotImplementedError

@abstractmethod
def build_aggregation_node(self,
view,
input_node):
def build_aggregation_node(self, view, input_node):
raise NotImplementedError

@abstractmethod
def build_join_node(self,
view,
input_node):
def build_join_node(self, view, input_node):
raise NotImplementedError

@abstractmethod
def build_filter_node(self,
view,
input_node):
def build_filter_node(self, view, input_node):
raise NotImplementedError

@abstractmethod
def build_dedup_node(self,
view,
input_node):
def build_dedup_node(self, view, input_node):
raise NotImplementedError

@abstractmethod
def build_transformation_node(self,
view,
input_node):
def build_transformation_node(self, view, input_node):
raise NotImplementedError

@abstractmethod
def build_output_nodes(self,
final_node):
def build_output_nodes(self, final_node):
raise NotImplementedError

@abstractmethod
def build_validation_node(self,
view,
input_node):
def build_validation_node(self, view, input_node):
raise NotImplementedError

def _should_aggregate(self,
view):
def _should_aggregate(self, view):
return bool(getattr(view, "aggregations", []))

def _should_transform(self,
view):
def _should_transform(self, view):
return bool(getattr(view, "feature_transformation", None))

def _should_validate(self,
view):
def _should_validate(self, view):
return getattr(view, "enable_validation", False)

def _should_dedupe(self,
view):
def _should_dedupe(self, view):
return isinstance(self.task, HistoricalRetrievalTask) or self.task.only_latest

def _build(self,
current_node: FeatureViewNode) -> DAGNode:
def _build(self, current_node: FeatureViewNode) -> DAGNode:
current_view = current_node.view

# Step 1: build source or parent join
Expand Down Expand Up @@ -128,8 +113,8 @@ def build(self) -> ExecutionPlan:
return ExecutionPlan(self.nodes)

def get_column_info(
self,
view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
self,
view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
) -> ColumnInfo:
entities = []
for entity_name in view.entities:
Expand Down Expand Up @@ -157,8 +142,7 @@ def get_column_info(
)

def get_field_mapping(
self,
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
self, feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
) -> Optional[dict]:
"""
Get the field mapping for a feature view.
Expand Down
19 changes: 8 additions & 11 deletions sdk/python/feast/infra/compute_engines/feature_resolver.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from feast.feature_view import FeatureView
from typing import List, Optional, Set

from typing import Set, List, Optional
from feast.feature_view import FeatureView


class FeatureViewNode:
"""
Logical representation of a node in the FeatureView dependency DAG.
"""
def __init__(self,
view: FeatureView):

def __init__(self, view: FeatureView):
self.view: FeatureView = view
self.parent: Optional["FeatureViewNode"] = None

Expand All @@ -19,13 +19,13 @@ class FeatureResolver:
This graph represents the logical dependencies between FeatureViews, allowing
for ordered execution and cycle detection.
"""

def __init__(self):
# Used to detect and prevent cycles in the FeatureView graph.
self.visited: Set[str] = set()
self.resolution_path: List[str] = []

def resolve(self,
feature_view: FeatureView) -> FeatureViewNode:
def resolve(self, feature_view: FeatureView) -> FeatureViewNode:
"""
Entry point for resolving a FeatureView into a DAG node.

Expand All @@ -39,8 +39,7 @@ def resolve(self,
self._walk(root)
return root

def _walk(self,
node: FeatureViewNode):
def _walk(self, node: FeatureViewNode):
"""
Recursive traversal of the FeatureView graph.

Expand Down Expand Up @@ -80,9 +79,7 @@ def dfs(node: FeatureViewNode):
dfs(root)
return ordered

def debug_dag(self,
node: FeatureViewNode,
depth=0):
def debug_dag(self, node: FeatureViewNode, depth=0):
"""
Prints the FeatureView dependency DAG for debugging.

Expand Down
21 changes: 15 additions & 6 deletions sdk/python/feast/infra/compute_engines/local/feature_builder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Union

from feast.data_source import DataSource
from feast.infra.common.materialization_job import MaterializationTask
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.compute_engines.feature_builder import FeatureBuilder
Expand Down Expand Up @@ -47,15 +46,19 @@ def build_filter_node(self, view, input_node):
filter_expr = getattr(view, "filter", None)
ttl = getattr(view, "ttl", None)
column_info = self.get_column_info(view)
node = LocalFilterNode("filter", column_info, self.backend, filter_expr, ttl, inputs=[input_node])
node = LocalFilterNode(
"filter", column_info, self.backend, filter_expr, ttl, inputs=[input_node]
)
self.nodes.append(node)
return node

def build_aggregation_node(self, view, input_node):
agg_specs = view.aggregations
agg_ops = self._get_aggregate_operations(agg_specs)
group_by_keys = view.entities
node = LocalAggregationNode("agg", self.backend, group_by_keys, agg_ops, inputs=[input_node])
node = LocalAggregationNode(
"agg", self.backend, group_by_keys, agg_ops, inputs=[input_node]
)
self.nodes.append(node)
return node

Expand All @@ -67,13 +70,17 @@ def build_dedup_node(self, view, input_node):

def build_transformation_node(self, view, input_node):
transform_config = view.feature_transformation
node = LocalTransformationNode("transform", transform_config, self.backend, inputs=[input_node])
node = LocalTransformationNode(
"transform", transform_config, self.backend, inputs=[input_node]
)
self.nodes.append(node)
return node

def build_validation_node(self, view, input_node):
validation_config = view.validation_config
node = LocalValidationNode("validate", validation_config, self.backend, inputs=[input_node])
node = LocalValidationNode(
"validate", validation_config, self.backend, inputs=[input_node]
)
self.nodes.append(node)
return node

Expand All @@ -86,7 +93,9 @@ def _get_aggregate_operations(agg_specs):
agg_ops = {}
for agg in agg_specs:
if agg.time_window is not None:
raise ValueError("Time window aggregation is not supported in the local compute engine.")
raise ValueError(
"Time window aggregation is not supported in the local compute engine."
)
alias = f"{agg.function}_{agg.column}"
agg_ops[alias] = (agg.function, agg.column)
return agg_ops
Loading
Loading