@@ -2213,6 +2213,148 @@ async def write_to_online_store_async(
22132213 provider = self ._get_provider ()
22142214 await provider .ingest_df_async (feature_view , df )
22152215
2216+ def update_online_store (
2217+ self ,
2218+ feature_view_name : str ,
2219+ df : pd .DataFrame ,
2220+ update_expressions : Dict [str , str ],
2221+ allow_registry_cache : bool = True ,
2222+ ) -> None :
2223+ """
2224+ Update features using DynamoDB-specific list operations.
2225+
2226+ This method provides efficient in-place list updates using DynamoDB's native
2227+ UpdateItem operations with list_append and other expressions. This is more
2228+ efficient than the read-modify-write pattern for array-based features.
2229+
2230+ Args:
2231+ feature_view_name: The feature view to update.
2232+ df: DataFrame with new values to append/prepend to existing lists.
2233+ update_expressions: Dict mapping feature names to DynamoDB update expressions.
2234+ Examples:
2235+ - {"transactions": "list_append(transactions, :new_val)"} # append
2236+ - {"recent_items": "list_append(:new_val, recent_items)"} # prepend
2237+ allow_registry_cache: Whether to allow cached registry.
2238+
2239+ Raises:
2240+ NotImplementedError: If online store doesn't support update expressions.
2241+ ValueError: If the feature view or update expressions are invalid.
2242+
2243+ Example:
2244+ # Append new transactions to existing transaction history
2245+ store.update_online_store(
2246+ feature_view_name="user_transactions",
2247+ df=new_transactions_df,
2248+ update_expressions={
2249+ "transaction_history": "list_append(transaction_history, :new_val)",
2250+ "recent_amounts": "list_append(:new_val, recent_amounts)" # prepend
2251+ }
2252+ )
2253+ """
2254+ # Check if online store supports update expressions
2255+ provider = self ._get_provider ()
2256+ if not hasattr (provider .online_store , "update_online_store" ):
2257+ raise NotImplementedError (
2258+ f"Online store { type (provider .online_store ).__name__ } "
2259+ "does not support update expressions. This feature is only available "
2260+ "with DynamoDB online store."
2261+ )
2262+
2263+ feature_view , df = self ._get_feature_view_and_df_for_online_write (
2264+ feature_view_name = feature_view_name ,
2265+ df = df ,
2266+ allow_registry_cache = allow_registry_cache ,
2267+ transform_on_write = False , # Don't transform for updates
2268+ )
2269+
2270+ # Validate that the dataframe has meaningful feature data
2271+ if df is not None :
2272+ if df .empty :
2273+ warnings .warn ("Cannot update with empty dataframe" )
2274+ return
2275+
2276+ # Check if feature columns are empty
2277+ feature_column_names = [f .name for f in feature_view .features ]
2278+ if feature_column_names :
2279+ feature_df = df [feature_column_names ]
2280+ if feature_df .empty or feature_df .isnull ().all ().all ():
2281+ warnings .warn ("Cannot update with empty feature columns" )
2282+ return
2283+
2284+ # Prepare data for online store
2285+ from feast .infra .passthrough_provider import PassthroughProvider
2286+
2287+ rows_to_write = PassthroughProvider ._prep_rows_to_write_for_ingestion (
2288+ feature_view = feature_view ,
2289+ df = df ,
2290+ )
2291+
2292+ # Call DynamoDB-specific method
2293+ provider .online_store .update_online_store (
2294+ config = self .config ,
2295+ table = feature_view ,
2296+ data = rows_to_write ,
2297+ update_expressions = update_expressions ,
2298+ progress = None ,
2299+ )
2300+
2301+ async def update_online_store_async (
2302+ self ,
2303+ feature_view_name : str ,
2304+ df : pd .DataFrame ,
2305+ update_expressions : Dict [str , str ],
2306+ allow_registry_cache : bool = True ,
2307+ ) -> None :
2308+ """
2309+ Async version of update_online_store.
2310+ """
2311+ # Check if online store supports update expressions
2312+ provider = self ._get_provider ()
2313+ if not hasattr (provider .online_store , "update_online_store_async" ):
2314+ raise NotImplementedError (
2315+ f"Online store { type (provider .online_store ).__name__ } "
2316+ "does not support async update expressions. This feature is only available "
2317+ "with DynamoDB online store."
2318+ )
2319+
2320+ feature_view , df = self ._get_feature_view_and_df_for_online_write (
2321+ feature_view_name = feature_view_name ,
2322+ df = df ,
2323+ allow_registry_cache = allow_registry_cache ,
2324+ transform_on_write = False , # Don't transform for updates
2325+ )
2326+
2327+ # Validate that the dataframe has meaningful feature data
2328+ if df is not None :
2329+ if df .empty :
2330+ warnings .warn ("Cannot update with empty dataframe" )
2331+ return
2332+
2333+ # Check if feature columns are empty
2334+ feature_column_names = [f .name for f in feature_view .features ]
2335+ if feature_column_names :
2336+ feature_df = df [feature_column_names ]
2337+ if feature_df .empty or feature_df .isnull ().all ().all ():
2338+ warnings .warn ("Cannot update with empty feature columns" )
2339+ return
2340+
2341+ # Prepare data for online store
2342+ from feast .infra .passthrough_provider import PassthroughProvider
2343+
2344+ rows_to_write = PassthroughProvider ._prep_rows_to_write_for_ingestion (
2345+ feature_view = feature_view ,
2346+ df = df ,
2347+ )
2348+
2349+ # Call DynamoDB-specific async method
2350+ await provider .online_store .update_online_store_async (
2351+ config = self .config ,
2352+ table = feature_view ,
2353+ data = rows_to_write ,
2354+ update_expressions = update_expressions ,
2355+ progress = None ,
2356+ )
2357+
22162358 def write_to_offline_store (
22172359 self ,
22182360 feature_view_name : str ,
0 commit comments