Skip to content

Commit 8f1af55

Browse files
authored
feat: Hybrid offline store (#5510)
* add hybrid offline store Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * add hybrid offline store Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * fix source type Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * update doc Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * update doc Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * update doc Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * update doc Signed-off-by: HaoXuAI <sduxuhao@gmail.com> --------- Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
1 parent f287ca5 commit 8f1af55

File tree

17 files changed

+464
-2
lines changed

17 files changed

+464
-2
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Hybrid Offline Store
2+
3+
## Description
4+
The HybridOfflineStore allows routing offline feature operations to different offline store backends based on the `batch_source` of the FeatureView. This enables a single Feast deployment to support multiple offline store backends, each configured independently and selected dynamically at runtime.
5+
6+
## Getting started
7+
To use the HybridOfflineStore, install Feast with all required offline store dependencies (e.g., BigQuery, Snowflake, etc.) for the stores you plan to use. For example:
8+
9+
```bash
10+
pip install 'feast[spark,snowflake]'
11+
```
12+
13+
## Example
14+
15+
{% code title="feature_store.yaml" %}
16+
```yaml
17+
project: my_feature_repo
18+
registry: data/registry.db
19+
provider: local
20+
offline_store:
21+
type: hybrid_offline_store.HybridOfflineStore
22+
offline_stores:
23+
- type: spark
24+
conf:
25+
spark_master: local[*]
26+
spark_app_name: feast_spark_app
27+
- type: snowflake
28+
conf:
29+
account: my_snowflake_account
30+
user: feast_user
31+
password: feast_password
32+
database: feast_database
33+
schema: feast_schema
34+
```
35+
{% endcode %}
36+
37+
### Example FeatureView
38+
```python
39+
from feast import FeatureView, Entity, ValueType
40+
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
41+
SparkSource,
42+
)
43+
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
44+
45+
46+
entity = Entity(name="user_id", value_type=ValueType.INT64, join_keys=["user_id"])
47+
feature_view1 = FeatureView(
48+
name="user_features",
49+
entities=["user_id"],
50+
ttl=None,
51+
features=[
52+
# Define your features here
53+
],
54+
source=SparkSource(
55+
path="s3://my-bucket/user_features_data",
56+
),
57+
)
58+
59+
feature_view2 = FeatureView(
60+
name="user_activity",
61+
entities=["user_id"],
62+
ttl=None,
63+
features=[
64+
# Define your features here
65+
],
66+
source=SnowflakeSource(
67+
path="s3://my-bucket/user_activity_data",
68+
),
69+
)
70+
71+
```
72+
73+
Then you can use materialize API to materialize the data from the specified offline store based on the `batch_source` of the FeatureView.
74+
75+
```python
76+
from feast import FeatureStore
77+
store = FeatureStore(repo_path=".")
78+
store.materialize(
79+
start_date="2025-01-01",
80+
end_date="2025-07-31",
81+
feature_views=[feature_view1, feature_view2],
82+
)
83+
```
84+
85+
## Functionality Matrix
86+
| Feature/Functionality | Supported |
87+
|---------------------------------------------------|----------------------------|
88+
| pull_latest_from_table_or_query | Yes |
89+
| pull_all_from_table_or_query | Yes |
90+
| offline_write_batch | Yes |
91+
| validate_data_source | Yes |
92+
| get_table_column_names_and_types_from_data_source | Yes |
93+
| write_logged_features | No |
94+
| get_historical_features | Only with same data source |

sdk/python/feast/data_source.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,16 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
163163
DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource",
164164
}
165165

166+
_DATA_SOURCE_FOR_OFFLINE_STORE = {
167+
DataSourceProto.SourceType.BATCH_FILE: "feast.infra.offline_stores.dask.DaskOfflineStore",
168+
DataSourceProto.SourceType.BATCH_BIGQUERY: "feast.infra.offline_stores.bigquery.BigQueryOfflineStore",
169+
DataSourceProto.SourceType.BATCH_REDSHIFT: "feast.infra.offline_stores.redshift.RedshiftOfflineStore",
170+
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake.SnowflakeOfflineStore",
171+
DataSourceProto.SourceType.BATCH_TRINO: "feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoOfflineStore",
172+
DataSourceProto.SourceType.BATCH_SPARK: "feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore",
173+
DataSourceProto.SourceType.BATCH_ATHENA: "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore",
174+
}
175+
166176

167177
@typechecked
168178
class DataSource(ABC):
@@ -401,6 +411,9 @@ def _set_timestamps_in_proto(self, data_source_proto: DataSourceProto):
401411
self.last_updated_timestamp
402412
)
403413

414+
@abstractmethod
415+
def source_type(self) -> DataSourceProto.SourceType.ValueType: ...
416+
404417

405418
@typechecked
406419
class KafkaSource(DataSource):
@@ -564,6 +577,9 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
564577
def get_table_query_string(self) -> str:
565578
raise NotImplementedError
566579

580+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
581+
return DataSourceProto.STREAM_KAFKA
582+
567583

568584
@typechecked
569585
class RequestSource(DataSource):
@@ -679,6 +695,9 @@ def get_table_query_string(self) -> str:
679695
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
680696
raise NotImplementedError
681697

698+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
699+
return DataSourceProto.REQUEST_SOURCE
700+
682701

683702
@typechecked
684703
class KinesisSource(DataSource):
@@ -811,6 +830,9 @@ def _to_proto_impl(self) -> DataSourceProto:
811830

812831
return data_source_proto
813832

833+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
834+
return DataSourceProto.STREAM_KINESIS
835+
814836

815837
class PushMode(enum.Enum):
816838
ONLINE = 1
@@ -911,3 +933,6 @@ def get_table_query_string(self) -> str:
911933
@staticmethod
912934
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
913935
raise NotImplementedError
936+
937+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
938+
return DataSourceProto.PUSH_SOURCE

sdk/python/feast/infra/offline_stores/bigquery_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
class BigQuerySource(DataSource):
2424
"""A BigQuerySource object defines a data source that a BigQueryOfflineStore class can use."""
2525

26+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
27+
return DataSourceProto.BATCH_BIGQUERY
28+
2629
def __init__(
2730
self,
2831
*,

sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818

1919
class AthenaSource(DataSource):
20+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
21+
return DataSourceProto.BATCH_ATHENA
22+
2023
def __init__(
2124
self,
2225
*,

sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse_source.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ def to_proto(self) -> DataSourceProto.CustomSourceOptions:
5656

5757

5858
class ClickhouseSource(DataSource):
59+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
60+
# TODO: Add ClickhouseSourceType to DataSourceProto
61+
return DataSourceProto.CUSTOM_SOURCE
62+
5963
def __init__(
6064
self,
6165
name: Optional[str] = None,

sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
class CouchbaseColumnarSource(DataSource):
2727
"""A CouchbaseColumnarSource object defines a data source that a CouchbaseColumnarOfflineStore class can use."""
2828

29+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
30+
# TODO: Add Couchbase to DataSourceProto.SourceType
31+
return DataSourceProto.CUSTOM_SOURCE
32+
2933
def __init__(
3034
self,
3135
name: Optional[str] = None,

sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssqlserver_source.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ def to_proto(self) -> DataSourceProto.CustomSourceOptions:
113113
class MsSqlServerSource(DataSource):
114114
"""A MsSqlServerSource object defines a data source that a MsSqlServerOfflineStore class can use."""
115115

116+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
117+
# TODO: Add MsSqlServerSource to DataSourceProto.SourceType
118+
return DataSourceProto.CUSTOM_SOURCE
119+
116120
def __init__(
117121
self,
118122
name: str,

sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres_source.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
class PostgreSQLSource(DataSource):
2121
"""A PostgreSQLSource object defines a data source that a PostgreSQLOfflineStore class can use."""
2222

23+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
24+
# TODO: Add Postgres to DataSourceProto.SourceType
25+
return DataSourceProto.CUSTOM_SOURCE
26+
2327
def __init__(
2428
self,
2529
name: Optional[str] = None,

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ class SparkSourceFormat(Enum):
3131
class SparkSource(DataSource):
3232
"""A SparkSource object defines a data source that a Spark offline store can use"""
3333

34+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
35+
return DataSourceProto.BATCH_SPARK
36+
3437
def __init__(
3538
self,
3639
*,

sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ def to_proto(self) -> DataSourceProto.TrinoOptions:
8686
class TrinoSource(DataSource):
8787
"""A TrinoSource object defines a data source that a TrinoOfflineStore class can use."""
8888

89+
def source_type(self) -> DataSourceProto.SourceType.ValueType:
90+
return DataSourceProto.BATCH_TRINO
91+
8992
def __init__(
9093
self,
9194
*,

0 commit comments

Comments
 (0)