Skip to content

Commit 45b218f

Browse files
author
Anshi Shrivastava
committed
feat: add DynamoDB in-place list update support for array-based features
- Add update_online_store() and update_online_store_async() methods to DynamoDBOnlineStore - Support DynamoDB UpdateItem with list_append expressions for efficient array operations - Add FeatureStore integration methods with same names - Implement proper list serialization for all ValueProto list types - Add comprehensive unit tests for both DynamoDB online store and FeatureStore methods - Maintain 100% backward compatibility with existing write_to_online_store behavior - DynamoDB-specific functionality kept separate from core OnlineStore interface Fixes #5687 Signed-off-by: Anshi Shrivastava <anshishr@amazon.com>
1 parent 6a07491 commit 45b218f

File tree

4 files changed

+1030
-1
lines changed

4 files changed

+1030
-1
lines changed

sdk/python/feast/feature_store.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2074,6 +2074,149 @@ async def write_to_online_store_async(
20742074
provider = self._get_provider()
20752075
await provider.ingest_df_async(feature_view, df)
20762076

2077+
def update_online_store(
2078+
self,
2079+
feature_view_name: str,
2080+
df: pd.DataFrame,
2081+
update_expressions: Dict[str, str],
2082+
allow_registry_cache: bool = True,
2083+
) -> None:
2084+
"""
2085+
Update features using DynamoDB-specific list operations.
2086+
2087+
This method provides efficient in-place list updates using DynamoDB's native
2088+
UpdateItem operations with list_append and other expressions. This is more
2089+
efficient than the read-modify-write pattern for array-based features.
2090+
2091+
Args:
2092+
feature_view_name: The feature view to update.
2093+
df: DataFrame with new values to append/prepend to existing lists.
2094+
update_expressions: Dict mapping feature names to DynamoDB update expressions.
2095+
Examples:
2096+
- {"transactions": "list_append(transactions, :new_val)"} # append
2097+
- {"recent_items": "list_append(:new_val, recent_items)"} # prepend
2098+
- {"sliding_window": "list_append(if_not_exists(sliding_window, :empty_list), :new_val)[:10]"} # sliding window
2099+
allow_registry_cache: Whether to allow cached registry.
2100+
2101+
Raises:
2102+
NotImplementedError: If online store doesn't support update expressions.
2103+
ValueError: If the feature view or update expressions are invalid.
2104+
2105+
Example:
2106+
# Append new transactions to existing transaction history
2107+
store.update_online_store(
2108+
feature_view_name="user_transactions",
2109+
df=new_transactions_df,
2110+
update_expressions={
2111+
"transaction_history": "list_append(transaction_history, :new_val)",
2112+
"recent_amounts": "list_append(:new_val, recent_amounts)[:10]"
2113+
}
2114+
)
2115+
"""
2116+
# Check if online store supports update expressions
2117+
provider = self._get_provider()
2118+
if not hasattr(provider.online_store, "update_online_store"):
2119+
raise NotImplementedError(
2120+
f"Online store {type(provider.online_store).__name__} "
2121+
"does not support update expressions. This feature is only available "
2122+
"with DynamoDB online store."
2123+
)
2124+
2125+
feature_view, df = self._get_feature_view_and_df_for_online_write(
2126+
feature_view_name=feature_view_name,
2127+
df=df,
2128+
allow_registry_cache=allow_registry_cache,
2129+
transform_on_write=False, # Don't transform for updates
2130+
)
2131+
2132+
# Validate that the dataframe has meaningful feature data
2133+
if df is not None:
2134+
if df.empty:
2135+
warnings.warn("Cannot update with empty dataframe")
2136+
return
2137+
2138+
# Check if feature columns are empty
2139+
feature_column_names = [f.name for f in feature_view.features]
2140+
if feature_column_names:
2141+
feature_df = df[feature_column_names]
2142+
if feature_df.empty or feature_df.isnull().all().all():
2143+
warnings.warn("Cannot update with empty feature columns")
2144+
return
2145+
2146+
# Prepare data for online store
2147+
from feast.infra.passthrough_provider import PassthroughProvider
2148+
2149+
rows_to_write = PassthroughProvider._prep_rows_to_write_for_ingestion(
2150+
feature_view=feature_view,
2151+
df=df,
2152+
)
2153+
2154+
# Call DynamoDB-specific method
2155+
provider.online_store.update_online_store(
2156+
config=self.config,
2157+
table=feature_view,
2158+
data=rows_to_write,
2159+
update_expressions=update_expressions,
2160+
progress=None,
2161+
)
2162+
2163+
async def update_online_store_async(
2164+
self,
2165+
feature_view_name: str,
2166+
df: pd.DataFrame,
2167+
update_expressions: Dict[str, str],
2168+
allow_registry_cache: bool = True,
2169+
) -> None:
2170+
"""
2171+
Async version of update_online_store.
2172+
"""
2173+
# Check if online store supports update expressions
2174+
provider = self._get_provider()
2175+
if not hasattr(provider.online_store, "update_online_store_async"):
2176+
raise NotImplementedError(
2177+
f"Online store {type(provider.online_store).__name__} "
2178+
"does not support async update expressions. This feature is only available "
2179+
"with DynamoDB online store."
2180+
)
2181+
2182+
feature_view, df = self._get_feature_view_and_df_for_online_write(
2183+
feature_view_name=feature_view_name,
2184+
df=df,
2185+
allow_registry_cache=allow_registry_cache,
2186+
transform_on_write=False, # Don't transform for updates
2187+
)
2188+
2189+
# Validate that the dataframe has meaningful feature data
2190+
if df is not None:
2191+
if df.empty:
2192+
warnings.warn("Cannot update with empty dataframe")
2193+
return
2194+
2195+
# Check if feature columns are empty
2196+
feature_column_names = [f.name for f in feature_view.features]
2197+
if feature_column_names:
2198+
feature_df = df[feature_column_names]
2199+
if feature_df.empty or feature_df.isnull().all().all():
2200+
warnings.warn("Cannot update with empty feature columns")
2201+
return
2202+
2203+
# Prepare data for online store
2204+
from feast.infra.passthrough_provider import PassthroughProvider
2205+
2206+
rows_to_write = PassthroughProvider._prep_rows_to_write_for_ingestion(
2207+
feature_view=feature_view,
2208+
df=df,
2209+
)
2210+
2211+
# Call DynamoDB-specific async method
2212+
await provider.online_store.update_online_store_async(
2213+
config=self.config,
2214+
table=feature_view,
2215+
data=rows_to_write,
2216+
update_expressions=update_expressions,
2217+
progress=None,
2218+
)
2219+
20772220
def write_to_offline_store(
20782221
self,
20792222
feature_view_name: str,

0 commit comments

Comments
 (0)