Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix source type
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
  • Loading branch information
HaoXuAI committed Jul 14, 2025
commit fd695e82d796d56750682ca40565f6960c9f0912
25 changes: 25 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource",
}

_DATA_SOURCE_FOR_OFFLINE_STORE = {
DataSourceProto.SourceType.BATCH_FILE: "feast.infra.offline_stores.dask.DaskOfflineStore",
DataSourceProto.SourceType.BATCH_BIGQUERY: "feast.infra.offline_stores.bigquery.BigQueryOfflineStore",
DataSourceProto.SourceType.BATCH_REDSHIFT: "feast.infra.offline_stores.redshift.RedshiftOfflineStore",
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake.SnowflakeOfflineStore",
DataSourceProto.SourceType.BATCH_TRINO: "feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoOfflineStore",
DataSourceProto.SourceType.BATCH_SPARK: "feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore",
DataSourceProto.SourceType.BATCH_ATHENA: "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore",
}


@typechecked
class DataSource(ABC):
Expand Down Expand Up @@ -401,6 +411,9 @@ def _set_timestamps_in_proto(self, data_source_proto: DataSourceProto):
self.last_updated_timestamp
)

@abstractmethod
def source_type(self) -> DataSourceProto.SourceType.ValueType: ...


@typechecked
class KafkaSource(DataSource):
Expand Down Expand Up @@ -564,6 +577,9 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_query_string(self) -> str:
raise NotImplementedError

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.STREAM_KAFKA


@typechecked
class RequestSource(DataSource):
Expand Down Expand Up @@ -679,6 +695,9 @@ def get_table_query_string(self) -> str:
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.REQUEST_SOURCE


@typechecked
class KinesisSource(DataSource):
Expand Down Expand Up @@ -811,6 +830,9 @@ def _to_proto_impl(self) -> DataSourceProto:

return data_source_proto

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.STREAM_KINESIS


class PushMode(enum.Enum):
ONLINE = 1
Expand Down Expand Up @@ -911,3 +933,6 @@ def get_table_query_string(self) -> str:
@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.PUSH_SOURCE
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
class BigQuerySource(DataSource):
"""A BigQuerySource object defines a data source that a BigQueryOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_BIGQUERY

def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@


class AthenaSource(DataSource):
def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_ATHENA

def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def to_proto(self) -> DataSourceProto.CustomSourceOptions:


class ClickhouseSource(DataSource):
def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add ClickhouseSourceType to DataSourceProto
return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
class CouchbaseColumnarSource(DataSource):
"""A CouchbaseColumnarSource object defines a data source that a CouchbaseColumnarOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add Couchbase to DataSourceProto.SourceType
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just add all of this now ? is the scope large?

Copy link
Copy Markdown
Collaborator Author

@HaoXuAI HaoXuAI Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it will have to update the protobuf and someother places, which will do it in next PR

return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ def to_proto(self) -> DataSourceProto.CustomSourceOptions:
class MsSqlServerSource(DataSource):
"""A MsSqlServerSource object defines a data source that a MsSqlServerOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add MsSqlServerSource to DataSourceProto.SourceType
return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
class PostgreSQLSource(DataSource):
"""A PostgreSQLSource object defines a data source that a PostgreSQLOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
# TODO: Add Postgres to DataSourceProto.SourceType
return DataSourceProto.CUSTOM_SOURCE

def __init__(
self,
name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class SparkSourceFormat(Enum):
class SparkSource(DataSource):
"""A SparkSource object defines a data source that a Spark offline store can use"""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_SPARK

def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def to_proto(self) -> DataSourceProto.TrinoOptions:
class TrinoSource(DataSource):
"""A TrinoSource object defines a data source that a TrinoOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_TRINO

def __init__(
self,
*,
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
class FileSource(DataSource):
"""A FileSource object defines a data source that a DaskOfflineStore or DuckDBOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_FILE

def __init__(
self,
*,
Expand Down
67 changes: 38 additions & 29 deletions sdk/python/feast/infra/offline_stores/hybrid_offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@

import pandas as pd
import pyarrow
from feature_logging import LoggingConfig, LoggingSource
from infra.registry.base_registry import BaseRegistry

from feast import FeatureView, RepoConfig
from feast.data_source import DataSource
from feast.infra.offline_stores.file_source import FileSource
from feast.data_source import _DATA_SOURCE_FOR_OFFLINE_STORE, DataSource
from feast.errors import FeastOfflineStoreInvalidName
from feast.feature_logging import LoggingConfig, LoggingSource
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
from feast.repo_config import FeastConfigBaseModel, get_offline_config_from_type
from feast.infra.registry.base_registry import BaseRegistry
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import (
FeastConfigBaseModel,
get_offline_config_from_type,
get_offline_store_type,
)


class HybridOfflineStoreConfig(FeastConfigBaseModel):
Expand All @@ -31,12 +35,6 @@ class HybridOfflineStore(OfflineStore):
_initialized: bool
offline_stores: Dict[str, OfflineStore]

_source_to_store_key = {
FileSource: "file",
SnowflakeSource: "snowflake",
LoggingSource: "logging",
}

def __new__(cls):
if cls._instance is None:
cls._instance = super(HybridOfflineStore, cls).__new__(cls)
Expand All @@ -48,35 +46,46 @@ def _initialize_offline_stores(self, config: RepoConfig):
if self._initialized:
return
for store_cfg in getattr(config.offline_store, "offline_stores", []):
config_cls = get_offline_config_from_type(
store_cfg.type.split(".")[-1].lower()
)
config_instance = config_cls(**store_cfg.conf)
store = get_offline_store_from_config(config_instance)
store_key = (
store_cfg.type.split(".")[-1].replace("OfflineStore", "").lower()
)
self.offline_stores[store_key] = store
try:
offline_store_type = get_offline_store_type(store_cfg.type)
config_cls = get_offline_config_from_type(store_cfg.type)
config_instance = config_cls(**store_cfg.conf)
store = get_offline_store_from_config(config_instance)
self.offline_stores[offline_store_type] = store
except FeastOfflineStoreInvalidName as e:
raise FeastOfflineStoreInvalidName(
f"Failed to initialize Hybrid offline store {store_cfg.type}: {e}"
)
self._initialized = True

def get_source_key_from_type(
self, source_type: DataSourceProto.SourceType.ValueType
) -> Optional[str]:
if source_type not in list(_DATA_SOURCE_FOR_OFFLINE_STORE.keys()):
raise ValueError(
f"Unsupported DataSource type for HybridOfflineStore: {source_type}."
f"Supported types are: {list(_DATA_SOURCE_FOR_OFFLINE_STORE.keys())}"
)
return _DATA_SOURCE_FOR_OFFLINE_STORE.get(source_type, None)

def _get_offline_store_for_feature_view(
self, feature_view: FeatureView, config: RepoConfig
) -> OfflineStore:
self._initialize_offline_stores(config)
source_type = type(feature_view.batch_source)
store_key = self._source_to_store_key.get(source_type)
source_type = feature_view.batch_source.source_type()
store_key = self.get_source_key_from_type(source_type)
if store_key is None:
raise ValueError(
f"Unsupported FeatureView batch_source type: {source_type}"
)
return self.offline_stores[store_key]

def _get_offline_store_for_source(
self, data_source: Union[DataSource, LoggingSource], config: RepoConfig
self, data_source: DataSource, config: RepoConfig
) -> OfflineStore:
self._initialize_offline_stores(config)
source_type = type(data_source)
store_key = self._source_to_store_key.get(source_type)
source_type = data_source.source_type()
store_key = self.get_source_key_from_type(source_type)
if store_key is None:
raise ValueError(f"Unsupported DataSource type: {source_type}")
return self.offline_stores[store_key]
Expand Down Expand Up @@ -158,9 +167,9 @@ def write_logged_features(
logging_config: LoggingConfig,
registry: BaseRegistry,
):
store = HybridOfflineStore()._get_offline_store_for_source(source, config)
return store.write_logged_features(
config, data, source, logging_config, registry
raise NotImplementedError(
"HybridOfflineStore does not support write_logged_features. "
"Please use the specific offline store for logging."
)

@staticmethod
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/offline_stores/redshift_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
class RedshiftSource(DataSource):
"""A RedshiftSource object defines a data source that a RedshiftOfflineStore class can use."""

def source_type(self) -> DataSourceProto.SourceType.ValueType:
return DataSourceProto.BATCH_REDSHIFT

def __init__(
self,
*,
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ def get_table_column_names_and_types(
for column in metadata
]

def source_type(self) -> DataSourceProto.SourceType.ValueType:
"""
Returns the source type of this data source.
"""
return DataSourceProto.BATCH_SNOWFLAKE


snowflake_type_code_map = {
0: "NUMBER",
Expand Down
9 changes: 7 additions & 2 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,16 @@ def get_auth_config_from_type(auth_config_type: str):
return import_class(module_name, config_class_name, config_class_name)


def get_offline_config_from_type(offline_store_type: str):
def get_offline_store_type(offline_store_type: str):
if offline_store_type in OFFLINE_STORE_CLASS_FOR_TYPE:
offline_store_type = OFFLINE_STORE_CLASS_FOR_TYPE[offline_store_type]
return OFFLINE_STORE_CLASS_FOR_TYPE[offline_store_type]
elif not offline_store_type.endswith("OfflineStore"):
raise FeastOfflineStoreInvalidName(offline_store_type)
return offline_store_type


def get_offline_config_from_type(offline_store_type: str):
offline_store_type = get_offline_store_type(offline_store_type)
module_name, offline_store_class_type = offline_store_type.rsplit(".", 1)
config_class_name = f"{offline_store_class_type}Config"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from unittest.mock import MagicMock, patch

import pytest

from feast import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.hybrid_offline_store import (
HybridOfflineStore,
HybridOfflineStoreConfig,
)
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
from feast.repo_config import RepoConfig

driver = Entity(name="driver_id", description="driver id")


@pytest.fixture
def mock_repo_config():
return RepoConfig(
registry="/tmp/registry.db",
project="test_project",
provider="local",
offline_store=HybridOfflineStoreConfig(
offline_stores=[
HybridOfflineStoreConfig.OfflineStoresWithConfig(
type="file",
conf={},
),
HybridOfflineStoreConfig.OfflineStoresWithConfig(
type="snowflake.offline",
conf={"database": "db", "schema": "public"},
),
]
),
online_store=None,
)


@patch("feast.infra.offline_stores.hybrid_offline_store.get_offline_store_from_config")
def test_file_source_routing(mock_get_offline_store_from_config, mock_repo_config):
HybridOfflineStore._instance = None

mock_file_store = MagicMock()
mock_snowflake_store = MagicMock()

def side_effect(conf):
if conf.__class__.__name__ == "DaskOfflineStoreConfig":
return mock_file_store
elif conf.__class__.__name__ == "SnowflakeOfflineStoreConfig":
return mock_snowflake_store
raise ValueError(f"Unexpected config class: {conf.__class__.__name__}")

mock_get_offline_store_from_config.side_effect = side_effect

hybrid_store = HybridOfflineStore()

feature_view_1 = FeatureView(
name="my_feature_1",
entities=[driver],
schema=[],
source=FileSource(path="data.parquet"),
)

store1 = hybrid_store._get_offline_store_for_feature_view(
feature_view_1, mock_repo_config
)
assert store1 is mock_file_store

feature_view_2 = FeatureView(
name="my_feature_2",
entities=[driver],
schema=[],
source=SnowflakeSource(database="db", schema="public", table="table"),
)

store2 = hybrid_store._get_offline_store_for_feature_view(
feature_view_2, mock_repo_config
)
assert store2 is mock_snowflake_store
Loading