-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Added batching to feature server /push to offline store (#5683) #5729
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
1a3ccbd
49de0cb
fbcc467
bb299d9
2747405
02899c1
a65937a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,29 @@ | ||
| # Copyright 2025 The Feast Authors | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # https://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| import asyncio | ||
| import os | ||
| import sys | ||
| import threading | ||
| import time | ||
| import traceback | ||
| from collections import defaultdict | ||
| from contextlib import asynccontextmanager | ||
| from datetime import datetime | ||
| from importlib import resources as importlib_resources | ||
| from typing import Any, Dict, List, Optional, Union | ||
| from types import SimpleNamespace | ||
| from typing import Any, DefaultDict, Dict, List, NamedTuple, Optional, Union | ||
|
|
||
| import pandas as pd | ||
| import psutil | ||
|
|
@@ -241,6 +257,41 @@ def get_app( | |
| registry_proto = None | ||
| shutting_down = False | ||
| active_timer: Optional[threading.Timer] = None | ||
| # --- Offline write batching config and batcher --- | ||
| fs_cfg = getattr(store.config, "feature_server", None) | ||
| batching_cfg = None | ||
| if fs_cfg is not None: | ||
| enabled = getattr(fs_cfg, "offline_push_batching_enabled", False) | ||
| batch_size = getattr(fs_cfg, "offline_push_batching_batch_size", None) | ||
| batch_interval_seconds = getattr( | ||
| fs_cfg, "offline_push_batching_batch_interval_seconds", None | ||
| ) | ||
|
|
||
| if enabled is True: | ||
| size_ok = isinstance(batch_size, int) and not isinstance(batch_size, bool) | ||
| interval_ok = isinstance(batch_interval_seconds, int) and not isinstance( | ||
| batch_interval_seconds, bool | ||
| ) | ||
| if size_ok and interval_ok: | ||
| batching_cfg = SimpleNamespace( | ||
| enabled=True, | ||
| batch_size=batch_size, | ||
| batch_interval_seconds=batch_interval_seconds, | ||
| ) | ||
| else: | ||
| logger.warning( | ||
| "Offline write batching enabled but missing or invalid numeric values; " | ||
| "disabling batching (batch_size=%r, batch_interval_seconds=%r)", | ||
| batch_size, | ||
| batch_interval_seconds, | ||
| ) | ||
|
|
||
| offline_batcher: Optional[OfflineWriteBatcher] = None | ||
| if batching_cfg is not None and batching_cfg.enabled is True: | ||
| offline_batcher = OfflineWriteBatcher(store=store, cfg=batching_cfg) | ||
| logger.debug("Offline write batching is ENABLED") | ||
| else: | ||
| logger.debug("Offline write batching is DISABLED") | ||
|
|
||
| def stop_refresh(): | ||
| nonlocal shutting_down | ||
|
|
@@ -268,9 +319,13 @@ async def lifespan(app: FastAPI): | |
|
|
||
| await store.initialize() | ||
| async_refresh() | ||
| yield | ||
| stop_refresh() | ||
| await store.close() | ||
| try: | ||
| yield | ||
| finally: | ||
| stop_refresh() | ||
| if offline_batcher is not None: | ||
| offline_batcher.shutdown() | ||
| await store.close() | ||
|
|
||
| app = FastAPI(lifespan=lifespan) | ||
|
|
||
|
|
@@ -375,22 +430,58 @@ async def push(request: PushFeaturesRequest) -> None: | |
| for feature_view in fvs_with_push_sources: | ||
| assert_permissions(resource=feature_view, actions=actions) | ||
|
|
||
| push_params = dict( | ||
| push_source_name=request.push_source_name, | ||
| df=df, | ||
| allow_registry_cache=request.allow_registry_cache, | ||
| to=to, | ||
| transform_on_write=request.transform_on_write, | ||
| ) | ||
| async def _push_with_to(push_to: PushMode) -> None: | ||
| """ | ||
| Helper for performing a single push operation. | ||
|
|
||
| NOTE: | ||
| - Feast providers **do not currently support async offline writes**. | ||
| - Therefore: | ||
| * ONLINE and ONLINE_AND_OFFLINE → may be async, depending on provider.async_supported.online.write | ||
| * OFFLINE → always synchronous, but executed via run_in_threadpool when called from HTTP handlers. | ||
| - The OfflineWriteBatcher handles offline writes directly in its own background thread, but the offline store writes are currently synchronous only. | ||
| """ | ||
| push_source_name = request.push_source_name | ||
| allow_registry_cache = request.allow_registry_cache | ||
| transform_on_write = request.transform_on_write | ||
|
|
||
| # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store | ||
| if push_to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) and ( | ||
| store._get_provider().async_supported.online.write | ||
| ): | ||
| await store.push_async( | ||
| push_source_name=push_source_name, | ||
| df=df, | ||
| allow_registry_cache=allow_registry_cache, | ||
| to=push_to, | ||
| transform_on_write=transform_on_write, | ||
| ) | ||
| else: | ||
| await run_in_threadpool( | ||
| lambda: store.push( | ||
| push_source_name=push_source_name, | ||
| df=df, | ||
| allow_registry_cache=allow_registry_cache, | ||
| to=push_to, | ||
| transform_on_write=transform_on_write, | ||
| ) | ||
| ) | ||
|
|
||
| should_push_async = ( | ||
| store._get_provider().async_supported.online.write | ||
| and to in [PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE] | ||
| ) | ||
| if should_push_async: | ||
| await store.push_async(**push_params) | ||
| needs_online = to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) | ||
| needs_offline = to in (PushMode.OFFLINE, PushMode.ONLINE_AND_OFFLINE) | ||
|
|
||
| if offline_batcher is None or not needs_offline: | ||
| await _push_with_to(to) | ||
| else: | ||
| store.push(**push_params) | ||
| if needs_online: | ||
| await _push_with_to(PushMode.ONLINE) | ||
|
|
||
| offline_batcher.enqueue( | ||
| push_source_name=request.push_source_name, | ||
| df=df, | ||
| allow_registry_cache=request.allow_registry_cache, | ||
| transform_on_write=request.transform_on_write, | ||
| ) | ||
|
|
||
| async def _get_feast_object( | ||
| feature_view_name: str, allow_registry_cache: bool | ||
|
|
@@ -732,3 +823,170 @@ def start_server( | |
| ) | ||
| else: | ||
| uvicorn.run(app, host=host, port=port, access_log=(not no_access_log)) | ||
|
|
||
|
|
||
| class _OfflineBatchKey(NamedTuple): | ||
| push_source_name: str | ||
| allow_registry_cache: bool | ||
| transform_on_write: bool | ||
|
|
||
|
|
||
| class OfflineWriteBatcher: | ||
| """ | ||
| In-process offline write batcher for /push requests. | ||
|
|
||
| - Buffers DataFrames per (push_source_name, allow_registry_cache, transform_on_write) | ||
| - Flushes when either: | ||
| * total rows in a buffer >= batch_size, or | ||
| * time since last flush >= batch_interval_seconds | ||
| - Flush runs in a dedicated background thread so the HTTP event loop stays unblocked. | ||
| """ | ||
|
|
||
| def __init__(self, store: "feast.FeatureStore", cfg: Any): | ||
| self._store = store | ||
| self._cfg = cfg | ||
|
|
||
| # Buffers and timestamps keyed by batch key | ||
| self._buffers: DefaultDict[_OfflineBatchKey, List[pd.DataFrame]] = defaultdict( | ||
| list | ||
| ) | ||
| self._last_flush: DefaultDict[_OfflineBatchKey, float] = defaultdict(time.time) | ||
|
|
||
| self._lock = threading.Lock() | ||
| self._stop_event = threading.Event() | ||
|
|
||
| # Start background flusher thread | ||
| self._thread = threading.Thread( | ||
| target=self._run, name="offline_write_batcher", daemon=True | ||
| ) | ||
| self._thread.start() | ||
|
|
||
| logger.debug( | ||
| "OfflineWriteBatcher initialized: batch_size=%s, batch_interval_seconds=%s", | ||
| getattr(cfg, "batch_size", None), | ||
| getattr(cfg, "batch_interval_seconds", None), | ||
| ) | ||
|
|
||
| # ---------- Public API ---------- | ||
|
|
||
| def enqueue( | ||
| self, | ||
| push_source_name: str, | ||
| df: pd.DataFrame, | ||
| allow_registry_cache: bool, | ||
| transform_on_write: bool, | ||
| ) -> None: | ||
| """ | ||
| Enqueue a dataframe for offline write, grouped by push source + flags. | ||
| Cheap and non-blocking; heavy I/O happens in background thread. | ||
| """ | ||
| key = _OfflineBatchKey( | ||
| push_source_name=push_source_name, | ||
| allow_registry_cache=allow_registry_cache, | ||
| transform_on_write=transform_on_write, | ||
| ) | ||
|
|
||
| with self._lock: | ||
| self._buffers[key].append(df) | ||
| total_rows = sum(len(d) for d in self._buffers[key]) | ||
|
|
||
| # Size-based flush | ||
| if total_rows >= self._cfg.batch_size: | ||
| logger.debug( | ||
| "OfflineWriteBatcher size threshold reached for %s: %s rows", | ||
| key, | ||
| total_rows, | ||
| ) | ||
| self._flush_locked(key) | ||
|
|
||
| def flush_all(self) -> None: | ||
| """ | ||
| Flush all buffers synchronously. Intended for graceful shutdown. | ||
| """ | ||
| with self._lock: | ||
| keys = list(self._buffers.keys()) | ||
| for key in keys: | ||
| self._flush_locked(key) | ||
|
|
||
| def shutdown(self, timeout: float = 5.0) -> None: | ||
| """ | ||
| Stop the background thread and perform a best-effort flush. | ||
| """ | ||
| logger.debug("Shutting down OfflineWriteBatcher") | ||
| self._stop_event.set() | ||
| try: | ||
| self._thread.join(timeout=timeout) | ||
| except Exception: | ||
| logger.exception("Error joining OfflineWriteBatcher thread") | ||
|
|
||
| # Best-effort final flush | ||
| try: | ||
| self.flush_all() | ||
| except Exception: | ||
| logger.exception("Error during final OfflineWriteBatcher flush") | ||
|
|
||
| # ---------- Internal helpers ---------- | ||
|
|
||
| def _run(self) -> None: | ||
| """ | ||
| Background loop: periodically checks for buffers that should be flushed | ||
| based on time since last flush. | ||
| """ | ||
| interval = max(1, int(getattr(self._cfg, "batch_interval_seconds", 30))) | ||
| logger.debug( | ||
| "OfflineWriteBatcher background loop started with check interval=%s", | ||
| interval, | ||
| ) | ||
|
|
||
| while not self._stop_event.wait(timeout=interval): | ||
| now = time.time() | ||
| try: | ||
| with self._lock: | ||
| for key, dfs in list(self._buffers.items()): | ||
| if not dfs: | ||
| continue | ||
| last = self._last_flush[ | ||
| key | ||
| ] # this will also init the default timestamp | ||
| age = now - last | ||
| if age >= self._cfg.batch_interval_seconds: | ||
| logger.debug( | ||
| "OfflineWriteBatcher time threshold reached for %s: age=%s", | ||
| key, | ||
| age, | ||
| ) | ||
| self._flush_locked(key) | ||
| except Exception: | ||
| logger.exception("Error in OfflineWriteBatcher background loop") | ||
|
|
||
| logger.debug("OfflineWriteBatcher background loop exiting") | ||
|
|
||
| def _flush_locked(self, key: _OfflineBatchKey) -> None: | ||
| """ | ||
| Flush a single buffer; caller must hold self._lock. | ||
| """ | ||
| dfs = self._buffers.get(key) | ||
| if not dfs: | ||
| return | ||
|
|
||
| batch_df = pd.concat(dfs, ignore_index=True) | ||
| self._buffers[key].clear() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to move clear inside
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Totally makes sense, it's done. Thanks for seeing that. |
||
| self._last_flush[key] = time.time() | ||
|
|
||
| logger.debug( | ||
| "Flushing offline batch for push_source=%s with %s rows", | ||
| key.push_source_name, | ||
| len(batch_df), | ||
| ) | ||
|
|
||
| # NOTE: offline writes are currently synchronous only, so we call directly | ||
| try: | ||
| self._store.push( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about splitting Something like:
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the suggestion! |
||
| push_source_name=key.push_source_name, | ||
| df=batch_df, | ||
| allow_registry_cache=key.allow_registry_cache, | ||
| to=PushMode.OFFLINE, | ||
| transform_on_write=key.transform_on_write, | ||
| ) | ||
| except Exception: | ||
| logger.exception("Error flushing offline batch for %s", key) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling: 'theres' should be 'there's'.