Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updated and fixed bug
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Jan 29, 2025
commit 7c006b1d008886fffc980c3a4268963de6af29c0
36 changes: 18 additions & 18 deletions docs/reference/online-stores/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ Details for each specific online store, such as how to configure it in a `featur

Below is a matrix indicating which online stores support what functionality.

| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | [IKV](https://inlined.io) |
| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |
| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no |
| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Java | no | yes | no | no | no | no | no | no | no |
| readable by Go | yes | yes | no | no | no | no | no | no | no |
| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes |
| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no |
| support for deleting expired data | no | yes | no | no | no | no | no | no | no |
| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no |
| collocated by feature service | no | no | no | no | no | no | no | no | no |
| collocated by entity key | no | yes | no | no | no | no | no | no | yes |
| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | [IKV](https://inlined.io) | Milvus |
| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |:-------|
| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no | no |
| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Java | no | yes | no | no | no | no | no | no | no | no |
| readable by Go | yes | yes | no | no | no | no | no | no | no | no |
| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes | no |
| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes | no |
| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no | no |
| support for deleting expired data | no | yes | no | no | no | no | no | no | no | no |
| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no | no |
| collocated by feature service | no | no | no | no | no | no | no | no | no | no |
| collocated by entity key | no | yes | no | no | no | no | no | no | yes | no |
3 changes: 1 addition & 2 deletions examples/rag/feature_repo/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ def run_demo():
query = query_embedding.detach().cpu().numpy().tolist()[0]

# Retrieve top k documents
features = store.retrieve_online_documents(
feature=None,
features = store.retrieve_online_documents_v2(
features=[
"city_embeddings:vector",
"city_embeddings:item_id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ def _connect(self, config: RepoConfig) -> MilvusClient:
if not self.client:
if config.provider == "local":
db_path = self._get_db_path(config)
print(f"Connecting to Milvus in local mode using {db_path}")
self.client = MilvusClient(db_path)
else:
self.client = MilvusClient(
Expand All @@ -138,8 +137,11 @@ def _connect(self, config: RepoConfig) -> MilvusClient:
)
return self.client

def _get_collection(self, config: RepoConfig, table: FeatureView) -> Dict[str, Any]:
def _get_or_create_collection(
self, config: RepoConfig, table: FeatureView
) -> Dict[str, Any]:
self.client = self._connect(config)
vector_field_dict = {k.name: k for k in table.schema if k.vector_index}
collection_name = _table_id(config.project, table)
if collection_name not in self._collections:
# Create a composite key by combining entity fields
Expand Down Expand Up @@ -200,10 +202,13 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Dict[str, A
DataType.FLOAT_VECTOR,
DataType.BINARY_VECTOR,
]:
metric = vector_field_dict[
vector_field.name
].vector_search_metric
index_params.add_index(
collection_name=collection_name,
field_name=vector_field.name,
metric_type=config.online_store.metric_type,
metric_type=metric or config.online_store.metric_type,
index_type=config.online_store.index_type,
index_name=f"vector_index_{vector_field.name}",
params={"nlist": config.online_store.nlist},
Expand Down Expand Up @@ -234,7 +239,7 @@ def online_write_batch(
progress: Optional[Callable[[int], Any]],
) -> None:
self.client = self._connect(config)
collection = self._get_collection(config, table)
collection = self._get_or_create_collection(config, table)
vector_cols = [f.name for f in table.features if f.vector_index]
entity_batch_to_insert = []
for entity_key, values_dict, timestamp, created_ts in data:
Expand Down Expand Up @@ -301,7 +306,7 @@ def update(
):
self.client = self._connect(config)
for table in tables_to_keep:
self._collections = self._get_collection(config, table)
self._collections = self._get_or_create_collection(config, table)

for table in tables_to_delete:
collection_name = _table_id(config.project, table)
Expand Down Expand Up @@ -347,7 +352,7 @@ def retrieve_online_documents_v2(
}
self.client = self._connect(config)
collection_name = _table_id(config.project, table)
collection = self._get_collection(config, table)
collection = self._get_or_create_collection(config, table)
if not config.online_store.vector_enabled:
raise ValueError("Vector search is not enabled in the online store config")

Expand Down Expand Up @@ -408,11 +413,10 @@ def retrieve_online_documents_v2(
)
for field in output_fields:
val = ValueProto()
field_value = hit.get("entity", {}).get(field, None)
# entity_key_proto = None
if field in ["created_ts", "event_ts"]:
res_ts = datetime.fromtimestamp(
hit.get("entity", {}).get(field) / 1e6
)
res_ts = datetime.fromtimestamp(field_value / 1e6)
elif field == ann_search_field:
serialized_embedding = _serialize_vector_to_float_list(
embedding
Expand All @@ -426,15 +430,14 @@ def retrieve_online_documents_v2(
PrimitiveFeastType.INT32,
PrimitiveFeastType.BYTES,
]:
res[field] = ValueProto(
string_val=hit.get("entity", {}).get(field, "")
)
res[field] = ValueProto(string_val=field_value)
elif field == composite_key_name:
pass
elif isinstance(field_value, bytes):
val.ParseFromString(field_value)
res[field] = val
else:
val.ParseFromString(
bytes(hit.get("entity", {}).get(field, b"").encode())
)
val.string_val = field_value
res[field] = val
distance = hit.get("distance", None)
res["distance"] = (
Expand Down Expand Up @@ -471,7 +474,7 @@ def _extract_proto_values_to_dict(
else:
vector_values = getattr(feature_values, proto_val_type).val
else:
if serialize_to_string:
if serialize_to_string and proto_val_type != "string_val":
vector_values = feature_values.SerializeToString().decode()
else:
vector_values = getattr(feature_values, proto_val_type)
Expand Down