Skip to content

Commit 8221600

Browse files
author
Anshi Shrivastava
committed
feat: add DynamoDB in-place list update support for array-based features
This PR adds support for in-place list updates (append/prepend) for array-based features in DynamoDB online store, addressing issue #5687. Key changes: - Add update_online_store() and update_online_store_async() methods to FeatureStore for list operations on existing feature values - Implement read-modify-write pattern in DynamoDB store to handle list operations while maintaining compatibility with existing protobuf serialization format - Add comprehensive tests for list append, prepend, mixed operations, and new entity handling The implementation uses a read-modify-write approach because existing data is stored as serialized protobuf bytes, not native DynamoDB lists. This maintains backward compatibility with existing online_read methods. Signed-off-by: Anshi Shrivastava <anshishr@amazon.com>
1 parent d66358f commit 8221600

File tree

4 files changed

+933
-1
lines changed

4 files changed

+933
-1
lines changed

sdk/python/feast/feature_store.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2213,6 +2213,148 @@ async def write_to_online_store_async(
22132213
provider = self._get_provider()
22142214
await provider.ingest_df_async(feature_view, df)
22152215

2216+
def update_online_store(
2217+
self,
2218+
feature_view_name: str,
2219+
df: pd.DataFrame,
2220+
update_expressions: Dict[str, str],
2221+
allow_registry_cache: bool = True,
2222+
) -> None:
2223+
"""
2224+
Update features using DynamoDB-specific list operations.
2225+
2226+
This method provides efficient in-place list updates using DynamoDB's native
2227+
UpdateItem operations with list_append and other expressions. This is more
2228+
efficient than the read-modify-write pattern for array-based features.
2229+
2230+
Args:
2231+
feature_view_name: The feature view to update.
2232+
df: DataFrame with new values to append/prepend to existing lists.
2233+
update_expressions: Dict mapping feature names to DynamoDB update expressions.
2234+
Examples:
2235+
- {"transactions": "list_append(transactions, :new_val)"} # append
2236+
- {"recent_items": "list_append(:new_val, recent_items)"} # prepend
2237+
allow_registry_cache: Whether to allow cached registry.
2238+
2239+
Raises:
2240+
NotImplementedError: If online store doesn't support update expressions.
2241+
ValueError: If the feature view or update expressions are invalid.
2242+
2243+
Example:
2244+
# Append new transactions to existing transaction history
2245+
store.update_online_store(
2246+
feature_view_name="user_transactions",
2247+
df=new_transactions_df,
2248+
update_expressions={
2249+
"transaction_history": "list_append(transaction_history, :new_val)",
2250+
"recent_amounts": "list_append(:new_val, recent_amounts)" # prepend
2251+
}
2252+
)
2253+
"""
2254+
# Check if online store supports update expressions
2255+
provider = self._get_provider()
2256+
if not hasattr(provider.online_store, "update_online_store"):
2257+
raise NotImplementedError(
2258+
f"Online store {type(provider.online_store).__name__} "
2259+
"does not support update expressions. This feature is only available "
2260+
"with DynamoDB online store."
2261+
)
2262+
2263+
feature_view, df = self._get_feature_view_and_df_for_online_write(
2264+
feature_view_name=feature_view_name,
2265+
df=df,
2266+
allow_registry_cache=allow_registry_cache,
2267+
transform_on_write=False, # Don't transform for updates
2268+
)
2269+
2270+
# Validate that the dataframe has meaningful feature data
2271+
if df is not None:
2272+
if df.empty:
2273+
warnings.warn("Cannot update with empty dataframe")
2274+
return
2275+
2276+
# Check if feature columns are empty
2277+
feature_column_names = [f.name for f in feature_view.features]
2278+
if feature_column_names:
2279+
feature_df = df[feature_column_names]
2280+
if feature_df.empty or feature_df.isnull().all().all():
2281+
warnings.warn("Cannot update with empty feature columns")
2282+
return
2283+
2284+
# Prepare data for online store
2285+
from feast.infra.passthrough_provider import PassthroughProvider
2286+
2287+
rows_to_write = PassthroughProvider._prep_rows_to_write_for_ingestion(
2288+
feature_view=feature_view,
2289+
df=df,
2290+
)
2291+
2292+
# Call DynamoDB-specific method
2293+
provider.online_store.update_online_store(
2294+
config=self.config,
2295+
table=feature_view,
2296+
data=rows_to_write,
2297+
update_expressions=update_expressions,
2298+
progress=None,
2299+
)
2300+
2301+
async def update_online_store_async(
2302+
self,
2303+
feature_view_name: str,
2304+
df: pd.DataFrame,
2305+
update_expressions: Dict[str, str],
2306+
allow_registry_cache: bool = True,
2307+
) -> None:
2308+
"""
2309+
Async version of update_online_store.
2310+
"""
2311+
# Check if online store supports update expressions
2312+
provider = self._get_provider()
2313+
if not hasattr(provider.online_store, "update_online_store_async"):
2314+
raise NotImplementedError(
2315+
f"Online store {type(provider.online_store).__name__} "
2316+
"does not support async update expressions. This feature is only available "
2317+
"with DynamoDB online store."
2318+
)
2319+
2320+
feature_view, df = self._get_feature_view_and_df_for_online_write(
2321+
feature_view_name=feature_view_name,
2322+
df=df,
2323+
allow_registry_cache=allow_registry_cache,
2324+
transform_on_write=False, # Don't transform for updates
2325+
)
2326+
2327+
# Validate that the dataframe has meaningful feature data
2328+
if df is not None:
2329+
if df.empty:
2330+
warnings.warn("Cannot update with empty dataframe")
2331+
return
2332+
2333+
# Check if feature columns are empty
2334+
feature_column_names = [f.name for f in feature_view.features]
2335+
if feature_column_names:
2336+
feature_df = df[feature_column_names]
2337+
if feature_df.empty or feature_df.isnull().all().all():
2338+
warnings.warn("Cannot update with empty feature columns")
2339+
return
2340+
2341+
# Prepare data for online store
2342+
from feast.infra.passthrough_provider import PassthroughProvider
2343+
2344+
rows_to_write = PassthroughProvider._prep_rows_to_write_for_ingestion(
2345+
feature_view=feature_view,
2346+
df=df,
2347+
)
2348+
2349+
# Call DynamoDB-specific async method
2350+
await provider.online_store.update_online_store_async(
2351+
config=self.config,
2352+
table=feature_view,
2353+
data=rows_to_write,
2354+
update_expressions=update_expressions,
2355+
progress=None,
2356+
)
2357+
22162358
def write_to_offline_store(
22172359
self,
22182360
feature_view_name: str,

0 commit comments

Comments
 (0)