-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Compute Engine Initial Implementation #5223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
1183a9a
add compute engine
HaoXuAI 2825ee4
fix linting
HaoXuAI 4210f68
fix linting
HaoXuAI a398075
fix linting
HaoXuAI 68c48dc
fix linting
HaoXuAI 1c9ae31
add doc
HaoXuAI 25af94e
add test
HaoXuAI ed0cdf4
add integration test
HaoXuAI 6b57e94
update API
HaoXuAI 227f8f4
update API
HaoXuAI e9362de
update API
HaoXuAI 68cf242
update API
HaoXuAI 3a5cf92
update API
HaoXuAI c1ba3d6
fix linting
HaoXuAI 95f757d
update doc
HaoXuAI e5445a2
update doc
HaoXuAI 263024b
Merge branch 'master' into compute-engine
HaoXuAI 2433064
update test
HaoXuAI 87e51c7
update doc
HaoXuAI c698b64
Merge branch 'master' into compute-engine
HaoXuAI 0877c03
Merge branch 'master' into compute-engine
HaoXuAI File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix linting
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
- Loading branch information
commit 2825ee4592e392a019dcffae045084a4b01f3490
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,60 +1,57 @@ | ||
| from abc import ABC, abstractmethod | ||
| from typing import Union | ||
|
|
||
| from feast import BatchFeatureView, StreamFeatureView | ||
| from feast.infra.compute_engines.dag.plan import ExecutionPlan | ||
| from feast import BatchFeatureView, StreamFeatureView, FeatureView | ||
| from feast.infra.compute_engines.base import HistoricalRetrievalTask | ||
| from feast.infra.compute_engines.dag.plan import ExecutionPlan | ||
| from feast.infra.materialization.batch_materialization_engine import MaterializationTask | ||
| from feast.infra.compute_engines.dag.node import DAGNode | ||
|
|
||
|
|
||
| class DAGBuilder(ABC): | ||
| def __init__(self, | ||
| feature_view: Union[BatchFeatureView, StreamFeatureView], | ||
| task: Union[MaterializationTask, HistoricalRetrievalTask] | ||
| ): | ||
| def __init__( | ||
| self, | ||
| feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView], | ||
| task: Union[MaterializationTask, HistoricalRetrievalTask], | ||
| ): | ||
| self.feature_view = feature_view | ||
| self.task = task | ||
| self.nodes = [] | ||
| self.nodes: list[DAGNode] = [] | ||
|
|
||
| @abstractmethod | ||
| def build_source_node(self): | ||
| raise NotImplementedError | ||
|
|
||
| @abstractmethod | ||
| def build_aggregation_node(self, | ||
| input_node): | ||
| def build_aggregation_node(self, input_node): | ||
| raise NotImplementedError | ||
|
|
||
| @abstractmethod | ||
| def build_join_node(self, | ||
| input_node): | ||
| def build_join_node(self, input_node): | ||
| raise NotImplementedError | ||
|
|
||
| @abstractmethod | ||
| def build_transformation_node(self, | ||
| input_node): | ||
| def build_transformation_node(self, input_node): | ||
| raise NotImplementedError | ||
|
|
||
| @abstractmethod | ||
| def build_output_nodes(self, | ||
| input_node): | ||
| def build_output_nodes(self, input_node): | ||
| raise NotImplementedError | ||
|
|
||
| @abstractmethod | ||
| def build_validation_node(self, | ||
| input_node): | ||
| def build_validation_node(self, input_node): | ||
| raise | ||
|
|
||
| def build(self) -> ExecutionPlan: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not: build_dag(self)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest to keep build(), more consistent with the |
||
| last_node = self.build_source_node() | ||
|
|
||
| if getattr(self.feature_view.transformation, "requires_aggregation", False): | ||
| if hasattr(self.feature_view, "aggregation") and self.feature_view.aggregation is not None: | ||
| last_node = self.build_aggregation_node(last_node) | ||
|
|
||
| if self._should_join(): | ||
| last_node = self.build_join_node(last_node) | ||
|
|
||
| if self.feature_view.transformation: | ||
| if hasattr(self.feature_view, "feature_transformation") and self.feature_view.feature_transformation: | ||
| last_node = self.build_transformation_node(last_node) | ||
|
|
||
| if getattr(self.feature_view, "enable_validation", False): | ||
|
|
@@ -65,6 +62,6 @@ def build(self) -> ExecutionPlan: | |
|
|
||
| def _should_join(self): | ||
| return ( | ||
| self.feature_view.compute_config.join_strategy == "engine" | ||
| or self.task.config.compute_engine.get("point_in_time_join") == "engine" | ||
| self.feature_view.compute_config.join_strategy == "engine" | ||
| or self.task.config.compute_engine.get("point_in_time_join") == "engine" | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,29 +1,26 @@ | ||
| from abc import ABC, abstractmethod | ||
| from typing import List | ||
|
|
||
| from feast.infra.compute_engines.dag.model import ExecutionContext | ||
| from infra.compute_engines.dag.value import DAGValue | ||
|
|
||
| from feast.infra.compute_engines.dag.model import ExecutionContext | ||
|
|
||
|
|
||
| class DAGNode(ABC): | ||
| name: str | ||
| inputs: List["DAGNode"] | ||
| outputs: List["DAGNode"] | ||
|
|
||
| def __init__(self, | ||
| name: str): | ||
| def __init__(self, name: str): | ||
| self.name = name | ||
| self.inputs = [] | ||
| self.outputs = [] | ||
|
|
||
| def add_input(self, | ||
| node: "DAGNode"): | ||
| def add_input(self, node: "DAGNode"): | ||
| if node in self.inputs: | ||
| raise ValueError(f"Input node {node.name} already added to {self.name}") | ||
| self.inputs.append(node) | ||
| node.outputs.append(self) | ||
|
|
||
| @abstractmethod | ||
| def execute(self, | ||
| context: ExecutionContext) -> DAGValue: | ||
| ... | ||
| def execute(self, context: ExecutionContext) -> DAGValue: ... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should remove the 'node' suffix from all of there, as I think it's implicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, I think it's better to keep the 'node' suffix, as it makes the output more descriptive and clearly indicates that it's a node, which can then be chained in the build method