Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Comments Resolved and code shortened
Signed-off-by: jyejare <jyejare@redhat.com>
  • Loading branch information
jyejare committed Jun 9, 2025
commit bab674be75f46e9f74f41338f8c4a8b79d4dea6b
55 changes: 19 additions & 36 deletions sdk/python/feast/infra/online_stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,24 @@
# limitations under the License.
import json
import logging
from collections import defaultdict
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple

import requests
from pydantic import StrictStr

from feast import Entity, FeatureView, RepoConfig
from feast.infra.online_stores.helpers import _to_naive_utc
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
from feast.rest_error_handler import rest_error_handling_decorator
from feast.type_map import python_values_to_proto_values
from feast.type_map import (
feast_value_type_to_python_type,
python_values_to_proto_values,
)
from feast.value_type import ValueType

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -66,46 +71,29 @@ def online_write_batch(
assert isinstance(config.online_store, RemoteOnlineStoreConfig)
config.online_store.__class__ = RemoteOnlineStoreConfig
Comment thread
jyejare marked this conversation as resolved.

# Restructure data into a columnar dictionary format for the 'df' key
columnar_data: Dict[str, List[Any]] = {}
columnar_data: Dict[str, List[Any]] = defaultdict(list)

# Collect all unique entity key names and feature names
all_keys = set()
for entity_key_proto, feature_values_proto, _, _ in data:
for join_key in entity_key_proto.join_keys:
all_keys.add(join_key)
for feature_name in feature_values_proto.keys():
all_keys.add(feature_name)
all_keys.add("event_timestamp")
if data and data[0][3] is not None: # Check if created_ts is present
all_keys.add("created")

# Initialize columnar data dictionary with empty lists
for key in all_keys:
columnar_data[key] = []

# Populate the columnar data
# Iterate through each row to populate columnar data directly
for entity_key_proto, feature_values_proto, event_ts, created_ts in data:
# Populate entity key values
for join_key, entity_value_proto in zip(
entity_key_proto.join_keys, entity_key_proto.entity_values
):
columnar_data[join_key].append(
self.value_proto_to_python(entity_value_proto)
feast_value_type_to_python_type(entity_value_proto)
)

# Populate feature values
for feature_name, feature_value_proto in feature_values_proto.items():
columnar_data[feature_name].append(
self.value_proto_to_python(feature_value_proto)
feast_value_type_to_python_type(feature_value_proto)
)

# Populate timestamps
columnar_data["event_timestamp"].append(event_ts.isoformat())
if "created" in columnar_data:
columnar_data["created"].append(
created_ts.isoformat() if created_ts else None
)
columnar_data["event_timestamp"].append(_to_naive_utc(event_ts).isoformat())
columnar_data["created"].append(
_to_naive_utc(created_ts).isoformat() if created_ts else None
)

req_body = {
"feature_view_name": table.name,
Expand All @@ -121,16 +109,11 @@ def online_write_batch(
raise RuntimeError(error_msg)

if progress:
Comment thread
jyejare marked this conversation as resolved.
progress(len(data))

def value_proto_to_python(self, value_proto: ValueProto):
"""
Convert a ValueProto to a native Python value for JSON serialization.
"""
kind = value_proto.WhichOneof("val")
if kind is None:
return None
return getattr(value_proto, kind)
data_length = len(data)
logger.info(
f"Writing {data_length} rows to the remote store for feature view {table.name}."
)
progress(data_length)

def online_read(
self,
Expand Down
Loading