1+ # Copyright 2025 The Feast Authors
2+ #
3+ # Licensed under the Apache License, Version 2.0 (the "License");
4+ # you may not use this file except in compliance with the License.
5+ # You may obtain a copy of the License at
6+ #
7+ # https://www.apache.org/licenses/LICENSE-2.0
8+ #
9+ # Unless required by applicable law or agreed to in writing, software
10+ # distributed under the License is distributed on an "AS IS" BASIS,
11+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+ # See the License for the specific language governing permissions and
13+ # limitations under the License.
14+
115import asyncio
216import os
317import sys
418import threading
519import time
620import traceback
21+ from collections import defaultdict
722from contextlib import asynccontextmanager
823from datetime import datetime
924from importlib import resources as importlib_resources
10- from typing import Any , Dict , List , Optional , Union
25+ from types import SimpleNamespace
26+ from typing import Any , DefaultDict , Dict , List , NamedTuple , Optional , Union
1127
1228import pandas as pd
1329import psutil
@@ -195,6 +211,41 @@ def get_app(
195211 registry_proto = None
196212 shutting_down = False
197213 active_timer : Optional [threading .Timer ] = None
214+ # --- Offline write batching config and batcher ---
215+ fs_cfg = getattr (store .config , "feature_server" , None )
216+ batching_cfg = None
217+ if fs_cfg is not None :
218+ enabled = getattr (fs_cfg , "offline_push_batching_enabled" , False )
219+ batch_size = getattr (fs_cfg , "offline_push_batching_batch_size" , None )
220+ batch_interval_seconds = getattr (
221+ fs_cfg , "offline_push_batching_batch_interval_seconds" , None
222+ )
223+
224+ if enabled is True :
225+ size_ok = isinstance (batch_size , int ) and not isinstance (batch_size , bool )
226+ interval_ok = isinstance (batch_interval_seconds , int ) and not isinstance (
227+ batch_interval_seconds , bool
228+ )
229+ if size_ok and interval_ok :
230+ batching_cfg = SimpleNamespace (
231+ enabled = True ,
232+ batch_size = batch_size ,
233+ batch_interval_seconds = batch_interval_seconds ,
234+ )
235+ else :
236+ logger .warning (
237+ "Offline write batching enabled but missing or invalid numeric values; "
238+ "disabling batching (batch_size=%r, batch_interval_seconds=%r)" ,
239+ batch_size ,
240+ batch_interval_seconds ,
241+ )
242+
243+ offline_batcher : Optional [OfflineWriteBatcher ] = None
244+ if batching_cfg is not None and batching_cfg .enabled is True :
245+ offline_batcher = OfflineWriteBatcher (store = store , cfg = batching_cfg )
246+ logger .debug ("Offline write batching is ENABLED" )
247+ else :
248+ logger .debug ("Offline write batching is DISABLED" )
198249
199250 def stop_refresh ():
200251 nonlocal shutting_down
@@ -219,9 +270,13 @@ def async_refresh():
219270 async def lifespan (app : FastAPI ):
220271 await store .initialize ()
221272 async_refresh ()
222- yield
223- stop_refresh ()
224- await store .close ()
273+ try :
274+ yield
275+ finally :
276+ stop_refresh ()
277+ if offline_batcher is not None :
278+ offline_batcher .shutdown ()
279+ await store .close ()
225280
226281 app = FastAPI (lifespan = lifespan )
227282
@@ -326,22 +381,58 @@ async def push(request: PushFeaturesRequest) -> None:
326381 for feature_view in fvs_with_push_sources :
327382 assert_permissions (resource = feature_view , actions = actions )
328383
329- push_params = dict (
330- push_source_name = request .push_source_name ,
331- df = df ,
332- allow_registry_cache = request .allow_registry_cache ,
333- to = to ,
334- transform_on_write = request .transform_on_write ,
335- )
384+ async def _push_with_to (push_to : PushMode ) -> None :
385+ """
386+ Helper for performing a single push operation.
387+
388+ NOTE:
389+ - Feast providers **do not currently support async offline writes**.
390+ - Therefore:
391+ * ONLINE and ONLINE_AND_OFFLINE → may be async, depending on provider.async_supported.online.write
392+ * OFFLINE → always synchronous, but executed via run_in_threadpool when called from HTTP handlers.
393+ - The OfflineWriteBatcher handles offline writes directly in its own background thread, but the offline store writes are currently synchronous only.
394+ """
395+ push_source_name = request .push_source_name
396+ allow_registry_cache = request .allow_registry_cache
397+ transform_on_write = request .transform_on_write
398+
399+ # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store
400+ if push_to in (PushMode .ONLINE , PushMode .ONLINE_AND_OFFLINE ) and (
401+ store ._get_provider ().async_supported .online .write
402+ ):
403+ await store .push_async (
404+ push_source_name = push_source_name ,
405+ df = df ,
406+ allow_registry_cache = allow_registry_cache ,
407+ to = push_to ,
408+ transform_on_write = transform_on_write ,
409+ )
410+ else :
411+ await run_in_threadpool (
412+ lambda : store .push (
413+ push_source_name = push_source_name ,
414+ df = df ,
415+ allow_registry_cache = allow_registry_cache ,
416+ to = push_to ,
417+ transform_on_write = transform_on_write ,
418+ )
419+ )
336420
337- should_push_async = (
338- store ._get_provider ().async_supported .online .write
339- and to in [PushMode .ONLINE , PushMode .ONLINE_AND_OFFLINE ]
340- )
341- if should_push_async :
342- await store .push_async (** push_params )
421+ needs_online = to in (PushMode .ONLINE , PushMode .ONLINE_AND_OFFLINE )
422+ needs_offline = to in (PushMode .OFFLINE , PushMode .ONLINE_AND_OFFLINE )
423+
424+ if offline_batcher is None or not needs_offline :
425+ await _push_with_to (to )
343426 else :
344- store .push (** push_params )
427+ if needs_online :
428+ await _push_with_to (PushMode .ONLINE )
429+
430+ offline_batcher .enqueue (
431+ push_source_name = request .push_source_name ,
432+ df = df ,
433+ allow_registry_cache = request .allow_registry_cache ,
434+ transform_on_write = request .transform_on_write ,
435+ )
345436
346437 async def _get_feast_object (
347438 feature_view_name : str , allow_registry_cache : bool
@@ -683,3 +774,170 @@ def start_server(
683774 )
684775 else :
685776 uvicorn .run (app , host = host , port = port , access_log = (not no_access_log ))
777+
778+
779+ class _OfflineBatchKey (NamedTuple ):
780+ push_source_name : str
781+ allow_registry_cache : bool
782+ transform_on_write : bool
783+
784+
785+ class OfflineWriteBatcher :
786+ """
787+ In-process offline write batcher for /push requests.
788+
789+ - Buffers DataFrames per (push_source_name, allow_registry_cache, transform_on_write)
790+ - Flushes when either:
791+ * total rows in a buffer >= batch_size, or
792+ * time since last flush >= batch_interval_seconds
793+ - Flush runs in a dedicated background thread so the HTTP event loop stays unblocked.
794+ """
795+
796+ def __init__ (self , store : "feast.FeatureStore" , cfg : Any ):
797+ self ._store = store
798+ self ._cfg = cfg
799+
800+ # Buffers and timestamps keyed by batch key
801+ self ._buffers : DefaultDict [_OfflineBatchKey , List [pd .DataFrame ]] = defaultdict (
802+ list
803+ )
804+ self ._last_flush : DefaultDict [_OfflineBatchKey , float ] = defaultdict (time .time )
805+
806+ self ._lock = threading .Lock ()
807+ self ._stop_event = threading .Event ()
808+
809+ # Start background flusher thread
810+ self ._thread = threading .Thread (
811+ target = self ._run , name = "offline_write_batcher" , daemon = True
812+ )
813+ self ._thread .start ()
814+
815+ logger .debug (
816+ "OfflineWriteBatcher initialized: batch_size=%s, batch_interval_seconds=%s" ,
817+ getattr (cfg , "batch_size" , None ),
818+ getattr (cfg , "batch_interval_seconds" , None ),
819+ )
820+
821+ # ---------- Public API ----------
822+
823+ def enqueue (
824+ self ,
825+ push_source_name : str ,
826+ df : pd .DataFrame ,
827+ allow_registry_cache : bool ,
828+ transform_on_write : bool ,
829+ ) -> None :
830+ """
831+ Enqueue a dataframe for offline write, grouped by push source + flags.
832+ Cheap and non-blocking; heavy I/O happens in background thread.
833+ """
834+ key = _OfflineBatchKey (
835+ push_source_name = push_source_name ,
836+ allow_registry_cache = allow_registry_cache ,
837+ transform_on_write = transform_on_write ,
838+ )
839+
840+ with self ._lock :
841+ self ._buffers [key ].append (df )
842+ total_rows = sum (len (d ) for d in self ._buffers [key ])
843+
844+ # Size-based flush
845+ if total_rows >= self ._cfg .batch_size :
846+ logger .debug (
847+ "OfflineWriteBatcher size threshold reached for %s: %s rows" ,
848+ key ,
849+ total_rows ,
850+ )
851+ self ._flush_locked (key )
852+
853+ def flush_all (self ) -> None :
854+ """
855+ Flush all buffers synchronously. Intended for graceful shutdown.
856+ """
857+ with self ._lock :
858+ keys = list (self ._buffers .keys ())
859+ for key in keys :
860+ self ._flush_locked (key )
861+
862+ def shutdown (self , timeout : float = 5.0 ) -> None :
863+ """
864+ Stop the background thread and perform a best-effort flush.
865+ """
866+ logger .debug ("Shutting down OfflineWriteBatcher" )
867+ self ._stop_event .set ()
868+ try :
869+ self ._thread .join (timeout = timeout )
870+ except Exception :
871+ logger .exception ("Error joining OfflineWriteBatcher thread" )
872+
873+ # Best-effort final flush
874+ try :
875+ self .flush_all ()
876+ except Exception :
877+ logger .exception ("Error during final OfflineWriteBatcher flush" )
878+
879+ # ---------- Internal helpers ----------
880+
881+ def _run (self ) -> None :
882+ """
883+ Background loop: periodically checks for buffers that should be flushed
884+ based on time since last flush.
885+ """
886+ interval = max (1 , int (getattr (self ._cfg , "batch_interval_seconds" , 30 )))
887+ logger .debug (
888+ "OfflineWriteBatcher background loop started with check interval=%s" ,
889+ interval ,
890+ )
891+
892+ while not self ._stop_event .wait (timeout = interval ):
893+ now = time .time ()
894+ try :
895+ with self ._lock :
896+ for key , dfs in list (self ._buffers .items ()):
897+ if not dfs :
898+ continue
899+ last = self ._last_flush [
900+ key
901+ ] # this will also init the default timestamp
902+ age = now - last
903+ if age >= self ._cfg .batch_interval_seconds :
904+ logger .debug (
905+ "OfflineWriteBatcher time threshold reached for %s: age=%s" ,
906+ key ,
907+ age ,
908+ )
909+ self ._flush_locked (key )
910+ except Exception :
911+ logger .exception ("Error in OfflineWriteBatcher background loop" )
912+
913+ logger .debug ("OfflineWriteBatcher background loop exiting" )
914+
915+ def _flush_locked (self , key : _OfflineBatchKey ) -> None :
916+ """
917+ Flush a single buffer; caller must hold self._lock.
918+ """
919+ dfs = self ._buffers .get (key )
920+ if not dfs :
921+ return
922+
923+ batch_df = pd .concat (dfs , ignore_index = True )
924+ self ._buffers [key ].clear ()
925+ self ._last_flush [key ] = time .time ()
926+
927+ logger .debug (
928+ "Flushing offline batch for push_source=%s with %s rows" ,
929+ key .push_source_name ,
930+ len (batch_df ),
931+ )
932+
933+ # NOTE: offline writes are currently synchronous only, so we call directly
934+ try :
935+ self ._store .push (
936+ push_source_name = key .push_source_name ,
937+ df = batch_df ,
938+ allow_registry_cache = key .allow_registry_cache ,
939+ to = PushMode .OFFLINE ,
940+ transform_on_write = key .transform_on_write ,
941+ )
942+ except Exception :
943+ logger .exception ("Error flushing offline batch for %s" , key )
0 commit comments