@@ -2253,6 +2253,91 @@ async def write_to_online_store_async(
22532253 provider = self ._get_provider ()
22542254 await provider .ingest_df_async (feature_view , df )
22552255
2256+ async def update_online_store (
2257+ self ,
2258+ feature_view_name : str ,
2259+ df : pd .DataFrame ,
2260+ update_expressions : Dict [str , str ],
2261+ allow_registry_cache : bool = True ,
2262+ ) -> None :
2263+ """
2264+ Update features using DynamoDB-specific list operations.
2265+
2266+ This method provides efficient in-place list updates using DynamoDB's native
2267+ UpdateItem operations with list_append and other expressions. This is more
2268+ efficient than the read-modify-write pattern for array-based features.
2269+
2270+ Args:
2271+ feature_view_name: The feature view to update.
2272+ df: DataFrame with new values to append/prepend to existing lists.
2273+ update_expressions: Dict mapping feature names to DynamoDB update expressions.
2274+ Examples:
2275+ - {"transactions": "list_append(transactions, :new_val)"} # append
2276+ - {"recent_items": "list_append(:new_val, recent_items)"} # prepend
2277+ allow_registry_cache: Whether to allow cached registry.
2278+
2279+ Raises:
2280+ NotImplementedError: If online store doesn't support update expressions.
2281+ ValueError: If the feature view or update expressions are invalid.
2282+
2283+ Example:
2284+ # Append new transactions to existing transaction history
2285+ await store.update_online_store(
2286+ feature_view_name="user_transactions",
2287+ df=new_transactions_df,
2288+ update_expressions={
2289+ "transaction_history": "list_append(transaction_history, :new_val)",
2290+ "recent_amounts": "list_append(:new_val, recent_amounts)" # prepend
2291+ }
2292+ )
2293+ """
2294+ # Check if online store supports update expressions
2295+ provider = self ._get_provider ()
2296+ if not hasattr (provider .online_store , "update_online_store_async" ):
2297+ raise NotImplementedError (
2298+ f"Online store { type (provider .online_store ).__name__ } "
2299+ "does not support async update expressions. This feature is only available "
2300+ "with DynamoDB online store."
2301+ )
2302+
2303+ feature_view , df = self ._get_feature_view_and_df_for_online_write (
2304+ feature_view_name = feature_view_name ,
2305+ df = df ,
2306+ allow_registry_cache = allow_registry_cache ,
2307+ transform_on_write = False , # Don't transform for updates
2308+ )
2309+
2310+ # Validate that the dataframe has meaningful feature data
2311+ if df is not None :
2312+ if df .empty :
2313+ warnings .warn ("Cannot update with empty dataframe" )
2314+ return
2315+
2316+ # Check if feature columns are empty
2317+ feature_column_names = [f .name for f in feature_view .features ]
2318+ if feature_column_names :
2319+ feature_df = df [feature_column_names ]
2320+ if feature_df .empty or feature_df .isnull ().all ().all ():
2321+ warnings .warn ("Cannot update with empty feature columns" )
2322+ return
2323+
2324+ # Prepare data for online store
2325+ from feast .infra .passthrough_provider import PassthroughProvider
2326+
2327+ rows_to_write = PassthroughProvider ._prep_rows_to_write_for_ingestion (
2328+ feature_view = feature_view ,
2329+ df = df ,
2330+ )
2331+
2332+ # Call DynamoDB-specific async method
2333+ await provider .online_store .update_online_store_async (
2334+ config = self .config ,
2335+ table = feature_view ,
2336+ data = rows_to_write ,
2337+ update_expressions = update_expressions ,
2338+ progress = None ,
2339+ )
2340+
22562341 def write_to_offline_store (
22572342 self ,
22582343 feature_view_name : str ,
0 commit comments