Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d372a09
ci: Add bigtable cleanup script
adchia Jul 3, 2023
f6d3caf
fix: Missing Catalog argument in athena connector (#3661)
GyuminJack Jul 3, 2023
d4f9158
ci: Disable flaky lambda materialization test
adchia Jul 3, 2023
4861af0
fix: Broken non-root path with projects-list.json (#3665)
bjfletcher Jul 3, 2023
48e0971
fix: Manage redis pipe's context (#3655)
j1wonpark Jul 4, 2023
315073f
chore: Bump tough-cookie from 4.0.0 to 4.1.3 in /sdk/python/feast/ui …
dependabot[bot] Jul 11, 2023
870762a
chore: Bump tough-cookie from 4.0.0 to 4.1.3 in /ui (#3676)
dependabot[bot] Jul 11, 2023
478caec
fix: For SQL registry, increase max data_source_name length to 255 (#…
radonnachie Jul 13, 2023
1c01035
fix: Optimize bytes processed when retrieving entity df schema to 0 (…
sudohainguyen Jul 13, 2023
ef4ef32
fix: Entityless fv breaks with `KeyError: __dummy` applying feature_s…
wfoschiera Jul 13, 2023
0ad2d62
chore: Bump protobufjs from 7.1.1 to 7.2.4 in /ui (#3674)
dependabot[bot] Jul 17, 2023
e4c0c9b
chore: Bump protobufjs from 7.1.2 to 7.2.4 in /sdk/python/feast/ui (#…
dependabot[bot] Jul 17, 2023
bef5791
chore: Bump semver from 6.3.0 to 6.3.1 in /ui (#3678)
dependabot[bot] Jul 17, 2023
928be7b
chore: Bump semver from 6.3.0 to 6.3.1 in /sdk/python/feast/ui (#3679)
dependabot[bot] Jul 17, 2023
12f57a9
chore: Bump google.golang.org/grpc from 1.47.0 to 1.53.0 (#3670)
dependabot[bot] Jul 17, 2023
9527183
chore(release): release 0.32.0
feast-ci-bot Jul 17, 2023
76270f6
fix: Redshift push ignores schema (#3671)
metavee Jul 24, 2023
c75a01f
fix: Add aws-sts dependency in java sdk so that S3 client acquires IR…
harmeet-singh-discovery Aug 1, 2023
0578b9b
Adding initial update changes
Aug 7, 2023
8487678
Merge branch 'feast-dev:master' into msudhir/add-vector-update-functi…
Manisha4 Aug 7, 2023
5828891
Added formatting changes
Aug 7, 2023
4a29d33
Revert "Merge branch 'feast-dev:master' into msudhir/add-vector-updat…
Aug 7, 2023
e209770
Added more tests and functionality
Aug 8, 2023
ebe1e32
updating tests
Aug 8, 2023
62692e0
updated functionality and added more tests
Aug 9, 2023
0680c94
correcting a test case
Aug 9, 2023
5c5490d
Making formatting corrections and changeing log
Aug 9, 2023
cdadb87
Improved tests and added functionality to convert feast schema to mil…
Aug 10, 2023
e1fd230
Added PR Review comments
Aug 11, 2023
d0c4269
Fixed failing test
Aug 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding initial update changes
  • Loading branch information
Manisha Sudhir committed Aug 7, 2023
commit 0578b9b2bf5efbbc6029e79ecb699b0c51bb931e
16 changes: 16 additions & 0 deletions sdk/python/docs/source/feast.protos.feast.core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,22 @@ feast.protos.feast.core.FeatureView\_pb2\_grpc module
:undoc-members:
:show-inheritance:

feast.protos.feast.core.VectorFeatureView\_pb2 module
-----------------------------------------------

.. automodule:: feast.protos.feast.core.VectorFeatureView_pb2
:members:
:undoc-members:
:show-inheritance:

feast.protos.feast.core.VectorFeatureView\_pb2\_grpc module
-----------------------------------------------------

.. automodule:: feast.protos.feast.core.VectorFeatureView_pb2_grpc
:members:
:undoc-members:
:show-inheritance:

feast.protos.feast.core.Feature\_pb2 module
-------------------------------------------

Expand Down
30 changes: 27 additions & 3 deletions sdk/python/feast/expediagroup/vectordb/milvus_online_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

Expand All @@ -10,6 +11,10 @@
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel

from pymilvus import Collection, FieldSchema, CollectionSchema, DataType, connections, utility

logger = logging.getLogger(__name__)


class MilvusOnlineStoreConfig(FeastConfigBaseModel):
"""Online store config for the Milvus online store"""
Expand All @@ -23,6 +28,11 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel):
port: int = 19530
""" the port to connect to a Milvus instance. Should be the one used for GRPC (default: 19530) """

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Establish the Milvus connection using the provided host and port
connections.connect(host=self.host, port=self.port, use_secure=True)


class MilvusOnlineStore(VectorOnlineStore):
def online_write_batch(
Expand Down Expand Up @@ -58,9 +68,23 @@ def update(
entities_to_keep: Sequence[Entity],
partial: bool,
):
raise NotImplementedError(
"to be implemented in https://jira.expedia.biz/browse/EAPC-7970"
)
for table_to_keep in tables_to_keep:
try:
Collection(name=table_to_keep.name, schema=table_to_keep.schema)
logger.info(f"Collection {table_to_keep.name} has been updated successfully.")
except Exception as e:
logger.error(f"Collection update failed due to {e}")

for table_to_delete in tables_to_delete:
collection_available = utility.has_collection(table_to_delete.name)
try:
if collection_available:
utility.drop_collection(table_to_delete.name)
logger.info(f"Collection {table_to_keep.name} has been deleted successfully.")
else:
return logger.error("Collection does not exist or is already deleted.")
except Exception as e:
logger.error(f"Collection deletion failed due to {e}")

def teardown(
self,
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ def __init__(self, **data: Any):
self._online_config = "dynamodb"
elif data["provider"] == "rockset":
self._online_config = "rockset"
elif data["provider"] == "milvus":
self._online_config = "milvus"

self._batch_engine = None
if "batch_engine" in data:
Expand Down
159 changes: 159 additions & 0 deletions sdk/python/tests/expediagroup/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import logging
from typing import List, Optional
from dataclasses import dataclass
import pytest
from pymilvus import (
Collection, FieldSchema, CollectionSchema, DataType, utility
)
from tests.expediagroup.milvus_online_store_creator import MilvusOnlineStoreCreator
from feast.repo_config import RepoConfig
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.expediagroup.vectordb.milvus_online_store import (
MilvusOnlineStoreConfig, MilvusOnlineStore
)
from feast.field import Field

logging.basicConfig(level=logging.INFO)

REGISTRY = "s3://test_registry/registry.db"
PROJECT = "test_aws"
PROVIDER = "aws"
TABLE_NAME = "milvus_online_store"
REGION = "us-west-2"
HOST = "localhost"


@dataclass
class MockFeatureView:
name: str
schema: Optional[List[Field]]


@pytest.fixture
def repo_config():
return RepoConfig(
registry=REGISTRY,
project=PROJECT,
provider=PROVIDER,
online_store=MilvusOnlineStoreConfig(host=HOST, region=REGION),
offline_store=FileOfflineStoreConfig(),
entity_key_serialization_version=2,
)


@pytest.fixture
def milvus_online_store():
return MilvusOnlineStore()


@pytest.fixture(scope="class")
def milvus_online_setup():
# Creating an online store through embedded Milvus for all tests in the class
online_store_creator = MilvusOnlineStoreCreator("milvus")
online_store_creator.create_online_store()

yield online_store_creator

# Tearing down the Milvus instance after all tests in the class
online_store_creator.teardown()


class TestMilvusOnlineStore:
def test_milvus_update(self, milvus_online_setup):

collection_to_delete = "Collection1"
collection_to_write = "Collection2"
MilvusOnlineStoreConfig(host=HOST)

# Creating a common schema for collection
schema = CollectionSchema(fields=[FieldSchema("int64", DataType.INT64, description="int64", is_primary=True), FieldSchema("float_vector", DataType.FLOAT_VECTOR, is_primary=False, dim=128), ])

# Ensuring no collections exist at the start of the test
utility.drop_collection(collection_to_delete)
utility.drop_collection(collection_to_write)

MilvusOnlineStore().update(
config=repo_config,
tables_to_delete=[],
tables_to_keep=[MockFeatureView(name=collection_to_delete, schema=schema)],
entities_to_delete=None,
entities_to_keep=None,
partial=None
)

assert len(utility.list_collections()) == 1

MilvusOnlineStore().update(
config=repo_config,
tables_to_delete=[MockFeatureView(name=collection_to_delete, schema=None)],
tables_to_keep=[MockFeatureView(name=collection_to_write, schema=schema)],
entities_to_delete=None,
entities_to_keep=None,
partial=None
)

logging.info(utility.list_collections())
assert utility.has_collection(collection_to_write) is True
assert utility.has_collection(collection_to_delete) is False
assert len(utility.list_collections()) == 1


def connect_from_connections():

online_store_creator = MilvusOnlineStoreCreator("milvus")
online_store_creator.create_online_store()

# create dummy table to delete
db_table_delete_name = "Collection2"
schema2 = CollectionSchema(fields=[FieldSchema("int64", DataType.INT64, description="int64", is_primary=True), FieldSchema("float_vector", DataType.FLOAT_VECTOR, is_primary=False, dim=128), ])

MilvusOnlineStoreConfig(host=HOST)

logging.info(utility.list_collections())

MilvusOnlineStore().update(
config=repo_config,
tables_to_delete=[],
tables_to_keep=[MockFeatureView(name="Collection1", schema=schema2)],
entities_to_delete=None,
entities_to_keep=None,
partial=None
)

logging.info(utility.list_collections())

# # logging.info(utility.has_collection("new_collection")) # Output: False
# logging.info(utility.list_collections())
# schema = CollectionSchema(fields=[FieldSchema("int64", DataType.INT64, description="int64", is_primary=True), FieldSchema("float_vector", DataType.FLOAT_VECTOR, is_primary=False, dim=128), ])
# Collection(name="old_collection", schema=schema)
# logging.info(utility.list_collections())
# utility.rename_collection("old_collection", "new_collection") # Output: True

# utility.drop_collection("new_collection")
# logging.info(utility.list_collections())
# logging.info(utility.has_collection("new_collection"))

MilvusOnlineStore().update(
config=repo_config,
tables_to_delete=[MockFeatureView(name="ew", schema=None)],
tables_to_keep=[MockFeatureView(name="Collection1", schema=schema2)],
entities_to_delete=None,
entities_to_keep=None,
partial=None
)

schema3 = CollectionSchema(fields=[FieldSchema("int64", DataType.INT64, description="int is new", is_primary=False), FieldSchema("varchar_vector", DataType.VARCHAR, is_primary=True, dim=128)])

MilvusOnlineStore().update(
config=repo_config,
tables_to_delete=[],
tables_to_keep=[MockFeatureView(name="Collection1", schema=schema3)],
entities_to_delete=None,
entities_to_keep=None,
partial=None
)

online_store_creator.teardown()


connect_from_connections()
7 changes: 6 additions & 1 deletion sdk/python/tests/expediagroup/test_milvus_online_store.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from pymilvus.client.stub import Milvus
from pymilvus.client.types import DataType
from dataclasses import dataclass

from tests.expediagroup.milvus_online_store_creator import MilvusOnlineStoreCreator


@dataclass
class MockFeatureView:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really necessary to use a mock? Can you use a VectorFeatureView instead?

name: str


def test_milvus_start_stop():
# this is just an example how to start / stop Milvus. Once a real test is implemented this test can be deleted
online_store_creator = MilvusOnlineStoreCreator("milvus")
Expand Down Expand Up @@ -33,5 +39,4 @@ def test_milvus_start_stop():

collection = milvus.describe_collection(collection_name)
assert collection.get("collection_name") == collection_name

online_store_creator.teardown()