-
Notifications
You must be signed in to change notification settings - Fork 3
Added milvus write batch functionality #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
cfde2f7
a8e5c79
97c4789
f3d5af8
a6518ce
1e375b4
423ca81
64396e2
880efed
cf9a21f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
| from datetime import datetime | ||
| from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple | ||
|
|
||
| import numpy as np | ||
| from pydantic.typing import Literal | ||
| from pymilvus import ( | ||
| Collection, | ||
|
|
@@ -91,9 +92,15 @@ def online_write_batch( | |
| ], | ||
| progress: Optional[Callable[[int], Any]], | ||
| ) -> None: | ||
| raise NotImplementedError( | ||
| "to be implemented in https://jira.expedia.biz/browse/EAPC-7971" | ||
| ) | ||
| with MilvusConnectionManager(config.online_store): | ||
| try: | ||
| rows = self._format_data_for_milvus(data) | ||
| collection_to_load_data = Collection(table.name) | ||
| collection_to_load_data.insert(rows) | ||
| # The flush call will seal any remaining segments and send them for indexing | ||
| collection_to_load_data.flush() | ||
| except Exception as e: | ||
| logger.error(f"Batch writing data failed due to {e}") | ||
|
|
||
| def online_read( | ||
| self, | ||
|
|
@@ -123,6 +130,7 @@ def update( | |
| if collection_available: | ||
| logger.info(f"Collection {table_to_keep.name} already exists.") | ||
| else: | ||
| # TODO: Enable dynamic schema option | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does this mean?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Milvus allows arbitrary additional fields including JSON documents with a
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for the explanation. Probably better to remove the to-do for now and have a spike in the future if that functionality is needed
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I had that note in to think about how we could potentially incorporate it but a spike ticket seems fine. |
||
| schema = self._convert_featureview_schema_to_milvus_readable( | ||
| table_to_keep.schema, | ||
| table_to_keep.vector_field, | ||
|
|
@@ -230,3 +238,31 @@ def _feast_to_milvus_data_type(self, feast_type: FeastType) -> DataType: | |
| # TODO: Need to think about list of binaries and list of bytes | ||
| # FeastType.BYTES_LIST: DataType.BINARY_VECTOR | ||
| }.get(feast_type, None) | ||
|
|
||
| def _format_data_for_milvus(self, feast_data): | ||
| """ | ||
| Data stored into Milvus takes the grouped representation approach where each feature value is grouped together: | ||
| [[1,2], [1,3]], [John, Lucy], [3,4]] | ||
|
|
||
| Parameters: | ||
| feast_data: List[ | ||
| Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]: Data represented for batch write in Feast | ||
|
|
||
| Returns: | ||
| List[List]: transformed_data: Data that can be directly written into Milvus | ||
| """ | ||
|
|
||
| milvus_data = [] | ||
| for entity_key, values, timestamp, created_ts in feast_data: | ||
| feature = [] | ||
| for feature_name, val in values.items(): | ||
| val_type = val.WhichOneof("val") | ||
| value = getattr(val, val_type) | ||
| if val_type == "float_list_val": | ||
| value = np.array(value.val) | ||
| # TODO: Check binary vector conversion | ||
| feature.append(value) | ||
| milvus_data.append(feature) | ||
|
|
||
| transformed_data = [list(item) for item in zip(*milvus_data)] | ||
| return transformed_data | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that we shouldn't put a try / except around the entire code and rather catch errors more targeted where they actually occur. We can address it later, but if you see ways to do it now please do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure