Skip to content

Commit 1a3ccbd

Browse files
jfw-ppintkathole
authored andcommitted
feat: added batching to feature server /push to offline store (#5683)
Signed-off-by: Jacob Weinhold <29459386+jfw-ppi@users.noreply.github.com> fix: formatting,l int errors (#5683) Signed-off-by: Jacob Weinhold <29459386+jfw-ppi@users.noreply.github.com>
1 parent 66aa7b8 commit 1a3ccbd

File tree

5 files changed

+588
-18
lines changed

5 files changed

+588
-18
lines changed

docs/reference/feature-servers/python-feature-server.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,26 @@ requests.post(
199199
"http://localhost:6566/push",
200200
data=json.dumps(push_data))
201201
```
202+
#### Offline write batching for `/push`
203+
204+
The Python feature server supports configurable batching for the **offline**
205+
portion of writes executed via the `/push` endpoint.
206+
207+
Only the offline part of a push is affected:
208+
209+
- `to: "offline"`**fully batched**
210+
- `to: "online_and_offline"`**online written immediately**, **offline batched**
211+
- `to: "online"` → unaffected, always immediate
212+
213+
Enable batching in your `feature_store.yaml`:
214+
215+
```yaml
216+
feature_server:
217+
type: local
218+
offline_push_batching_enabled: true
219+
offline_push_batching_batch_size: 1000
220+
offline_push_batching_batch_interval_seconds: 10
221+
```
202222
203223
### Materializing features
204224

docs/reference/feature-store-yaml.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,22 @@ online_store:
2525
* **project\_id** — Optional parameter for the datastore online store. Sets the GCP project id used by Feast, if not set Feast will use the default GCP project id in the local environment.
2626
* **project** — Defines a namespace for the entire feature store. Can be used to isolate multiple deployments in a single installation of Feast.
2727
28+
### feature_server
29+
30+
The `feature_server` block configures the Python Feature Server when it is used
31+
to serve online features and handle `/push` requests. This section is optional
32+
and only applies when running the Python feature server.
33+
34+
An example configuration:
35+
36+
```yaml
37+
feature_server:
38+
type: local
39+
offline_push_batching_enabled: true # Enables batching of offline writes processed by /push. Online writes are unaffected.
40+
offline_push_batching_batch_size: 100 # Maximum number of buffered rows before writing to the offline store.
41+
offline_push_batching_batch_interval_seconds: 5 # Maximum time rows may remain buffered before a forced flush.
42+
```
43+
2844
## Providers
2945

3046
The `provider` field defines the environment in which Feast will execute data flows. As a result, it also determines the default values for other fields.

sdk/python/feast/feature_server.py

Lines changed: 276 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,29 @@
1+
# Copyright 2025 The Feast Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
115
import asyncio
216
import os
317
import sys
418
import threading
519
import time
620
import traceback
21+
from collections import defaultdict
722
from contextlib import asynccontextmanager
823
from datetime import datetime
924
from importlib import resources as importlib_resources
10-
from typing import Any, Dict, List, Optional, Union
25+
from types import SimpleNamespace
26+
from typing import Any, DefaultDict, Dict, List, NamedTuple, Optional, Union
1127

1228
import pandas as pd
1329
import psutil
@@ -195,6 +211,41 @@ def get_app(
195211
registry_proto = None
196212
shutting_down = False
197213
active_timer: Optional[threading.Timer] = None
214+
# --- Offline write batching config and batcher ---
215+
fs_cfg = getattr(store.config, "feature_server", None)
216+
batching_cfg = None
217+
if fs_cfg is not None:
218+
enabled = getattr(fs_cfg, "offline_push_batching_enabled", False)
219+
batch_size = getattr(fs_cfg, "offline_push_batching_batch_size", None)
220+
batch_interval_seconds = getattr(
221+
fs_cfg, "offline_push_batching_batch_interval_seconds", None
222+
)
223+
224+
if enabled is True:
225+
size_ok = isinstance(batch_size, int) and not isinstance(batch_size, bool)
226+
interval_ok = isinstance(batch_interval_seconds, int) and not isinstance(
227+
batch_interval_seconds, bool
228+
)
229+
if size_ok and interval_ok:
230+
batching_cfg = SimpleNamespace(
231+
enabled=True,
232+
batch_size=batch_size,
233+
batch_interval_seconds=batch_interval_seconds,
234+
)
235+
else:
236+
logger.warning(
237+
"Offline write batching enabled but missing or invalid numeric values; "
238+
"disabling batching (batch_size=%r, batch_interval_seconds=%r)",
239+
batch_size,
240+
batch_interval_seconds,
241+
)
242+
243+
offline_batcher: Optional[OfflineWriteBatcher] = None
244+
if batching_cfg is not None and batching_cfg.enabled is True:
245+
offline_batcher = OfflineWriteBatcher(store=store, cfg=batching_cfg)
246+
logger.debug("Offline write batching is ENABLED")
247+
else:
248+
logger.debug("Offline write batching is DISABLED")
198249

199250
def stop_refresh():
200251
nonlocal shutting_down
@@ -219,9 +270,13 @@ def async_refresh():
219270
async def lifespan(app: FastAPI):
220271
await store.initialize()
221272
async_refresh()
222-
yield
223-
stop_refresh()
224-
await store.close()
273+
try:
274+
yield
275+
finally:
276+
stop_refresh()
277+
if offline_batcher is not None:
278+
offline_batcher.shutdown()
279+
await store.close()
225280

226281
app = FastAPI(lifespan=lifespan)
227282

@@ -326,22 +381,58 @@ async def push(request: PushFeaturesRequest) -> None:
326381
for feature_view in fvs_with_push_sources:
327382
assert_permissions(resource=feature_view, actions=actions)
328383

329-
push_params = dict(
330-
push_source_name=request.push_source_name,
331-
df=df,
332-
allow_registry_cache=request.allow_registry_cache,
333-
to=to,
334-
transform_on_write=request.transform_on_write,
335-
)
384+
async def _push_with_to(push_to: PushMode) -> None:
385+
"""
386+
Helper for performing a single push operation.
387+
388+
NOTE:
389+
- Feast providers **do not currently support async offline writes**.
390+
- Therefore:
391+
* ONLINE and ONLINE_AND_OFFLINE → may be async, depending on provider.async_supported.online.write
392+
* OFFLINE → always synchronous, but executed via run_in_threadpool when called from HTTP handlers.
393+
- The OfflineWriteBatcher handles offline writes directly in its own background thread, but the offline store writes are currently synchronous only.
394+
"""
395+
push_source_name = request.push_source_name
396+
allow_registry_cache = request.allow_registry_cache
397+
transform_on_write = request.transform_on_write
398+
399+
# Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store
400+
if push_to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) and (
401+
store._get_provider().async_supported.online.write
402+
):
403+
await store.push_async(
404+
push_source_name=push_source_name,
405+
df=df,
406+
allow_registry_cache=allow_registry_cache,
407+
to=push_to,
408+
transform_on_write=transform_on_write,
409+
)
410+
else:
411+
await run_in_threadpool(
412+
lambda: store.push(
413+
push_source_name=push_source_name,
414+
df=df,
415+
allow_registry_cache=allow_registry_cache,
416+
to=push_to,
417+
transform_on_write=transform_on_write,
418+
)
419+
)
336420

337-
should_push_async = (
338-
store._get_provider().async_supported.online.write
339-
and to in [PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE]
340-
)
341-
if should_push_async:
342-
await store.push_async(**push_params)
421+
needs_online = to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE)
422+
needs_offline = to in (PushMode.OFFLINE, PushMode.ONLINE_AND_OFFLINE)
423+
424+
if offline_batcher is None or not needs_offline:
425+
await _push_with_to(to)
343426
else:
344-
store.push(**push_params)
427+
if needs_online:
428+
await _push_with_to(PushMode.ONLINE)
429+
430+
offline_batcher.enqueue(
431+
push_source_name=request.push_source_name,
432+
df=df,
433+
allow_registry_cache=request.allow_registry_cache,
434+
transform_on_write=request.transform_on_write,
435+
)
345436

346437
async def _get_feast_object(
347438
feature_view_name: str, allow_registry_cache: bool
@@ -683,3 +774,170 @@ def start_server(
683774
)
684775
else:
685776
uvicorn.run(app, host=host, port=port, access_log=(not no_access_log))
777+
778+
779+
class _OfflineBatchKey(NamedTuple):
780+
push_source_name: str
781+
allow_registry_cache: bool
782+
transform_on_write: bool
783+
784+
785+
class OfflineWriteBatcher:
786+
"""
787+
In-process offline write batcher for /push requests.
788+
789+
- Buffers DataFrames per (push_source_name, allow_registry_cache, transform_on_write)
790+
- Flushes when either:
791+
* total rows in a buffer >= batch_size, or
792+
* time since last flush >= batch_interval_seconds
793+
- Flush runs in a dedicated background thread so the HTTP event loop stays unblocked.
794+
"""
795+
796+
def __init__(self, store: "feast.FeatureStore", cfg: Any):
797+
self._store = store
798+
self._cfg = cfg
799+
800+
# Buffers and timestamps keyed by batch key
801+
self._buffers: DefaultDict[_OfflineBatchKey, List[pd.DataFrame]] = defaultdict(
802+
list
803+
)
804+
self._last_flush: DefaultDict[_OfflineBatchKey, float] = defaultdict(time.time)
805+
806+
self._lock = threading.Lock()
807+
self._stop_event = threading.Event()
808+
809+
# Start background flusher thread
810+
self._thread = threading.Thread(
811+
target=self._run, name="offline_write_batcher", daemon=True
812+
)
813+
self._thread.start()
814+
815+
logger.debug(
816+
"OfflineWriteBatcher initialized: batch_size=%s, batch_interval_seconds=%s",
817+
getattr(cfg, "batch_size", None),
818+
getattr(cfg, "batch_interval_seconds", None),
819+
)
820+
821+
# ---------- Public API ----------
822+
823+
def enqueue(
824+
self,
825+
push_source_name: str,
826+
df: pd.DataFrame,
827+
allow_registry_cache: bool,
828+
transform_on_write: bool,
829+
) -> None:
830+
"""
831+
Enqueue a dataframe for offline write, grouped by push source + flags.
832+
Cheap and non-blocking; heavy I/O happens in background thread.
833+
"""
834+
key = _OfflineBatchKey(
835+
push_source_name=push_source_name,
836+
allow_registry_cache=allow_registry_cache,
837+
transform_on_write=transform_on_write,
838+
)
839+
840+
with self._lock:
841+
self._buffers[key].append(df)
842+
total_rows = sum(len(d) for d in self._buffers[key])
843+
844+
# Size-based flush
845+
if total_rows >= self._cfg.batch_size:
846+
logger.debug(
847+
"OfflineWriteBatcher size threshold reached for %s: %s rows",
848+
key,
849+
total_rows,
850+
)
851+
self._flush_locked(key)
852+
853+
def flush_all(self) -> None:
854+
"""
855+
Flush all buffers synchronously. Intended for graceful shutdown.
856+
"""
857+
with self._lock:
858+
keys = list(self._buffers.keys())
859+
for key in keys:
860+
self._flush_locked(key)
861+
862+
def shutdown(self, timeout: float = 5.0) -> None:
863+
"""
864+
Stop the background thread and perform a best-effort flush.
865+
"""
866+
logger.debug("Shutting down OfflineWriteBatcher")
867+
self._stop_event.set()
868+
try:
869+
self._thread.join(timeout=timeout)
870+
except Exception:
871+
logger.exception("Error joining OfflineWriteBatcher thread")
872+
873+
# Best-effort final flush
874+
try:
875+
self.flush_all()
876+
except Exception:
877+
logger.exception("Error during final OfflineWriteBatcher flush")
878+
879+
# ---------- Internal helpers ----------
880+
881+
def _run(self) -> None:
882+
"""
883+
Background loop: periodically checks for buffers that should be flushed
884+
based on time since last flush.
885+
"""
886+
interval = max(1, int(getattr(self._cfg, "batch_interval_seconds", 30)))
887+
logger.debug(
888+
"OfflineWriteBatcher background loop started with check interval=%s",
889+
interval,
890+
)
891+
892+
while not self._stop_event.wait(timeout=interval):
893+
now = time.time()
894+
try:
895+
with self._lock:
896+
for key, dfs in list(self._buffers.items()):
897+
if not dfs:
898+
continue
899+
last = self._last_flush[
900+
key
901+
] # this will also init the default timestamp
902+
age = now - last
903+
if age >= self._cfg.batch_interval_seconds:
904+
logger.debug(
905+
"OfflineWriteBatcher time threshold reached for %s: age=%s",
906+
key,
907+
age,
908+
)
909+
self._flush_locked(key)
910+
except Exception:
911+
logger.exception("Error in OfflineWriteBatcher background loop")
912+
913+
logger.debug("OfflineWriteBatcher background loop exiting")
914+
915+
def _flush_locked(self, key: _OfflineBatchKey) -> None:
916+
"""
917+
Flush a single buffer; caller must hold self._lock.
918+
"""
919+
dfs = self._buffers.get(key)
920+
if not dfs:
921+
return
922+
923+
batch_df = pd.concat(dfs, ignore_index=True)
924+
self._buffers[key].clear()
925+
self._last_flush[key] = time.time()
926+
927+
logger.debug(
928+
"Flushing offline batch for push_source=%s with %s rows",
929+
key.push_source_name,
930+
len(batch_df),
931+
)
932+
933+
# NOTE: offline writes are currently synchronous only, so we call directly
934+
try:
935+
self._store.push(
936+
push_source_name=key.push_source_name,
937+
df=batch_df,
938+
allow_registry_cache=key.allow_registry_cache,
939+
to=PushMode.OFFLINE,
940+
transform_on_write=key.transform_on_write,
941+
)
942+
except Exception:
943+
logger.exception("Error flushing offline batch for %s", key)

0 commit comments

Comments
 (0)