Skip to content

Commit 6728f2a

Browse files
author
Vitaly Sergeyev
authored
direct data ingestion into Online store (feast-dev#1939)
* direct data ingestion into Online store Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com> * feature flags and formatting Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com> * add test for write_to_online_store Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com> * formatting Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com> * move test to test_universal_online Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com> * formatting Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com> * remove unused import Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com> * test flags not needed Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>
1 parent ce3ad03 commit 6728f2a

6 files changed

Lines changed: 94 additions & 0 deletions

File tree

sdk/python/feast/feature_store.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,26 @@ def tqdm_builder(length):
809809
feature_view, self.project, start_date, end_date
810810
)
811811

812+
@log_exceptions_and_usage
813+
def write_to_online_store(
814+
self, feature_view_name: str, df: pd.DataFrame,
815+
):
816+
"""
817+
ingests data directly into the Online store
818+
"""
819+
if not flags_helper.enable_direct_ingestion_to_online_store(self.config):
820+
raise ExperimentalFeatureNotEnabled(
821+
flags.FLAG_DIRECT_INGEST_TO_ONLINE_STORE
822+
)
823+
824+
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
825+
feature_view = self._registry.get_feature_view(feature_view_name, self.project)
826+
entities = []
827+
for entity_name in feature_view.entities:
828+
entities.append(self._registry.get_entity(entity_name, self.project))
829+
provider = self._get_provider()
830+
provider.ingest_df(feature_view, entities, df)
831+
812832
@log_exceptions_and_usage
813833
def get_online_features(
814834
self,

sdk/python/feast/flags.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
FLAG_ON_DEMAND_TRANSFORM_NAME = "on_demand_transforms"
33
FLAG_PYTHON_FEATURE_SERVER_NAME = "python_feature_server"
44
FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME = "aws_lambda_feature_server"
5+
FLAG_DIRECT_INGEST_TO_ONLINE_STORE = "direct_ingest_to_online_store"
56
ENV_FLAG_IS_TEST = "IS_TEST"
67

78
FLAG_NAMES = {
89
FLAG_ALPHA_FEATURES_NAME,
910
FLAG_ON_DEMAND_TRANSFORM_NAME,
1011
FLAG_PYTHON_FEATURE_SERVER_NAME,
1112
FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME,
13+
FLAG_DIRECT_INGEST_TO_ONLINE_STORE,
1214
}

sdk/python/feast/flags_helper.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ def enable_python_feature_server(repo_config: RepoConfig) -> bool:
4141

4242
def enable_aws_lambda_feature_server(repo_config: RepoConfig) -> bool:
4343
return feature_flag_enabled(repo_config, flags.FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME)
44+
45+
46+
def enable_direct_ingestion_to_online_store(repo_config: RepoConfig) -> bool:
47+
return feature_flag_enabled(repo_config, flags.FLAG_DIRECT_INGEST_TO_ONLINE_STORE)

sdk/python/feast/infra/passthrough_provider.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
33

44
import pandas
5+
import pyarrow as pa
56
from tqdm import tqdm
67

78
from feast.entity import Entity
@@ -82,6 +83,21 @@ def online_read(
8283

8384
return result
8485

86+
def ingest_df(
87+
self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame,
88+
):
89+
table = pa.Table.from_pandas(df)
90+
91+
if feature_view.batch_source.field_mapping is not None:
92+
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)
93+
94+
join_keys = [entity.join_key for entity in entities]
95+
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
96+
97+
self.online_write_batch(
98+
self.repo_config, feature_view, rows_to_write, progress=None
99+
)
100+
85101
def materialize_single_feature_view(
86102
self,
87103
config: RepoConfig,

sdk/python/feast/infra/provider.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,14 @@ def online_write_batch(
9898
"""
9999
...
100100

101+
def ingest_df(
102+
self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame,
103+
):
104+
"""
105+
Ingests a DataFrame directly into the online store
106+
"""
107+
pass
108+
101109
@abc.abstractmethod
102110
def materialize_single_feature_view(
103111
self,

sdk/python/tests/integration/online_store/test_universal_online.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import itertools
23
import unittest
34
from datetime import timedelta
@@ -19,6 +20,49 @@
1920
driver,
2021
location,
2122
)
23+
from tests.integration.feature_repos.universal.feature_views import (
24+
create_driver_hourly_stats_feature_view,
25+
)
26+
27+
28+
@pytest.mark.integration
29+
def test_write_to_online_store(environment, universal_data_sources):
30+
fs = environment.feature_store
31+
entities, datasets, data_sources = universal_data_sources
32+
driver_hourly_stats = create_driver_hourly_stats_feature_view(
33+
data_sources["driver"]
34+
)
35+
driver_entity = driver()
36+
37+
# Register Feature View and Entity
38+
fs.apply([driver_hourly_stats, driver_entity])
39+
40+
# fake data to ingest into Online Store
41+
data = {
42+
"driver_id": [123],
43+
"conv_rate": [0.85],
44+
"acc_rate": [0.91],
45+
"avg_daily_trips": [14],
46+
"event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
47+
"created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
48+
}
49+
df_data = pd.DataFrame(data)
50+
51+
# directly ingest data into the Online Store
52+
fs.write_to_online_store("driver_stats", df_data)
53+
54+
# assert the right data is in the Online Store
55+
df = fs.get_online_features(
56+
features=[
57+
"driver_stats:avg_daily_trips",
58+
"driver_stats:acc_rate",
59+
"driver_stats:conv_rate",
60+
],
61+
entity_rows=[{"driver": 123}],
62+
).to_df()
63+
assert df["avg_daily_trips"].iloc[0] == 14
64+
assert df["acc_rate"].iloc[0] == 0.91
65+
assert df["conv_rate"].iloc[0] == 0.85
2266

2367

2468
@pytest.mark.integration

0 commit comments

Comments
 (0)