Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions sdk/python/feast/expediagroup/vectordb/milvus_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

import numpy as np
from pydantic.typing import Literal
from pymilvus import (
Collection,
Expand Down Expand Up @@ -91,9 +92,15 @@ def online_write_batch(
],
progress: Optional[Callable[[int], Any]],
) -> None:
raise NotImplementedError(
"to be implemented in https://jira.expedia.biz/browse/EAPC-7971"
)
with MilvusConnectionManager(config.online_store):
try:
rows = self._format_data_for_milvus(data)
collection_to_load_data = Collection(table.name)
collection_to_load_data.insert(rows)
# The flush call will seal any remaining segments and send them for indexing
collection_to_load_data.flush()
except Exception as e:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that we shouldn't put a try / except around the entire code and rather catch errors more targeted where they actually occur. We can address it later, but if you see ways to do it now please do so.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

logger.error(f"Batch writing data failed due to {e}")

def online_read(
self,
Expand Down Expand Up @@ -123,6 +130,7 @@ def update(
if collection_available:
logger.info(f"Collection {table_to_keep.name} already exists.")
else:
# TODO: Enable dynamic schema option
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean?

Copy link
Copy Markdown
Collaborator

@piket piket Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Milvus allows arbitrary additional fields including JSON documents with a enable_dynamic_field=True property in the CollectionSchema.
It would make adding new fields to a collection very easy, but I don't know if that really works the way Feast does. I believe if the FeatureView schema would change then the old collection would be flagged for deletion and a new one would be created.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanation. Probably better to remove the to-do for now and have a spike in the future if that functionality is needed

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I had that note in to think about how we could potentially incorporate it but a spike ticket seems fine.

schema = self._convert_featureview_schema_to_milvus_readable(
table_to_keep.schema,
table_to_keep.vector_field,
Expand Down Expand Up @@ -230,3 +238,31 @@ def _feast_to_milvus_data_type(self, feast_type: FeastType) -> DataType:
# TODO: Need to think about list of binaries and list of bytes
# FeastType.BYTES_LIST: DataType.BINARY_VECTOR
}.get(feast_type, None)

def _format_data_for_milvus(self, feast_data):
"""
Data stored into Milvus takes the grouped representation approach where each feature value is grouped together:
[[1,2], [1,3]], [John, Lucy], [3,4]]

Parameters:
feast_data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]: Data represented for batch write in Feast

Returns:
List[List]: transformed_data: Data that can be directly written into Milvus
"""

milvus_data = []
for entity_key, values, timestamp, created_ts in feast_data:
feature = []
for feature_name, val in values.items():
val_type = val.WhichOneof("val")
value = getattr(val, val_type)
if val_type == "float_list_val":
value = np.array(value.val)
# TODO: Check binary vector conversion
feature.append(value)
milvus_data.append(feature)

transformed_data = [list(item) for item in zip(*milvus_data)]
return transformed_data
78 changes: 78 additions & 0 deletions sdk/python/tests/expediagroup/test_milvus_online_store.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime

import pytest
from pymilvus import (
Expand All @@ -20,6 +21,9 @@
from feast.field import Field
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.offline_stores.file_source import FileSource
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import FloatList
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from feast.types import Array, Float32, Int64
from tests.expediagroup.milvus_online_store_creator import MilvusOnlineStoreCreator
Expand Down Expand Up @@ -136,6 +140,27 @@ def setup_method(self, milvus_online_setup):
utility.drop_collection(self.collection_to_write)
# Closing the temporary collection to do this

def create_n_customer_test_samples_milvus(self, n=10):
# Utility method to create sample data
return [
(
EntityKeyProto(
join_keys=["customer"],
entity_values=[ValueProto(string_val=str(i))],
),
{
"avg_orders_day": ValueProto(
float_list_val=FloatList(val=[1.0, 2.1, 3.3, 4.0, 5.0])
),
"name": ValueProto(string_val="John"),
"age": ValueProto(int64_val=3),
},
datetime.utcnow(),
None,
)
for i in range(n)
]

def test_milvus_update_add_collection(
self, repo_config, milvus_online_setup, caplog
):
Expand Down Expand Up @@ -377,3 +402,56 @@ def test_milvus_update_delete_unavailable_collection(

with PymilvusConnectionContext():
assert len(utility.list_collections()) == 0

def test_milvus_online_write_batch(self, repo_config, caplog, milvus_online_setup):

total_rows_to_write = 100

data = self.create_n_customer_test_samples_milvus(n=total_rows_to_write)

# Creating a common schema for collection to directly add to Milvus
milvus_schema = CollectionSchema(
fields=[
FieldSchema(
"avg_orders_day", DataType.FLOAT_VECTOR, is_primary=False, dim=5
),
FieldSchema(
"name",
DataType.VARCHAR,
description="string",
is_primary=True,
max_length=256,
),
FieldSchema("age", DataType.INT64, is_primary=False),
]
)

with PymilvusConnectionContext():
# Create a collection
collection = Collection(name=self.collection_to_write, schema=milvus_schema)
# Drop all indexes if any exists
collection.drop_index()
# Create a new index
index_params = {
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {"nlist": 1024},
}
collection.create_index("avg_orders_day", index_params)

vectorFeatureView = VectorFeatureView(
name=self.collection_to_write,
source=SOURCE,
vector_field="avg_orders_day",
dimensions=DIMENSIONS,
index_algorithm=IndexType.flat,
)

MilvusOnlineStore().online_write_batch(
config=repo_config, table=vectorFeatureView, data=data, progress=None
)

with PymilvusConnectionContext():
collection = Collection(name=self.collection_to_write)
progress = utility.index_building_progress(collection_name=collection.name)
assert progress["total_rows"] == total_rows_to_write