From 9a97f052169162356b4e84b2c6e7516c72532b7c Mon Sep 17 00:00:00 2001 From: Swanand Mulay <73115739+swanandx@users.noreply.github.com> Date: Fri, 1 May 2026 14:44:54 +0530 Subject: [PATCH] [connectors] configure storage for datafusion instances used by connectors Delta Lake and Iceberg connectors built bare SessionContexts with no memory pool or spill path, so large scans (e.g. ORDER BY in Delta CDC) could OOM the pipeline. Build one Arc per pipeline with a FairSpillPool and a spill directory at {storage.path}/datafusion-tmp/, shared by the ad-hoc engine and every DataFusion-using connector. Add RuntimeConfig.datafusion_memory_mb (default: 5% of the effective memory budget, capped at 2 GB) and subtract it from the DBSP circuit's RSS limit so the two no longer double-book RAM. Rename ADHOC_TEMP_DIR -> DATAFUSION_TEMP_DIR ("datafusion-tmp") so checkpointer::gc_startup keeps the new directory. Stale adhoc-tmp/ from prior releases is removed by the existing GC sweep. Expose create_runtime_env / create_session_context[_with] in feldera_adapterlib::utils::datafusion; the _with variant preserves Delta's schema_force_view_types override. Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com> --- crates/adapterlib/src/errors/controller.rs | 18 + crates/adapterlib/src/utils/datafusion.rs | 382 +++++++++++++++++- crates/adapters/src/adhoc.rs | 55 +-- crates/adapters/src/controller.rs | 43 +- crates/adapters/src/integrated.rs | 31 +- .../src/integrated/delta_table/input.rs | 139 ++++--- crates/dbsp/src/circuit/checkpointer.rs | 4 +- crates/feldera-types/src/config.rs | 124 +++++- crates/feldera-types/src/constants.rs | 11 +- crates/iceberg/src/input.rs | 16 +- crates/pipeline-manager/src/api/examples.rs | 1 + crates/pipeline-manager/src/db/test.rs | 3 + openapi.json | 26 ++ 13 files changed, 727 insertions(+), 126 deletions(-) diff --git a/crates/adapterlib/src/errors/controller.rs b/crates/adapterlib/src/errors/controller.rs index a121f57e527..b94a79df9c6 100644 --- a/crates/adapterlib/src/errors/controller.rs +++ b/crates/adapterlib/src/errors/controller.rs @@ -168,6 +168,14 @@ pub enum ConfigError { FtRequiresStorage, FtRequiresFtInput, + /// `datafusion_memory_mb` was set to a value greater than or equal to + /// the pipeline's effective memory budget, which would leave no memory + /// for the DBSP circuit. + DatafusionMemoryExceedsBudget { + datafusion_memory_mb: u64, + max_rss_mb: u64, + }, + InvalidLayout(LayoutError), } @@ -203,6 +211,9 @@ impl DbspDetailedError for ConfigError { Self::FtRequiresFtInput => Cow::from("FtWithNonFtInput"), Self::CyclicDependency { .. } => Cow::from("CyclicDependency"), Self::EmptyStartAfter { .. } => Cow::from("EmptyStartAfter"), + Self::DatafusionMemoryExceedsBudget { .. } => { + Cow::from("DatafusionMemoryExceedsBudget") + } Self::InvalidLayout(_) => Cow::from("LayoutError"), } } @@ -417,6 +428,13 @@ impl Display for ConfigError { f, "Fault tolerance is configured, but it cannot be enabled because the pipeline has at least one non-fault-tolerant input adapter" ), + Self::DatafusionMemoryExceedsBudget { + datafusion_memory_mb, + max_rss_mb, + } => write!( + f, + "'datafusion_memory_mb' ({datafusion_memory_mb} MB) must be less than the pipeline's memory budget ({max_rss_mb} MB); the difference is the budget available to the DBSP circuit" + ), Self::InvalidLayout(e) => write!(f, "Multihost layout error: {e}"), } } diff --git a/crates/adapterlib/src/utils/datafusion.rs b/crates/adapterlib/src/utils/datafusion.rs index 7180af51b9e..a40f750eee4 100644 --- a/crates/adapterlib/src/utils/datafusion.rs +++ b/crates/adapterlib/src/utils/datafusion.rs @@ -1,12 +1,149 @@ use crate::errors::journal::ControllerError; use anyhow::{Error as AnyError, anyhow}; use arrow::array::Array; +use datafusion::common::ScalarValue; use datafusion::common::arrow::array::{AsArray, RecordBatch}; +use datafusion::execution::SessionStateBuilder; +use datafusion::execution::memory_pool::FairSpillPool; +use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use datafusion::logical_expr::sqlparser::parser::ParserError; -use datafusion::prelude::{SQLOptions, SessionContext}; +use datafusion::prelude::{SQLOptions, SessionConfig, SessionContext}; use datafusion::sql::sqlparser::dialect::GenericDialect; use datafusion::sql::sqlparser::parser::Parser; +use feldera_types::config::PipelineConfig; +use feldera_types::constants::DATAFUSION_TEMP_DIR; use feldera_types::program_schema::{ColumnType, Field, Relation, SqlType}; +use std::fs::{create_dir_all, read_dir, remove_dir_all, remove_file}; +use std::io::Error as IoError; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tracing::warn; + +/// In-memory sort threshold; above this, sorts spill to disk. +const SORT_IN_PLACE_THRESHOLD_BYTES: usize = 64 * 1024 * 1024; + +/// Memory withheld from the sort phase for the merge phase to use. +const SORT_SPILL_RESERVATION_BYTES: usize = 64 * 1024 * 1024; + +/// Build the shared datafusion [`RuntimeEnv`] for a pipeline. +/// +/// Build once per pipeline and share the `Arc` across every +/// `SessionContext`. A separate `RuntimeEnv` per context would give each its +/// own pool, multiplying the effective memory budget by `(1 + #connectors)`. +pub fn create_runtime_env( + pipeline_config: &PipelineConfig, +) -> Result, ControllerError> { + let mut builder = RuntimeEnvBuilder::new(); + if let Some(datafusion_memory_mb) = pipeline_config.global.resolved_datafusion_memory_mb() { + let memory_bytes_max = datafusion_memory_mb * 1_000_000; + builder = builder.with_memory_pool(Arc::new(FairSpillPool::new(memory_bytes_max as usize))); + } + if let Some(storage) = &pipeline_config.storage_config { + let path = PathBuf::from(storage.path.clone()).join(DATAFUSION_TEMP_DIR); + create_dir_all(&path).map_err(|error| { + ControllerError::io_error( + format!( + "unable to create datafusion scratch space directory '{}'", + path.display() + ), + error, + ) + })?; + clean_stale_scratch_entries(&path); + builder = builder.with_temp_file_path(path); + } + builder.build_arc().map_err(|error| { + ControllerError::io_error( + "unable to build datafusion runtime environment", + IoError::other(error.to_string()), + ) + }) +} + +/// Remove leftovers from a previous process inside the scratch directory. +/// +/// DataFusion's `DiskManager` leaks its `datafusion-XXXXXX/` subdir if the +/// process is killed before `tempfile::TempDir::drop` runs. The previous +/// process is gone by the time we get here, so anything still in the dir is +/// orphaned. Spill files are per-query and never need to survive a restart. +/// Errors only logged: a stuck file should not block startup. +fn clean_stale_scratch_entries(scratch_dir: &Path) { + let entries = match read_dir(scratch_dir) { + Ok(entries) => entries, + Err(error) => { + warn!( + "unable to read datafusion scratch directory '{}' for startup cleanup: {error}", + scratch_dir.display(), + ); + return; + } + }; + for entry in entries.flatten() { + let path = entry.path(); + let file_type = match entry.file_type() { + Ok(ft) => ft, + Err(error) => { + warn!( + "unable to stat stale datafusion scratch entry '{}': {error}", + path.display(), + ); + continue; + } + }; + let result = if file_type.is_dir() { + remove_dir_all(&path) + } else { + remove_file(&path) + }; + if let Err(error) = result { + warn!( + "unable to remove stale datafusion scratch entry '{}': {error}", + path.display(), + ); + } + } +} + +/// `SessionContext` bound to the shared [`RuntimeEnv`], configured with the +/// pipeline's worker count and feldera's sort-spill thresholds. +pub fn create_session_context( + pipeline_config: &PipelineConfig, + runtime_env: Arc, +) -> SessionContext { + create_session_context_with(pipeline_config, runtime_env, |cfg| cfg) +} + +/// Like [`create_session_context`], with a hook to override individual +/// datafusion settings (e.g. parquet decoding) before the context is built. +pub fn create_session_context_with( + pipeline_config: &PipelineConfig, + runtime_env: Arc, + customize_config: F, +) -> SessionContext +where + F: FnOnce(SessionConfig) -> SessionConfig, +{ + let workers = pipeline_config + .global + .io_workers + .unwrap_or(pipeline_config.global.workers as u64); + let session_config = SessionConfig::new() + .with_target_partitions(workers as usize) + .with_sort_in_place_threshold_bytes(SORT_IN_PLACE_THRESHOLD_BYTES) + .with_sort_spill_reservation_bytes(SORT_SPILL_RESERVATION_BYTES) + .set( + "datafusion.execution.planning_concurrency", + &ScalarValue::UInt64(Some(workers)), + ); + let session_config = customize_config(session_config); + + let state = SessionStateBuilder::new() + .with_config(session_config) + .with_runtime_env(runtime_env) + .with_default_features() + .build(); + SessionContext::from(state) +} /// Execute a SQL query and collect all results in a vector of `RecordBatch`'s. pub async fn execute_query_collect( @@ -172,3 +309,246 @@ pub async fn validate_timestamp_column( Ok(()) } + +#[cfg(test)] +mod tests { + use super::{create_runtime_env, create_session_context}; + use datafusion::execution::memory_pool::MemoryLimit; + use feldera_types::config::{PipelineConfig, ResourceConfig, RuntimeConfig, StorageConfig}; + use feldera_types::constants::DATAFUSION_TEMP_DIR; + use std::fs; + use std::path::{Path, PathBuf}; + + /// Drop guard so a failing test does not leak temp directories. + struct TempStorage { + path: PathBuf, + } + + impl TempStorage { + fn new(name: &str) -> Self { + let path = std::env::temp_dir().join(name); + let _ = fs::remove_dir_all(&path); + fs::create_dir_all(&path).unwrap(); + Self { path } + } + + fn path(&self) -> &Path { + &self.path + } + } + + impl Drop for TempStorage { + fn drop(&mut self) { + let _ = fs::remove_dir_all(&self.path); + } + } + + fn pipeline_config(global: RuntimeConfig, storage: Option<&Path>) -> PipelineConfig { + PipelineConfig { + global, + multihost: None, + name: None, + given_name: None, + storage_config: storage.map(|p| StorageConfig { + path: p.to_string_lossy().into(), + cache: Default::default(), + }), + secrets_dir: None, + inputs: Default::default(), + outputs: Default::default(), + program_ir: None, + } + } + + #[test] + fn create_runtime_env_creates_tmp_dir_under_storage() { + let storage = TempStorage::new("feldera-datafusion-create-runtime-env-tmp-dir-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + ..Default::default() + }, + Some(storage.path()), + ); + + create_runtime_env(&cfg).unwrap(); + + let expected = storage.path().join(DATAFUSION_TEMP_DIR); + assert!( + expected.is_dir(), + "expected scratch directory at {}", + expected.display(), + ); + } + + /// Must match the value `checkpointer::gc_startup` allowlists, or the + /// scratch dir is wiped on every restart. + #[test] + fn scratch_dir_name_matches_gc_allowlist_constant() { + assert_eq!(DATAFUSION_TEMP_DIR, "datafusion-tmp"); + } + + #[test] + fn create_runtime_env_without_storage_succeeds() { + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + ..Default::default() + }, + None, + ); + create_runtime_env(&cfg).unwrap(); + } + + #[test] + fn create_runtime_env_applies_memory_pool_when_budget_set() { + // 5% of 16 GB = 800 MB; below the 2 GB ceiling. + let storage = TempStorage::new("feldera-datafusion-create-runtime-env-pool-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + max_rss_mb: Some(16_000), + ..Default::default() + }, + Some(storage.path()), + ); + + let env = create_runtime_env(&cfg).unwrap(); + match env.memory_pool.memory_limit() { + MemoryLimit::Finite(bytes) => assert_eq!(bytes, 800 * 1_000_000), + MemoryLimit::Infinite => panic!("expected a bounded memory pool, got Infinite"), + MemoryLimit::Unknown => panic!("expected a bounded memory pool, got Unknown"), + } + } + + #[test] + fn create_runtime_env_no_memory_limit_when_budget_unset() { + let storage = TempStorage::new("feldera-datafusion-create-runtime-env-unbounded-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + ..Default::default() + }, + Some(storage.path()), + ); + + let env = create_runtime_env(&cfg).unwrap(); + // Anything other than `Finite(_)` proves no FairSpillPool was wired in. + match env.memory_pool.memory_limit() { + MemoryLimit::Finite(bytes) => { + panic!("expected an unbounded pool, got finite limit of {bytes} bytes"); + } + _ => {} + } + } + + #[test] + fn create_runtime_env_uses_resources_memory_mb_max_fallback() { + let storage = TempStorage::new("feldera-datafusion-create-runtime-env-resources-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + max_rss_mb: None, + resources: ResourceConfig { + memory_mb_max: Some(16_000), + ..Default::default() + }, + ..Default::default() + }, + Some(storage.path()), + ); + + let env = create_runtime_env(&cfg).unwrap(); + match env.memory_pool.memory_limit() { + MemoryLimit::Finite(bytes) => assert_eq!(bytes, 800 * 1_000_000), + MemoryLimit::Infinite => panic!("expected a bounded memory pool, got Infinite"), + MemoryLimit::Unknown => panic!("expected a bounded memory pool, got Unknown"), + } + } + + #[test] + fn create_runtime_env_wipes_stale_scratch_entries() { + let storage = TempStorage::new("feldera-datafusion-create-runtime-env-wipe-test"); + let scratch = storage.path().join(DATAFUSION_TEMP_DIR); + fs::create_dir_all(&scratch).unwrap(); + + // Simulate leftovers from a prior crashed process. + let stale_subdir = scratch.join("datafusion-stale1"); + fs::create_dir_all(&stale_subdir).unwrap(); + fs::write(stale_subdir.join("orphan.arrow"), b"stale").unwrap(); + let stale_file = scratch.join("loose.tmp"); + fs::write(&stale_file, b"stale").unwrap(); + + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + ..Default::default() + }, + Some(storage.path()), + ); + create_runtime_env(&cfg).unwrap(); + + assert!( + scratch.is_dir(), + "scratch root must survive cleanup; gc_startup keeps it on the allowlist", + ); + assert!( + !stale_subdir.exists(), + "stale per-DiskManager subdir should be removed on startup", + ); + assert!( + !stale_file.exists(), + "stale loose file should be removed on startup", + ); + } + + #[test] + fn create_session_context_target_partitions_match_workers() { + let storage = TempStorage::new("feldera-datafusion-create-session-context-workers-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 7, + ..Default::default() + }, + Some(storage.path()), + ); + let env = create_runtime_env(&cfg).unwrap(); + let ctx = create_session_context(&cfg, env); + assert_eq!(ctx.copied_config().target_partitions(), 7); + } + + #[test] + fn create_session_context_target_partitions_prefer_io_workers() { + let storage = TempStorage::new("feldera-datafusion-create-session-context-io-workers-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 4, + io_workers: Some(12), + ..Default::default() + }, + Some(storage.path()), + ); + let env = create_runtime_env(&cfg).unwrap(); + let ctx = create_session_context(&cfg, env); + assert_eq!(ctx.copied_config().target_partitions(), 12); + } + + #[test] + fn create_session_context_with_customise_overrides_defaults() { + use super::create_session_context_with; + let storage = TempStorage::new("feldera-datafusion-create-session-context-override-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 4, + ..Default::default() + }, + Some(storage.path()), + ); + let env = create_runtime_env(&cfg).unwrap(); + // Customise hook must win over the worker-derived defaults. + let ctx = create_session_context_with(&cfg, env, |c| { + c.set_usize("datafusion.execution.target_partitions", 99) + }); + assert_eq!(ctx.copied_config().target_partitions(), 99); + } +} diff --git a/crates/adapters/src/adhoc.rs b/crates/adapters/src/adhoc.rs index 8e71be879f7..026fe8c0f36 100644 --- a/crates/adapters/src/adhoc.rs +++ b/crates/adapters/src/adhoc.rs @@ -3,10 +3,8 @@ use crate::{Controller, PipelineError}; use actix_web::{HttpRequest, HttpResponse, http::header, web::Payload}; use actix_ws::{AggregatedMessage, CloseCode, CloseReason, Closed, Session as WsSession}; use datafusion::common::metadata::ScalarAndMetadata; -use datafusion::common::{DFSchema, ParamValues, ScalarValue}; -use datafusion::execution::memory_pool::FairSpillPool; -use datafusion::execution::runtime_env::RuntimeEnvBuilder; -use datafusion::execution::{SessionState, SessionStateBuilder}; +use datafusion::common::{DFSchema, ParamValues}; +use datafusion::execution::SessionState; use datafusion::logical_expr::{EmptyRelation, Execute, LogicalPlan, Prepare, Statement}; use datafusion::prelude::*; use datafusion::sql::parser::{DFParserBuilder, Statement as DFStatement}; @@ -15,15 +13,11 @@ use executor::{ hash_query_result, infallible_from_bytestring, stream_arrow_query, stream_json_query, stream_parquet_query, stream_text_query, }; -use feldera_adapterlib::errors::journal::ControllerError; -use feldera_types::config::PipelineConfig; use feldera_types::query::{AdHocResultFormat, AdhocQueryArgs, MAX_WS_FRAME_SIZE}; use futures_util::StreamExt; use serde_json::json; use std::collections::{HashMap, VecDeque}; use std::convert::Infallible; -use std::fs::create_dir_all; -use std::path::PathBuf; use std::sync::Arc; use tracing::warn; @@ -31,49 +25,6 @@ mod executor; mod format; pub(crate) mod table; -pub(crate) fn create_session_context( - config: &PipelineConfig, -) -> Result { - const SORT_IN_PLACE_THRESHOLD_BYTES: usize = 64 * 1024 * 1024; - const SORT_SPILL_RESERVATION_BYTES: usize = 64 * 1024 * 1024; - let session_config = SessionConfig::new() - .with_target_partitions(config.global.workers as usize) - .with_sort_in_place_threshold_bytes(SORT_IN_PLACE_THRESHOLD_BYTES) - .with_sort_spill_reservation_bytes(SORT_SPILL_RESERVATION_BYTES) - .set( - "datafusion.execution.planning_concurrency", - &ScalarValue::UInt64(Some(config.global.workers as u64)), - ); - // Initialize datafusion memory limits - let mut runtime_env_builder = RuntimeEnvBuilder::new(); - if let Some(memory_mb_max) = config.global.resources.memory_mb_max { - let memory_bytes_max = memory_mb_max * 1_000_000; - runtime_env_builder = runtime_env_builder - .with_memory_pool(Arc::new(FairSpillPool::new(memory_bytes_max as usize))); - } - // Initialize datafusion spill-to-disk directory - if let Some(storage) = &config.storage_config { - let path = PathBuf::from(storage.path.clone()).join("adhoc-tmp"); - if !path.exists() { - create_dir_all(&path).map_err(|error| { - ControllerError::io_error( - "unable to create ad-hoc scratch space directory during startup", - error, - ) - })?; - } - runtime_env_builder = runtime_env_builder.with_temp_file_path(path); - } - - let runtime_env = runtime_env_builder.build_arc().unwrap(); - let state = SessionStateBuilder::new() - .with_config(session_config) - .with_runtime_env(runtime_env) - .with_default_features() - .build(); - Ok(SessionContext::from(state)) -} - /// Helper for for closing the websocket session /// /// Note that adding a `description` to the `CloseReason` is currently @@ -462,6 +413,8 @@ mod tests { use super::*; use datafusion::arrow; use datafusion::arrow::record_batch::RecordBatch; + use datafusion::common::ScalarValue; + use datafusion::execution::SessionStateBuilder; fn test_state() -> SessionState { SessionStateBuilder::new().with_default_features().build() diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 231a7413d76..7cbdefdafb9 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -71,6 +71,7 @@ use enum_map::EnumMap; use feldera_adapterlib::format::BufferSize; use feldera_adapterlib::metrics::{ConnectorMetrics, ValueType}; use feldera_adapterlib::transport::{CommandHandler, InputReader, Resume, Watermark}; +use feldera_adapterlib::utils::datafusion::{create_runtime_env, create_session_context}; use feldera_ir::LirCircuit; use feldera_samply::{AnnotationOptions, CaptureOptions, Span}; use feldera_storage::fbuf::slab::FBufSlabsStats; @@ -146,8 +147,8 @@ mod stats; mod sync; mod validate; +use crate::adhoc::execute_sql; use crate::adhoc::table::AdHocTable; -use crate::adhoc::{create_session_context, execute_sql}; use crate::catalog::{SerBatchReader, SerTrace}; use crate::format::parquet::relation_to_arrow_fields; use crate::format::{MessageOrientedPreprocessedParser, StreamingPreprocessedParser}; @@ -4476,6 +4477,7 @@ impl ControllerInit { hosts: checkpoint_config.global.hosts, workers: checkpoint_config.global.workers, max_rss_mb: checkpoint_config.global.max_rss_mb, + datafusion_memory_mb: checkpoint_config.global.datafusion_memory_mb, // The checkpoint determines the fault tolerance model, but the // pipeline manager can override the details of the @@ -4599,10 +4601,37 @@ Using the Kubernetes limit as the RSS memory limit." ); } + // Carve out a slice of the pipeline's memory budget for DataFusion's + // memory pool (see `RuntimeConfig::datafusion_memory_mb`) and pass + // only the remainder to the DBSP circuit. Without this, the circuit + // and DataFusion would each independently grow to the full budget, + // which would double-book RAM. + let datafusion_memory_mb = pipeline_config.global.resolved_datafusion_memory_mb(); + let circuit_max_rss_mb = match (max_rss_mb, datafusion_memory_mb) { + (Some(rss), Some(df)) => { + if df >= rss { + return Err(ControllerError::Config { + config_error: Box::new(ConfigError::DatafusionMemoryExceedsBudget { + datafusion_memory_mb: df, + max_rss_mb: rss, + }), + }); + } + let remainder = rss - df; + info!( + "memory budget split: circuit RSS {remainder} MB, datafusion {df} MB \ +(total {rss} MB)" + ); + Some(remainder) + } + (Some(rss), None) => Some(rss), + (None, _) => None, + }; + let layout = layout.unwrap_or_else(|| Layout::new_solo(pipeline_config.global.workers as usize)); Ok(CircuitConfig::from(layout) - .with_max_rss_bytes(max_rss_mb.map(|mb| mb * 1_000_000)) + .with_max_rss_bytes(circuit_max_rss_mb.map(|mb| mb * 1_000_000)) .with_pin_cpus(pipeline_config.global.pin_cpus.clone()) .with_storage(storage) .with_mode(Mode::Persistent) @@ -5597,6 +5626,10 @@ pub struct ControllerInner { backpressure_thread_unparker: Unparker, error_cb: Box, Option) + Send + Sync>, session_ctxt: SessionContext, + /// Shared datafusion runtime environment. Owns the pipeline-wide + /// `FairSpillPool` and spill-to-disk path so that every `SessionContext` + /// (ad-hoc + integrated connectors) draws from a single bounded budget. + datafusion_runtime_env: Arc, adhoc_tables: HashMap>, fault_tolerance: Option, step_receiver: tokio::sync::watch::Receiver, @@ -5669,7 +5702,8 @@ impl ControllerInner { let (command_sender, command_receiver) = channel(); let (transaction_sender, transaction_receiver) = tokio::sync::watch::channel(TransactionCoordination::default()); - let session_ctxt = create_session_context(&config)?; + let datafusion_runtime_env = create_runtime_env(&config)?; + let session_ctxt = create_session_context(&config, datafusion_runtime_env.clone()); let controller = Arc::new_cyclic(|weak| { let adhoc_tables = Self::initialize_adhoc_queries(&session_ctxt, &*catalog, weak); Self { @@ -5691,6 +5725,7 @@ impl ControllerInner { backpressure_thread_unparker: backpressure_thread_parker.unparker().clone(), error_cb, session_ctxt, + datafusion_runtime_env, adhoc_tables, fault_tolerance: config.global.fault_tolerance.model, transaction_info: Mutex::new(TransactionInfo::new( @@ -6101,6 +6136,8 @@ impl ControllerInner { let endpoint = create_integrated_input_endpoint( endpoint_name, &resolved_connector_config, + &self.status.pipeline_config, + self.datafusion_runtime_env.clone(), probe, )?; diff --git a/crates/adapters/src/integrated.rs b/crates/adapters/src/integrated.rs index 7ea87c6be76..6e57e993180 100644 --- a/crates/adapters/src/integrated.rs +++ b/crates/adapters/src/integrated.rs @@ -1,9 +1,10 @@ use crate::controller::{ControllerInner, EndpointId}; use crate::transport::IntegratedInputEndpoint; use crate::{ControllerError, Encoder, InputConsumer, OutputEndpoint}; -use feldera_types::config::{ConnectorConfig, TransportConfig}; +use datafusion::execution::runtime_env::RuntimeEnv; +use feldera_types::config::{ConnectorConfig, PipelineConfig, TransportConfig}; use feldera_types::program_schema::Relation; -use std::sync::Weak; +use std::sync::{Arc, Weak}; #[cfg(feature = "with-deltalake")] pub mod delta_table; @@ -90,17 +91,31 @@ pub fn create_integrated_output_endpoint( pub fn create_integrated_input_endpoint( endpoint_name: &str, config: &ConnectorConfig, + pipeline_config: &PipelineConfig, + runtime_env: Arc, consumer: Box, ) -> Result, ControllerError> { let ep: Box = match &config.transport { #[cfg(feature = "with-deltalake")] - TransportConfig::DeltaTableInput(config) => Box::new( - delta_table::DeltaTableInputEndpoint::new(endpoint_name, config, consumer), - ), + TransportConfig::DeltaTableInput(config) => { + Box::new(delta_table::DeltaTableInputEndpoint::new( + endpoint_name, + config, + pipeline_config, + runtime_env, + consumer, + )) + } #[cfg(feature = "with-iceberg")] - TransportConfig::IcebergInput(config) => Box::new( - feldera_iceberg::IcebergInputEndpoint::new(endpoint_name, config, consumer), - ), + TransportConfig::IcebergInput(config) => { + Box::new(feldera_iceberg::IcebergInputEndpoint::new( + endpoint_name, + config, + pipeline_config, + runtime_env, + consumer, + )) + } TransportConfig::PostgresInput(config) => { Box::new(PostgresInputEndpoint::new(endpoint_name, config, consumer)) } diff --git a/crates/adapters/src/integrated/delta_table/input.rs b/crates/adapters/src/integrated/delta_table/input.rs index 31eecfdd6ed..2f7aeec7346 100644 --- a/crates/adapters/src/integrated/delta_table/input.rs +++ b/crates/adapters/src/integrated/delta_table/input.rs @@ -16,7 +16,6 @@ use datafusion::datasource::listing::{ use datafusion::logical_expr::LogicalPlan; use datafusion::physical_plan::{PhysicalExpr, displayable}; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; -use datafusion::prelude::SessionConfig; use dbsp::circuit::tokio::TOKIO; use deltalake::datafusion::dataframe::DataFrame; use deltalake::datafusion::execution::context::SQLOptions; @@ -29,12 +28,12 @@ use feldera_adapterlib::format::{ParseError, StagedInputBuffer}; use feldera_adapterlib::metrics::{ConnectorMetrics, ValueType}; use feldera_adapterlib::transport::{InputQueueEntry, Resume, Watermark, parse_resume_info}; use feldera_adapterlib::utils::datafusion::{ - array_to_string, execute_query_collect, execute_singleton_query, timestamp_to_sql_expression, - validate_sql_expression, validate_timestamp_column, + array_to_string, create_session_context_with, execute_query_collect, execute_singleton_query, + timestamp_to_sql_expression, validate_sql_expression, validate_timestamp_column, }; use feldera_storage::tokio::TOKIO_DEDICATED_IO; use feldera_types::adapter_stats::ConnectorHealth; -use feldera_types::config::FtModel; +use feldera_types::config::{FtModel, PipelineConfig}; use feldera_types::program_schema::Relation; use feldera_types::transport::delta_table::{DeltaTableReaderConfig, DeltaTableTransactionMode}; use futures_util::StreamExt; @@ -109,6 +108,7 @@ fn quote_sql_identifier>(ident: S) -> String { pub struct DeltaTableInputEndpoint { endpoint_name: String, config: DeltaTableReaderConfig, + datafusion: SessionContext, consumer: Box, } @@ -116,13 +116,87 @@ impl DeltaTableInputEndpoint { pub fn new( endpoint_name: &str, config: &DeltaTableReaderConfig, + pipeline_config: &PipelineConfig, + runtime_env: Arc, consumer: Box, ) -> Self { register_storage_handlers(); + // Number of parallel scan partitions DataFusion plans for the + // snapshot read. Resolution order: + // + // 1. `DELTA_DF_TARGET_PARTITIONS` env var (process-wide override). + // 2. `max_concurrent_readers` from the connector config (which + // also drives the global `DELTA_READER_SEMAPHORE`); using the + // same value here keeps the per-connector parallelism aligned + // with the process-wide concurrent-reader cap. + // 3. `DEFAULT_MAX_CONCURRENT_READERS` (6), matching the semaphore + // default. + let env_target_partitions = match std::env::var("DELTA_DF_TARGET_PARTITIONS").ok() { + None => None, + Some(s) => match s.parse::() { + Ok(n) if n > 0 => Some(n), + _ => { + warn!( + "delta_table {endpoint_name}: ignoring DELTA_DF_TARGET_PARTITIONS={s:?}; expected a positive integer" + ); + None + } + }, + }; + let target_partitions = env_target_partitions + .or_else(|| { + config + .max_concurrent_readers + .map(|n| n as usize) + .filter(|n| *n > 0) + }) + .unwrap_or(DEFAULT_MAX_CONCURRENT_READERS); + info!("delta_table {endpoint_name}: target_partitions={target_partitions}"); + + let env_batch_size = match std::env::var("DELTA_DF_BATCH_SIZE").ok() { + None => None, + Some(s) => match s.parse::() { + Ok(n) if n > 0 => { + info!("delta_table {endpoint_name}: applying DELTA_DF_BATCH_SIZE={n}"); + Some(n) + } + _ => { + warn!( + "delta_table {endpoint_name}: ignoring DELTA_DF_BATCH_SIZE={s:?}; expected a positive integer" + ); + None + } + }, + }; + + // Configure datafusion not to generate Utf8View arrow types, which are + // not yet supported by the `serde_arrow` crate. The `SessionContext` + // shares the pipeline-wide `RuntimeEnv` so that the CDC-mode ORDER BY + // query spills to the same bounded memory pool and on-disk scratch + // dir as every other datafusion user in the pipeline. + // + // The `target_partitions` and (optional) `batch_size` overrides are + // applied after `create_session_context_with` populates its own + // worker-derived defaults, so the Delta connector's resolution above + // wins. + let datafusion = create_session_context_with(pipeline_config, runtime_env, |cfg| { + let cfg = cfg + .set_bool( + "datafusion.execution.parquet.schema_force_view_types", + false, + ) + .set_usize("datafusion.execution.target_partitions", target_partitions); + match env_batch_size { + Some(n) => cfg.set_usize("datafusion.execution.batch_size", n), + None => cfg, + } + }); + Self { endpoint_name: endpoint_name.to_string(), config: config.clone(), + datafusion, consumer, } } @@ -142,6 +216,7 @@ impl IntegratedInputEndpoint for DeltaTableInputEndpoint { Ok(Box::new(DeltaTableInputReader::new( self.endpoint_name, self.config, + self.datafusion, self.consumer, input_handle, resume_info, @@ -158,6 +233,7 @@ impl DeltaTableInputReader { fn new( endpoint_name: String, mut config: DeltaTableReaderConfig, + datafusion: SessionContext, consumer: Box, input_handle: &InputCollectionHandle, resume_info: Option, @@ -281,6 +357,7 @@ impl DeltaTableInputReader { let endpoint = Arc::new(DeltaTableInputEndpointInner::new( &endpoint_name, config, + datafusion, consumer, schema, resume_info.clone(), @@ -611,62 +688,12 @@ impl DeltaTableInputEndpointInner { fn new( endpoint_name: &str, config: DeltaTableReaderConfig, + datafusion: SessionContext, consumer: Box, schema: Relation, resume_info: Option, ) -> Self { let queue = Arc::new(InputQueue::new(consumer.clone())); - // Configure datafusion not to generate Utf8View arrow types, which are not yet - // supported by the `serde_arrow` crate. - let mut session_config = SessionConfig::new().set_bool( - "datafusion.execution.parquet.schema_force_view_types", - false, - ); - // Number of parallel scan partitions DataFusion plans for the - // snapshot read. Resolution order: - // - // 1. `DELTA_DF_TARGET_PARTITIONS` env var (process-wide override). - // 2. `max_concurrent_readers` from the connector config (which - // also drives the global `DELTA_READER_SEMAPHORE`); using the - // same value here keeps the per-connector parallelism aligned - // with the process-wide concurrent-reader cap. - // 3. `DEFAULT_MAX_CONCURRENT_READERS` (6), matching the semaphore - // default. - let env_target_partitions = match std::env::var("DELTA_DF_TARGET_PARTITIONS").ok() { - None => None, - Some(s) => match s.parse::() { - Ok(n) if n > 0 => Some(n), - _ => { - warn!( - "delta_table {endpoint_name}: ignoring DELTA_DF_TARGET_PARTITIONS={s:?}; expected a positive integer" - ); - None - } - }, - }; - let target_partitions = env_target_partitions - .or_else(|| { - config - .max_concurrent_readers - .map(|n| n as usize) - .filter(|n| *n > 0) - }) - .unwrap_or(DEFAULT_MAX_CONCURRENT_READERS); - session_config = - session_config.set_usize("datafusion.execution.target_partitions", target_partitions); - info!("delta_table {endpoint_name}: target_partitions={target_partitions}"); - - if let Ok(s) = std::env::var("DELTA_DF_BATCH_SIZE") { - match s.parse::() { - Ok(n) if n > 0 => { - session_config = session_config.set_usize("datafusion.execution.batch_size", n); - info!("delta_table {endpoint_name}: applying DELTA_DF_BATCH_SIZE={n}"); - } - _ => warn!( - "delta_table {endpoint_name}: ignoring DELTA_DF_BATCH_SIZE={s:?}; expected a positive integer" - ), - } - } let metrics = Arc::new(DeltaTableMetrics::new()); consumer.set_custom_metrics(Arc::clone(&metrics) as Arc); @@ -678,7 +705,7 @@ impl DeltaTableInputEndpointInner { schema, config, consumer, - datafusion: SessionContext::new_with_config(session_config), + datafusion, transaction_index: AtomicUsize::new(0), // Set version to None by default so that the connector is checkpointable in the initial state. diff --git a/crates/dbsp/src/circuit/checkpointer.rs b/crates/dbsp/src/circuit/checkpointer.rs index a118a768ee0..4976151f12f 100644 --- a/crates/dbsp/src/circuit/checkpointer.rs +++ b/crates/dbsp/src/circuit/checkpointer.rs @@ -5,7 +5,7 @@ use crate::storage::file::SerializerInner; use crate::{Error, NumEntries, TypedBox}; use feldera_types::checkpoint::CheckpointMetadata; use feldera_types::constants::{ - ACTIVATION_MARKER_FILE, ADHOC_TEMP_DIR, CHECKPOINT_DEPENDENCIES, CHECKPOINT_FILE_NAME, + ACTIVATION_MARKER_FILE, CHECKPOINT_DEPENDENCIES, CHECKPOINT_FILE_NAME, DATAFUSION_TEMP_DIR, DBSP_FILE_EXTENSION, STATE_FILE, STATUS_FILE, STEPS_FILE, }; use itertools::Itertools; @@ -112,7 +112,7 @@ impl Checkpointer { // it. in_use_paths.insert(STATUS_FILE.into()); in_use_paths.insert(format!("{}.mut", STATUS_FILE).into()); - in_use_paths.insert(ADHOC_TEMP_DIR.into()); + in_use_paths.insert(DATAFUSION_TEMP_DIR.into()); in_use_paths.insert(ACTIVATION_MARKER_FILE.into()); for cpm in self.checkpoint_list.iter() { in_use_paths.insert(cpm.uuid.to_string().into()); diff --git a/crates/feldera-types/src/config.rs b/crates/feldera-types/src/config.rs index 68117df7d42..19abe7dbfb9 100644 --- a/crates/feldera-types/src/config.rs +++ b/crates/feldera-types/src/config.rs @@ -803,6 +803,25 @@ pub struct RuntimeConfig { /// for more details. pub max_rss_mb: Option, + /// DataFusion memory pool size, in MB, shared by the ad-hoc query + /// engine and the Delta Lake / Iceberg connectors. + /// + /// Carved out of `max_rss_mb` (falling back to + /// `resources.memory_mb_max`); the remainder goes to the DBSP circuit, + /// so the two do not double-book RAM. + /// + /// Unset: defaults to 5% of the effective budget, capped at 2 GB. + /// Pipelines that don't run heavy ad-hoc / Delta / Iceberg workloads + /// can leave this unset. + /// + /// Sort/aggregate-heavy ad-hoc queries (especially at high `workers` + /// counts) should set this explicitly. An under-sized pool surfaces as + /// `ResourcesExhausted` on the failing query — the pipeline keeps + /// running and only that query fails. + /// + /// No pool limit applied if no overall budget is configured. + pub datafusion_memory_mb: Option, + /// Number of DBSP hosts. /// /// The worker threads are evenly divided among the hosts. For single-host @@ -1065,6 +1084,7 @@ impl Default for RuntimeConfig { Self { workers: 8, max_rss_mb: None, + datafusion_memory_mb: None, hosts: 1, storage: Some(StorageOptions::default()), fault_tolerance: FtConfig::default(), @@ -1094,6 +1114,35 @@ impl Default for RuntimeConfig { } } +/// Upper bound on the default DataFusion pool size, in MB. +/// +/// Spill-to-disk handles overflow; reserving more starves the circuit. +pub const DEFAULT_DATAFUSION_MEMORY_MB_CEILING: u64 = 2048; + +/// Default DataFusion pool sizing fraction, as the divisor `effective / N`. +pub const DEFAULT_DATAFUSION_MEMORY_FRACTION_DIVISOR: u64 = 20; + +impl RuntimeConfig { + /// Pipeline's effective memory budget in MB: `max_rss_mb`, falling back + /// to `resources.memory_mb_max` (the k8s pod limit). + pub fn effective_memory_mb(&self) -> Option { + self.max_rss_mb.or(self.resources.memory_mb_max) + } + + /// Resolved DataFusion pool size in MB: explicit `datafusion_memory_mb` + /// if set, else 5% of the effective budget capped at + /// `DEFAULT_DATAFUSION_MEMORY_MB_CEILING`. `None` if no budget is + /// configured. + pub fn resolved_datafusion_memory_mb(&self) -> Option { + if let Some(explicit) = self.datafusion_memory_mb { + return Some(explicit); + } + let effective = self.effective_memory_mb()?; + let fraction = effective / DEFAULT_DATAFUSION_MEMORY_FRACTION_DIVISOR; + Some(fraction.min(DEFAULT_DATAFUSION_MEMORY_MB_CEILING)) + } +} + /// Fault-tolerance configuration. /// /// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance, @@ -1143,9 +1192,82 @@ impl Default for FtConfig { #[cfg(test)] mod test { use super::deserialize_fault_tolerance; - use crate::config::{FtConfig, FtModel}; + use crate::config::{ + DEFAULT_DATAFUSION_MEMORY_MB_CEILING, FtConfig, FtModel, ResourceConfig, RuntimeConfig, + }; use serde::{Deserialize, Serialize}; + #[test] + fn resolved_datafusion_memory_explicit_passes_through() { + let config = RuntimeConfig { + max_rss_mb: Some(8_000), + datafusion_memory_mb: Some(1_500), + ..Default::default() + }; + assert_eq!(config.resolved_datafusion_memory_mb(), Some(1_500)); + } + + #[test] + fn resolved_datafusion_memory_unconfigured_returns_none() { + let config = RuntimeConfig::default(); + assert!(config.max_rss_mb.is_none()); + assert!(config.resources.memory_mb_max.is_none()); + assert_eq!(config.resolved_datafusion_memory_mb(), None); + } + + #[test] + fn resolved_datafusion_memory_small_budget_scales_down() { + // Small pipelines must provision cleanly; the default just shrinks. + let config = RuntimeConfig { + max_rss_mb: Some(256), + ..Default::default() + }; + assert_eq!(config.resolved_datafusion_memory_mb(), Some(12)); + + let config = RuntimeConfig { + max_rss_mb: Some(512), + ..Default::default() + }; + assert_eq!(config.resolved_datafusion_memory_mb(), Some(25)); + } + + #[test] + fn resolved_datafusion_memory_clamps_to_ceiling_for_large_budgets() { + // 5% of 64 GB = 3.2 GB, above the 2 GB ceiling. + let config = RuntimeConfig { + max_rss_mb: Some(64_000), + ..Default::default() + }; + assert_eq!( + config.resolved_datafusion_memory_mb(), + Some(DEFAULT_DATAFUSION_MEMORY_MB_CEILING), + ); + } + + #[test] + fn resolved_datafusion_memory_midrange_uses_five_percent() { + // 5% of 16 GB = 800 MB, inside the clamp range. + let config = RuntimeConfig { + max_rss_mb: Some(16_000), + ..Default::default() + }; + assert_eq!(config.resolved_datafusion_memory_mb(), Some(800)); + } + + #[test] + fn resolved_datafusion_memory_falls_back_to_resources() { + // No max_rss_mb, but resources.memory_mb_max is set. + let config = RuntimeConfig { + max_rss_mb: None, + resources: ResourceConfig { + memory_mb_max: Some(16_000), + ..Default::default() + }, + ..Default::default() + }; + assert_eq!(config.resolved_datafusion_memory_mb(), Some(800)); + } + #[test] fn ft_config() { #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)] diff --git a/crates/feldera-types/src/constants.rs b/crates/feldera-types/src/constants.rs index ccf3af80d41..1f973b18046 100644 --- a/crates/feldera-types/src/constants.rs +++ b/crates/feldera-types/src/constants.rs @@ -14,7 +14,16 @@ pub const STEPS_FILE: &str = "steps.bin"; pub const CHECKPOINT_DEPENDENCIES: &str = "dependencies.json"; -pub const ADHOC_TEMP_DIR: &str = "adhoc-tmp"; +/// Subdirectory under the pipeline's storage path where DataFusion writes +/// spill files for the ad-hoc query engine and every integrated connector +/// that uses DataFusion (Delta Lake, Iceberg). +/// +/// One pipeline-wide directory keeps the on-disk layout discoverable in a +/// single place and lets `gc_startup` allowlist it as a single entry. If +/// you change this value, audit `gc_startup` in `dbsp::circuit::checkpointer` +/// — the GC's allowlist must keep matching the directory the runtime +/// actually creates. +pub const DATAFUSION_TEMP_DIR: &str = "datafusion-tmp"; /// A slice of all file-extension the system can create. pub const DBSP_FILE_EXTENSION: &[&str] = &["mut", "feldera"]; diff --git a/crates/iceberg/src/input.rs b/crates/iceberg/src/input.rs index ea0fea0cb1a..2da00d4ffb0 100644 --- a/crates/iceberg/src/input.rs +++ b/crates/iceberg/src/input.rs @@ -12,13 +12,13 @@ use feldera_adapterlib::{ IntegratedInputEndpoint, NonFtInputReaderCommand, }, utils::datafusion::{ - array_to_string, execute_query_collect, execute_singleton_query, + array_to_string, create_session_context, execute_query_collect, execute_singleton_query, timestamp_to_sql_expression, validate_sql_expression, validate_timestamp_column, }, PipelineState, }; use feldera_types::{ - config::FtModel, + config::{FtModel, PipelineConfig}, program_schema::Relation, transport::iceberg::{IcebergCatalogType, IcebergReaderConfig}, }; @@ -62,12 +62,16 @@ impl IcebergInputEndpoint { pub fn new( endpoint_name: &str, config: &IcebergReaderConfig, + pipeline_config: &PipelineConfig, + runtime_env: Arc, consumer: Box, ) -> Self { Self { inner: Arc::new(IcebergInputEndpointInner::new( endpoint_name, config.clone(), + pipeline_config, + runtime_env, consumer, )), } @@ -183,14 +187,20 @@ impl IcebergInputEndpointInner { fn new( endpoint_name: &str, config: IcebergReaderConfig, + pipeline_config: &PipelineConfig, + runtime_env: Arc, consumer: Box, ) -> Self { let queue = InputQueue::new(consumer.clone()); + // Share the pipeline-wide `RuntimeEnv` so that scans against the + // iceberg table spill to the bounded memory pool and on-disk scratch + // dir alongside every other datafusion user in the pipeline. + let datafusion = create_session_context(pipeline_config, runtime_env); Self { endpoint_name: endpoint_name.to_string(), config, consumer, - datafusion: SessionContext::new(), + datafusion, queue, } } diff --git a/crates/pipeline-manager/src/api/examples.rs b/crates/pipeline-manager/src/api/examples.rs index 7839f8557fe..00410b72b03 100644 --- a/crates/pipeline-manager/src/api/examples.rs +++ b/crates/pipeline-manager/src/api/examples.rs @@ -96,6 +96,7 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr { runtime_config: serde_json::to_value(RuntimeConfig { workers: 10, max_rss_mb: None, + datafusion_memory_mb: None, hosts: 1, storage: Some(StorageOptions::default()), fault_tolerance: FtConfig::default(), diff --git a/crates/pipeline-manager/src/db/test.rs b/crates/pipeline-manager/src/db/test.rs index ae20859af54..d38de455b0e 100644 --- a/crates/pipeline-manager/src/db/test.rs +++ b/crates/pipeline-manager/src/db/test.rs @@ -300,6 +300,7 @@ fn map_val_to_limited_runtime_config(val: RuntimeConfigPropVal) -> serde_json::V serde_json::to_value(RuntimeConfig { workers: val.val0, max_rss_mb: val.val21, + datafusion_memory_mb: None, hosts: val.val20, cpu_profiler: val.val1, min_batch_size_records: val.val2, @@ -1192,6 +1193,7 @@ async fn pipeline_versioning() { let new_runtime_config = serde_json::to_value(RuntimeConfig { workers: 100, max_rss_mb: None, + datafusion_memory_mb: None, hosts: 1, storage: None, fault_tolerance: FtConfig::default(), @@ -2484,6 +2486,7 @@ async fn pipeline_provision_version_guard() { serde_json::to_value(RuntimeConfig { workers: 10, max_rss_mb: None, + datafusion_memory_mb: None, hosts: 1, storage: None, fault_tolerance: FtConfig::default(), diff --git a/openapi.json b/openapi.json index d1d3336cac3..687eeee8ce6 100644 --- a/openapi.json +++ b/openapi.json @@ -584,6 +584,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -674,6 +675,7 @@ "runtime_config": { "workers": 10, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -794,6 +796,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -867,6 +870,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -1050,6 +1054,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -1197,6 +1202,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -1270,6 +1276,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -1370,6 +1377,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -1639,6 +1647,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -6473,6 +6482,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -10623,6 +10633,14 @@ "description": "Enable CPU profiler.\n\nThe default value is `true`.", "default": true }, + "datafusion_memory_mb": { + "type": "integer", + "format": "int64", + "description": "DataFusion memory pool size, in MB, shared by the ad-hoc query\nengine and the Delta Lake / Iceberg connectors.\n\nCarved out of `max_rss_mb` (falling back to\n`resources.memory_mb_max`); the remainder goes to the DBSP circuit,\nso the two do not double-book RAM.\n\nUnset: defaults to 5% of the effective budget, capped at 2 GB.\nPipelines that don't run heavy ad-hoc / Delta / Iceberg workloads\ncan leave this unset.\n\nSort/aggregate-heavy ad-hoc queries (especially at high `workers`\ncounts) should set this explicitly. An under-sized pool surfaces as\n`ResourcesExhausted` on the failing query — the pipeline keeps\nrunning and only that query fails.\n\nNo pool limit applied if no overall budget is configured.", + "default": null, + "nullable": true, + "minimum": 0 + }, "dev_tweaks": { "allOf": [ { @@ -12497,6 +12515,14 @@ "description": "Enable CPU profiler.\n\nThe default value is `true`.", "default": true }, + "datafusion_memory_mb": { + "type": "integer", + "format": "int64", + "description": "DataFusion memory pool size, in MB, shared by the ad-hoc query\nengine and the Delta Lake / Iceberg connectors.\n\nCarved out of `max_rss_mb` (falling back to\n`resources.memory_mb_max`); the remainder goes to the DBSP circuit,\nso the two do not double-book RAM.\n\nUnset: defaults to 5% of the effective budget, capped at 2 GB.\nPipelines that don't run heavy ad-hoc / Delta / Iceberg workloads\ncan leave this unset.\n\nSort/aggregate-heavy ad-hoc queries (especially at high `workers`\ncounts) should set this explicitly. An under-sized pool surfaces as\n`ResourcesExhausted` on the failing query — the pipeline keeps\nrunning and only that query fails.\n\nNo pool limit applied if no overall budget is configured.", + "default": null, + "nullable": true, + "minimum": 0 + }, "dev_tweaks": { "allOf": [ {