Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions examples/rag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ The RAG architecture combines retrieval of documents (using vector search) with

3. Materialize features into the online store:

```bash
python -c "from datetime import datetime; from feast import FeatureStore; store = FeatureStore(repo_path='.')"
python -c "store.materialize_incremental(datetime.utcnow())"
```python
store.write_to_online_store(feature_view_name='city_embeddings', df=df)
```
4. Run a query:

Expand All @@ -61,7 +60,7 @@ feast apply
store.write_to_online_store(feature_view_name='city_embeddings', df=df)
```

-Inspect retrieved features using Python:
- Inspect retrieved features using Python:
```python
context_data = store.retrieve_online_documents_v2(
features=[
Expand Down
2 changes: 1 addition & 1 deletion examples/rag/milvus-quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@
}
],
"source": [
"! feast apply "
"! feast apply"
]
},
{
Expand Down
205 changes: 153 additions & 52 deletions sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from pydantic import StrictStr
from pymilvus import (
Collection,
CollectionSchema,
DataType,
FieldSchema,
Expand All @@ -20,13 +19,13 @@
)
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.online_stores.vector_store import VectorStoreConfig
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.type_map import (
PROTO_VALUE_TO_VALUE_TYPE_MAP,
VALUE_TYPE_TO_PROTO_VALUE_MAP,
feast_value_type_to_python_type,
)
from feast.types import (
Expand All @@ -35,6 +34,7 @@
ComplexFeastType,
PrimitiveFeastType,
ValueType,
from_feast_type,
)
from feast.utils import (
_serialize_vector_to_float_list,
Expand Down Expand Up @@ -146,9 +146,7 @@ def _get_or_create_collection(
collection_name = _table_id(config.project, table)
if collection_name not in self._collections:
# Create a composite key by combining entity fields
composite_key_name = (
"_".join([field.name for field in table.entity_columns]) + "_pk"
)
composite_key_name = _get_composite_key_name(table)

fields = [
FieldSchema(
Expand Down Expand Up @@ -251,9 +249,8 @@ def online_write_batch(
).hex()
# to recover the entity key just run:
# deserialize_entity_key(bytes.fromhex(entity_key_str), entity_key_serialization_version=3)
composite_key_name = (
"_".join([str(value) for value in entity_key.join_keys]) + "_pk"
)
composite_key_name = _get_composite_key_name(table)

timestamp_int = int(to_naive_utc(timestamp).timestamp() * 1e6)
created_ts_int = (
int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0
Expand Down Expand Up @@ -293,8 +290,133 @@ def online_read(
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
full_feature_names: bool = False,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
raise NotImplementedError
self.client = self._connect(config)
collection_name = _table_id(config.project, table)
collection = self._get_or_create_collection(config, table)

composite_key_name = _get_composite_key_name(table)

output_fields = (
[composite_key_name]
+ (requested_features if requested_features else [])
+ ["created_ts", "event_ts"]
)
assert all(
field in [f["name"] for f in collection["fields"]]
for field in output_fields
), (
f"field(s) [{[field for field in output_fields if field not in [f['name'] for f in collection['fields']]]}] not found in collection schema"
)
composite_entities = []
for entity_key in entity_keys:
entity_key_str = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
composite_entities.append(entity_key_str)

query_filter_for_entities = (
f"{composite_key_name} in ["
+ ", ".join([f"'{e}'" for e in composite_entities])
+ "]"
)
self.client.load_collection(collection_name)
results = self.client.query(
collection_name=collection_name,
filter=query_filter_for_entities,
output_fields=output_fields,
)
# Group hits by composite key.
grouped_hits: Dict[str, Any] = {}
for hit in results:
key = hit.get(composite_key_name)
grouped_hits.setdefault(key, []).append(hit)

# Map the features to their Feast types.
feature_name_feast_primitive_type_map = {
f.name: f.dtype for f in table.features
}
# Build a dictionary mapping composite key -> (res_ts, res)
results_dict: Dict[
str, Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
] = {}

# here we need to map the data stored as characters back into the protobuf value
for hit in results:
key = hit.get(composite_key_name)
# Only take one hit per composite key (adjust if you need aggregation)
if key not in results_dict:
res = {}
res_ts = None
for field in output_fields:
val = ValueProto()
field_value = hit.get(field, None)
if field_value is None and ":" in field:
_, field_short = field.split(":", 1)
field_value = hit.get(field_short)

if field in ["created_ts", "event_ts"]:
res_ts = datetime.fromtimestamp(field_value / 1e6)
elif field == composite_key_name:
# We do not return the composite key value
pass
else:
feature_feast_primitive_type = (
feature_name_feast_primitive_type_map.get(
field, PrimitiveFeastType.INVALID
)
)
feature_fv_dtype = from_feast_type(feature_feast_primitive_type)
proto_attr = VALUE_TYPE_TO_PROTO_VALUE_MAP.get(feature_fv_dtype)
if proto_attr:
if proto_attr == "bytes_val":
setattr(val, proto_attr, field_value.encode())
elif proto_attr in [
"int32_val",
"int64_val",
"float_val",
"double_val",
]:
setattr(
val,
proto_attr,
type(getattr(val, proto_attr))(field_value),
)
elif proto_attr in [
"int32_list_val",
"int64_list_val",
"float_list_val",
"double_list_val",
]:
setattr(
val,
proto_attr,
list(
map(
type(getattr(val, proto_attr)).__args__[0],
field_value,
)
),
)
else:
setattr(val, proto_attr, field_value)
else:
raise ValueError(
f"Unsupported ValueType: {feature_feast_primitive_type} with feature view value {field_value} for feature {field} with value {field_value}"
)
# res[field] = val
key_to_use = field.split(":", 1)[-1] if ":" in field else field
res[key_to_use] = val
results_dict[key] = (res_ts, res if res else None)

# Map the results back into a list matching the original order of composite_keys.
result_list = [
results_dict.get(key, (None, None)) for key in composite_entities
]

return result_list

def update(
self,
Expand Down Expand Up @@ -362,11 +484,7 @@ def retrieve_online_documents_v2(
"params": {"nprobe": 10},
}

composite_key_name = (
"_".join([str(field.name) for field in table.entity_columns]) + "_pk"
)
# features_str = ", ".join([f"'{f}'" for f in requested_features])
# expr = f" && feature_name in [{features_str}]"
composite_key_name = _get_composite_key_name(table)

output_fields = (
[composite_key_name]
Expand Down Expand Up @@ -452,6 +570,10 @@ def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"


def _get_composite_key_name(table: FeatureView) -> str:
return "_".join([field.name for field in table.entity_columns]) + "_pk"


def _extract_proto_values_to_dict(
input_dict: Dict[str, Any],
vector_cols: List[str],
Expand All @@ -462,6 +584,13 @@ def _extract_proto_values_to_dict(
for k in PROTO_VALUE_TO_VALUE_TYPE_MAP.keys()
if k is not None and "list" in k and "string" not in k
]
numeric_types = [
"double_val",
"float_val",
"int32_val",
"int64_val",
"bool_val",
]
output_dict = {}
for feature_name, feature_values in input_dict.items():
for proto_val_type in PROTO_VALUE_TO_VALUE_TYPE_MAP:
Expand All @@ -475,10 +604,18 @@ def _extract_proto_values_to_dict(
else:
vector_values = getattr(feature_values, proto_val_type).val
else:
if serialize_to_string and proto_val_type != "string_val":
if (
serialize_to_string
and proto_val_type not in ["string_val"] + numeric_types
):
vector_values = feature_values.SerializeToString().decode()
else:
vector_values = getattr(feature_values, proto_val_type)
if not isinstance(feature_values, str):
vector_values = str(
getattr(feature_values, proto_val_type)
)
else:
vector_values = getattr(feature_values, proto_val_type)
output_dict[feature_name] = vector_values
else:
if serialize_to_string:
Expand All @@ -487,39 +624,3 @@ def _extract_proto_values_to_dict(
output_dict[feature_name] = feature_values

return output_dict


class MilvusTable(InfraObject):
"""
A Milvus collection managed by Feast.

Attributes:
host: The host of the Milvus server.
port: The port of the Milvus server.
name: The name of the collection.
"""

host: str
port: int

def __init__(self, host: str, port: int, name: str):
super().__init__(name)
self.host = host
self.port = port
self._connect()

def _connect(self):
raise NotImplementedError

def to_infra_object_proto(self) -> InfraObjectProto:
# Implement serialization if needed
raise NotImplementedError

def update(self):
# Implement update logic if needed
raise NotImplementedError

def teardown(self):
collection = Collection(name=self.name)
if collection.exists():
collection.drop()
4 changes: 4 additions & 0 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,10 @@ def python_values_to_proto_values(
"bool_list_val": ValueType.BOOL_LIST,
}

VALUE_TYPE_TO_PROTO_VALUE_MAP: Dict[ValueType, str] = {
v: k for k, v in PROTO_VALUE_TO_VALUE_TYPE_MAP.items()
}


def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType:
"""
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/feast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,23 @@ def from_value_type(
return VALUE_TYPES_TO_FEAST_TYPES[value_type]

raise ValueError(f"Could not convert value type {value_type} to FeastType.")


def from_feast_type(
feast_type: FeastType,
) -> ValueType:
"""
Converts a Feast type to a ValueType enum.

Args:
feast_type: The Feast type to be converted.

Comment thread
franciscojavierarceo marked this conversation as resolved.
Raises:
ValueError: The conversion could not be performed.
"""
if feast_type in VALUE_TYPES_TO_FEAST_TYPES.values():
return list(VALUE_TYPES_TO_FEAST_TYPES.keys())[
list(VALUE_TYPES_TO_FEAST_TYPES.values()).index(feast_type)
]

raise ValueError(f"Could not convert feast type {feast_type} to ValueType.")
7 changes: 6 additions & 1 deletion sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@
name="document_embeddings",
entities=[item],
schema=[
Field(name="Embeddings", dtype=Array(Float32)),
Field(
name="Embeddings",
dtype=Array(Float32),
vector_index=True,
vector_search_metric="L2",
),
Field(name="item_id", dtype=String),
],
source=rag_documents_source,
Expand Down
Loading