@@ -94,9 +94,9 @@ def online_write_batch(
9494 ) -> None :
9595 with MilvusConnectionManager (config .online_store ):
9696 try :
97- entities = self ._format_data_for_milvus (data )
97+ rows = self ._format_data_for_milvus (data )
9898 collection_to_load_data = Collection (table .name )
99- collection_to_load_data .insert (entities )
99+ collection_to_load_data .insert (rows )
100100 # The flush call will seal any remaining segments and send them for indexing
101101 collection_to_load_data .flush ()
102102 except Exception as e :
@@ -257,29 +257,11 @@ def _format_data_for_milvus(self, feast_data):
257257 feature = []
258258 for feature_name , val in values .items ():
259259 val_type = val .WhichOneof ("val" )
260+ value = getattr (val , val_type )
260261 if val_type == "float_list_val" :
261- float_list = val .float_list_val .val
262- final_float_list = np .array (float_list )
263- feature .append (final_float_list )
264- # TODO: Test out binary lists
265- if val_type == "biinary_list_val" :
266- binary_list = val .binary_list_val .val
267- feature .append (binary_list )
268- if val_type == "string_val" :
269- string_val = val .string_val
270- feature .append (string_val )
271- if val_type == "int32_val" :
272- int32_val = val .int32_val
273- feature .append (int32_val )
274- if val_type == "int64_val" :
275- int64_val = val .int64_val
276- feature .append (int64_val )
277- if val_type == "bytes_val" :
278- bytes_val = val .bytes_val
279- feature .append (bytes_val )
280- if val_type == "float_val" :
281- float_val = val .float_val
282- feature .append (float_val )
262+ value = np .array (value .val )
263+ # TODO: Check binary vector conversion
264+ feature .append (value )
283265 milvus_data .append (feature )
284266
285267 transformed_data = [list (item ) for item in zip (* milvus_data )]
0 commit comments