Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix Milvus integration for Set types
- Fix IndexParams len() check by tracking indices_added flag
- Fix Unicode decode error for Set types by including them in numeric_vector_list_types
- Fix Unicode decode error for complex types by using base64 encoding instead of decode()
- Fix duplicate timestamp fields in Milvus insertion by filtering them from values_dict
- Add missing timestamp field names to exclusion list in schema creation

Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
  • Loading branch information
franciscojavierarceo and claude committed Jan 27, 2026
commit 8ba622e330ff5c28e7b3e027bf81900f7cb2fc3b
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def _get_or_create_collection(
fields_to_exclude = [
"event_ts",
"created_ts",
"event_timestamp",
"created_timestamp",
]
fields_to_add = [f for f in table.schema if f.name not in fields_to_exclude]
for field in fields_to_add:
Expand Down Expand Up @@ -202,6 +204,7 @@ def _get_or_create_collection(
schema=schema,
)
index_params = self.client.prepare_index_params()
indices_added = False
for vector_field in schema.fields:
if (
vector_field.dtype
Expand All @@ -222,7 +225,8 @@ def _get_or_create_collection(
index_name=f"vector_index_{vector_field.name}",
params={"nlist": config.online_store.nlist},
)
if len(index_params) > 0:
indices_added = True
if indices_added:
self.client.create_index(
collection_name=collection_name,
index_params=index_params,
Expand Down Expand Up @@ -281,6 +285,11 @@ def online_write_batch(
serialize_to_string=True,
)

# Remove timestamp fields that are handled separately to avoid conflicts
timestamp_fields = ["event_timestamp", "created_timestamp", "event_ts", "created_ts"]
for field in timestamp_fields:
values_dict.pop(field, None)

single_entity_record = {
composite_key_name: entity_key_str,
"event_ts": timestamp_int,
Expand Down Expand Up @@ -722,7 +731,7 @@ def _extract_proto_values_to_dict(
numeric_vector_list_types = [
k
for k in PROTO_VALUE_TO_VALUE_TYPE_MAP.keys()
if k is not None and "list" in k and "string" not in k
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

special handling because milvus is difficult

if k is not None and ("list" in k or "set" in k) and "string" not in k
]
numeric_types = [
"double_val",
Expand All @@ -747,9 +756,10 @@ def _extract_proto_values_to_dict(
if (
serialize_to_string
and proto_val_type
not in ["string_val", "bytes_val"] + numeric_types
not in ["string_val", "bytes_val", "unix_timestamp_val"] + numeric_types
):
vector_values = feature_values.SerializeToString().decode()
# For complex types, use base64 encoding instead of decode
vector_values = base64.b64encode(feature_values.SerializeToString()).decode("utf-8")
elif proto_val_type == "bytes_val":
byte_data = getattr(feature_values, proto_val_type)
vector_values = base64.b64encode(byte_data).decode("utf-8")
Expand Down
1 change: 1 addition & 0 deletions test_registry
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1"$40dec8fa-7d8a-49af-b301-be3db7b879e3* ��������