Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Implementing online_read for MilvusOnlineStore
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Jan 31, 2025
commit f448b2c1476195135a336c6be1e5f172c605f571
7 changes: 3 additions & 4 deletions examples/rag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ The RAG architecture combines retrieval of documents (using vector search) with

3. Materialize features into the online store:

```bash
python -c "from datetime import datetime; from feast import FeatureStore; store = FeatureStore(repo_path='.')"
python -c "store.materialize_incremental(datetime.utcnow())"
```python
store.write_to_online_store(feature_view_name='city_embeddings', df=df)
```
4. Run a query:

Expand All @@ -61,7 +60,7 @@ feast apply
store.write_to_online_store(feature_view_name='city_embeddings', df=df)
```

-Inspect retrieved features using Python:
- Inspect retrieved features using Python:
```python
context_data = store.retrieve_online_documents_v2(
features=[
Expand Down
2 changes: 1 addition & 1 deletion examples/rag/milvus-quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@
}
],
"source": [
"! feast apply "
"! feast apply"
]
},
{
Expand Down
182 changes: 130 additions & 52 deletions sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from pydantic import StrictStr
from pymilvus import (
Collection,
CollectionSchema,
DataType,
FieldSchema,
Expand All @@ -20,13 +19,13 @@
)
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.online_stores.vector_store import VectorStoreConfig
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.type_map import (
PROTO_VALUE_TO_VALUE_TYPE_MAP,
VALUE_TYPE_TO_PROTO_VALUE_MAP,
feast_value_type_to_python_type,
)
from feast.types import (
Expand All @@ -35,6 +34,7 @@
ComplexFeastType,
PrimitiveFeastType,
ValueType,
from_feast_type,
)
from feast.utils import (
_serialize_vector_to_float_list,
Expand Down Expand Up @@ -146,9 +146,7 @@ def _get_or_create_collection(
collection_name = _table_id(config.project, table)
if collection_name not in self._collections:
# Create a composite key by combining entity fields
composite_key_name = (
"_".join([field.name for field in table.entity_columns]) + "_pk"
)
composite_key_name = _get_composite_key_name(table)

fields = [
FieldSchema(
Expand Down Expand Up @@ -251,9 +249,8 @@ def online_write_batch(
).hex()
# to recover the entity key just run:
# deserialize_entity_key(bytes.fromhex(entity_key_str), entity_key_serialization_version=3)
composite_key_name = (
"_".join([str(value) for value in entity_key.join_keys]) + "_pk"
)
composite_key_name = _get_composite_key_name(table)

timestamp_int = int(to_naive_utc(timestamp).timestamp() * 1e6)
created_ts_int = (
int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0
Expand Down Expand Up @@ -294,7 +291,106 @@ def online_read(
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
raise NotImplementedError
self.client = self._connect(config)
collection_name = _table_id(config.project, table)
collection = self._get_or_create_collection(config, table)

composite_key_name = _get_composite_key_name(table)

output_fields = (
[composite_key_name]
+ (requested_features if requested_features else [])
+ ["created_ts", "event_ts"]
)
assert all(
field in [f["name"] for f in collection["fields"]]
for field in output_fields
), (
f"field(s) [{[field for field in output_fields if field not in [f['name'] for f in collection['fields']]]}] not found in collection schema"
)
composite_entities = []
for entity_key in entity_keys:
entity_key_str = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
composite_entities.append(entity_key_str)

query_filter_for_entities = (
f"{composite_key_name} in ["
+ ", ".join([f"'{e}'" for e in composite_entities])
+ "]"
)
self.client.load_collection(collection_name)
results = self.client.query(
collection_name=collection_name,
filter=query_filter_for_entities,
output_fields=output_fields,
)

feature_name_feast_primitive_type_map = {
f.name: f.dtype for f in table.features
}
# here we need to map the data stored as characters back into the protobuf value
result_list = []
for hit in results:
res = {}
res_ts = None
for field in output_fields:
val = ValueProto()
field_value = hit.get(field, None)
if field in ["created_ts", "event_ts"]:
res_ts = datetime.fromtimestamp(field_value / 1e6)
elif field == composite_key_name:
# We do not return the composite key value
pass
else:
feature_feast_primitive_type = (
feature_name_feast_primitive_type_map.get(
field, PrimitiveFeastType.INVALID
)
)
feature_fv_dtype = from_feast_type(feature_feast_primitive_type)
proto_attr = VALUE_TYPE_TO_PROTO_VALUE_MAP.get(feature_fv_dtype)
if proto_attr:
if proto_attr == "bytes_val":
setattr(val, proto_attr, field_value.encode())
elif proto_attr in [
"int32_val",
"int64_val",
"float_val",
"double_val",
]:
setattr(
val,
proto_attr,
type(getattr(val, proto_attr))(field_value),
)
elif proto_attr in [
"int32_list_val",
"int64_list_val",
"float_list_val",
"double_list_val",
]:
setattr(
val,
proto_attr,
list(
map(
type(getattr(val, proto_attr)).__args__[0],
field_value,
)
),
)
else:
setattr(val, proto_attr, field_value)
else:
raise ValueError(
f"Unsupported ValueType: {feature_feast_primitive_type} with feature view value {field_value} for feature {field} with value {field_value}"
)
res[field] = val
result_list.append((res_ts, res if res else None))
return result_list

def update(
self,
Expand Down Expand Up @@ -362,11 +458,7 @@ def retrieve_online_documents_v2(
"params": {"nprobe": 10},
}

composite_key_name = (
"_".join([str(field.name) for field in table.entity_columns]) + "_pk"
)
# features_str = ", ".join([f"'{f}'" for f in requested_features])
# expr = f" && feature_name in [{features_str}]"
composite_key_name = _get_composite_key_name(table)

output_fields = (
[composite_key_name]
Expand Down Expand Up @@ -452,6 +544,10 @@ def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"


def _get_composite_key_name(table: FeatureView) -> str:
return "_".join([field.name for field in table.entity_columns]) + "_pk"


def _extract_proto_values_to_dict(
input_dict: Dict[str, Any],
vector_cols: List[str],
Expand All @@ -462,6 +558,13 @@ def _extract_proto_values_to_dict(
for k in PROTO_VALUE_TO_VALUE_TYPE_MAP.keys()
if k is not None and "list" in k and "string" not in k
]
numeric_types = [
"double_val",
"float_val",
"int32_val",
"int64_val",
"bool_val",
]
output_dict = {}
for feature_name, feature_values in input_dict.items():
for proto_val_type in PROTO_VALUE_TO_VALUE_TYPE_MAP:
Expand All @@ -475,51 +578,26 @@ def _extract_proto_values_to_dict(
else:
vector_values = getattr(feature_values, proto_val_type).val
else:
if serialize_to_string and proto_val_type != "string_val":
if (
serialize_to_string
and proto_val_type not in ["string_val"] + numeric_types
):
vector_values = feature_values.SerializeToString().decode()
else:
vector_values = getattr(feature_values, proto_val_type)
if not isinstance(feature_values, str):
vector_values = str(
getattr(feature_values, proto_val_type)
)
else:
vector_values = getattr(feature_values, proto_val_type)
output_dict[feature_name] = vector_values
else:
if serialize_to_string:
if not isinstance(feature_values, str):
print(
f"converting {feature_name} with value = {feature_values} to string"
)
feature_values = str(feature_values)
output_dict[feature_name] = feature_values

return output_dict


class MilvusTable(InfraObject):
"""
A Milvus collection managed by Feast.

Attributes:
host: The host of the Milvus server.
port: The port of the Milvus server.
name: The name of the collection.
"""

host: str
port: int

def __init__(self, host: str, port: int, name: str):
super().__init__(name)
self.host = host
self.port = port
self._connect()

def _connect(self):
raise NotImplementedError

def to_infra_object_proto(self) -> InfraObjectProto:
# Implement serialization if needed
raise NotImplementedError

def update(self):
# Implement update logic if needed
raise NotImplementedError

def teardown(self):
collection = Collection(name=self.name)
if collection.exists():
collection.drop()
4 changes: 4 additions & 0 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,10 @@ def python_values_to_proto_values(
"bool_list_val": ValueType.BOOL_LIST,
}

VALUE_TYPE_TO_PROTO_VALUE_MAP: Dict[ValueType, str] = {
v: k for k, v in PROTO_VALUE_TO_VALUE_TYPE_MAP.items()
}


def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType:
"""
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/feast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,23 @@ def from_value_type(
return VALUE_TYPES_TO_FEAST_TYPES[value_type]

raise ValueError(f"Could not convert value type {value_type} to FeastType.")


def from_feast_type(
feast_type: FeastType,
) -> ValueType:
"""
Converts a Feast type to a ValueType enum.

Args:
feast_type: The Feast type to be converted.

Comment thread
franciscojavierarceo marked this conversation as resolved.
Raises:
ValueError: The conversion could not be performed.
"""
if feast_type in VALUE_TYPES_TO_FEAST_TYPES.values():
return list(VALUE_TYPES_TO_FEAST_TYPES.keys())[
list(VALUE_TYPES_TO_FEAST_TYPES.values()).index(feast_type)
]

raise ValueError(f"Could not convert feast type {feast_type} to ValueType.")
7 changes: 6 additions & 1 deletion sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@
name="document_embeddings",
entities=[item],
schema=[
Field(name="Embeddings", dtype=Array(Float32)),
Field(
name="Embeddings",
dtype=Array(Float32),
vector_index=True,
vector_search_metric="L2",
),
Field(name="item_id", dtype=String),
],
source=rag_documents_source,
Expand Down
Loading