Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ab7d4fb
merged rebase
franciscojavierarceo Dec 9, 2024
a6c9005
adding configuration
franciscojavierarceo Nov 11, 2024
b61b965
updated
franciscojavierarceo Nov 12, 2024
88bca02
changed things and linted
franciscojavierarceo Nov 15, 2024
81bd84a
adding updated builds
franciscojavierarceo Nov 15, 2024
083fe67
adding repo config
franciscojavierarceo Nov 20, 2024
fcf32d9
updated
franciscojavierarceo Nov 25, 2024
662119a
renaming test and adding milvus to integration test
franciscojavierarceo Nov 27, 2024
4a7edd8
not quite working but have milvus talking
franciscojavierarceo Nov 27, 2024
4c16c0b
updated tests
franciscojavierarceo Dec 1, 2024
fabac18
apply() method now works
franciscojavierarceo Dec 2, 2024
2ee74c5
updated setup
franciscojavierarceo Dec 9, 2024
1859e76
checking in progresss...getting there
franciscojavierarceo Dec 13, 2024
35557ce
making some progress
franciscojavierarceo Dec 14, 2024
c54309a
partially running
franciscojavierarceo Dec 15, 2024
0094e27
checking in progress...finding issues still
franciscojavierarceo Dec 16, 2024
25a065e
have the apply working
franciscojavierarceo Dec 16, 2024
f1a92e0
adjusting some type issues
franciscojavierarceo Dec 16, 2024
3282a05
updated and removed test
franciscojavierarceo Dec 16, 2024
2dee462
have things behaving with the enviornment arg
franciscojavierarceo Dec 17, 2024
be8c5bc
updated milvus
franciscojavierarceo Dec 17, 2024
dbc11e2
almost have retrieval working, having to make a lot of changes to onl…
franciscojavierarceo Dec 18, 2024
37d93f1
almost have deserialization from the search results done
franciscojavierarceo Dec 19, 2024
67ad688
got the retrieval working now too :D
franciscojavierarceo Dec 19, 2024
d1c15cf
updates to fix linter and new signature for all implementations
franciscojavierarceo Dec 20, 2024
6d94d7a
linter
franciscojavierarceo Dec 20, 2024
fe518ed
more linting
franciscojavierarceo Dec 20, 2024
a639013
Removing some unnecessary code
franciscojavierarceo Dec 20, 2024
c91681f
removing change to setup
franciscojavierarceo Dec 20, 2024
bd5ff48
adding sphinx docs
franciscojavierarceo Dec 20, 2024
32c9a9f
adjusting workflow
franciscojavierarceo Dec 23, 2024
22bac8b
adding logging to debug
franciscojavierarceo Dec 23, 2024
134b908
changing to vectordb environment
franciscojavierarceo Dec 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
almost have retrieval working, having to make a lot of changes to onl…
…ine retrieval. long term this can all go in the FeatureView class and in get_online_features

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Dec 22, 2024
commit dbc11e29d98f863e9ce15aec94778a92a8424031
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
)

PROTO_TO_MILVUS_TYPE_MAPPING: Dict[ValueType, DataType] = {
PROTO_VALUE_TO_VALUE_TYPE_MAP["bytes_val"]: DataType.STRING,
PROTO_VALUE_TO_VALUE_TYPE_MAP["bytes_val"]: DataType.VARCHAR,
PROTO_VALUE_TO_VALUE_TYPE_MAP["bool_val"]: DataType.BOOL,
PROTO_VALUE_TO_VALUE_TYPE_MAP["string_val"]: DataType.STRING,
PROTO_VALUE_TO_VALUE_TYPE_MAP["string_val"]: DataType.VARCHAR,
PROTO_VALUE_TO_VALUE_TYPE_MAP["float_val"]: DataType.FLOAT,
PROTO_VALUE_TO_VALUE_TYPE_MAP["double_val"]: DataType.DOUBLE,
PROTO_VALUE_TO_VALUE_TYPE_MAP["int32_val"]: DataType.INT32,
Expand Down Expand Up @@ -71,6 +71,8 @@
ValueType.DOUBLE,
]:
FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.FLOAT_VECTOR
elif base_value_type == ValueType.STRING:
FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.VARCHAR
elif base_value_type == ValueType.BOOL:
FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.BINARY_VECTOR

Expand Down Expand Up @@ -149,7 +151,14 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Collection:
dim=config.online_store.embedding_dim,
)
)

elif dtype == DataType.VARCHAR:
fields.append(
FieldSchema(
name=field.name,
dtype=dtype,
max_length=512,
)
)
else:
fields.append(FieldSchema(name=field.name, dtype=dtype))

Expand Down Expand Up @@ -210,17 +219,14 @@ def online_write_batch(
int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0
)
for feature_name in values_dict:
for vector_list_type_name in numeric_vector_list_types:
vector_list = getattr(
values_dict[feature_name], vector_list_type_name, None
)
if vector_list:
vector_values = getattr(
values_dict[feature_name], vector_list_type_name
).val
if vector_values != []:
# Note here we are over-writing the feature and collapsing the list into a single value
values_dict[feature_name] = vector_values
feature_values = values_dict[feature_name]
for proto_val_type in PROTO_VALUE_TO_VALUE_TYPE_MAP:
if feature_values.HasField(proto_val_type):
if proto_val_type in numeric_vector_list_types:
vector_values = getattr(feature_values, proto_val_type).val
else:
vector_values = getattr(feature_values, proto_val_type)
values_dict[feature_name] = vector_values

single_entity_record = {
composite_key_name: entity_key_str,
Expand All @@ -243,40 +249,7 @@ def online_read(
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
collection = self._get_collection(config, table)
results = []

for entity_key in entity_keys:
entity_key_str = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
expr = f"entity_key == '{entity_key_str}'"
if requested_features:
features_str = ", ".join([f"'{f}'" for f in requested_features])
expr += f" && feature_name in [{features_str}]"

res = collection.query(
expr,
output_fields=["feature_name", "value", "event_ts"],
consistency_level="Strong",
)

res_dict = {}
res_ts = None
for r in res:
feature_name = r["feature_name"]
val_bin = r["value"]
val = ValueProto()
val.ParseFromString(val_bin)
res_dict[feature_name] = val
res_ts = datetime.fromtimestamp(r["event_ts"] / 1e6)
if not res_dict:
results.append((None, None))
else:
results.append((res_ts, res_dict))

return results
raise NotImplementedError

def update(
self,
Expand Down Expand Up @@ -320,6 +293,7 @@ def retrieve_online_documents(
config: RepoConfig,
table: FeatureView,
requested_feature: str,
requested_features: List[str],
embedding: List[float],
top_k: int,
distance_metric: Optional[str] = None,
Expand All @@ -342,13 +316,22 @@ def retrieve_online_documents(
}
expr = f"feature_name == '{requested_feature}'"

composite_key_name = (
"_".join([str(value) for value in table.entity_columns]) + "_pk"
)
if requested_features:
features_str = ", ".join([f"'{f}'" for f in requested_features])
expr += f" && feature_name in [{features_str}]"

results = collection.search(
data=[embedding],
anns_field="vector_value",
param=search_params,
limit=top_k,
expr=expr,
output_fields=["entity_key", "value", "event_ts"],
output_fields=[composite_key_name]
+ requested_features
+ ["created_ts", "event_ts"],
consistency_level="Strong",
)

Expand Down
5 changes: 1 addition & 4 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@
driver,
location,
)
from tests.integration.feature_repos.universal.online_store.milvus import (
MilvusOnlineStoreCreator,
)
from tests.utils.auth_permissions_util import default_store
from tests.utils.http_server import check_port_open, free_port # noqa: E402
from tests.utils.ssl_certifcates_util import (
Expand Down Expand Up @@ -208,7 +205,6 @@ def environment(request, worker_id):
e.teardown()



@pytest.fixture
def vectordb_environment(request, worker_id):
db_config = IntegrationTestRepoConfig(
Expand All @@ -235,6 +231,7 @@ def vectordb_environment(request, worker_id):

e.teardown()


_config_cache: Any = {}


Expand Down
1 change: 1 addition & 0 deletions sdk/python/tests/foo_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def retrieve_online_documents(
config: RepoConfig,
table: FeatureView,
requested_feature: str,
requested_features: Optional[List[str]],
query: List[float],
top_k: int,
distance_metric: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,12 @@ def test_retrieve_online_documents2(environment, fake_document_data):
fs.apply([item_embeddings_feature_view, item()])
fs.write_to_online_store("item_embeddings", df)
documents = fs.retrieve_online_documents(
feature="item_embeddings:embedding_float",
feature=None,
features=[
"item_embeddings:embedding_float",
"item_embeddings:item_id",
"item_embeddings:string_feature",
],
query=[1.0, 2.0],
top_k=2,
distance_metric="L2",
Expand Down
3 changes: 0 additions & 3 deletions sdk/python/tests/unit/online_store/test_online_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

from feast import FeatureStore, RepoConfig
from feast.errors import FeatureViewNotFoundException
from feast.infra.online_stores.milvus_online_store.milvus import MilvusOnlineStoreConfig
from feast.infra.provider import Provider
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import FloatList as FloatListProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
Expand Down Expand Up @@ -563,4 +561,3 @@ def test_sqlite_vec_import() -> None:
""").fetchall()
result = [(rowid, round(distance, 2)) for rowid, distance in result]
assert result == [(2, 2.39), (1, 2.39)]