Skip to content

Commit 2ce4198

Browse files
authored
feat: Feast dataframe phase1 (#5611)
* Add FeastDataFrame Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * linting Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * linting Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * linting Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * update init Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * linting Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * fix testing Signed-off-by: HaoXuAI <sduxuhao@gmail.com> --------- Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
1 parent 8f0e22d commit 2ce4198

File tree

4 files changed

+241
-3
lines changed

4 files changed

+241
-3
lines changed

sdk/python/feast/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from .batch_feature_view import BatchFeatureView
1313
from .data_source import KafkaSource, KinesisSource, PushSource, RequestSource
14+
from .dataframe import DataFrameEngine, FeastDataFrame
1415
from .entity import Entity
1516
from .feature import Feature
1617
from .feature_service import FeatureService
@@ -32,9 +33,11 @@
3233

3334
__all__ = [
3435
"BatchFeatureView",
36+
"DataFrameEngine",
3537
"Entity",
3638
"KafkaSource",
3739
"KinesisSource",
40+
"FeastDataFrame",
3841
"Feature",
3942
"Field",
4043
"FeatureService",

sdk/python/feast/dataframe.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""FeastDataFrame: A lightweight container for DataFrame-like objects in Feast."""
2+
3+
from enum import Enum
4+
from typing import Any, Dict, Optional
5+
6+
import pandas as pd
7+
import pyarrow as pa
8+
9+
10+
class DataFrameEngine(str, Enum):
11+
"""Supported DataFrame engines."""
12+
13+
PANDAS = "pandas"
14+
SPARK = "spark"
15+
DASK = "dask"
16+
RAY = "ray"
17+
ARROW = "arrow"
18+
POLARS = "polars"
19+
UNKNOWN = "unknown"
20+
21+
22+
class FeastDataFrame:
23+
"""
24+
A lightweight container for DataFrame-like objects in Feast.
25+
26+
This class wraps any DataFrame implementation and provides metadata
27+
about the engine type for proper routing in Feast's processing pipeline.
28+
"""
29+
30+
def __init__(
31+
self,
32+
data: Any,
33+
engine: Optional[DataFrameEngine] = None,
34+
metadata: Optional[Dict[str, Any]] = None,
35+
):
36+
"""
37+
Initialize a FeastDataFrame.
38+
39+
Args:
40+
data: The wrapped DataFrame object (pandas, Spark, Dask, etc.)
41+
engine: Explicitly specify the engine type (auto-detected if None)
42+
metadata: Additional metadata (schema hints, etc.)
43+
"""
44+
self.data = data
45+
self.metadata = metadata or {}
46+
47+
# Detect the actual engine from the data
48+
detected_engine = self._detect_engine()
49+
50+
if engine is not None:
51+
# Validate that the provided engine matches the detected engine
52+
if engine != detected_engine:
53+
raise ValueError(
54+
f"Provided engine '{engine.value}' does not match detected engine '{detected_engine.value}' "
55+
f"for data type {type(data).__name__}"
56+
)
57+
self._engine = engine
58+
else:
59+
self._engine = detected_engine
60+
61+
def _detect_engine(self) -> DataFrameEngine:
62+
"""Auto-detect the DataFrame engine based on type."""
63+
if isinstance(self.data, pd.DataFrame):
64+
return DataFrameEngine.PANDAS
65+
elif isinstance(self.data, pa.Table):
66+
return DataFrameEngine.ARROW
67+
68+
# For optional dependencies, check module name to avoid import errors
69+
module = type(self.data).__module__
70+
if "pyspark" in module:
71+
return DataFrameEngine.SPARK
72+
elif "dask" in module:
73+
return DataFrameEngine.DASK
74+
elif "ray" in module:
75+
return DataFrameEngine.RAY
76+
elif "polars" in module:
77+
return DataFrameEngine.POLARS
78+
else:
79+
return DataFrameEngine.UNKNOWN
80+
81+
@property
82+
def engine(self) -> DataFrameEngine:
83+
"""Get the detected or specified engine type."""
84+
return self._engine
85+
86+
def __repr__(self):
87+
return f"FeastDataFrame(engine={self.engine.value}, type={type(self.data).__name__})"
88+
89+
@property
90+
def is_lazy(self) -> bool:
91+
"""Check if the underlying DataFrame is lazy (Spark, Dask, Ray)."""
92+
return self.engine in [
93+
DataFrameEngine.SPARK,
94+
DataFrameEngine.DASK,
95+
DataFrameEngine.RAY,
96+
]

sdk/python/feast/online_response.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import TYPE_CHECKING, Any, Dict, List, Union
15+
from typing import TYPE_CHECKING, Any, Dict, List, TypeAlias, Union
1616

1717
import pandas as pd
1818
import pyarrow as pa
@@ -25,9 +25,9 @@
2525
if TYPE_CHECKING:
2626
import torch
2727

28-
TorchTensor = torch.Tensor
28+
TorchTensor: TypeAlias = torch.Tensor
2929
else:
30-
TorchTensor = Any
30+
TorchTensor: TypeAlias = Any
3131

3232
TIMESTAMP_POSTFIX: str = "__ts"
3333

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"""Unit tests for FeastDataFrame."""
2+
3+
import pandas as pd
4+
import pyarrow as pa
5+
import pytest
6+
7+
from feast.dataframe import DataFrameEngine, FeastDataFrame
8+
9+
10+
class TestFeastDataFrame:
11+
"""Test suite for FeastDataFrame functionality."""
12+
13+
def test_pandas_detection(self):
14+
"""Test auto-detection of pandas DataFrame."""
15+
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
16+
feast_df = FeastDataFrame(df)
17+
18+
assert feast_df.engine == DataFrameEngine.PANDAS
19+
assert not feast_df.is_lazy
20+
assert isinstance(feast_df.data, pd.DataFrame)
21+
22+
def test_arrow_detection(self):
23+
"""Test auto-detection of Arrow Table."""
24+
table = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]})
25+
feast_df = FeastDataFrame(table)
26+
27+
assert feast_df.engine == DataFrameEngine.ARROW
28+
assert not feast_df.is_lazy
29+
assert isinstance(feast_df.data, pa.Table)
30+
31+
def test_explicit_engine(self):
32+
"""Test explicit engine specification with unknown data."""
33+
data = {"mock": "data"}
34+
feast_df = FeastDataFrame(data, engine=DataFrameEngine.UNKNOWN)
35+
36+
assert feast_df.engine == DataFrameEngine.UNKNOWN
37+
assert not feast_df.is_lazy
38+
39+
def test_unknown_engine(self):
40+
"""Test handling of unknown DataFrame types."""
41+
data = {"some": "dict"}
42+
feast_df = FeastDataFrame(data)
43+
44+
assert feast_df.engine == DataFrameEngine.UNKNOWN
45+
46+
def test_metadata(self):
47+
"""Test metadata handling."""
48+
df = pd.DataFrame({"a": [1, 2, 3]})
49+
metadata = {"features": ["a"], "source": "test"}
50+
feast_df = FeastDataFrame(df, metadata=metadata)
51+
52+
assert feast_df.metadata == metadata
53+
assert feast_df.metadata["features"] == ["a"]
54+
55+
def test_repr(self):
56+
"""Test string representation."""
57+
df = pd.DataFrame({"a": [1, 2, 3]})
58+
feast_df = FeastDataFrame(df)
59+
60+
repr_str = repr(feast_df)
61+
assert "FeastDataFrame" in repr_str
62+
assert "engine=pandas" in repr_str
63+
assert "DataFrame" in repr_str
64+
65+
def test_is_lazy_property(self):
66+
"""Test is_lazy property for different engines."""
67+
# Test with pandas DataFrame (not lazy)
68+
df = pd.DataFrame({"a": [1, 2, 3]})
69+
feast_df = FeastDataFrame(df)
70+
assert not feast_df.is_lazy
71+
72+
# Test with Arrow table (not lazy)
73+
table = pa.table({"a": [1, 2, 3]})
74+
feast_df = FeastDataFrame(table)
75+
assert not feast_df.is_lazy
76+
77+
# Test with unknown data type (not lazy)
78+
unknown_data = {"mock": "data"}
79+
feast_df = FeastDataFrame(unknown_data)
80+
assert not feast_df.is_lazy
81+
82+
# Test explicit lazy engines (using unknown data to avoid type validation)
83+
for lazy_engine in [
84+
DataFrameEngine.SPARK,
85+
DataFrameEngine.DASK,
86+
DataFrameEngine.RAY,
87+
]:
88+
feast_df = FeastDataFrame(unknown_data, engine=DataFrameEngine.UNKNOWN)
89+
feast_df._engine = lazy_engine # Override for testing
90+
assert feast_df.is_lazy
91+
92+
def test_polars_detection(self):
93+
"""Test detection of polars DataFrame (using mock)."""
94+
95+
# Mock polars DataFrame
96+
class MockPolarsDF:
97+
__module__ = "polars.dataframe.frame"
98+
99+
def __init__(self):
100+
pass
101+
102+
polars_df = MockPolarsDF()
103+
feast_df = FeastDataFrame(polars_df)
104+
105+
assert feast_df.engine == DataFrameEngine.POLARS
106+
assert not feast_df.is_lazy
107+
108+
def test_engine_validation_valid(self):
109+
"""Test that providing a correct engine passes validation."""
110+
df = pd.DataFrame({"a": [1, 2, 3]})
111+
feast_df = FeastDataFrame(df, engine=DataFrameEngine.PANDAS)
112+
113+
assert feast_df.engine == DataFrameEngine.PANDAS
114+
assert isinstance(feast_df.data, pd.DataFrame)
115+
116+
def test_engine_validation_invalid(self):
117+
"""Test that providing an incorrect engine raises ValueError."""
118+
df = pd.DataFrame({"a": [1, 2, 3]})
119+
120+
with pytest.raises(
121+
ValueError,
122+
match="Provided engine 'spark' does not match detected engine 'pandas'",
123+
):
124+
FeastDataFrame(df, engine=DataFrameEngine.SPARK)
125+
126+
def test_engine_validation_arrow(self):
127+
"""Test engine validation with Arrow table."""
128+
table = pa.table({"a": [1, 2, 3]})
129+
130+
# Valid case
131+
feast_df = FeastDataFrame(table, engine=DataFrameEngine.ARROW)
132+
assert feast_df.engine == DataFrameEngine.ARROW
133+
134+
# Invalid case
135+
with pytest.raises(
136+
ValueError,
137+
match="Provided engine 'pandas' does not match detected engine 'arrow'",
138+
):
139+
FeastDataFrame(table, engine=DataFrameEngine.PANDAS)

0 commit comments

Comments
 (0)