@@ -2074,6 +2074,149 @@ async def write_to_online_store_async(
20742074 provider = self ._get_provider ()
20752075 await provider .ingest_df_async (feature_view , df )
20762076
2077+ def update_online_store (
2078+ self ,
2079+ feature_view_name : str ,
2080+ df : pd .DataFrame ,
2081+ update_expressions : Dict [str , str ],
2082+ allow_registry_cache : bool = True ,
2083+ ) -> None :
2084+ """
2085+ Update features using DynamoDB-specific list operations.
2086+
2087+ This method provides efficient in-place list updates using DynamoDB's native
2088+ UpdateItem operations with list_append and other expressions. This is more
2089+ efficient than the read-modify-write pattern for array-based features.
2090+
2091+ Args:
2092+ feature_view_name: The feature view to update.
2093+ df: DataFrame with new values to append/prepend to existing lists.
2094+ update_expressions: Dict mapping feature names to DynamoDB update expressions.
2095+ Examples:
2096+ - {"transactions": "list_append(transactions, :new_val)"} # append
2097+ - {"recent_items": "list_append(:new_val, recent_items)"} # prepend
2098+ - {"sliding_window": "list_append(if_not_exists(sliding_window, :empty_list), :new_val)[:10]"} # sliding window
2099+ allow_registry_cache: Whether to allow cached registry.
2100+
2101+ Raises:
2102+ NotImplementedError: If online store doesn't support update expressions.
2103+ ValueError: If the feature view or update expressions are invalid.
2104+
2105+ Example:
2106+ # Append new transactions to existing transaction history
2107+ store.update_online_store(
2108+ feature_view_name="user_transactions",
2109+ df=new_transactions_df,
2110+ update_expressions={
2111+ "transaction_history": "list_append(transaction_history, :new_val)",
2112+ "recent_amounts": "list_append(:new_val, recent_amounts)[:10]"
2113+ }
2114+ )
2115+ """
2116+ # Check if online store supports update expressions
2117+ provider = self ._get_provider ()
2118+ if not hasattr (provider .online_store , "update_online_store" ):
2119+ raise NotImplementedError (
2120+ f"Online store { type (provider .online_store ).__name__ } "
2121+ "does not support update expressions. This feature is only available "
2122+ "with DynamoDB online store."
2123+ )
2124+
2125+ feature_view , df = self ._get_feature_view_and_df_for_online_write (
2126+ feature_view_name = feature_view_name ,
2127+ df = df ,
2128+ allow_registry_cache = allow_registry_cache ,
2129+ transform_on_write = False , # Don't transform for updates
2130+ )
2131+
2132+ # Validate that the dataframe has meaningful feature data
2133+ if df is not None :
2134+ if df .empty :
2135+ warnings .warn ("Cannot update with empty dataframe" )
2136+ return
2137+
2138+ # Check if feature columns are empty
2139+ feature_column_names = [f .name for f in feature_view .features ]
2140+ if feature_column_names :
2141+ feature_df = df [feature_column_names ]
2142+ if feature_df .empty or feature_df .isnull ().all ().all ():
2143+ warnings .warn ("Cannot update with empty feature columns" )
2144+ return
2145+
2146+ # Prepare data for online store
2147+ from feast .infra .passthrough_provider import PassthroughProvider
2148+
2149+ rows_to_write = PassthroughProvider ._prep_rows_to_write_for_ingestion (
2150+ feature_view = feature_view ,
2151+ df = df ,
2152+ )
2153+
2154+ # Call DynamoDB-specific method
2155+ provider .online_store .update_online_store (
2156+ config = self .config ,
2157+ table = feature_view ,
2158+ data = rows_to_write ,
2159+ update_expressions = update_expressions ,
2160+ progress = None ,
2161+ )
2162+
2163+ async def update_online_store_async (
2164+ self ,
2165+ feature_view_name : str ,
2166+ df : pd .DataFrame ,
2167+ update_expressions : Dict [str , str ],
2168+ allow_registry_cache : bool = True ,
2169+ ) -> None :
2170+ """
2171+ Async version of update_online_store.
2172+ """
2173+ # Check if online store supports update expressions
2174+ provider = self ._get_provider ()
2175+ if not hasattr (provider .online_store , "update_online_store_async" ):
2176+ raise NotImplementedError (
2177+ f"Online store { type (provider .online_store ).__name__ } "
2178+ "does not support async update expressions. This feature is only available "
2179+ "with DynamoDB online store."
2180+ )
2181+
2182+ feature_view , df = self ._get_feature_view_and_df_for_online_write (
2183+ feature_view_name = feature_view_name ,
2184+ df = df ,
2185+ allow_registry_cache = allow_registry_cache ,
2186+ transform_on_write = False , # Don't transform for updates
2187+ )
2188+
2189+ # Validate that the dataframe has meaningful feature data
2190+ if df is not None :
2191+ if df .empty :
2192+ warnings .warn ("Cannot update with empty dataframe" )
2193+ return
2194+
2195+ # Check if feature columns are empty
2196+ feature_column_names = [f .name for f in feature_view .features ]
2197+ if feature_column_names :
2198+ feature_df = df [feature_column_names ]
2199+ if feature_df .empty or feature_df .isnull ().all ().all ():
2200+ warnings .warn ("Cannot update with empty feature columns" )
2201+ return
2202+
2203+ # Prepare data for online store
2204+ from feast .infra .passthrough_provider import PassthroughProvider
2205+
2206+ rows_to_write = PassthroughProvider ._prep_rows_to_write_for_ingestion (
2207+ feature_view = feature_view ,
2208+ df = df ,
2209+ )
2210+
2211+ # Call DynamoDB-specific async method
2212+ await provider .online_store .update_online_store_async (
2213+ config = self .config ,
2214+ table = feature_view ,
2215+ data = rows_to_write ,
2216+ update_expressions = update_expressions ,
2217+ progress = None ,
2218+ )
2219+
20772220 def write_to_offline_store (
20782221 self ,
20792222 feature_view_name : str ,
0 commit comments