diff --git a/crates/adapterlib/src/errors/controller.rs b/crates/adapterlib/src/errors/controller.rs index a121f57e527..b94a79df9c6 100644 --- a/crates/adapterlib/src/errors/controller.rs +++ b/crates/adapterlib/src/errors/controller.rs @@ -168,6 +168,14 @@ pub enum ConfigError { FtRequiresStorage, FtRequiresFtInput, + /// `datafusion_memory_mb` was set to a value greater than or equal to + /// the pipeline's effective memory budget, which would leave no memory + /// for the DBSP circuit. + DatafusionMemoryExceedsBudget { + datafusion_memory_mb: u64, + max_rss_mb: u64, + }, + InvalidLayout(LayoutError), } @@ -203,6 +211,9 @@ impl DbspDetailedError for ConfigError { Self::FtRequiresFtInput => Cow::from("FtWithNonFtInput"), Self::CyclicDependency { .. } => Cow::from("CyclicDependency"), Self::EmptyStartAfter { .. } => Cow::from("EmptyStartAfter"), + Self::DatafusionMemoryExceedsBudget { .. } => { + Cow::from("DatafusionMemoryExceedsBudget") + } Self::InvalidLayout(_) => Cow::from("LayoutError"), } } @@ -417,6 +428,13 @@ impl Display for ConfigError { f, "Fault tolerance is configured, but it cannot be enabled because the pipeline has at least one non-fault-tolerant input adapter" ), + Self::DatafusionMemoryExceedsBudget { + datafusion_memory_mb, + max_rss_mb, + } => write!( + f, + "'datafusion_memory_mb' ({datafusion_memory_mb} MB) must be less than the pipeline's memory budget ({max_rss_mb} MB); the difference is the budget available to the DBSP circuit" + ), Self::InvalidLayout(e) => write!(f, "Multihost layout error: {e}"), } } diff --git a/crates/adapterlib/src/utils/datafusion.rs b/crates/adapterlib/src/utils/datafusion.rs index 7180af51b9e..a40f750eee4 100644 --- a/crates/adapterlib/src/utils/datafusion.rs +++ b/crates/adapterlib/src/utils/datafusion.rs @@ -1,12 +1,149 @@ use crate::errors::journal::ControllerError; use anyhow::{Error as AnyError, anyhow}; use arrow::array::Array; +use datafusion::common::ScalarValue; use datafusion::common::arrow::array::{AsArray, RecordBatch}; +use datafusion::execution::SessionStateBuilder; +use datafusion::execution::memory_pool::FairSpillPool; +use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use datafusion::logical_expr::sqlparser::parser::ParserError; -use datafusion::prelude::{SQLOptions, SessionContext}; +use datafusion::prelude::{SQLOptions, SessionConfig, SessionContext}; use datafusion::sql::sqlparser::dialect::GenericDialect; use datafusion::sql::sqlparser::parser::Parser; +use feldera_types::config::PipelineConfig; +use feldera_types::constants::DATAFUSION_TEMP_DIR; use feldera_types::program_schema::{ColumnType, Field, Relation, SqlType}; +use std::fs::{create_dir_all, read_dir, remove_dir_all, remove_file}; +use std::io::Error as IoError; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tracing::warn; + +/// In-memory sort threshold; above this, sorts spill to disk. +const SORT_IN_PLACE_THRESHOLD_BYTES: usize = 64 * 1024 * 1024; + +/// Memory withheld from the sort phase for the merge phase to use. +const SORT_SPILL_RESERVATION_BYTES: usize = 64 * 1024 * 1024; + +/// Build the shared datafusion [`RuntimeEnv`] for a pipeline. +/// +/// Build once per pipeline and share the `Arc` across every +/// `SessionContext`. A separate `RuntimeEnv` per context would give each its +/// own pool, multiplying the effective memory budget by `(1 + #connectors)`. +pub fn create_runtime_env( + pipeline_config: &PipelineConfig, +) -> Result, ControllerError> { + let mut builder = RuntimeEnvBuilder::new(); + if let Some(datafusion_memory_mb) = pipeline_config.global.resolved_datafusion_memory_mb() { + let memory_bytes_max = datafusion_memory_mb * 1_000_000; + builder = builder.with_memory_pool(Arc::new(FairSpillPool::new(memory_bytes_max as usize))); + } + if let Some(storage) = &pipeline_config.storage_config { + let path = PathBuf::from(storage.path.clone()).join(DATAFUSION_TEMP_DIR); + create_dir_all(&path).map_err(|error| { + ControllerError::io_error( + format!( + "unable to create datafusion scratch space directory '{}'", + path.display() + ), + error, + ) + })?; + clean_stale_scratch_entries(&path); + builder = builder.with_temp_file_path(path); + } + builder.build_arc().map_err(|error| { + ControllerError::io_error( + "unable to build datafusion runtime environment", + IoError::other(error.to_string()), + ) + }) +} + +/// Remove leftovers from a previous process inside the scratch directory. +/// +/// DataFusion's `DiskManager` leaks its `datafusion-XXXXXX/` subdir if the +/// process is killed before `tempfile::TempDir::drop` runs. The previous +/// process is gone by the time we get here, so anything still in the dir is +/// orphaned. Spill files are per-query and never need to survive a restart. +/// Errors only logged: a stuck file should not block startup. +fn clean_stale_scratch_entries(scratch_dir: &Path) { + let entries = match read_dir(scratch_dir) { + Ok(entries) => entries, + Err(error) => { + warn!( + "unable to read datafusion scratch directory '{}' for startup cleanup: {error}", + scratch_dir.display(), + ); + return; + } + }; + for entry in entries.flatten() { + let path = entry.path(); + let file_type = match entry.file_type() { + Ok(ft) => ft, + Err(error) => { + warn!( + "unable to stat stale datafusion scratch entry '{}': {error}", + path.display(), + ); + continue; + } + }; + let result = if file_type.is_dir() { + remove_dir_all(&path) + } else { + remove_file(&path) + }; + if let Err(error) = result { + warn!( + "unable to remove stale datafusion scratch entry '{}': {error}", + path.display(), + ); + } + } +} + +/// `SessionContext` bound to the shared [`RuntimeEnv`], configured with the +/// pipeline's worker count and feldera's sort-spill thresholds. +pub fn create_session_context( + pipeline_config: &PipelineConfig, + runtime_env: Arc, +) -> SessionContext { + create_session_context_with(pipeline_config, runtime_env, |cfg| cfg) +} + +/// Like [`create_session_context`], with a hook to override individual +/// datafusion settings (e.g. parquet decoding) before the context is built. +pub fn create_session_context_with( + pipeline_config: &PipelineConfig, + runtime_env: Arc, + customize_config: F, +) -> SessionContext +where + F: FnOnce(SessionConfig) -> SessionConfig, +{ + let workers = pipeline_config + .global + .io_workers + .unwrap_or(pipeline_config.global.workers as u64); + let session_config = SessionConfig::new() + .with_target_partitions(workers as usize) + .with_sort_in_place_threshold_bytes(SORT_IN_PLACE_THRESHOLD_BYTES) + .with_sort_spill_reservation_bytes(SORT_SPILL_RESERVATION_BYTES) + .set( + "datafusion.execution.planning_concurrency", + &ScalarValue::UInt64(Some(workers)), + ); + let session_config = customize_config(session_config); + + let state = SessionStateBuilder::new() + .with_config(session_config) + .with_runtime_env(runtime_env) + .with_default_features() + .build(); + SessionContext::from(state) +} /// Execute a SQL query and collect all results in a vector of `RecordBatch`'s. pub async fn execute_query_collect( @@ -172,3 +309,246 @@ pub async fn validate_timestamp_column( Ok(()) } + +#[cfg(test)] +mod tests { + use super::{create_runtime_env, create_session_context}; + use datafusion::execution::memory_pool::MemoryLimit; + use feldera_types::config::{PipelineConfig, ResourceConfig, RuntimeConfig, StorageConfig}; + use feldera_types::constants::DATAFUSION_TEMP_DIR; + use std::fs; + use std::path::{Path, PathBuf}; + + /// Drop guard so a failing test does not leak temp directories. + struct TempStorage { + path: PathBuf, + } + + impl TempStorage { + fn new(name: &str) -> Self { + let path = std::env::temp_dir().join(name); + let _ = fs::remove_dir_all(&path); + fs::create_dir_all(&path).unwrap(); + Self { path } + } + + fn path(&self) -> &Path { + &self.path + } + } + + impl Drop for TempStorage { + fn drop(&mut self) { + let _ = fs::remove_dir_all(&self.path); + } + } + + fn pipeline_config(global: RuntimeConfig, storage: Option<&Path>) -> PipelineConfig { + PipelineConfig { + global, + multihost: None, + name: None, + given_name: None, + storage_config: storage.map(|p| StorageConfig { + path: p.to_string_lossy().into(), + cache: Default::default(), + }), + secrets_dir: None, + inputs: Default::default(), + outputs: Default::default(), + program_ir: None, + } + } + + #[test] + fn create_runtime_env_creates_tmp_dir_under_storage() { + let storage = TempStorage::new("feldera-datafusion-create-runtime-env-tmp-dir-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + ..Default::default() + }, + Some(storage.path()), + ); + + create_runtime_env(&cfg).unwrap(); + + let expected = storage.path().join(DATAFUSION_TEMP_DIR); + assert!( + expected.is_dir(), + "expected scratch directory at {}", + expected.display(), + ); + } + + /// Must match the value `checkpointer::gc_startup` allowlists, or the + /// scratch dir is wiped on every restart. + #[test] + fn scratch_dir_name_matches_gc_allowlist_constant() { + assert_eq!(DATAFUSION_TEMP_DIR, "datafusion-tmp"); + } + + #[test] + fn create_runtime_env_without_storage_succeeds() { + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + ..Default::default() + }, + None, + ); + create_runtime_env(&cfg).unwrap(); + } + + #[test] + fn create_runtime_env_applies_memory_pool_when_budget_set() { + // 5% of 16 GB = 800 MB; below the 2 GB ceiling. + let storage = TempStorage::new("feldera-datafusion-create-runtime-env-pool-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + max_rss_mb: Some(16_000), + ..Default::default() + }, + Some(storage.path()), + ); + + let env = create_runtime_env(&cfg).unwrap(); + match env.memory_pool.memory_limit() { + MemoryLimit::Finite(bytes) => assert_eq!(bytes, 800 * 1_000_000), + MemoryLimit::Infinite => panic!("expected a bounded memory pool, got Infinite"), + MemoryLimit::Unknown => panic!("expected a bounded memory pool, got Unknown"), + } + } + + #[test] + fn create_runtime_env_no_memory_limit_when_budget_unset() { + let storage = TempStorage::new("feldera-datafusion-create-runtime-env-unbounded-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + ..Default::default() + }, + Some(storage.path()), + ); + + let env = create_runtime_env(&cfg).unwrap(); + // Anything other than `Finite(_)` proves no FairSpillPool was wired in. + match env.memory_pool.memory_limit() { + MemoryLimit::Finite(bytes) => { + panic!("expected an unbounded pool, got finite limit of {bytes} bytes"); + } + _ => {} + } + } + + #[test] + fn create_runtime_env_uses_resources_memory_mb_max_fallback() { + let storage = TempStorage::new("feldera-datafusion-create-runtime-env-resources-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + max_rss_mb: None, + resources: ResourceConfig { + memory_mb_max: Some(16_000), + ..Default::default() + }, + ..Default::default() + }, + Some(storage.path()), + ); + + let env = create_runtime_env(&cfg).unwrap(); + match env.memory_pool.memory_limit() { + MemoryLimit::Finite(bytes) => assert_eq!(bytes, 800 * 1_000_000), + MemoryLimit::Infinite => panic!("expected a bounded memory pool, got Infinite"), + MemoryLimit::Unknown => panic!("expected a bounded memory pool, got Unknown"), + } + } + + #[test] + fn create_runtime_env_wipes_stale_scratch_entries() { + let storage = TempStorage::new("feldera-datafusion-create-runtime-env-wipe-test"); + let scratch = storage.path().join(DATAFUSION_TEMP_DIR); + fs::create_dir_all(&scratch).unwrap(); + + // Simulate leftovers from a prior crashed process. + let stale_subdir = scratch.join("datafusion-stale1"); + fs::create_dir_all(&stale_subdir).unwrap(); + fs::write(stale_subdir.join("orphan.arrow"), b"stale").unwrap(); + let stale_file = scratch.join("loose.tmp"); + fs::write(&stale_file, b"stale").unwrap(); + + let cfg = pipeline_config( + RuntimeConfig { + workers: 1, + ..Default::default() + }, + Some(storage.path()), + ); + create_runtime_env(&cfg).unwrap(); + + assert!( + scratch.is_dir(), + "scratch root must survive cleanup; gc_startup keeps it on the allowlist", + ); + assert!( + !stale_subdir.exists(), + "stale per-DiskManager subdir should be removed on startup", + ); + assert!( + !stale_file.exists(), + "stale loose file should be removed on startup", + ); + } + + #[test] + fn create_session_context_target_partitions_match_workers() { + let storage = TempStorage::new("feldera-datafusion-create-session-context-workers-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 7, + ..Default::default() + }, + Some(storage.path()), + ); + let env = create_runtime_env(&cfg).unwrap(); + let ctx = create_session_context(&cfg, env); + assert_eq!(ctx.copied_config().target_partitions(), 7); + } + + #[test] + fn create_session_context_target_partitions_prefer_io_workers() { + let storage = TempStorage::new("feldera-datafusion-create-session-context-io-workers-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 4, + io_workers: Some(12), + ..Default::default() + }, + Some(storage.path()), + ); + let env = create_runtime_env(&cfg).unwrap(); + let ctx = create_session_context(&cfg, env); + assert_eq!(ctx.copied_config().target_partitions(), 12); + } + + #[test] + fn create_session_context_with_customise_overrides_defaults() { + use super::create_session_context_with; + let storage = TempStorage::new("feldera-datafusion-create-session-context-override-test"); + let cfg = pipeline_config( + RuntimeConfig { + workers: 4, + ..Default::default() + }, + Some(storage.path()), + ); + let env = create_runtime_env(&cfg).unwrap(); + // Customise hook must win over the worker-derived defaults. + let ctx = create_session_context_with(&cfg, env, |c| { + c.set_usize("datafusion.execution.target_partitions", 99) + }); + assert_eq!(ctx.copied_config().target_partitions(), 99); + } +} diff --git a/crates/adapters/src/adhoc.rs b/crates/adapters/src/adhoc.rs index 8e71be879f7..026fe8c0f36 100644 --- a/crates/adapters/src/adhoc.rs +++ b/crates/adapters/src/adhoc.rs @@ -3,10 +3,8 @@ use crate::{Controller, PipelineError}; use actix_web::{HttpRequest, HttpResponse, http::header, web::Payload}; use actix_ws::{AggregatedMessage, CloseCode, CloseReason, Closed, Session as WsSession}; use datafusion::common::metadata::ScalarAndMetadata; -use datafusion::common::{DFSchema, ParamValues, ScalarValue}; -use datafusion::execution::memory_pool::FairSpillPool; -use datafusion::execution::runtime_env::RuntimeEnvBuilder; -use datafusion::execution::{SessionState, SessionStateBuilder}; +use datafusion::common::{DFSchema, ParamValues}; +use datafusion::execution::SessionState; use datafusion::logical_expr::{EmptyRelation, Execute, LogicalPlan, Prepare, Statement}; use datafusion::prelude::*; use datafusion::sql::parser::{DFParserBuilder, Statement as DFStatement}; @@ -15,15 +13,11 @@ use executor::{ hash_query_result, infallible_from_bytestring, stream_arrow_query, stream_json_query, stream_parquet_query, stream_text_query, }; -use feldera_adapterlib::errors::journal::ControllerError; -use feldera_types::config::PipelineConfig; use feldera_types::query::{AdHocResultFormat, AdhocQueryArgs, MAX_WS_FRAME_SIZE}; use futures_util::StreamExt; use serde_json::json; use std::collections::{HashMap, VecDeque}; use std::convert::Infallible; -use std::fs::create_dir_all; -use std::path::PathBuf; use std::sync::Arc; use tracing::warn; @@ -31,49 +25,6 @@ mod executor; mod format; pub(crate) mod table; -pub(crate) fn create_session_context( - config: &PipelineConfig, -) -> Result { - const SORT_IN_PLACE_THRESHOLD_BYTES: usize = 64 * 1024 * 1024; - const SORT_SPILL_RESERVATION_BYTES: usize = 64 * 1024 * 1024; - let session_config = SessionConfig::new() - .with_target_partitions(config.global.workers as usize) - .with_sort_in_place_threshold_bytes(SORT_IN_PLACE_THRESHOLD_BYTES) - .with_sort_spill_reservation_bytes(SORT_SPILL_RESERVATION_BYTES) - .set( - "datafusion.execution.planning_concurrency", - &ScalarValue::UInt64(Some(config.global.workers as u64)), - ); - // Initialize datafusion memory limits - let mut runtime_env_builder = RuntimeEnvBuilder::new(); - if let Some(memory_mb_max) = config.global.resources.memory_mb_max { - let memory_bytes_max = memory_mb_max * 1_000_000; - runtime_env_builder = runtime_env_builder - .with_memory_pool(Arc::new(FairSpillPool::new(memory_bytes_max as usize))); - } - // Initialize datafusion spill-to-disk directory - if let Some(storage) = &config.storage_config { - let path = PathBuf::from(storage.path.clone()).join("adhoc-tmp"); - if !path.exists() { - create_dir_all(&path).map_err(|error| { - ControllerError::io_error( - "unable to create ad-hoc scratch space directory during startup", - error, - ) - })?; - } - runtime_env_builder = runtime_env_builder.with_temp_file_path(path); - } - - let runtime_env = runtime_env_builder.build_arc().unwrap(); - let state = SessionStateBuilder::new() - .with_config(session_config) - .with_runtime_env(runtime_env) - .with_default_features() - .build(); - Ok(SessionContext::from(state)) -} - /// Helper for for closing the websocket session /// /// Note that adding a `description` to the `CloseReason` is currently @@ -462,6 +413,8 @@ mod tests { use super::*; use datafusion::arrow; use datafusion::arrow::record_batch::RecordBatch; + use datafusion::common::ScalarValue; + use datafusion::execution::SessionStateBuilder; fn test_state() -> SessionState { SessionStateBuilder::new().with_default_features().build() diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 231a7413d76..7cbdefdafb9 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -71,6 +71,7 @@ use enum_map::EnumMap; use feldera_adapterlib::format::BufferSize; use feldera_adapterlib::metrics::{ConnectorMetrics, ValueType}; use feldera_adapterlib::transport::{CommandHandler, InputReader, Resume, Watermark}; +use feldera_adapterlib::utils::datafusion::{create_runtime_env, create_session_context}; use feldera_ir::LirCircuit; use feldera_samply::{AnnotationOptions, CaptureOptions, Span}; use feldera_storage::fbuf::slab::FBufSlabsStats; @@ -146,8 +147,8 @@ mod stats; mod sync; mod validate; +use crate::adhoc::execute_sql; use crate::adhoc::table::AdHocTable; -use crate::adhoc::{create_session_context, execute_sql}; use crate::catalog::{SerBatchReader, SerTrace}; use crate::format::parquet::relation_to_arrow_fields; use crate::format::{MessageOrientedPreprocessedParser, StreamingPreprocessedParser}; @@ -4476,6 +4477,7 @@ impl ControllerInit { hosts: checkpoint_config.global.hosts, workers: checkpoint_config.global.workers, max_rss_mb: checkpoint_config.global.max_rss_mb, + datafusion_memory_mb: checkpoint_config.global.datafusion_memory_mb, // The checkpoint determines the fault tolerance model, but the // pipeline manager can override the details of the @@ -4599,10 +4601,37 @@ Using the Kubernetes limit as the RSS memory limit." ); } + // Carve out a slice of the pipeline's memory budget for DataFusion's + // memory pool (see `RuntimeConfig::datafusion_memory_mb`) and pass + // only the remainder to the DBSP circuit. Without this, the circuit + // and DataFusion would each independently grow to the full budget, + // which would double-book RAM. + let datafusion_memory_mb = pipeline_config.global.resolved_datafusion_memory_mb(); + let circuit_max_rss_mb = match (max_rss_mb, datafusion_memory_mb) { + (Some(rss), Some(df)) => { + if df >= rss { + return Err(ControllerError::Config { + config_error: Box::new(ConfigError::DatafusionMemoryExceedsBudget { + datafusion_memory_mb: df, + max_rss_mb: rss, + }), + }); + } + let remainder = rss - df; + info!( + "memory budget split: circuit RSS {remainder} MB, datafusion {df} MB \ +(total {rss} MB)" + ); + Some(remainder) + } + (Some(rss), None) => Some(rss), + (None, _) => None, + }; + let layout = layout.unwrap_or_else(|| Layout::new_solo(pipeline_config.global.workers as usize)); Ok(CircuitConfig::from(layout) - .with_max_rss_bytes(max_rss_mb.map(|mb| mb * 1_000_000)) + .with_max_rss_bytes(circuit_max_rss_mb.map(|mb| mb * 1_000_000)) .with_pin_cpus(pipeline_config.global.pin_cpus.clone()) .with_storage(storage) .with_mode(Mode::Persistent) @@ -5597,6 +5626,10 @@ pub struct ControllerInner { backpressure_thread_unparker: Unparker, error_cb: Box, Option) + Send + Sync>, session_ctxt: SessionContext, + /// Shared datafusion runtime environment. Owns the pipeline-wide + /// `FairSpillPool` and spill-to-disk path so that every `SessionContext` + /// (ad-hoc + integrated connectors) draws from a single bounded budget. + datafusion_runtime_env: Arc, adhoc_tables: HashMap>, fault_tolerance: Option, step_receiver: tokio::sync::watch::Receiver, @@ -5669,7 +5702,8 @@ impl ControllerInner { let (command_sender, command_receiver) = channel(); let (transaction_sender, transaction_receiver) = tokio::sync::watch::channel(TransactionCoordination::default()); - let session_ctxt = create_session_context(&config)?; + let datafusion_runtime_env = create_runtime_env(&config)?; + let session_ctxt = create_session_context(&config, datafusion_runtime_env.clone()); let controller = Arc::new_cyclic(|weak| { let adhoc_tables = Self::initialize_adhoc_queries(&session_ctxt, &*catalog, weak); Self { @@ -5691,6 +5725,7 @@ impl ControllerInner { backpressure_thread_unparker: backpressure_thread_parker.unparker().clone(), error_cb, session_ctxt, + datafusion_runtime_env, adhoc_tables, fault_tolerance: config.global.fault_tolerance.model, transaction_info: Mutex::new(TransactionInfo::new( @@ -6101,6 +6136,8 @@ impl ControllerInner { let endpoint = create_integrated_input_endpoint( endpoint_name, &resolved_connector_config, + &self.status.pipeline_config, + self.datafusion_runtime_env.clone(), probe, )?; diff --git a/crates/adapters/src/integrated.rs b/crates/adapters/src/integrated.rs index 7ea87c6be76..6e57e993180 100644 --- a/crates/adapters/src/integrated.rs +++ b/crates/adapters/src/integrated.rs @@ -1,9 +1,10 @@ use crate::controller::{ControllerInner, EndpointId}; use crate::transport::IntegratedInputEndpoint; use crate::{ControllerError, Encoder, InputConsumer, OutputEndpoint}; -use feldera_types::config::{ConnectorConfig, TransportConfig}; +use datafusion::execution::runtime_env::RuntimeEnv; +use feldera_types::config::{ConnectorConfig, PipelineConfig, TransportConfig}; use feldera_types::program_schema::Relation; -use std::sync::Weak; +use std::sync::{Arc, Weak}; #[cfg(feature = "with-deltalake")] pub mod delta_table; @@ -90,17 +91,31 @@ pub fn create_integrated_output_endpoint( pub fn create_integrated_input_endpoint( endpoint_name: &str, config: &ConnectorConfig, + pipeline_config: &PipelineConfig, + runtime_env: Arc, consumer: Box, ) -> Result, ControllerError> { let ep: Box = match &config.transport { #[cfg(feature = "with-deltalake")] - TransportConfig::DeltaTableInput(config) => Box::new( - delta_table::DeltaTableInputEndpoint::new(endpoint_name, config, consumer), - ), + TransportConfig::DeltaTableInput(config) => { + Box::new(delta_table::DeltaTableInputEndpoint::new( + endpoint_name, + config, + pipeline_config, + runtime_env, + consumer, + )) + } #[cfg(feature = "with-iceberg")] - TransportConfig::IcebergInput(config) => Box::new( - feldera_iceberg::IcebergInputEndpoint::new(endpoint_name, config, consumer), - ), + TransportConfig::IcebergInput(config) => { + Box::new(feldera_iceberg::IcebergInputEndpoint::new( + endpoint_name, + config, + pipeline_config, + runtime_env, + consumer, + )) + } TransportConfig::PostgresInput(config) => { Box::new(PostgresInputEndpoint::new(endpoint_name, config, consumer)) } diff --git a/crates/adapters/src/integrated/delta_table/input.rs b/crates/adapters/src/integrated/delta_table/input.rs index 31eecfdd6ed..2f7aeec7346 100644 --- a/crates/adapters/src/integrated/delta_table/input.rs +++ b/crates/adapters/src/integrated/delta_table/input.rs @@ -16,7 +16,6 @@ use datafusion::datasource::listing::{ use datafusion::logical_expr::LogicalPlan; use datafusion::physical_plan::{PhysicalExpr, displayable}; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; -use datafusion::prelude::SessionConfig; use dbsp::circuit::tokio::TOKIO; use deltalake::datafusion::dataframe::DataFrame; use deltalake::datafusion::execution::context::SQLOptions; @@ -29,12 +28,12 @@ use feldera_adapterlib::format::{ParseError, StagedInputBuffer}; use feldera_adapterlib::metrics::{ConnectorMetrics, ValueType}; use feldera_adapterlib::transport::{InputQueueEntry, Resume, Watermark, parse_resume_info}; use feldera_adapterlib::utils::datafusion::{ - array_to_string, execute_query_collect, execute_singleton_query, timestamp_to_sql_expression, - validate_sql_expression, validate_timestamp_column, + array_to_string, create_session_context_with, execute_query_collect, execute_singleton_query, + timestamp_to_sql_expression, validate_sql_expression, validate_timestamp_column, }; use feldera_storage::tokio::TOKIO_DEDICATED_IO; use feldera_types::adapter_stats::ConnectorHealth; -use feldera_types::config::FtModel; +use feldera_types::config::{FtModel, PipelineConfig}; use feldera_types::program_schema::Relation; use feldera_types::transport::delta_table::{DeltaTableReaderConfig, DeltaTableTransactionMode}; use futures_util::StreamExt; @@ -109,6 +108,7 @@ fn quote_sql_identifier>(ident: S) -> String { pub struct DeltaTableInputEndpoint { endpoint_name: String, config: DeltaTableReaderConfig, + datafusion: SessionContext, consumer: Box, } @@ -116,13 +116,87 @@ impl DeltaTableInputEndpoint { pub fn new( endpoint_name: &str, config: &DeltaTableReaderConfig, + pipeline_config: &PipelineConfig, + runtime_env: Arc, consumer: Box, ) -> Self { register_storage_handlers(); + // Number of parallel scan partitions DataFusion plans for the + // snapshot read. Resolution order: + // + // 1. `DELTA_DF_TARGET_PARTITIONS` env var (process-wide override). + // 2. `max_concurrent_readers` from the connector config (which + // also drives the global `DELTA_READER_SEMAPHORE`); using the + // same value here keeps the per-connector parallelism aligned + // with the process-wide concurrent-reader cap. + // 3. `DEFAULT_MAX_CONCURRENT_READERS` (6), matching the semaphore + // default. + let env_target_partitions = match std::env::var("DELTA_DF_TARGET_PARTITIONS").ok() { + None => None, + Some(s) => match s.parse::() { + Ok(n) if n > 0 => Some(n), + _ => { + warn!( + "delta_table {endpoint_name}: ignoring DELTA_DF_TARGET_PARTITIONS={s:?}; expected a positive integer" + ); + None + } + }, + }; + let target_partitions = env_target_partitions + .or_else(|| { + config + .max_concurrent_readers + .map(|n| n as usize) + .filter(|n| *n > 0) + }) + .unwrap_or(DEFAULT_MAX_CONCURRENT_READERS); + info!("delta_table {endpoint_name}: target_partitions={target_partitions}"); + + let env_batch_size = match std::env::var("DELTA_DF_BATCH_SIZE").ok() { + None => None, + Some(s) => match s.parse::() { + Ok(n) if n > 0 => { + info!("delta_table {endpoint_name}: applying DELTA_DF_BATCH_SIZE={n}"); + Some(n) + } + _ => { + warn!( + "delta_table {endpoint_name}: ignoring DELTA_DF_BATCH_SIZE={s:?}; expected a positive integer" + ); + None + } + }, + }; + + // Configure datafusion not to generate Utf8View arrow types, which are + // not yet supported by the `serde_arrow` crate. The `SessionContext` + // shares the pipeline-wide `RuntimeEnv` so that the CDC-mode ORDER BY + // query spills to the same bounded memory pool and on-disk scratch + // dir as every other datafusion user in the pipeline. + // + // The `target_partitions` and (optional) `batch_size` overrides are + // applied after `create_session_context_with` populates its own + // worker-derived defaults, so the Delta connector's resolution above + // wins. + let datafusion = create_session_context_with(pipeline_config, runtime_env, |cfg| { + let cfg = cfg + .set_bool( + "datafusion.execution.parquet.schema_force_view_types", + false, + ) + .set_usize("datafusion.execution.target_partitions", target_partitions); + match env_batch_size { + Some(n) => cfg.set_usize("datafusion.execution.batch_size", n), + None => cfg, + } + }); + Self { endpoint_name: endpoint_name.to_string(), config: config.clone(), + datafusion, consumer, } } @@ -142,6 +216,7 @@ impl IntegratedInputEndpoint for DeltaTableInputEndpoint { Ok(Box::new(DeltaTableInputReader::new( self.endpoint_name, self.config, + self.datafusion, self.consumer, input_handle, resume_info, @@ -158,6 +233,7 @@ impl DeltaTableInputReader { fn new( endpoint_name: String, mut config: DeltaTableReaderConfig, + datafusion: SessionContext, consumer: Box, input_handle: &InputCollectionHandle, resume_info: Option, @@ -281,6 +357,7 @@ impl DeltaTableInputReader { let endpoint = Arc::new(DeltaTableInputEndpointInner::new( &endpoint_name, config, + datafusion, consumer, schema, resume_info.clone(), @@ -611,62 +688,12 @@ impl DeltaTableInputEndpointInner { fn new( endpoint_name: &str, config: DeltaTableReaderConfig, + datafusion: SessionContext, consumer: Box, schema: Relation, resume_info: Option, ) -> Self { let queue = Arc::new(InputQueue::new(consumer.clone())); - // Configure datafusion not to generate Utf8View arrow types, which are not yet - // supported by the `serde_arrow` crate. - let mut session_config = SessionConfig::new().set_bool( - "datafusion.execution.parquet.schema_force_view_types", - false, - ); - // Number of parallel scan partitions DataFusion plans for the - // snapshot read. Resolution order: - // - // 1. `DELTA_DF_TARGET_PARTITIONS` env var (process-wide override). - // 2. `max_concurrent_readers` from the connector config (which - // also drives the global `DELTA_READER_SEMAPHORE`); using the - // same value here keeps the per-connector parallelism aligned - // with the process-wide concurrent-reader cap. - // 3. `DEFAULT_MAX_CONCURRENT_READERS` (6), matching the semaphore - // default. - let env_target_partitions = match std::env::var("DELTA_DF_TARGET_PARTITIONS").ok() { - None => None, - Some(s) => match s.parse::() { - Ok(n) if n > 0 => Some(n), - _ => { - warn!( - "delta_table {endpoint_name}: ignoring DELTA_DF_TARGET_PARTITIONS={s:?}; expected a positive integer" - ); - None - } - }, - }; - let target_partitions = env_target_partitions - .or_else(|| { - config - .max_concurrent_readers - .map(|n| n as usize) - .filter(|n| *n > 0) - }) - .unwrap_or(DEFAULT_MAX_CONCURRENT_READERS); - session_config = - session_config.set_usize("datafusion.execution.target_partitions", target_partitions); - info!("delta_table {endpoint_name}: target_partitions={target_partitions}"); - - if let Ok(s) = std::env::var("DELTA_DF_BATCH_SIZE") { - match s.parse::() { - Ok(n) if n > 0 => { - session_config = session_config.set_usize("datafusion.execution.batch_size", n); - info!("delta_table {endpoint_name}: applying DELTA_DF_BATCH_SIZE={n}"); - } - _ => warn!( - "delta_table {endpoint_name}: ignoring DELTA_DF_BATCH_SIZE={s:?}; expected a positive integer" - ), - } - } let metrics = Arc::new(DeltaTableMetrics::new()); consumer.set_custom_metrics(Arc::clone(&metrics) as Arc); @@ -678,7 +705,7 @@ impl DeltaTableInputEndpointInner { schema, config, consumer, - datafusion: SessionContext::new_with_config(session_config), + datafusion, transaction_index: AtomicUsize::new(0), // Set version to None by default so that the connector is checkpointable in the initial state. diff --git a/crates/dbsp/src/circuit/checkpointer.rs b/crates/dbsp/src/circuit/checkpointer.rs index a118a768ee0..4976151f12f 100644 --- a/crates/dbsp/src/circuit/checkpointer.rs +++ b/crates/dbsp/src/circuit/checkpointer.rs @@ -5,7 +5,7 @@ use crate::storage::file::SerializerInner; use crate::{Error, NumEntries, TypedBox}; use feldera_types::checkpoint::CheckpointMetadata; use feldera_types::constants::{ - ACTIVATION_MARKER_FILE, ADHOC_TEMP_DIR, CHECKPOINT_DEPENDENCIES, CHECKPOINT_FILE_NAME, + ACTIVATION_MARKER_FILE, CHECKPOINT_DEPENDENCIES, CHECKPOINT_FILE_NAME, DATAFUSION_TEMP_DIR, DBSP_FILE_EXTENSION, STATE_FILE, STATUS_FILE, STEPS_FILE, }; use itertools::Itertools; @@ -112,7 +112,7 @@ impl Checkpointer { // it. in_use_paths.insert(STATUS_FILE.into()); in_use_paths.insert(format!("{}.mut", STATUS_FILE).into()); - in_use_paths.insert(ADHOC_TEMP_DIR.into()); + in_use_paths.insert(DATAFUSION_TEMP_DIR.into()); in_use_paths.insert(ACTIVATION_MARKER_FILE.into()); for cpm in self.checkpoint_list.iter() { in_use_paths.insert(cpm.uuid.to_string().into()); diff --git a/crates/feldera-types/src/config.rs b/crates/feldera-types/src/config.rs index 68117df7d42..19abe7dbfb9 100644 --- a/crates/feldera-types/src/config.rs +++ b/crates/feldera-types/src/config.rs @@ -803,6 +803,25 @@ pub struct RuntimeConfig { /// for more details. pub max_rss_mb: Option, + /// DataFusion memory pool size, in MB, shared by the ad-hoc query + /// engine and the Delta Lake / Iceberg connectors. + /// + /// Carved out of `max_rss_mb` (falling back to + /// `resources.memory_mb_max`); the remainder goes to the DBSP circuit, + /// so the two do not double-book RAM. + /// + /// Unset: defaults to 5% of the effective budget, capped at 2 GB. + /// Pipelines that don't run heavy ad-hoc / Delta / Iceberg workloads + /// can leave this unset. + /// + /// Sort/aggregate-heavy ad-hoc queries (especially at high `workers` + /// counts) should set this explicitly. An under-sized pool surfaces as + /// `ResourcesExhausted` on the failing query — the pipeline keeps + /// running and only that query fails. + /// + /// No pool limit applied if no overall budget is configured. + pub datafusion_memory_mb: Option, + /// Number of DBSP hosts. /// /// The worker threads are evenly divided among the hosts. For single-host @@ -1065,6 +1084,7 @@ impl Default for RuntimeConfig { Self { workers: 8, max_rss_mb: None, + datafusion_memory_mb: None, hosts: 1, storage: Some(StorageOptions::default()), fault_tolerance: FtConfig::default(), @@ -1094,6 +1114,35 @@ impl Default for RuntimeConfig { } } +/// Upper bound on the default DataFusion pool size, in MB. +/// +/// Spill-to-disk handles overflow; reserving more starves the circuit. +pub const DEFAULT_DATAFUSION_MEMORY_MB_CEILING: u64 = 2048; + +/// Default DataFusion pool sizing fraction, as the divisor `effective / N`. +pub const DEFAULT_DATAFUSION_MEMORY_FRACTION_DIVISOR: u64 = 20; + +impl RuntimeConfig { + /// Pipeline's effective memory budget in MB: `max_rss_mb`, falling back + /// to `resources.memory_mb_max` (the k8s pod limit). + pub fn effective_memory_mb(&self) -> Option { + self.max_rss_mb.or(self.resources.memory_mb_max) + } + + /// Resolved DataFusion pool size in MB: explicit `datafusion_memory_mb` + /// if set, else 5% of the effective budget capped at + /// `DEFAULT_DATAFUSION_MEMORY_MB_CEILING`. `None` if no budget is + /// configured. + pub fn resolved_datafusion_memory_mb(&self) -> Option { + if let Some(explicit) = self.datafusion_memory_mb { + return Some(explicit); + } + let effective = self.effective_memory_mb()?; + let fraction = effective / DEFAULT_DATAFUSION_MEMORY_FRACTION_DIVISOR; + Some(fraction.min(DEFAULT_DATAFUSION_MEMORY_MB_CEILING)) + } +} + /// Fault-tolerance configuration. /// /// The default [FtConfig] (via [FtConfig::default]) disables fault tolerance, @@ -1143,9 +1192,82 @@ impl Default for FtConfig { #[cfg(test)] mod test { use super::deserialize_fault_tolerance; - use crate::config::{FtConfig, FtModel}; + use crate::config::{ + DEFAULT_DATAFUSION_MEMORY_MB_CEILING, FtConfig, FtModel, ResourceConfig, RuntimeConfig, + }; use serde::{Deserialize, Serialize}; + #[test] + fn resolved_datafusion_memory_explicit_passes_through() { + let config = RuntimeConfig { + max_rss_mb: Some(8_000), + datafusion_memory_mb: Some(1_500), + ..Default::default() + }; + assert_eq!(config.resolved_datafusion_memory_mb(), Some(1_500)); + } + + #[test] + fn resolved_datafusion_memory_unconfigured_returns_none() { + let config = RuntimeConfig::default(); + assert!(config.max_rss_mb.is_none()); + assert!(config.resources.memory_mb_max.is_none()); + assert_eq!(config.resolved_datafusion_memory_mb(), None); + } + + #[test] + fn resolved_datafusion_memory_small_budget_scales_down() { + // Small pipelines must provision cleanly; the default just shrinks. + let config = RuntimeConfig { + max_rss_mb: Some(256), + ..Default::default() + }; + assert_eq!(config.resolved_datafusion_memory_mb(), Some(12)); + + let config = RuntimeConfig { + max_rss_mb: Some(512), + ..Default::default() + }; + assert_eq!(config.resolved_datafusion_memory_mb(), Some(25)); + } + + #[test] + fn resolved_datafusion_memory_clamps_to_ceiling_for_large_budgets() { + // 5% of 64 GB = 3.2 GB, above the 2 GB ceiling. + let config = RuntimeConfig { + max_rss_mb: Some(64_000), + ..Default::default() + }; + assert_eq!( + config.resolved_datafusion_memory_mb(), + Some(DEFAULT_DATAFUSION_MEMORY_MB_CEILING), + ); + } + + #[test] + fn resolved_datafusion_memory_midrange_uses_five_percent() { + // 5% of 16 GB = 800 MB, inside the clamp range. + let config = RuntimeConfig { + max_rss_mb: Some(16_000), + ..Default::default() + }; + assert_eq!(config.resolved_datafusion_memory_mb(), Some(800)); + } + + #[test] + fn resolved_datafusion_memory_falls_back_to_resources() { + // No max_rss_mb, but resources.memory_mb_max is set. + let config = RuntimeConfig { + max_rss_mb: None, + resources: ResourceConfig { + memory_mb_max: Some(16_000), + ..Default::default() + }, + ..Default::default() + }; + assert_eq!(config.resolved_datafusion_memory_mb(), Some(800)); + } + #[test] fn ft_config() { #[derive(Serialize, Deserialize, Default, PartialEq, Eq, Debug)] diff --git a/crates/feldera-types/src/constants.rs b/crates/feldera-types/src/constants.rs index ccf3af80d41..1f973b18046 100644 --- a/crates/feldera-types/src/constants.rs +++ b/crates/feldera-types/src/constants.rs @@ -14,7 +14,16 @@ pub const STEPS_FILE: &str = "steps.bin"; pub const CHECKPOINT_DEPENDENCIES: &str = "dependencies.json"; -pub const ADHOC_TEMP_DIR: &str = "adhoc-tmp"; +/// Subdirectory under the pipeline's storage path where DataFusion writes +/// spill files for the ad-hoc query engine and every integrated connector +/// that uses DataFusion (Delta Lake, Iceberg). +/// +/// One pipeline-wide directory keeps the on-disk layout discoverable in a +/// single place and lets `gc_startup` allowlist it as a single entry. If +/// you change this value, audit `gc_startup` in `dbsp::circuit::checkpointer` +/// — the GC's allowlist must keep matching the directory the runtime +/// actually creates. +pub const DATAFUSION_TEMP_DIR: &str = "datafusion-tmp"; /// A slice of all file-extension the system can create. pub const DBSP_FILE_EXTENSION: &[&str] = &["mut", "feldera"]; diff --git a/crates/iceberg/src/input.rs b/crates/iceberg/src/input.rs index ea0fea0cb1a..2da00d4ffb0 100644 --- a/crates/iceberg/src/input.rs +++ b/crates/iceberg/src/input.rs @@ -12,13 +12,13 @@ use feldera_adapterlib::{ IntegratedInputEndpoint, NonFtInputReaderCommand, }, utils::datafusion::{ - array_to_string, execute_query_collect, execute_singleton_query, + array_to_string, create_session_context, execute_query_collect, execute_singleton_query, timestamp_to_sql_expression, validate_sql_expression, validate_timestamp_column, }, PipelineState, }; use feldera_types::{ - config::FtModel, + config::{FtModel, PipelineConfig}, program_schema::Relation, transport::iceberg::{IcebergCatalogType, IcebergReaderConfig}, }; @@ -62,12 +62,16 @@ impl IcebergInputEndpoint { pub fn new( endpoint_name: &str, config: &IcebergReaderConfig, + pipeline_config: &PipelineConfig, + runtime_env: Arc, consumer: Box, ) -> Self { Self { inner: Arc::new(IcebergInputEndpointInner::new( endpoint_name, config.clone(), + pipeline_config, + runtime_env, consumer, )), } @@ -183,14 +187,20 @@ impl IcebergInputEndpointInner { fn new( endpoint_name: &str, config: IcebergReaderConfig, + pipeline_config: &PipelineConfig, + runtime_env: Arc, consumer: Box, ) -> Self { let queue = InputQueue::new(consumer.clone()); + // Share the pipeline-wide `RuntimeEnv` so that scans against the + // iceberg table spill to the bounded memory pool and on-disk scratch + // dir alongside every other datafusion user in the pipeline. + let datafusion = create_session_context(pipeline_config, runtime_env); Self { endpoint_name: endpoint_name.to_string(), config, consumer, - datafusion: SessionContext::new(), + datafusion, queue, } } diff --git a/crates/pipeline-manager/src/api/examples.rs b/crates/pipeline-manager/src/api/examples.rs index 7839f8557fe..00410b72b03 100644 --- a/crates/pipeline-manager/src/api/examples.rs +++ b/crates/pipeline-manager/src/api/examples.rs @@ -96,6 +96,7 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr { runtime_config: serde_json::to_value(RuntimeConfig { workers: 10, max_rss_mb: None, + datafusion_memory_mb: None, hosts: 1, storage: Some(StorageOptions::default()), fault_tolerance: FtConfig::default(), diff --git a/crates/pipeline-manager/src/db/test.rs b/crates/pipeline-manager/src/db/test.rs index ae20859af54..d38de455b0e 100644 --- a/crates/pipeline-manager/src/db/test.rs +++ b/crates/pipeline-manager/src/db/test.rs @@ -300,6 +300,7 @@ fn map_val_to_limited_runtime_config(val: RuntimeConfigPropVal) -> serde_json::V serde_json::to_value(RuntimeConfig { workers: val.val0, max_rss_mb: val.val21, + datafusion_memory_mb: None, hosts: val.val20, cpu_profiler: val.val1, min_batch_size_records: val.val2, @@ -1192,6 +1193,7 @@ async fn pipeline_versioning() { let new_runtime_config = serde_json::to_value(RuntimeConfig { workers: 100, max_rss_mb: None, + datafusion_memory_mb: None, hosts: 1, storage: None, fault_tolerance: FtConfig::default(), @@ -2484,6 +2486,7 @@ async fn pipeline_provision_version_guard() { serde_json::to_value(RuntimeConfig { workers: 10, max_rss_mb: None, + datafusion_memory_mb: None, hosts: 1, storage: None, fault_tolerance: FtConfig::default(), diff --git a/openapi.json b/openapi.json index d1d3336cac3..687eeee8ce6 100644 --- a/openapi.json +++ b/openapi.json @@ -584,6 +584,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -674,6 +675,7 @@ "runtime_config": { "workers": 10, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -794,6 +796,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -867,6 +870,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -1050,6 +1054,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -1197,6 +1202,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -1270,6 +1276,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -1370,6 +1377,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -1639,6 +1647,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -6473,6 +6482,7 @@ "runtime_config": { "workers": 16, "max_rss_mb": null, + "datafusion_memory_mb": null, "hosts": 1, "storage": { "backend": { @@ -10623,6 +10633,14 @@ "description": "Enable CPU profiler.\n\nThe default value is `true`.", "default": true }, + "datafusion_memory_mb": { + "type": "integer", + "format": "int64", + "description": "DataFusion memory pool size, in MB, shared by the ad-hoc query\nengine and the Delta Lake / Iceberg connectors.\n\nCarved out of `max_rss_mb` (falling back to\n`resources.memory_mb_max`); the remainder goes to the DBSP circuit,\nso the two do not double-book RAM.\n\nUnset: defaults to 5% of the effective budget, capped at 2 GB.\nPipelines that don't run heavy ad-hoc / Delta / Iceberg workloads\ncan leave this unset.\n\nSort/aggregate-heavy ad-hoc queries (especially at high `workers`\ncounts) should set this explicitly. An under-sized pool surfaces as\n`ResourcesExhausted` on the failing query — the pipeline keeps\nrunning and only that query fails.\n\nNo pool limit applied if no overall budget is configured.", + "default": null, + "nullable": true, + "minimum": 0 + }, "dev_tweaks": { "allOf": [ { @@ -12497,6 +12515,14 @@ "description": "Enable CPU profiler.\n\nThe default value is `true`.", "default": true }, + "datafusion_memory_mb": { + "type": "integer", + "format": "int64", + "description": "DataFusion memory pool size, in MB, shared by the ad-hoc query\nengine and the Delta Lake / Iceberg connectors.\n\nCarved out of `max_rss_mb` (falling back to\n`resources.memory_mb_max`); the remainder goes to the DBSP circuit,\nso the two do not double-book RAM.\n\nUnset: defaults to 5% of the effective budget, capped at 2 GB.\nPipelines that don't run heavy ad-hoc / Delta / Iceberg workloads\ncan leave this unset.\n\nSort/aggregate-heavy ad-hoc queries (especially at high `workers`\ncounts) should set this explicitly. An under-sized pool surfaces as\n`ResourcesExhausted` on the failing query — the pipeline keeps\nrunning and only that query fails.\n\nNo pool limit applied if no overall budget is configured.", + "default": null, + "nullable": true, + "minimum": 0 + }, "dev_tweaks": { "allOf": [ {