Skip to content

Commit 2368f42

Browse files
authored
feat: Remote Write to Online Store completes client / server architecture (#5422)
* Remote Write to Online Store Signed-off-by: jyejare <jyejare@redhat.com> * Comments Resolved and code shortened Signed-off-by: jyejare <jyejare@redhat.com> --------- Signed-off-by: jyejare <jyejare@redhat.com>
1 parent f1aaa61 commit 2368f42

File tree

2 files changed

+216
-4
lines changed

2 files changed

+216
-4
lines changed

sdk/python/feast/infra/online_stores/remote.py

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,24 @@
1313
# limitations under the License.
1414
import json
1515
import logging
16+
from collections import defaultdict
1617
from datetime import datetime
1718
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple
1819

1920
import requests
2021
from pydantic import StrictStr
2122

2223
from feast import Entity, FeatureView, RepoConfig
24+
from feast.infra.online_stores.helpers import _to_naive_utc
2325
from feast.infra.online_stores.online_store import OnlineStore
2426
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2527
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
2628
from feast.repo_config import FeastConfigBaseModel
2729
from feast.rest_error_handler import rest_error_handling_decorator
28-
from feast.type_map import python_values_to_proto_values
30+
from feast.type_map import (
31+
feast_value_type_to_python_type,
32+
python_values_to_proto_values,
33+
)
2934
from feast.value_type import ValueType
3035

3136
logger = logging.getLogger(__name__)
@@ -60,7 +65,55 @@ def online_write_batch(
6065
],
6166
progress: Optional[Callable[[int], Any]],
6267
) -> None:
63-
raise NotImplementedError
68+
"""
69+
Writes a batch of feature rows to the remote online store via the remote API.
70+
"""
71+
assert isinstance(config.online_store, RemoteOnlineStoreConfig)
72+
config.online_store.__class__ = RemoteOnlineStoreConfig
73+
74+
columnar_data: Dict[str, List[Any]] = defaultdict(list)
75+
76+
# Iterate through each row to populate columnar data directly
77+
for entity_key_proto, feature_values_proto, event_ts, created_ts in data:
78+
# Populate entity key values
79+
for join_key, entity_value_proto in zip(
80+
entity_key_proto.join_keys, entity_key_proto.entity_values
81+
):
82+
columnar_data[join_key].append(
83+
feast_value_type_to_python_type(entity_value_proto)
84+
)
85+
86+
# Populate feature values
87+
for feature_name, feature_value_proto in feature_values_proto.items():
88+
columnar_data[feature_name].append(
89+
feast_value_type_to_python_type(feature_value_proto)
90+
)
91+
92+
# Populate timestamps
93+
columnar_data["event_timestamp"].append(_to_naive_utc(event_ts).isoformat())
94+
columnar_data["created"].append(
95+
_to_naive_utc(created_ts).isoformat() if created_ts else None
96+
)
97+
98+
req_body = {
99+
"feature_view_name": table.name,
100+
"df": columnar_data,
101+
"allow_registry_cache": False,
102+
}
103+
104+
response = post_remote_online_write(config=config, req_body=req_body)
105+
106+
if response.status_code != 200:
107+
error_msg = f"Unable to write online store data using feature server API. Error_code={response.status_code}, error_message={response.text}"
108+
logger.error(error_msg)
109+
raise RuntimeError(error_msg)
110+
111+
if progress:
112+
data_length = len(data)
113+
logger.info(
114+
f"Writing {data_length} rows to the remote store for feature view {table.name}."
115+
)
116+
progress(data_length)
64117

65118
def online_read(
66119
self,
@@ -184,3 +237,14 @@ def get_remote_online_features(
184237
return session.post(
185238
f"{config.online_store.path}/get-online-features", data=req_body
186239
)
240+
241+
242+
@rest_error_handling_decorator
243+
def post_remote_online_write(
244+
session: requests.Session, config: RepoConfig, req_body: dict
245+
) -> requests.Response:
246+
url = f"{config.online_store.path}/write-to-online-store"
247+
if config.online_store.cert:
248+
return session.post(url, json=req_body, verify=config.online_store.cert)
249+
else:
250+
return session.post(url, json=req_body)

sdk/python/tests/integration/online_store/test_remote_online_store.py

Lines changed: 150 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,28 @@
11
import logging
22
import os
33
import tempfile
4+
from datetime import timedelta
45
from textwrap import dedent
56

7+
import pandas as pd
68
import pytest
79

8-
from feast import FeatureView, OnDemandFeatureView, StreamFeatureView
10+
from feast import (
11+
Entity,
12+
FeatureView,
13+
Field,
14+
FileSource,
15+
OnDemandFeatureView,
16+
PushSource,
17+
StreamFeatureView,
18+
)
19+
from feast.data_source import PushMode
920
from feast.feature_store import FeatureStore
1021
from feast.permissions.action import AuthzedAction
1122
from feast.permissions.permission import Permission
1223
from feast.permissions.policy import RoleBasedPolicy
24+
from feast.types import Float32, Int64
25+
from feast.utils import _utc_now
1326
from tests.utils.auth_permissions_util import (
1427
PROJECT_NAME,
1528
default_store,
@@ -235,7 +248,6 @@ def _create_remote_client_feature_store(
235248
if is_tls_mode and ca_trust_store_path:
236249
# configure trust store path only when is_tls_mode and ca_trust_store_path exists.
237250
os.environ["FEAST_CA_CERT_FILE_PATH"] = ca_trust_store_path
238-
239251
return FeatureStore(repo_path=repo_path)
240252

241253

@@ -265,3 +277,139 @@ def _overwrite_remote_client_feature_store_yaml(
265277

266278
with open(repo_config, "w") as repo_config_file:
267279
repo_config_file.write(config_content)
280+
281+
282+
@pytest.mark.integration
283+
@pytest.mark.rbac_remote_integration_test
284+
@pytest.mark.parametrize(
285+
"tls_mode", [("True", "True"), ("True", "False"), ("False", "")], indirect=True
286+
)
287+
def test_remote_online_store_read_write(auth_config, tls_mode):
288+
with (
289+
tempfile.TemporaryDirectory() as remote_server_tmp_dir,
290+
tempfile.TemporaryDirectory() as remote_client_tmp_dir,
291+
):
292+
permissions_list = [
293+
Permission(
294+
name="online_list_fv_perm",
295+
types=FeatureView,
296+
policy=RoleBasedPolicy(roles=["reader"]),
297+
actions=[AuthzedAction.READ_ONLINE],
298+
),
299+
Permission(
300+
name="online_list_odfv_perm",
301+
types=OnDemandFeatureView,
302+
policy=RoleBasedPolicy(roles=["reader"]),
303+
actions=[AuthzedAction.READ_ONLINE],
304+
),
305+
Permission(
306+
name="online_list_sfv_perm",
307+
types=StreamFeatureView,
308+
policy=RoleBasedPolicy(roles=["reader"]),
309+
actions=[AuthzedAction.READ_ONLINE],
310+
),
311+
Permission(
312+
name="online_write_fv_perm",
313+
types=FeatureView,
314+
policy=RoleBasedPolicy(roles=["writer"]),
315+
actions=[AuthzedAction.WRITE_ONLINE],
316+
),
317+
Permission(
318+
name="online_write_odfv_perm",
319+
types=OnDemandFeatureView,
320+
policy=RoleBasedPolicy(roles=["writer"]),
321+
actions=[AuthzedAction.WRITE_ONLINE],
322+
),
323+
Permission(
324+
name="online_write_sfv_perm",
325+
types=StreamFeatureView,
326+
policy=RoleBasedPolicy(roles=["writer"]),
327+
actions=[AuthzedAction.WRITE_ONLINE],
328+
),
329+
]
330+
server_store, server_url, registry_path = (
331+
_create_server_store_spin_feature_server(
332+
temp_dir=remote_server_tmp_dir,
333+
auth_config=auth_config,
334+
permissions_list=permissions_list,
335+
tls_mode=tls_mode,
336+
)
337+
)
338+
assert None not in (server_store, server_url, registry_path)
339+
340+
client_store = _create_remote_client_feature_store(
341+
temp_dir=remote_client_tmp_dir,
342+
server_registry_path=str(registry_path),
343+
feature_server_url=server_url,
344+
auth_config=auth_config,
345+
tls_mode=tls_mode,
346+
)
347+
assert client_store is not None
348+
349+
# Define a simple FeatureView for testing write operations
350+
driver = Entity(name="driver_id", description="Drivers id")
351+
352+
driver_hourly_stats_source = FileSource(
353+
path="data/driver_stats.parquet", # Path is not used for online writes in this context
354+
timestamp_field="event_timestamp",
355+
created_timestamp_column="created",
356+
)
357+
358+
PushSource(
359+
name="driver_stats_push_source",
360+
batch_source=driver_hourly_stats_source,
361+
)
362+
363+
driver_hourly_stats_fv = FeatureView(
364+
name="driver_hourly_stats",
365+
entities=[driver],
366+
ttl=timedelta(days=1),
367+
schema=[
368+
Field(name="conv_rate", dtype=Float32),
369+
Field(name="acc_rate", dtype=Float32),
370+
Field(name="avg_daily_trips", dtype=Int64),
371+
],
372+
source=driver_hourly_stats_source,
373+
tags={},
374+
)
375+
376+
# Apply the feature view to the client store
377+
client_store.apply([driver, driver_hourly_stats_fv])
378+
event_df = pd.DataFrame(
379+
{
380+
"driver_id": [1000, 1001],
381+
"conv_rate": [0.56, 0.74],
382+
"acc_rate": [0.95, 0.93],
383+
"avg_daily_trips": [50, 45],
384+
"event_timestamp": [pd.Timestamp(_utc_now()).round("ms")] * 2,
385+
"created": [pd.Timestamp(_utc_now()).round("ms")] * 2,
386+
}
387+
)
388+
389+
# Perform the online write
390+
client_store.push(
391+
push_source_name="driver_stats_push_source", df=event_df, to=PushMode.ONLINE
392+
)
393+
394+
# Verify the data by reading it back
395+
# read_entity_keys = [entity_key_1, entity_key_2]
396+
read_features = [
397+
"driver_hourly_stats_fresh:conv_rate",
398+
"driver_hourly_stats_fresh:acc_rate",
399+
"driver_hourly_stats_fresh:avg_daily_trips",
400+
]
401+
online_features = client_store.get_online_features(
402+
features=read_features,
403+
entity_rows=[{"driver_id": 1000}, {"driver_id": 1001}],
404+
).to_dict()
405+
406+
# Assertions for read data
407+
assert online_features is not None
408+
assert len(online_features["driver_id"]) == 2
409+
assert online_features["driver_id"] == [1000, 1001]
410+
assert [round(val, 2) for val in online_features["conv_rate"]] == [0.56, 0.74]
411+
assert [round(val, 2) for val in online_features["acc_rate"]] == [0.95, 0.93]
412+
assert online_features["avg_daily_trips"] == [50, 45]
413+
414+
# Clean up the applied feature view from the server store to avoid interference with other tests
415+
server_store.teardown()

0 commit comments

Comments
 (0)