Skip to content

Commit dcc0b1c

Browse files
committed
Remote Write to Online Store
Signed-off-by: jyejare <jyejare@redhat.com>
1 parent 5f15329 commit dcc0b1c

File tree

2 files changed

+232
-3
lines changed

2 files changed

+232
-3
lines changed

sdk/python/feast/infra/online_stores/remote.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,77 @@ def online_write_batch(
6060
],
6161
progress: Optional[Callable[[int], Any]],
6262
) -> None:
63-
raise NotImplementedError
63+
"""
64+
Writes a batch of feature rows to the remote online store via the remote API.
65+
"""
66+
assert isinstance(config.online_store, RemoteOnlineStoreConfig)
67+
config.online_store.__class__ = RemoteOnlineStoreConfig
68+
69+
# Restructure data into a columnar dictionary format for the 'df' key
70+
columnar_data: Dict[str, List[Any]] = {}
71+
72+
# Collect all unique entity key names and feature names
73+
all_keys = set()
74+
for entity_key_proto, feature_values_proto, _, _ in data:
75+
for join_key in entity_key_proto.join_keys:
76+
all_keys.add(join_key)
77+
for feature_name in feature_values_proto.keys():
78+
all_keys.add(feature_name)
79+
all_keys.add("event_timestamp")
80+
if data and data[0][3] is not None: # Check if created_ts is present
81+
all_keys.add("created")
82+
83+
# Initialize columnar data dictionary with empty lists
84+
for key in all_keys:
85+
columnar_data[key] = []
86+
87+
# Populate the columnar data
88+
for entity_key_proto, feature_values_proto, event_ts, created_ts in data:
89+
# Populate entity key values
90+
for join_key, entity_value_proto in zip(
91+
entity_key_proto.join_keys, entity_key_proto.entity_values
92+
):
93+
columnar_data[join_key].append(
94+
self.value_proto_to_python(entity_value_proto)
95+
)
96+
97+
# Populate feature values
98+
for feature_name, feature_value_proto in feature_values_proto.items():
99+
columnar_data[feature_name].append(
100+
self.value_proto_to_python(feature_value_proto)
101+
)
102+
103+
# Populate timestamps
104+
columnar_data["event_timestamp"].append(event_ts.isoformat())
105+
if "created" in columnar_data:
106+
columnar_data["created"].append(
107+
created_ts.isoformat() if created_ts else None
108+
)
109+
110+
req_body = {
111+
"feature_view_name": table.name,
112+
"df": columnar_data,
113+
"allow_registry_cache": False,
114+
}
115+
116+
response = post_remote_online_write(config=config, req_body=req_body)
117+
118+
if response.status_code != 200:
119+
error_msg = f"Unable to write online store data using feature server API. Error_code={response.status_code}, error_message={response.text}"
120+
logger.error(error_msg)
121+
raise RuntimeError(error_msg)
122+
123+
if progress:
124+
progress(len(data))
125+
126+
def value_proto_to_python(self, value_proto: ValueProto):
127+
"""
128+
Convert a ValueProto to a native Python value for JSON serialization.
129+
"""
130+
kind = value_proto.WhichOneof("val")
131+
if kind is None:
132+
return None
133+
return getattr(value_proto, kind)
64134

65135
def online_read(
66136
self,
@@ -184,3 +254,14 @@ def get_remote_online_features(
184254
return session.post(
185255
f"{config.online_store.path}/get-online-features", data=req_body
186256
)
257+
258+
259+
@rest_error_handling_decorator
260+
def post_remote_online_write(
261+
session: requests.Session, config: RepoConfig, req_body: dict
262+
) -> requests.Response:
263+
url = f"{config.online_store.path}/write-to-online-store"
264+
if config.online_store.cert:
265+
return session.post(url, json=req_body, verify=config.online_store.cert)
266+
else:
267+
return session.post(url, json=req_body)

sdk/python/tests/integration/online_store/test_remote_online_store.py

Lines changed: 150 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,28 @@
11
import logging
22
import os
33
import tempfile
4+
from datetime import timedelta
45
from textwrap import dedent
56

7+
import pandas as pd
68
import pytest
79

8-
from feast import FeatureView, OnDemandFeatureView, StreamFeatureView
10+
from feast import (
11+
Entity,
12+
FeatureView,
13+
Field,
14+
FileSource,
15+
OnDemandFeatureView,
16+
PushSource,
17+
StreamFeatureView,
18+
)
19+
from feast.data_source import PushMode
920
from feast.feature_store import FeatureStore
1021
from feast.permissions.action import AuthzedAction
1122
from feast.permissions.permission import Permission
1223
from feast.permissions.policy import RoleBasedPolicy
24+
from feast.types import Float32, Int64
25+
from feast.utils import _utc_now
1326
from tests.utils.auth_permissions_util import (
1427
PROJECT_NAME,
1528
default_store,
@@ -235,7 +248,6 @@ def _create_remote_client_feature_store(
235248
if is_tls_mode and ca_trust_store_path:
236249
# configure trust store path only when is_tls_mode and ca_trust_store_path exists.
237250
os.environ["FEAST_CA_CERT_FILE_PATH"] = ca_trust_store_path
238-
239251
return FeatureStore(repo_path=repo_path)
240252

241253

@@ -265,3 +277,139 @@ def _overwrite_remote_client_feature_store_yaml(
265277

266278
with open(repo_config, "w") as repo_config_file:
267279
repo_config_file.write(config_content)
280+
281+
282+
@pytest.mark.integration
283+
@pytest.mark.rbac_remote_integration_test
284+
@pytest.mark.parametrize(
285+
"tls_mode", [("True", "True"), ("True", "False"), ("False", "")], indirect=True
286+
)
287+
def test_remote_online_store_read_write(auth_config, tls_mode):
288+
with (
289+
tempfile.TemporaryDirectory() as remote_server_tmp_dir,
290+
tempfile.TemporaryDirectory() as remote_client_tmp_dir,
291+
):
292+
permissions_list = [
293+
Permission(
294+
name="online_list_fv_perm",
295+
types=FeatureView,
296+
policy=RoleBasedPolicy(roles=["reader"]),
297+
actions=[AuthzedAction.READ_ONLINE],
298+
),
299+
Permission(
300+
name="online_list_odfv_perm",
301+
types=OnDemandFeatureView,
302+
policy=RoleBasedPolicy(roles=["reader"]),
303+
actions=[AuthzedAction.READ_ONLINE],
304+
),
305+
Permission(
306+
name="online_list_sfv_perm",
307+
types=StreamFeatureView,
308+
policy=RoleBasedPolicy(roles=["reader"]),
309+
actions=[AuthzedAction.READ_ONLINE],
310+
),
311+
Permission(
312+
name="online_write_fv_perm",
313+
types=FeatureView,
314+
policy=RoleBasedPolicy(roles=["writer"]),
315+
actions=[AuthzedAction.WRITE_ONLINE],
316+
),
317+
Permission(
318+
name="online_write_odfv_perm",
319+
types=OnDemandFeatureView,
320+
policy=RoleBasedPolicy(roles=["writer"]),
321+
actions=[AuthzedAction.WRITE_ONLINE],
322+
),
323+
Permission(
324+
name="online_write_sfv_perm",
325+
types=StreamFeatureView,
326+
policy=RoleBasedPolicy(roles=["writer"]),
327+
actions=[AuthzedAction.WRITE_ONLINE],
328+
),
329+
]
330+
server_store, server_url, registry_path = (
331+
_create_server_store_spin_feature_server(
332+
temp_dir=remote_server_tmp_dir,
333+
auth_config=auth_config,
334+
permissions_list=permissions_list,
335+
tls_mode=tls_mode,
336+
)
337+
)
338+
assert None not in (server_store, server_url, registry_path)
339+
340+
client_store = _create_remote_client_feature_store(
341+
temp_dir=remote_client_tmp_dir,
342+
server_registry_path=str(registry_path),
343+
feature_server_url=server_url,
344+
auth_config=auth_config,
345+
tls_mode=tls_mode,
346+
)
347+
assert client_store is not None
348+
349+
# Define a simple FeatureView for testing write operations
350+
driver = Entity(name="driver_id", description="Drivers id")
351+
352+
driver_hourly_stats_source = FileSource(
353+
path="data/driver_stats.parquet", # Path is not used for online writes in this context
354+
timestamp_field="event_timestamp",
355+
created_timestamp_column="created",
356+
)
357+
358+
PushSource(
359+
name="driver_stats_push_source",
360+
batch_source=driver_hourly_stats_source,
361+
)
362+
363+
driver_hourly_stats_fv = FeatureView(
364+
name="driver_hourly_stats",
365+
entities=[driver],
366+
ttl=timedelta(days=1),
367+
schema=[
368+
Field(name="conv_rate", dtype=Float32),
369+
Field(name="acc_rate", dtype=Float32),
370+
Field(name="avg_daily_trips", dtype=Int64),
371+
],
372+
source=driver_hourly_stats_source,
373+
tags={},
374+
)
375+
376+
# Apply the feature view to the client store
377+
client_store.apply([driver, driver_hourly_stats_fv])
378+
event_df = pd.DataFrame(
379+
{
380+
"driver_id": [1000, 1001],
381+
"conv_rate": [0.56, 0.74],
382+
"acc_rate": [0.95, 0.93],
383+
"avg_daily_trips": [50, 45],
384+
"event_timestamp": [pd.Timestamp(_utc_now()).round("ms")] * 2,
385+
"created": [pd.Timestamp(_utc_now()).round("ms")] * 2,
386+
}
387+
)
388+
389+
# Perform the online write
390+
client_store.push(
391+
push_source_name="driver_stats_push_source", df=event_df, to=PushMode.ONLINE
392+
)
393+
394+
# Verify the data by reading it back
395+
# read_entity_keys = [entity_key_1, entity_key_2]
396+
read_features = [
397+
"driver_hourly_stats_fresh:conv_rate",
398+
"driver_hourly_stats_fresh:acc_rate",
399+
"driver_hourly_stats_fresh:avg_daily_trips",
400+
]
401+
online_features = client_store.get_online_features(
402+
features=read_features,
403+
entity_rows=[{"driver_id": 1000}, {"driver_id": 1001}],
404+
).to_dict()
405+
406+
# Assertions for read data
407+
assert online_features is not None
408+
assert len(online_features["driver_id"]) == 2
409+
assert online_features["driver_id"] == [1000, 1001]
410+
assert [round(val, 2) for val in online_features["conv_rate"]] == [0.56, 0.74]
411+
assert [round(val, 2) for val in online_features["acc_rate"]] == [0.95, 0.93]
412+
assert online_features["avg_daily_trips"] == [50, 45]
413+
414+
# Clean up the applied feature view from the server store to avoid interference with other tests
415+
server_store.teardown()

0 commit comments

Comments
 (0)