Skip to content
Prev Previous commit
Next Next commit
fix: fix five bugs in MilvusOnlineStore
1. update() overwrote self._collections with the raw describe_collection()
   dict from the last table instead of updating the keyed cache entry.
   _get_or_create_collection() already updates self._collections as a side
   effect, so the assignment is simply dropped.

2. plan() raised NotImplementedError instead of returning [] like the base
   class default.  This caused `feast plan` to crash for Milvus stores.

3. online_read() carried an extra full_feature_names parameter not present
   in the OnlineStore base class, violating the interface contract.  The
   parameter was unused and is removed.

4. retrieve_online_documents_v2() passed the raw hit.get() result (which
   can be None when the composite key is absent) directly to bytes.fromhex(),
   raising TypeError.  Guard added: only call bytes.fromhex() when the value
   is non-None.

5. Replace print() calls in _connect() with logger.info() so connection
   messages respect standard logging configuration instead of always writing
   to stdout.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Alex Korbonits <alex@korbonits.com>
  • Loading branch information
korbonits and claude committed Apr 14, 2026
commit 30ff5021e8860faf8c90331f5660582be9583d35
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Falsy check on distance drops valid 0.0 distance values

At line 746, if distance is used to decide whether to wrap the distance in a ValueProto(float_val=...). When distance is 0.0 (a valid distance meaning an exact match for L2 or cosine metrics), Python evaluates 0.0 as falsy, so the code returns an empty ValueProto() instead of ValueProto(float_val=0.0). Because float_val is inside a oneof val in the proto definition (protos/feast/types/Value.proto:78), ValueProto() leaves the oneof unset (WhichOneof("val")None), while ValueProto(float_val=0.0) correctly sets it to "float_val". This is the same bug pattern the PR fixes for raw_key at line 694. The postgres online store handles this correctly with if distance is not None: (sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py:777).

(Refers to line 746)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union
Expand Down Expand Up @@ -42,6 +43,8 @@
to_naive_utc,
)

logger = logging.getLogger(__name__)

PROTO_TO_MILVUS_TYPE_MAPPING: Dict[ValueType, DataType] = {
PROTO_VALUE_TO_VALUE_TYPE_MAP["bytes_val"]: DataType.VARCHAR,
ValueType.IMAGE_BYTES: DataType.VARCHAR,
Expand Down Expand Up @@ -140,11 +143,13 @@ def _connect(self, config: RepoConfig) -> MilvusClient:
if not self.client:
if config.provider == "local" and config.online_store.path:
db_path = self._get_db_path(config)
print(f"Connecting to Milvus in local mode using {db_path}")
logger.info("Connecting to Milvus in local mode using %s", db_path)
self.client = MilvusClient(db_path)
else:
print(
f"Connecting to Milvus remotely at {config.online_store.host}:{config.online_store.port}"
logger.info(
"Connecting to Milvus remotely at %s:%s",
config.online_store.host,
config.online_store.port,
)
self.client = MilvusClient(
uri=f"{config.online_store.host}:{config.online_store.port}",
Expand Down Expand Up @@ -339,7 +344,6 @@ def online_read(
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
full_feature_names: bool = False,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
self.client = self._connect(config)
collection_name = _table_id(config.project, table)
Expand Down Expand Up @@ -487,7 +491,7 @@ def update(
):
self.client = self._connect(config)
for table in tables_to_keep:
self._collections = self._get_or_create_collection(config, table)
self._get_or_create_collection(config, table)

for table in tables_to_delete:
collection_name = _table_id(config.project, table)
Expand All @@ -498,7 +502,7 @@ def update(
def plan(
self, config: RepoConfig, desired_registry_proto: RegistryProto
) -> List[InfraObject]:
raise NotImplementedError
return []

def teardown(
self,
Expand Down Expand Up @@ -686,9 +690,8 @@ def retrieve_online_documents_v2(
for hit in hits:
res = {}
res_ts = None
entity_key_bytes = bytes.fromhex(
hit.get("entity", {}).get(composite_key_name, None)
)
raw_key = hit.get("entity", {}).get(composite_key_name)
entity_key_bytes = bytes.fromhex(raw_key) if raw_key else None
entity_key_proto = (
deserialize_entity_key(entity_key_bytes)
if entity_key_bytes
Expand Down
Loading