Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add DynamoDB in-place list update support for array-based features
This PR adds support for in-place list updates (append/prepend) for
array-based features in DynamoDB online store, addressing issue #5687.

Key changes:
- Add update_online_store() and update_online_store_async() methods to
  FeatureStore for list operations on existing feature values
- Implement read-modify-write pattern in DynamoDB store to handle list
  operations while maintaining compatibility with existing protobuf
  serialization format
- Add comprehensive tests for list append, prepend, mixed operations,
  and new entity handling

The implementation uses a read-modify-write approach because existing
data is stored as serialized protobuf bytes, not native DynamoDB lists.
This maintains backward compatibility with existing online_read methods.

Signed-off-by: Anshi Shrivastava <anshishr@amazon.com>
  • Loading branch information
Anshi Shrivastava committed Jan 30, 2026
commit aca958951af844b32f36bc1cac9b923e08357202
142 changes: 142 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2253,6 +2253,148 @@ async def write_to_online_store_async(
provider = self._get_provider()
await provider.ingest_df_async(feature_view, df)

def update_online_store(
Comment thread
anshishrivastava marked this conversation as resolved.
Outdated
self,
feature_view_name: str,
df: pd.DataFrame,
update_expressions: Dict[str, str],
allow_registry_cache: bool = True,
) -> None:
"""
Update features using DynamoDB-specific list operations.

This method provides efficient in-place list updates using DynamoDB's native
UpdateItem operations with list_append and other expressions. This is more
efficient than the read-modify-write pattern for array-based features.

Args:
feature_view_name: The feature view to update.
df: DataFrame with new values to append/prepend to existing lists.
update_expressions: Dict mapping feature names to DynamoDB update expressions.
Examples:
- {"transactions": "list_append(transactions, :new_val)"} # append
- {"recent_items": "list_append(:new_val, recent_items)"} # prepend
allow_registry_cache: Whether to allow cached registry.

Raises:
NotImplementedError: If online store doesn't support update expressions.
ValueError: If the feature view or update expressions are invalid.

Example:
# Append new transactions to existing transaction history
store.update_online_store(
feature_view_name="user_transactions",
df=new_transactions_df,
update_expressions={
"transaction_history": "list_append(transaction_history, :new_val)",
"recent_amounts": "list_append(:new_val, recent_amounts)" # prepend
}
)
"""
# Check if online store supports update expressions
provider = self._get_provider()
if not hasattr(provider.online_store, "update_online_store"):
raise NotImplementedError(
f"Online store {type(provider.online_store).__name__} "
"does not support update expressions. This feature is only available "
"with DynamoDB online store."
)

feature_view, df = self._get_feature_view_and_df_for_online_write(
feature_view_name=feature_view_name,
df=df,
allow_registry_cache=allow_registry_cache,
transform_on_write=False, # Don't transform for updates
)

# Validate that the dataframe has meaningful feature data
if df is not None:
if df.empty:
warnings.warn("Cannot update with empty dataframe")
return

# Check if feature columns are empty
feature_column_names = [f.name for f in feature_view.features]
if feature_column_names:
feature_df = df[feature_column_names]
if feature_df.empty or feature_df.isnull().all().all():
warnings.warn("Cannot update with empty feature columns")
return

# Prepare data for online store
from feast.infra.passthrough_provider import PassthroughProvider

rows_to_write = PassthroughProvider._prep_rows_to_write_for_ingestion(
feature_view=feature_view,
df=df,
)

# Call DynamoDB-specific method
provider.online_store.update_online_store(
config=self.config,
table=feature_view,
data=rows_to_write,
update_expressions=update_expressions,
progress=None,
)

async def update_online_store_async(
self,
feature_view_name: str,
df: pd.DataFrame,
update_expressions: Dict[str, str],
allow_registry_cache: bool = True,
) -> None:
"""
Async version of update_online_store.
"""
# Check if online store supports update expressions
provider = self._get_provider()
if not hasattr(provider.online_store, "update_online_store_async"):
raise NotImplementedError(
f"Online store {type(provider.online_store).__name__} "
"does not support async update expressions. This feature is only available "
"with DynamoDB online store."
)

feature_view, df = self._get_feature_view_and_df_for_online_write(
feature_view_name=feature_view_name,
df=df,
allow_registry_cache=allow_registry_cache,
transform_on_write=False, # Don't transform for updates
)

# Validate that the dataframe has meaningful feature data
if df is not None:
if df.empty:
warnings.warn("Cannot update with empty dataframe")
return

# Check if feature columns are empty
feature_column_names = [f.name for f in feature_view.features]
if feature_column_names:
feature_df = df[feature_column_names]
if feature_df.empty or feature_df.isnull().all().all():
warnings.warn("Cannot update with empty feature columns")
return

# Prepare data for online store
from feast.infra.passthrough_provider import PassthroughProvider

rows_to_write = PassthroughProvider._prep_rows_to_write_for_ingestion(
feature_view=feature_view,
df=df,
)

# Call DynamoDB-specific async method
await provider.online_store.update_online_store_async(
config=self.config,
table=feature_view,
data=rows_to_write,
update_expressions=update_expressions,
progress=None,
)

def write_to_offline_store(
self,
feature_view_name: str,
Expand Down
Loading