Skip to content

Commit 9550904

Browse files
committed
Comments Resolved and code shortened
Signed-off-by: jyejare <jyejare@redhat.com>
1 parent dcc0b1c commit 9550904

File tree

1 file changed

+19
-36
lines changed
  • sdk/python/feast/infra/online_stores

1 file changed

+19
-36
lines changed

sdk/python/feast/infra/online_stores/remote.py

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,24 @@
1313
# limitations under the License.
1414
import json
1515
import logging
16+
from collections import defaultdict
1617
from datetime import datetime
1718
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple
1819

1920
import requests
2021
from pydantic import StrictStr
2122

2223
from feast import Entity, FeatureView, RepoConfig
24+
from feast.infra.online_stores.helpers import _to_naive_utc
2325
from feast.infra.online_stores.online_store import OnlineStore
2426
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2527
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
2628
from feast.repo_config import FeastConfigBaseModel
2729
from feast.rest_error_handler import rest_error_handling_decorator
28-
from feast.type_map import python_values_to_proto_values
30+
from feast.type_map import (
31+
feast_value_type_to_python_type,
32+
python_values_to_proto_values,
33+
)
2934
from feast.value_type import ValueType
3035

3136
logger = logging.getLogger(__name__)
@@ -66,46 +71,29 @@ def online_write_batch(
6671
assert isinstance(config.online_store, RemoteOnlineStoreConfig)
6772
config.online_store.__class__ = RemoteOnlineStoreConfig
6873

69-
# Restructure data into a columnar dictionary format for the 'df' key
70-
columnar_data: Dict[str, List[Any]] = {}
74+
columnar_data: Dict[str, List[Any]] = defaultdict(list)
7175

72-
# Collect all unique entity key names and feature names
73-
all_keys = set()
74-
for entity_key_proto, feature_values_proto, _, _ in data:
75-
for join_key in entity_key_proto.join_keys:
76-
all_keys.add(join_key)
77-
for feature_name in feature_values_proto.keys():
78-
all_keys.add(feature_name)
79-
all_keys.add("event_timestamp")
80-
if data and data[0][3] is not None: # Check if created_ts is present
81-
all_keys.add("created")
82-
83-
# Initialize columnar data dictionary with empty lists
84-
for key in all_keys:
85-
columnar_data[key] = []
86-
87-
# Populate the columnar data
76+
# Iterate through each row to populate columnar data directly
8877
for entity_key_proto, feature_values_proto, event_ts, created_ts in data:
8978
# Populate entity key values
9079
for join_key, entity_value_proto in zip(
9180
entity_key_proto.join_keys, entity_key_proto.entity_values
9281
):
9382
columnar_data[join_key].append(
94-
self.value_proto_to_python(entity_value_proto)
83+
feast_value_type_to_python_type(entity_value_proto)
9584
)
9685

9786
# Populate feature values
9887
for feature_name, feature_value_proto in feature_values_proto.items():
9988
columnar_data[feature_name].append(
100-
self.value_proto_to_python(feature_value_proto)
89+
feast_value_type_to_python_type(feature_value_proto)
10190
)
10291

10392
# Populate timestamps
104-
columnar_data["event_timestamp"].append(event_ts.isoformat())
105-
if "created" in columnar_data:
106-
columnar_data["created"].append(
107-
created_ts.isoformat() if created_ts else None
108-
)
93+
columnar_data["event_timestamp"].append(_to_naive_utc(event_ts).isoformat())
94+
columnar_data["created"].append(
95+
_to_naive_utc(event_ts).isoformat() if created_ts else None
96+
)
10997

11098
req_body = {
11199
"feature_view_name": table.name,
@@ -121,16 +109,11 @@ def online_write_batch(
121109
raise RuntimeError(error_msg)
122110

123111
if progress:
124-
progress(len(data))
125-
126-
def value_proto_to_python(self, value_proto: ValueProto):
127-
"""
128-
Convert a ValueProto to a native Python value for JSON serialization.
129-
"""
130-
kind = value_proto.WhichOneof("val")
131-
if kind is None:
132-
return None
133-
return getattr(value_proto, kind)
112+
data_length = len(data)
113+
logger.info(
114+
f"Writing {data_length} rows to the remote store for feature view {table.name}."
115+
)
116+
progress(data_length)
134117

135118
def online_read(
136119
self,

0 commit comments

Comments
 (0)