Skip to content

Commit 88629a1

Browse files
authored
Merge branch 'master' into feat/mysql-registry-store-go
2 parents 60e45df + aa5973f commit 88629a1

File tree

4 files changed

+777
-1
lines changed

4 files changed

+777
-1
lines changed

sdk/python/feast/feature_store.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2253,6 +2253,91 @@ async def write_to_online_store_async(
22532253
provider = self._get_provider()
22542254
await provider.ingest_df_async(feature_view, df)
22552255

2256+
async def update_online_store(
2257+
self,
2258+
feature_view_name: str,
2259+
df: pd.DataFrame,
2260+
update_expressions: Dict[str, str],
2261+
allow_registry_cache: bool = True,
2262+
) -> None:
2263+
"""
2264+
Update features using DynamoDB-specific list operations.
2265+
2266+
This method provides efficient in-place list updates using DynamoDB's native
2267+
UpdateItem operations with list_append and other expressions. This is more
2268+
efficient than the read-modify-write pattern for array-based features.
2269+
2270+
Args:
2271+
feature_view_name: The feature view to update.
2272+
df: DataFrame with new values to append/prepend to existing lists.
2273+
update_expressions: Dict mapping feature names to DynamoDB update expressions.
2274+
Examples:
2275+
- {"transactions": "list_append(transactions, :new_val)"} # append
2276+
- {"recent_items": "list_append(:new_val, recent_items)"} # prepend
2277+
allow_registry_cache: Whether to allow cached registry.
2278+
2279+
Raises:
2280+
NotImplementedError: If online store doesn't support update expressions.
2281+
ValueError: If the feature view or update expressions are invalid.
2282+
2283+
Example:
2284+
# Append new transactions to existing transaction history
2285+
await store.update_online_store(
2286+
feature_view_name="user_transactions",
2287+
df=new_transactions_df,
2288+
update_expressions={
2289+
"transaction_history": "list_append(transaction_history, :new_val)",
2290+
"recent_amounts": "list_append(:new_val, recent_amounts)" # prepend
2291+
}
2292+
)
2293+
"""
2294+
# Check if online store supports update expressions
2295+
provider = self._get_provider()
2296+
if not hasattr(provider.online_store, "update_online_store_async"):
2297+
raise NotImplementedError(
2298+
f"Online store {type(provider.online_store).__name__} "
2299+
"does not support async update expressions. This feature is only available "
2300+
"with DynamoDB online store."
2301+
)
2302+
2303+
feature_view, df = self._get_feature_view_and_df_for_online_write(
2304+
feature_view_name=feature_view_name,
2305+
df=df,
2306+
allow_registry_cache=allow_registry_cache,
2307+
transform_on_write=False, # Don't transform for updates
2308+
)
2309+
2310+
# Validate that the dataframe has meaningful feature data
2311+
if df is not None:
2312+
if df.empty:
2313+
warnings.warn("Cannot update with empty dataframe")
2314+
return
2315+
2316+
# Check if feature columns are empty
2317+
feature_column_names = [f.name for f in feature_view.features]
2318+
if feature_column_names:
2319+
feature_df = df[feature_column_names]
2320+
if feature_df.empty or feature_df.isnull().all().all():
2321+
warnings.warn("Cannot update with empty feature columns")
2322+
return
2323+
2324+
# Prepare data for online store
2325+
from feast.infra.passthrough_provider import PassthroughProvider
2326+
2327+
rows_to_write = PassthroughProvider._prep_rows_to_write_for_ingestion(
2328+
feature_view=feature_view,
2329+
df=df,
2330+
)
2331+
2332+
# Call DynamoDB-specific async method
2333+
await provider.online_store.update_online_store_async(
2334+
config=self.config,
2335+
table=feature_view,
2336+
data=rows_to_write,
2337+
update_expressions=update_expressions,
2338+
progress=None,
2339+
)
2340+
22562341
def write_to_offline_store(
22572342
self,
22582343
feature_view_name: str,

0 commit comments

Comments
 (0)