|
1 | 1 | import logging |
2 | 2 | import os |
3 | 3 | import tempfile |
| 4 | +from datetime import timedelta |
4 | 5 | from textwrap import dedent |
5 | 6 |
|
| 7 | +import pandas as pd |
6 | 8 | import pytest |
7 | 9 |
|
8 | | -from feast import FeatureView, OnDemandFeatureView, StreamFeatureView |
| 10 | +from feast import ( |
| 11 | + Entity, |
| 12 | + FeatureView, |
| 13 | + Field, |
| 14 | + FileSource, |
| 15 | + OnDemandFeatureView, |
| 16 | + PushSource, |
| 17 | + StreamFeatureView, |
| 18 | +) |
| 19 | +from feast.data_source import PushMode |
9 | 20 | from feast.feature_store import FeatureStore |
10 | 21 | from feast.permissions.action import AuthzedAction |
11 | 22 | from feast.permissions.permission import Permission |
12 | 23 | from feast.permissions.policy import RoleBasedPolicy |
| 24 | +from feast.types import Float32, Int64 |
| 25 | +from feast.utils import _utc_now |
13 | 26 | from tests.utils.auth_permissions_util import ( |
14 | 27 | PROJECT_NAME, |
15 | 28 | default_store, |
@@ -235,7 +248,6 @@ def _create_remote_client_feature_store( |
235 | 248 | if is_tls_mode and ca_trust_store_path: |
236 | 249 | # configure trust store path only when is_tls_mode and ca_trust_store_path exists. |
237 | 250 | os.environ["FEAST_CA_CERT_FILE_PATH"] = ca_trust_store_path |
238 | | - |
239 | 251 | return FeatureStore(repo_path=repo_path) |
240 | 252 |
|
241 | 253 |
|
@@ -265,3 +277,139 @@ def _overwrite_remote_client_feature_store_yaml( |
265 | 277 |
|
266 | 278 | with open(repo_config, "w") as repo_config_file: |
267 | 279 | repo_config_file.write(config_content) |
| 280 | + |
| 281 | + |
| 282 | +@pytest.mark.integration |
| 283 | +@pytest.mark.rbac_remote_integration_test |
| 284 | +@pytest.mark.parametrize( |
| 285 | + "tls_mode", [("True", "True"), ("True", "False"), ("False", "")], indirect=True |
| 286 | +) |
| 287 | +def test_remote_online_store_read_write(auth_config, tls_mode): |
| 288 | + with ( |
| 289 | + tempfile.TemporaryDirectory() as remote_server_tmp_dir, |
| 290 | + tempfile.TemporaryDirectory() as remote_client_tmp_dir, |
| 291 | + ): |
| 292 | + permissions_list = [ |
| 293 | + Permission( |
| 294 | + name="online_list_fv_perm", |
| 295 | + types=FeatureView, |
| 296 | + policy=RoleBasedPolicy(roles=["reader"]), |
| 297 | + actions=[AuthzedAction.READ_ONLINE], |
| 298 | + ), |
| 299 | + Permission( |
| 300 | + name="online_list_odfv_perm", |
| 301 | + types=OnDemandFeatureView, |
| 302 | + policy=RoleBasedPolicy(roles=["reader"]), |
| 303 | + actions=[AuthzedAction.READ_ONLINE], |
| 304 | + ), |
| 305 | + Permission( |
| 306 | + name="online_list_sfv_perm", |
| 307 | + types=StreamFeatureView, |
| 308 | + policy=RoleBasedPolicy(roles=["reader"]), |
| 309 | + actions=[AuthzedAction.READ_ONLINE], |
| 310 | + ), |
| 311 | + Permission( |
| 312 | + name="online_write_fv_perm", |
| 313 | + types=FeatureView, |
| 314 | + policy=RoleBasedPolicy(roles=["writer"]), |
| 315 | + actions=[AuthzedAction.WRITE_ONLINE], |
| 316 | + ), |
| 317 | + Permission( |
| 318 | + name="online_write_odfv_perm", |
| 319 | + types=OnDemandFeatureView, |
| 320 | + policy=RoleBasedPolicy(roles=["writer"]), |
| 321 | + actions=[AuthzedAction.WRITE_ONLINE], |
| 322 | + ), |
| 323 | + Permission( |
| 324 | + name="online_write_sfv_perm", |
| 325 | + types=StreamFeatureView, |
| 326 | + policy=RoleBasedPolicy(roles=["writer"]), |
| 327 | + actions=[AuthzedAction.WRITE_ONLINE], |
| 328 | + ), |
| 329 | + ] |
| 330 | + server_store, server_url, registry_path = ( |
| 331 | + _create_server_store_spin_feature_server( |
| 332 | + temp_dir=remote_server_tmp_dir, |
| 333 | + auth_config=auth_config, |
| 334 | + permissions_list=permissions_list, |
| 335 | + tls_mode=tls_mode, |
| 336 | + ) |
| 337 | + ) |
| 338 | + assert None not in (server_store, server_url, registry_path) |
| 339 | + |
| 340 | + client_store = _create_remote_client_feature_store( |
| 341 | + temp_dir=remote_client_tmp_dir, |
| 342 | + server_registry_path=str(registry_path), |
| 343 | + feature_server_url=server_url, |
| 344 | + auth_config=auth_config, |
| 345 | + tls_mode=tls_mode, |
| 346 | + ) |
| 347 | + assert client_store is not None |
| 348 | + |
| 349 | + # Define a simple FeatureView for testing write operations |
| 350 | + driver = Entity(name="driver_id", description="Drivers id") |
| 351 | + |
| 352 | + driver_hourly_stats_source = FileSource( |
| 353 | + path="data/driver_stats.parquet", # Path is not used for online writes in this context |
| 354 | + timestamp_field="event_timestamp", |
| 355 | + created_timestamp_column="created", |
| 356 | + ) |
| 357 | + |
| 358 | + PushSource( |
| 359 | + name="driver_stats_push_source", |
| 360 | + batch_source=driver_hourly_stats_source, |
| 361 | + ) |
| 362 | + |
| 363 | + driver_hourly_stats_fv = FeatureView( |
| 364 | + name="driver_hourly_stats", |
| 365 | + entities=[driver], |
| 366 | + ttl=timedelta(days=1), |
| 367 | + schema=[ |
| 368 | + Field(name="conv_rate", dtype=Float32), |
| 369 | + Field(name="acc_rate", dtype=Float32), |
| 370 | + Field(name="avg_daily_trips", dtype=Int64), |
| 371 | + ], |
| 372 | + source=driver_hourly_stats_source, |
| 373 | + tags={}, |
| 374 | + ) |
| 375 | + |
| 376 | + # Apply the feature view to the client store |
| 377 | + client_store.apply([driver, driver_hourly_stats_fv]) |
| 378 | + event_df = pd.DataFrame( |
| 379 | + { |
| 380 | + "driver_id": [1000, 1001], |
| 381 | + "conv_rate": [0.56, 0.74], |
| 382 | + "acc_rate": [0.95, 0.93], |
| 383 | + "avg_daily_trips": [50, 45], |
| 384 | + "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")] * 2, |
| 385 | + "created": [pd.Timestamp(_utc_now()).round("ms")] * 2, |
| 386 | + } |
| 387 | + ) |
| 388 | + |
| 389 | + # Perform the online write |
| 390 | + client_store.push( |
| 391 | + push_source_name="driver_stats_push_source", df=event_df, to=PushMode.ONLINE |
| 392 | + ) |
| 393 | + |
| 394 | + # Verify the data by reading it back |
| 395 | + # read_entity_keys = [entity_key_1, entity_key_2] |
| 396 | + read_features = [ |
| 397 | + "driver_hourly_stats_fresh:conv_rate", |
| 398 | + "driver_hourly_stats_fresh:acc_rate", |
| 399 | + "driver_hourly_stats_fresh:avg_daily_trips", |
| 400 | + ] |
| 401 | + online_features = client_store.get_online_features( |
| 402 | + features=read_features, |
| 403 | + entity_rows=[{"driver_id": 1000}, {"driver_id": 1001}], |
| 404 | + ).to_dict() |
| 405 | + |
| 406 | + # Assertions for read data |
| 407 | + assert online_features is not None |
| 408 | + assert len(online_features["driver_id"]) == 2 |
| 409 | + assert online_features["driver_id"] == [1000, 1001] |
| 410 | + assert [round(val, 2) for val in online_features["conv_rate"]] == [0.56, 0.74] |
| 411 | + assert [round(val, 2) for val in online_features["acc_rate"]] == [0.95, 0.93] |
| 412 | + assert online_features["avg_daily_trips"] == [50, 45] |
| 413 | + |
| 414 | + # Clean up the applied feature view from the server store to avoid interference with other tests |
| 415 | + server_store.teardown() |
0 commit comments