1313# limitations under the License.
1414import json
1515import logging
16+ from collections import defaultdict
1617from datetime import datetime
1718from typing import Any , Callable , Dict , List , Literal , Optional , Sequence , Tuple
1819
1920import requests
2021from pydantic import StrictStr
2122
2223from feast import Entity , FeatureView , RepoConfig
24+ from feast .infra .online_stores .helpers import _to_naive_utc
2325from feast .infra .online_stores .online_store import OnlineStore
2426from feast .protos .feast .types .EntityKey_pb2 import EntityKey as EntityKeyProto
2527from feast .protos .feast .types .Value_pb2 import Value as ValueProto
2628from feast .repo_config import FeastConfigBaseModel
2729from feast .rest_error_handler import rest_error_handling_decorator
28- from feast .type_map import python_values_to_proto_values
30+ from feast .type_map import (
31+ feast_value_type_to_python_type ,
32+ python_values_to_proto_values ,
33+ )
2934from feast .value_type import ValueType
3035
3136logger = logging .getLogger (__name__ )
@@ -66,46 +71,29 @@ def online_write_batch(
6671 assert isinstance (config .online_store , RemoteOnlineStoreConfig )
6772 config .online_store .__class__ = RemoteOnlineStoreConfig
6873
69- # Restructure data into a columnar dictionary format for the 'df' key
70- columnar_data : Dict [str , List [Any ]] = {}
74+ columnar_data : Dict [str , List [Any ]] = defaultdict (list )
7175
72- # Collect all unique entity key names and feature names
73- all_keys = set ()
74- for entity_key_proto , feature_values_proto , _ , _ in data :
75- for join_key in entity_key_proto .join_keys :
76- all_keys .add (join_key )
77- for feature_name in feature_values_proto .keys ():
78- all_keys .add (feature_name )
79- all_keys .add ("event_timestamp" )
80- if data and data [0 ][3 ] is not None : # Check if created_ts is present
81- all_keys .add ("created" )
82-
83- # Initialize columnar data dictionary with empty lists
84- for key in all_keys :
85- columnar_data [key ] = []
86-
87- # Populate the columnar data
76+ # Iterate through each row to populate columnar data directly
8877 for entity_key_proto , feature_values_proto , event_ts , created_ts in data :
8978 # Populate entity key values
9079 for join_key , entity_value_proto in zip (
9180 entity_key_proto .join_keys , entity_key_proto .entity_values
9281 ):
9382 columnar_data [join_key ].append (
94- self . value_proto_to_python (entity_value_proto )
83+ feast_value_type_to_python_type (entity_value_proto )
9584 )
9685
9786 # Populate feature values
9887 for feature_name , feature_value_proto in feature_values_proto .items ():
9988 columnar_data [feature_name ].append (
100- self . value_proto_to_python (feature_value_proto )
89+ feast_value_type_to_python_type (feature_value_proto )
10190 )
10291
10392 # Populate timestamps
104- columnar_data ["event_timestamp" ].append (event_ts .isoformat ())
105- if "created" in columnar_data :
106- columnar_data ["created" ].append (
107- created_ts .isoformat () if created_ts else None
108- )
93+ columnar_data ["event_timestamp" ].append (_to_naive_utc (event_ts ).isoformat ())
94+ columnar_data ["created" ].append (
95+ _to_naive_utc (event_ts ).isoformat () if created_ts else None
96+ )
10997
11098 req_body = {
11199 "feature_view_name" : table .name ,
@@ -121,16 +109,11 @@ def online_write_batch(
121109 raise RuntimeError (error_msg )
122110
123111 if progress :
124- progress (len (data ))
125-
126- def value_proto_to_python (self , value_proto : ValueProto ):
127- """
128- Convert a ValueProto to a native Python value for JSON serialization.
129- """
130- kind = value_proto .WhichOneof ("val" )
131- if kind is None :
132- return None
133- return getattr (value_proto , kind )
112+ data_length = len (data )
113+ logger .info (
114+ f"Writing { data_length } rows to the remote store for feature view { table .name } ."
115+ )
116+ progress (data_length )
134117
135118 def online_read (
136119 self ,
0 commit comments