From c1ad4cbd8e16f5352d61404763cae6032615eb9f Mon Sep 17 00:00:00 2001 From: Leo Stewen Date: Tue, 31 Mar 2026 16:24:34 +0200 Subject: [PATCH 1/4] Implement From for CircuitConfig --- crates/dbsp/src/circuit/dbsp_handle.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/dbsp/src/circuit/dbsp_handle.rs b/crates/dbsp/src/circuit/dbsp_handle.rs index 697da65591..5f8fad451a 100644 --- a/crates/dbsp/src/circuit/dbsp_handle.rs +++ b/crates/dbsp/src/circuit/dbsp_handle.rs @@ -22,6 +22,7 @@ pub use feldera_types::config::{StorageCacheConfig, StorageConfig, StorageOption use feldera_types::transaction::CommitProgressSummary; use itertools::Either; use std::collections::BTreeMap; +use std::num::NonZeroUsize; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -494,6 +495,12 @@ impl From for CircuitConfig { } } +impl From for CircuitConfig { + fn from(n_workers: NonZeroUsize) -> Self { + Self::with_workers(n_workers.get()) + } +} + impl From for CircuitConfig { fn from(layout: Layout) -> Self { Self { From f51e7b6270336850d2b0da7c0794c1c99955e646 Mon Sep 17 00:00:00 2001 From: "release-feldera-feldera[bot]" Date: Wed, 1 Apr 2026 07:54:41 +0000 Subject: [PATCH 2/4] ci: Prepare for v0.278.0 --- Cargo.lock | 40 ++++++++++++++++++++-------------------- Cargo.toml | 24 ++++++++++++------------ openapi.json | 2 +- python/pyproject.toml | 2 +- python/uv.lock | 2 +- 5 files changed, 35 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ae3bbd658..87207c90c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3761,7 +3761,7 @@ dependencies = [ [[package]] name = "dbsp" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "arc-swap", @@ -3849,7 +3849,7 @@ dependencies = [ [[package]] name = "dbsp_adapters" -version = "0.277.0" +version = "0.278.0" dependencies = [ "actix", "actix-codec", @@ -3986,7 +3986,7 @@ dependencies = [ [[package]] name = "dbsp_nexmark" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "ascii_table", @@ -4861,7 +4861,7 @@ dependencies = [ [[package]] name = "fda" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "arrow", @@ -4913,7 +4913,7 @@ dependencies = [ [[package]] name = "feldera-adapterlib" -version = "0.277.0" +version = "0.278.0" dependencies = [ "actix-web", "anyhow", @@ -4944,7 +4944,7 @@ dependencies = [ [[package]] name = "feldera-buffer-cache" -version = "0.277.0" +version = "0.278.0" dependencies = [ "crossbeam-utils", "enum-map", @@ -4972,7 +4972,7 @@ dependencies = [ [[package]] name = "feldera-datagen" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "async-channel 2.5.0", @@ -4998,7 +4998,7 @@ dependencies = [ [[package]] name = "feldera-fxp" -version = "0.277.0" +version = "0.278.0" dependencies = [ "bytecheck", "dbsp", @@ -5018,7 +5018,7 @@ dependencies = [ [[package]] name = "feldera-iceberg" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "chrono", @@ -5038,7 +5038,7 @@ dependencies = [ [[package]] name = "feldera-ir" -version = "0.277.0" +version = "0.278.0" dependencies = [ "proptest", "proptest-derive", @@ -5050,7 +5050,7 @@ dependencies = [ [[package]] name = "feldera-macros" -version = "0.277.0" +version = "0.278.0" dependencies = [ "prettyplease", "proc-macro2", @@ -5060,7 +5060,7 @@ dependencies = [ [[package]] name = "feldera-observability" -version = "0.277.0" +version = "0.278.0" dependencies = [ "actix-http", "awc", @@ -5075,7 +5075,7 @@ dependencies = [ [[package]] name = "feldera-rest-api" -version = "0.277.0" +version = "0.278.0" dependencies = [ "chrono", "feldera-observability", @@ -5109,7 +5109,7 @@ dependencies = [ [[package]] name = "feldera-sqllib" -version = "0.277.0" +version = "0.278.0" dependencies = [ "arcstr", "base58", @@ -5150,7 +5150,7 @@ dependencies = [ [[package]] name = "feldera-storage" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "crossbeam", @@ -5173,7 +5173,7 @@ dependencies = [ [[package]] name = "feldera-types" -version = "0.277.0" +version = "0.278.0" dependencies = [ "actix-web", "anyhow", @@ -8094,7 +8094,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeline-manager" -version = "0.277.0" +version = "0.278.0" dependencies = [ "actix-cors", "actix-files", @@ -9188,7 +9188,7 @@ dependencies = [ [[package]] name = "readers" -version = "0.277.0" +version = "0.278.0" dependencies = [ "async-std", "csv", @@ -10764,7 +10764,7 @@ dependencies = [ [[package]] name = "sltsqlvalue" -version = "0.277.0" +version = "0.278.0" dependencies = [ "dbsp", "feldera-sqllib", @@ -11067,7 +11067,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "storage-test-compat" -version = "0.277.0" +version = "0.278.0" dependencies = [ "dbsp", "derive_more 1.0.0", diff --git a/Cargo.toml b/Cargo.toml index 1c455e33f6..6fe1e37ec5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace.package] authors = ["Feldera Team "] -version = "0.277.0" +version = "0.278.0" license = "MIT OR Apache-2.0" homepage = "https://github.com/feldera/feldera" repository = "https://github.com/feldera/feldera" @@ -101,7 +101,7 @@ csv = "1.2.2" csv-core = "0.1.10" dashmap = "6.1.0" datafusion = "51.0" -dbsp = { path = "crates/dbsp", version = "0.277.0" } +dbsp = { path = "crates/dbsp", version = "0.278.0" } dbsp_nexmark = { path = "crates/nexmark" } deadpool-postgres = "0.14.1" #deltalake = "0.30.2" @@ -121,19 +121,19 @@ erased-serde = "0.3.31" fake = "2.10" fastbloom = "0.14.0" fdlimit = "0.3.0" -feldera-buffer-cache = { version = "0.277.0", path = "crates/buffer-cache" } +feldera-buffer-cache = { version = "0.278.0", path = "crates/buffer-cache" } feldera-cloud1-client = "0.1.2" feldera-datagen = { path = "crates/datagen" } -feldera-fxp = { version = "0.277.0", path = "crates/fxp", features = ["dbsp"] } +feldera-fxp = { version = "0.278.0", path = "crates/fxp", features = ["dbsp"] } feldera-iceberg = { path = "crates/iceberg" } -feldera-observability = { version = "0.277.0", path = "crates/feldera-observability" } -feldera-macros = { version = "0.277.0", path = "crates/feldera-macros" } -feldera-sqllib = { version = "0.277.0", path = "crates/sqllib" } -feldera-storage = { version = "0.277.0", path = "crates/storage" } -feldera-types = { version = "0.277.0", path = "crates/feldera-types" } -feldera-rest-api = { version = "0.277.0", path = "crates/rest-api" } -feldera-ir = { version = "0.277.0", path = "crates/ir" } -feldera-adapterlib = { version = "0.277.0", path = "crates/adapterlib" } +feldera-observability = { version = "0.278.0", path = "crates/feldera-observability" } +feldera-macros = { version = "0.278.0", path = "crates/feldera-macros" } +feldera-sqllib = { version = "0.278.0", path = "crates/sqllib" } +feldera-storage = { version = "0.278.0", path = "crates/storage" } +feldera-types = { version = "0.278.0", path = "crates/feldera-types" } +feldera-rest-api = { version = "0.278.0", path = "crates/rest-api" } +feldera-ir = { version = "0.278.0", path = "crates/ir" } +feldera-adapterlib = { version = "0.278.0", path = "crates/adapterlib" } flate2 = "1.1.0" form_urlencoded = "1.2.0" futures = "0.3.30" diff --git a/openapi.json b/openapi.json index 4c716d9fd2..d835becb4a 100644 --- a/openapi.json +++ b/openapi.json @@ -10,7 +10,7 @@ "license": { "name": "MIT OR Apache-2.0" }, - "version": "0.277.0" + "version": "0.278.0" }, "paths": { "/config/authentication": { diff --git a/python/pyproject.toml b/python/pyproject.toml index 7e0b5c5667..c8808d7978 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "feldera" readme = "README.md" description = "The feldera python client" -version = "0.277.0" +version = "0.278.0" license = "MIT" requires-python = ">=3.10" authors = [ diff --git a/python/uv.lock b/python/uv.lock index 8a04482e6e..e37650e602 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -169,7 +169,7 @@ wheels = [ [[package]] name = "feldera" -version = "0.277.0" +version = "0.278.0" source = { editable = "." } dependencies = [ { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, From 3d73ba50fa639e022022c42712818aae40aa8de5 Mon Sep 17 00:00:00 2001 From: Simon Kassing Date: Mon, 30 Mar 2026 19:11:11 +0200 Subject: [PATCH 3/4] cargo: MSRV from `1.91.1` to `1.93.1` Signed-off-by: Simon Kassing --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6fe1e37ec5..0d246a88f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ repository = "https://github.com/feldera/feldera" keywords = ["ivm", "analytics", "database", "incremental", "sql"] readme = "README.md" # Define Minimum Supported Rust Version (MSRV) -rust-version = "1.91.1" +rust-version = "1.93.1" edition = "2021" [workspace] From 714a79c1e5d1efd575924cb897e4070a89e06fae Mon Sep 17 00:00:00 2001 From: Swanand Mulay <73115739+swanandx@users.noreply.github.com> Date: Sun, 15 Mar 2026 13:42:50 +0530 Subject: [PATCH 4/4] [adapters]: parallel delta output encoder Use SplitCursor to split the batch and distribute it across tasks, each task retries encoding and writing to delta lake and then returns Add actions which main task retries to commit to delta lake Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com> --- crates/adapters/Cargo.toml | 4 + crates/adapters/benches/delta_encoder.rs | 78 + crates/adapters/src/integrated.rs | 2 +- .../src/integrated/delta_table/output.rs | 1318 ++++++++++++----- .../src/integrated/delta_table/test.rs | 22 +- .../src/transport/delta_table.rs | 19 + .../docs/connectors/sinks/delta.md | 1 + openapi.json | 6 + 8 files changed, 1103 insertions(+), 347 deletions(-) create mode 100644 crates/adapters/benches/delta_encoder.rs diff --git a/crates/adapters/Cargo.toml b/crates/adapters/Cargo.toml index 95eaeb15a8..02f0a32c82 100644 --- a/crates/adapters/Cargo.toml +++ b/crates/adapters/Cargo.toml @@ -246,3 +246,7 @@ required-features = ["with-avro"] name = "postgres_output" harness = false required-features = ["bench-mode"] + +[[bench]] +name = "delta_encoder" +harness = false diff --git a/crates/adapters/benches/delta_encoder.rs b/crates/adapters/benches/delta_encoder.rs new file mode 100644 index 0000000000..23f97bbda3 --- /dev/null +++ b/crates/adapters/benches/delta_encoder.rs @@ -0,0 +1,78 @@ +mod bench_common; + +use bench_common::{BenchKeyStruct, BenchTestStruct, build_indexed_batch, generate_test_data}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use dbsp_adapters::Encoder; +use dbsp_adapters::integrated::delta_table::DeltaTableWriter; +use feldera_types::transport::delta_table::{DeltaTableWriteMode, DeltaTableWriterConfig}; +use std::sync::Weak; +use tempfile::TempDir; + +// --------------------------------------------------------------------------- +// Delta-specific helpers +// --------------------------------------------------------------------------- + +fn create_indexed_writer(threads: usize, table_uri: &str) -> DeltaTableWriter { + let config = DeltaTableWriterConfig { + uri: table_uri.to_string(), + mode: DeltaTableWriteMode::Truncate, + max_retries: Some(0), + threads: Some(threads), + object_store_config: Default::default(), + }; + let key_schema = Some(BenchKeyStruct::relation_schema()); + let mut value_schema = BenchTestStruct::relation_schema(); + value_schema.materialized = true; + DeltaTableWriter::new( + Default::default(), + "bench_endpoint", + &config, + &key_schema, + &value_schema, + Weak::new(), + ) + .unwrap() +} + +// --------------------------------------------------------------------------- +// Benchmarks +// --------------------------------------------------------------------------- + +/// Benchmark parallel Delta table encoding with 100k records across 1/2/4/8 workers. +fn bench_indexed_encode(c: &mut Criterion) { + let num_records = 100_000; + let data = generate_test_data(num_records); + let batch = build_indexed_batch(&data); + + let mut group = c.benchmark_group("delta_indexed_encode"); + group.throughput(criterion::Throughput::Elements(num_records as u64)); + + for workers in [1, 2, 4, 8] { + group.bench_with_input( + BenchmarkId::new("workers", workers), + &workers, + |b, &workers| { + // Each iteration needs a fresh directory since the writer + // creates real Parquet files. + b.iter_with_setup( + || { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + let writer = create_indexed_writer(workers, &table_uri); + (writer, table_dir) + }, + |(mut writer, _table_dir)| { + writer.consumer().batch_start(0); + writer.encode(batch.clone().arc_as_batch_reader()).unwrap(); + writer.consumer().batch_end(); + }, + ); + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_indexed_encode); +criterion_main!(benches); diff --git a/crates/adapters/src/integrated.rs b/crates/adapters/src/integrated.rs index 0bb3ad1446..aff0c2b100 100644 --- a/crates/adapters/src/integrated.rs +++ b/crates/adapters/src/integrated.rs @@ -6,7 +6,7 @@ use feldera_types::program_schema::Relation; use std::sync::Weak; #[cfg(feature = "with-deltalake")] -mod delta_table; +pub mod delta_table; mod postgres; use crate::integrated::postgres::PostgresInputEndpoint; diff --git a/crates/adapters/src/integrated/delta_table/output.rs b/crates/adapters/src/integrated/delta_table/output.rs index 3d011061a2..fc364063c7 100644 --- a/crates/adapters/src/integrated/delta_table/output.rs +++ b/crates/adapters/src/integrated/delta_table/output.rs @@ -1,4 +1,4 @@ -use crate::catalog::{CursorWithPolarity, SerBatchReader}; +use crate::catalog::{CursorWithPolarity, SerBatchReader, SplitCursorBuilder}; use crate::controller::{ControllerInner, EndpointId}; use crate::format::MAX_DUPLICATES; use crate::format::parquet::relation_to_arrow_fields; @@ -9,8 +9,7 @@ use crate::{ AsyncErrorCallback, ControllerError, Encoder, OutputConsumer, OutputEndpoint, RecordFormat, SerCursor, }; -use anyhow::{Error as AnyError, Result as AnyResult, anyhow, bail}; -use arrow::array::RecordBatch; +use anyhow::{Result as AnyResult, anyhow, bail}; use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; use chrono::Utc; use dbsp::circuit::tokio::TOKIO; @@ -19,10 +18,10 @@ use delta_kernel::table_properties::DataSkippingNumIndexedCols; use deltalake::DeltaTable; use deltalake::kernel::transaction::{CommitBuilder, TableReference}; use deltalake::kernel::{Action, Add, DataType, StructField}; +use deltalake::logstore::ObjectStoreRef; use deltalake::operations::create::CreateBuilder; use deltalake::operations::write::writer::{DeltaWriter, WriterConfig}; use deltalake::protocol::{DeltaOperation, SaveMode}; -use feldera_types::program_schema::SqlIdentifier; use feldera_types::serde_with_context::serde_config::{ BinaryFormat, DecimalFormat, UuidFormat, VariantFormat, }; @@ -37,10 +36,8 @@ use serde_arrow::ArrayBuilder; use serde_arrow::schema::SerdeArrowSchema; use std::cmp::min; use std::sync::{Arc, Weak}; -use std::thread; -use tokio::sync::mpsc::{Receiver, Sender, channel}; use tokio::time::{Duration, sleep}; -use tracing::{info, trace, warn}; +use tracing::{Instrument, info, info_span, warn}; /// Arrow serde config for reading/writing Delta tables. pub const fn delta_arrow_serde_config() -> &'static SqlSerdeConfig { @@ -69,20 +66,15 @@ struct DeltaTableWriterInner { pub struct DeltaTableWriter { inner: Arc, - command_sender: Sender, - response_receiver: Receiver>, + object_store: ObjectStoreRef, + task: WriterTask, + threads: usize, + pending_actions: Vec, + num_rows: usize, } /// Limit on the number of records buffered in memory in the encoder. -static CHUNK_SIZE: usize = 100_000; - -/// Commands sent to the tokio runtime that performs the actual -/// delta table operations. -enum Command { - BatchStart, - Insert(RecordBatch), - BatchEnd, -} +const CHUNK_SIZE: usize = 100_000; impl DeltaTableWriter { pub fn new( @@ -93,6 +85,22 @@ impl DeltaTableWriter { value_schema: &Relation, controller: Weak, ) -> Result { + config.validate().map_err(|e| { + ControllerError::invalid_transport_configuration(endpoint_name, &e.to_string()) + })?; + + let threads = config.threads.unwrap_or(1); + + if threads > 1 && key_schema.is_none() { + return Err(ControllerError::invalid_transport_configuration( + endpoint_name, + "Parallel writes (threads > 1) require the view to have a unique key to \ + ensure correct ordering of inserts and deletes. Please specify the `index` \ + property in the connector configuration. For more details, see: \ + https://docs.feldera.com/connectors/unique_keys", + )); + } + register_storage_handlers(); // Create arrow schema @@ -134,126 +142,38 @@ impl DeltaTableWriter { value_schema: value_schema.clone(), controller, }); - let inner_clone = inner.clone(); - - let (command_sender, command_receiver) = channel::(1); - let (response_sender, mut response_receiver) = channel::>(1); - - // Start tokio runtime. - thread::Builder::new() - .name(format!("{endpoint_name}-delta-output-tokio-wrapper")) - .spawn(move || { - TOKIO.block_on(async { - let _ = Self::worker_task(inner_clone, command_receiver, response_sender).await; - }) - }) - .expect("failed to spawn output delta connector tokio wrapper thread"); - - response_receiver - .blocking_recv() - .ok_or_else(|| { + // Create or open the delta table. + // Panic safety: block_on() panics if called from a tokio async context. + // new() is called from sync controller code (connect_output), so this is fine. + let task = TOKIO + .block_on(WriterTask::create(inner.clone())) + .map_err(|e| { ControllerError::output_transport_error( endpoint_name, true, - anyhow!("worker thread terminated unexpectedly during initialization"), + anyhow!( + "error creating or opening delta table '{}': {e}", + &config.uri + ), ) - })? - .map_err(|(e, _)| ControllerError::output_transport_error(endpoint_name, true, e))?; - - let writer = Self { - inner, - command_sender, - response_receiver, - }; - - Ok(writer) - } - - fn view_name(&self) -> &SqlIdentifier { - &self.inner.value_schema.name - } - - fn command(&mut self, command: Command) -> Result<(), (AnyError, bool)> { - self.command_sender - .blocking_send(command) - .map_err(|_| (anyhow!("worker thread terminated unexpectedly"), true))?; - self.response_receiver - .blocking_recv() - .ok_or_else(|| (anyhow!("worker thread terminated unexpectedly"), true))? - } - - fn insert_record_batch(&mut self, builder: &mut ArrayBuilder) -> AnyResult<()> { - let batch = builder - .to_record_batch() - .map_err(|e| anyhow!("error generating arrow arrays: {e}"))?; - self.command(Command::Insert(batch)) - .map_err(|(e, _fatal)| e) - } + })?; - async fn worker_task( - inner: Arc, - mut command_receiver: Receiver, - response_sender: Sender>, - ) { - let mut task = match WriterTask::create(inner.clone()).await { - Ok(task) => { - let _ = response_sender.send(Ok(())).await; - task - } - Err(e) => { - let _ = response_sender - .send(Err(( - anyhow!( - "error creating or opening delta table '{}': {e}", - &inner.config.uri - ), - false, - ))) - .await; - return; - } - }; + let object_store = task.delta_table.object_store(); - loop { - match command_receiver.recv().await { - Some(Command::BatchStart) => { - task.batch_start().await; - // Ignore closed channel, we'll handle it at the next loop iteration. - let _ = response_sender.send(Ok(())).await; - } - Some(Command::BatchEnd) => match task.batch_end().await { - Ok(()) => { - let _ = response_sender.send(Ok(())).await; - } - Err(e) => { - let _ = response_sender.send(Err((e, false))).await; - } - }, - Some(Command::Insert(batch)) => match task.insert(batch).await { - Ok(()) => { - let _ = response_sender.send(Ok(())).await; - } - Err(e) => { - let _ = response_sender.send(Err((e, false))).await; - } - }, - None => { - trace!( - "delta_table {}: endpoint is shutting down", - &inner.endpoint_name - ); - return; - } - } - } + Ok(Self { + inner, + object_store, + task, + threads, + pending_actions: Vec::new(), + num_rows: 0, + }) } } struct WriterTask { inner: Arc, delta_table: DeltaTable, - writer: Option, - num_rows: usize, } /// Retry `op` with exponential backoff of up to 10 seconds until it succeeds or config.max_retries is reached. @@ -421,41 +341,7 @@ impl WriterTask { } ); - Ok(Self { - inner, - delta_table, - writer: None, - num_rows: 0, - }) - } - - async fn batch_start(&mut self) { - trace!( - "delta_table {}: starting a new output batch", - &self.inner.endpoint_name, - ); - - self.num_rows = 0; - - // TODO: make target_file_size configurable. - // TODO: configure WriterProperties, e.g., do we want to set WriterProperties::sorting_columns? - let writer_config = WriterConfig::new( - self.inner.arrow_schema.clone(), - vec![], - None, - None, - None, - DataSkippingNumIndexedCols::NumColumns(min( - 32, - self.inner.arrow_schema.fields.len() as u64, - )), - None, - ); - - self.writer = Some(DeltaWriter::new( - self.delta_table.object_store(), - writer_config, - )); + Ok(Self { inner, delta_table }) } async fn commit(&mut self, actions: &[Add]) -> AnyResult<()> { @@ -501,77 +387,246 @@ impl WriterTask { Ok(()) } - async fn batch_end(&mut self) -> AnyResult<()> { - trace!( - "delta_table {}: finished writing output records, committing (current table version: {})", - &self.inner.endpoint_name, - self.current_version() - ); - match self.writer.take() { - Some(writer) => { - // TODO: this is currently not retryable. - // See: https://github.com/delta-io/delta-rs/issues/4265 - // Another option is to retry the entire transaction, since it just happens that there's always exactly one - // insert between batch_start and batch_end, so we can hold on to it and retry the whole thing. - let actions = writer - .close() - .await - .map_err(|e| anyhow!("error flushing {} Parquet rows: {e:?}", self.num_rows))?; - - if actions.is_empty() { - return Ok(()); - } - - let num_bytes = actions.iter().map(|action| action.size as usize).sum(); + async fn commit_with_retry(&mut self, actions: &[Add]) -> AnyResult<()> { + retry!( + self, + "committing Delta table transaction", + self.commit(actions).await + ) + } +} - retry!( - self, - "committing Delta table transaction", - self.commit(&actions).await - )?; +/// Error classification for Delta table write operations. +/// +/// Separates deterministic failures (which will recur on every attempt) from +/// transient I/O failures (which may succeed on retry). +enum WriteError { + /// Data-dependent error that will recur identically on retry. + /// Examples: non-unique keys, schema mismatches, serialization failures. + Deterministic(anyhow::Error), + /// Transient I/O error that may resolve on retry. + /// Examples: object store timeouts, network failures. + Transient(anyhow::Error), +} - if let Some(controller) = self.inner.controller.upgrade() { - controller.status.output_buffer( - self.inner.endpoint_id, - num_bytes, - self.num_rows, - ) - }; - Ok(()) +/// Encode a key range and stream-write it to a `DeltaWriter`, retrying transient failures. +/// +/// On retry, a fresh cursor is rebuilt from `cursor_builder` and a new `DeltaWriter` +/// is created. Any Parquet files written by a failed attempt become orphans that +/// Delta `VACUUM` will clean up. +/// +/// Only transient I/O errors are retried; deterministic errors (e.g., non-unique keys, +/// serialization failures) are returned immediately. +async fn encode_and_write_range( + cursor_builder: SplitCursorBuilder, + inner: Arc, + object_store: ObjectStoreRef, + micros: i64, +) -> AnyResult<(Vec, usize)> { + // This function has its own retry loop instead of using the `retry!` macro because: + // Multiple ranges run in parallel; the `retry!` macro clears the connector + // health status on success, which would be incorrect here; a single range + // succeeding must not mask failures in other ranges. + let mut retry_count: u32 = 0; + let mut backoff = Duration::from_secs(1); + let max_backoff = Duration::from_secs(10); + + loop { + match stream_encode_and_write(&cursor_builder, &inner, object_store.clone(), micros).await { + Ok((ref actions, rows)) => { + if retry_count > 0 { + info!( + "delta_table {}: Delta table write succeeded after {retry_count} retries ({rows} rows, {} files)", + inner.endpoint_name, + actions.len(), + ); + } + return Ok((actions.clone(), rows)); } - _ => { - bail!( - "delta_table {}: received a BatchEnd without a matching BatchStart", - &self.inner.endpoint_name - ) + Err(WriteError::Deterministic(e)) => { + return Err(e); + } + Err(WriteError::Transient(e)) + if inner.config.max_retries.is_none() + || retry_count < inner.config.max_retries.unwrap() => + { + retry_count += 1; + let message = format!( + "Delta table write failed (attempt {retry_count}, retrying in {backoff:?}): {e:?}" + ); + if let Some(controller) = inner.controller.upgrade() { + controller.update_output_connector_health( + inner.endpoint_id, + ConnectorHealth::unhealthy(&message), + ); + } + warn!("delta_table {}: {message}", inner.endpoint_name); + sleep(backoff).await; + backoff = std::cmp::min(backoff * 2, max_backoff); + } + Err(WriteError::Transient(e)) => { + return Err(anyhow!( + "Delta table write failed after {retry_count} retries: {e}" + )); } } } +} - async fn insert(&mut self, batch: RecordBatch) -> AnyResult<()> { - let current_version = self.current_version(); +/// Build a `RecordBatch` from `builder` (deterministic), write it via `writer` (transient I/O), +/// and accumulate the row count in `total_rows`. +async fn flush_chunk( + builder: &mut ArrayBuilder, + writer: &mut DeltaWriter, + total_rows: &mut usize, +) -> Result<(), WriteError> { + let batch = builder + .to_record_batch() + .map_err(|e| WriteError::Deterministic(anyhow!("error generating arrow arrays: {e}")))?; + *total_rows += batch.num_rows(); + writer.write(&batch).await.map_err(|e| { + WriteError::Transient(anyhow!("error writing {} records: {e:?}", batch.num_rows())) + })?; + Ok(()) +} - if let Some(writer) = &mut self.writer { - self.num_rows += batch.num_rows(); - trace!( - "delta_table {}: writing {} records", - &self.inner.endpoint_name, self.num_rows, - ); +/// Single-attempt streaming encode + write for one key range. +/// +/// Encodes records from the cursor in chunks of `CHUNK_SIZE` and writes each chunk +/// to the `DeltaWriter` immediately, avoiding buffering all `RecordBatch`es in memory. +/// +/// Returns [`WriteError::Deterministic`] for data-dependent failures (serialization, +/// validation) and [`WriteError::Transient`] for I/O failures (object store writes). +async fn stream_encode_and_write( + cursor_builder: &SplitCursorBuilder, + inner: &DeltaTableWriterInner, + object_store: ObjectStoreRef, + micros: i64, +) -> Result<(Vec, usize), WriteError> { + let num_indexed_cols = min(32, inner.arrow_schema.fields.len() as u64); + let writer_config = WriterConfig::new( + inner.arrow_schema.clone(), + vec![], + None, + None, + None, + DataSkippingNumIndexedCols::NumColumns(num_indexed_cols), + None, + ); + let mut writer = DeltaWriter::new(object_store, writer_config); + let mut insert_builder = ArrayBuilder::new(inner.serde_arrow_schema.clone()) + .map_err(|e| WriteError::Deterministic(anyhow!("error creating array builder: {e}")))?; + let mut num_records = 0; + let mut total_rows = 0; + let index_name = inner.key_schema.as_ref().map(|s| &s.name); + + if let Some(index_name) = index_name { + let mut cursor = cursor_builder.build(); + + while cursor.key_valid() { + let op = indexed_operation_type(&inner.value_schema.name, index_name, &mut cursor) + .map_err(WriteError::Deterministic)?; + + if let Some(op) = op { + cursor.rewind_vals(); + + match op { + IndexedOperationType::Insert => cursor + .serialize_val_to_arrow_with_metadata( + &Meta::new("i", micros), + &mut insert_builder, + ) + .map_err(WriteError::Deterministic)?, + IndexedOperationType::Delete => cursor + .serialize_val_to_arrow_with_metadata( + &Meta::new("d", micros), + &mut insert_builder, + ) + .map_err(WriteError::Deterministic)?, + IndexedOperationType::Upsert => { + assert!(cursor.val_valid()); - let description = format!( - "writing {} records to Delta table (current version: {current_version})", - batch.num_rows(), - ); - retry!(self, description, { writer.write(&batch).await })?; + if cursor.weight() < 0 { + cursor.step_val(); + } + assert!(cursor.val_valid()); - Ok(()) - } else { - bail!( - "delta_table {}: received Data without a matching BatchStart", - &self.inner.endpoint_name - ); + cursor + .serialize_val_to_arrow_with_metadata( + &Meta::new("u", micros), + &mut insert_builder, + ) + .map_err(WriteError::Deterministic)?; + } + }; + + num_records += 1; + + if num_records >= CHUNK_SIZE { + flush_chunk(&mut insert_builder, &mut writer, &mut total_rows).await?; + num_records = 0; + } + }; + + cursor.step_key(); + } + } else { + let cursor = cursor_builder.build(); + let mut cursor = CursorWithPolarity::new(Box::new(cursor)); + + while cursor.key_valid() { + if !cursor.val_valid() { + cursor.step_key(); + continue; + } + + let mut w = cursor.weight(); + if !(-MAX_DUPLICATES..=MAX_DUPLICATES).contains(&w) { + return Err(WriteError::Deterministic(anyhow!( + "Unable to output record with very large weight {w}. \ + Consider adjusting your SQL queries to avoid duplicate output records, \ + e.g., using 'SELECT DISTINCT'." + ))); + } + + while w != 0 { + if w > 0 { + cursor + .serialize_key_to_arrow_with_metadata( + &Meta::new("i", micros), + &mut insert_builder, + ) + .map_err(WriteError::Deterministic)?; + w -= 1; + } else { + cursor + .serialize_key_to_arrow_with_metadata( + &Meta::new("d", micros), + &mut insert_builder, + ) + .map_err(WriteError::Deterministic)?; + w += 1; + } + num_records += 1; + + if num_records >= CHUNK_SIZE { + flush_chunk(&mut insert_builder, &mut writer, &mut total_rows).await?; + num_records = 0; + } + } + cursor.step_key(); } } + + if num_records > 0 { + flush_chunk(&mut insert_builder, &mut writer, &mut total_rows).await?; + } + + let actions = writer + .close() + .await + .map_err(|e| WriteError::Transient(anyhow!("error closing writer: {e:?}")))?; + Ok((actions, total_rows)) } impl OutputConsumer for DeltaTableWriter { @@ -580,18 +635,8 @@ impl OutputConsumer for DeltaTableWriter { } fn batch_start(&mut self, _step: Step) { - self.command(Command::BatchStart) - .unwrap_or_else(|(e, fatal)| { - if let Some(controller) = self.inner.controller.upgrade() { - controller.output_transport_error( - self.inner.endpoint_id, - &self.inner.endpoint_name, - fatal, - e, - Some("delta_batch_start"), - ) - }; - }); + self.pending_actions.clear(); + self.num_rows = 0; } fn push_buffer(&mut self, _buffer: &[u8], _num_records: usize) { @@ -609,18 +654,44 @@ impl OutputConsumer for DeltaTableWriter { } fn batch_end(&mut self) { - self.command(Command::BatchEnd) - .unwrap_or_else(|(e, fatal)| { - if let Some(controller) = self.inner.controller.upgrade() { - controller.output_transport_error( - self.inner.endpoint_id, - &self.inner.endpoint_name, - fatal, - e, - Some("delta_batch_end"), - ) - }; - }); + if self.pending_actions.is_empty() { + return; + } + + let _span = info_span!( + "delta_output", + endpoint = &*self.inner.endpoint_name, + table = &*self.inner.config.uri, + ) + .entered(); + + let num_bytes: usize = self.pending_actions.iter().map(|a| a.size as usize).sum(); + let num_rows = self.num_rows; + let actions = std::mem::take(&mut self.pending_actions); + self.num_rows = 0; + + // Panic safety: block_on() panics if called from a tokio async context. + // batch_end() is called from the dedicated output thread (output_thread_func). + if let Err(e) = TOKIO.block_on(self.task.commit_with_retry(&actions)) { + if let Some(controller) = self.inner.controller.upgrade() { + controller.output_transport_error( + self.inner.endpoint_id, + &self.inner.endpoint_name, + false, + e, + Some("delta_batch_end"), + ) + }; + return; + } + + if let Some(controller) = self.inner.controller.upgrade() { + controller + .update_output_connector_health(self.inner.endpoint_id, ConnectorHealth::healthy()); + controller + .status + .output_buffer(self.inner.endpoint_id, num_bytes, num_rows); + } } } @@ -650,108 +721,89 @@ impl Encoder for DeltaTableWriter { } fn encode(&mut self, batch: Arc) -> AnyResult<()> { - let micros = Utc::now().timestamp_micros(); - let mut insert_builder = ArrayBuilder::new(self.inner.serde_arrow_schema.clone())?; - - let mut num_insert_records = 0; - - let index_name = &self.inner.key_schema.as_ref().map(|s| s.name.to_owned()); - - if let Some(index_name) = &index_name { - let mut cursor = - batch.cursor(RecordFormat::Parquet(delta_arrow_serde_config().clone()))?; - - while cursor.key_valid() { - if let Some(op) = - indexed_operation_type(self.view_name(), index_name, cursor.as_mut())? - { - cursor.rewind_vals(); - - match op { - IndexedOperationType::Insert => cursor - .serialize_val_to_arrow_with_metadata( - &Meta::new("i", micros), - &mut insert_builder, - )?, - IndexedOperationType::Delete => cursor - .serialize_val_to_arrow_with_metadata( - &Meta::new("d", micros), - &mut insert_builder, - )?, - IndexedOperationType::Upsert => { - assert!(cursor.val_valid()); - - if cursor.weight() < 0 { - cursor.step_val(); - } - assert!(cursor.val_valid()); - - cursor.serialize_val_to_arrow_with_metadata( - &Meta::new("u", micros), - &mut insert_builder, - )?; - } - }; + let threads = self.threads; + let mut bounds = batch.keys_factory().default_box(); + batch.partition_keys(threads, &mut *bounds); + + let mut cursor_builders = Vec::new(); + for i in 0..=bounds.len() { + let Some(cb) = SplitCursorBuilder::from_bounds( + batch.clone(), + &*bounds, + i, + RecordFormat::Parquet(delta_arrow_serde_config().clone()), + ) else { + continue; + }; + cursor_builders.push(cb); + } + if cursor_builders.is_empty() { + return Ok(()); + } - num_insert_records += 1; + let micros = Utc::now().timestamp_micros(); - // Split batch into chunks. This does not affect the number or size of generated - // parquet files, since that is controlled by the `DeltaWriter`, but it limits - // the amount of memory used by `builder`. - if num_insert_records >= CHUNK_SIZE { - self.insert_record_batch(&mut insert_builder)?; - num_insert_records = 0; - } - }; + let span = info_span!( + "delta_output", + endpoint = &*self.inner.endpoint_name, + table = &*self.inner.config.uri, + ); - cursor.step_key(); + // Panic safety: block_on() panics if called from a tokio async context. + // encode() is called from the dedicated output thread (output_thread_func). + let results = TOKIO.block_on(async { + let mut handles = Vec::with_capacity(cursor_builders.len()); + for cursor_builder in cursor_builders { + let inner = self.inner.clone(); + let object_store = self.object_store.clone(); + handles.push(tokio::spawn( + encode_and_write_range(cursor_builder, inner, object_store, micros) + .instrument(span.clone()), + )); } - } else { - let mut cursor = CursorWithPolarity::new( - batch.cursor(RecordFormat::Parquet(delta_arrow_serde_config().clone()))?, - ); - while cursor.key_valid() { - if !cursor.val_valid() { - cursor.step_key(); - continue; - } + let mut results = Vec::with_capacity(handles.len()); + for handle in handles { + results.push( + handle + .await + .unwrap_or_else(|e| Err(anyhow!("write task panicked: {e}"))), + ); + } + results + }); - let mut w = cursor.weight(); - if !(-MAX_DUPLICATES..=MAX_DUPLICATES).contains(&w) { - bail!( - "Unable to output record with very large weight {w}. Consider adjusting your SQL queries to avoid duplicate output records, e.g., using 'SELECT DISTINCT'." - ); + let mut errors = Vec::new(); + let mut succeeded_ranges = 0usize; + for result in results { + match result { + Ok((mut actions, rows)) => { + self.pending_actions.append(&mut actions); + self.num_rows += rows; + succeeded_ranges += 1; } - - while w != 0 { - if w > 0 { - cursor.serialize_key_to_arrow_with_metadata( - &Meta::new("i", micros), - &mut insert_builder, - )?; - w -= 1; - } else { - cursor.serialize_key_to_arrow_with_metadata( - &Meta::new("d", micros), - &mut insert_builder, - )?; - w += 1; - } - num_insert_records += 1; - // Split batch into chunks. This does not affect the number or size of generated - // parquet files, since that is controlled by the `DeltaWriter`, but it limits - // the amount of memory used by `builder`. - if num_insert_records >= CHUNK_SIZE { - self.insert_record_batch(&mut insert_builder)?; - num_insert_records = 0; - } - } - cursor.step_key(); + Err(e) => errors.push(e), } } - - if num_insert_records > 0 { - self.insert_record_batch(&mut insert_builder)?; + if !errors.is_empty() { + if succeeded_ranges > 0 { + warn!( + "delta_table {}: {} range(s) succeeded but {} failed; \ + dropping {} file action(s) from this commit (orphaned files will be cleaned up by VACUUM)", + self.inner.endpoint_name, + succeeded_ranges, + errors.len(), + self.pending_actions.len(), + ); + } + self.pending_actions.clear(); + self.num_rows = 0; + let msg = errors + .iter() + .map(|e| format!("{e:#}")) + .collect::>() + .join("; "); + + bail!("{} write task(s) failed: {msg}", errors.len()); } Ok(()) @@ -795,3 +847,593 @@ impl OutputEndpoint for DeltaTableWriter { false } } + +#[cfg(test)] +mod parallel { + use std::collections::BTreeMap; + use std::ffi::OsStr; + use std::os::unix::ffi::OsStrExt; + use std::os::unix::fs::PermissionsExt; + use std::path::Path; + use std::sync::{Arc, Weak}; + + use dbsp::utils::Tup2; + use dbsp::{OrdIndexedZSet, OrdZSet}; + use feldera_sqllib::{ + ByteArray, Date, F32, F64, SqlDecimal, SqlString, Timestamp, Uuid, Variant, + }; + use feldera_types::deserialize_table_record; + use feldera_types::program_schema::{ColumnType, Relation, SqlIdentifier}; + use feldera_types::transport::delta_table::{DeltaTableWriteMode, DeltaTableWriterConfig}; + use tempfile::TempDir; + + use crate::catalog::SerBatch; + use crate::controller::EndpointId; + use crate::format::Encoder; + use crate::format::parquet::test::load_parquet_file; + use crate::static_compile::seroutput::SerBatchImpl; + use crate::test::data::{DeltaTestKey, DeltaTestStruct, TestStruct}; + use crate::test::list_files_recursive; + + use super::DeltaTableWriter; + + // ── Output record type (DeltaTestStruct fields + metadata columns) ── + + #[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone)] + struct OutputRecord { + bigint: i64, + binary: ByteArray, + boolean: bool, + date: Date, + decimal_10_3: SqlDecimal<10, 3>, + double: F64, + float: F32, + int: i32, + smallint: i16, + string: String, + unused: Option, + timestamp_ntz: Timestamp, + tinyint: i8, + string_array: Vec, + struct1: TestStruct, + struct_array: Vec, + string_string_map: BTreeMap, + string_struct_map: BTreeMap, + variant: Variant, + uuid: Uuid, + __feldera_op: String, + __feldera_ts: i64, + } + + deserialize_table_record!(OutputRecord["OutputRecord", Variant, 22] { + (bigint, "bigint", false, i64, |_| None), + (binary, "binary", false, ByteArray, |_| None), + (boolean, "boolean", false, bool, |_| None), + (date, "date", false, Date, |_| None), + (decimal_10_3, "decimal_10_3", false, SqlDecimal<10, 3>, |_| None), + (double, "double", false, F64, |_| None), + (float, "float", false, F32, |_| None), + (int, "int", false, i32, |_| None), + (smallint, "smallint", false, i16, |_| None), + (string, "string", false, String, |_| None), + (unused, "unused", false, Option, |_| Some(None)), + (timestamp_ntz, "timestamp_ntz", false, Timestamp, |_| None), + (tinyint, "tinyint", false, i8, |_| None), + (string_array, "string_array", false, Vec, |_| None), + (struct1, "struct1", false, TestStruct, |_| None), + (struct_array, "struct_array", false, Vec, |_| None), + (string_string_map, "string_string_map", false, BTreeMap, |_| None), + (string_struct_map, "string_struct_map", false, BTreeMap, |_| None), + (variant, "variant", false, Variant, |_| None), + (uuid, "uuid", false, Uuid, |_| None), + (__feldera_op, "__feldera_op", false, String, |_| None), + (__feldera_ts, "__feldera_ts", false, i64, |_| None) + }); + + impl OutputRecord { + fn to_data_record(&self) -> DeltaTestStruct { + DeltaTestStruct { + bigint: self.bigint, + binary: self.binary.clone(), + boolean: self.boolean, + date: self.date, + decimal_10_3: self.decimal_10_3, + double: self.double, + float: self.float, + int: self.int, + smallint: self.smallint, + string: self.string.clone(), + unused: self.unused.clone(), + timestamp_ntz: self.timestamp_ntz, + tinyint: self.tinyint, + string_array: self.string_array.clone(), + struct1: self.struct1.clone(), + struct_array: self.struct_array.clone(), + string_string_map: self.string_string_map.clone(), + string_struct_map: self.string_struct_map.clone(), + variant: self.variant.clone(), + uuid: self.uuid.clone(), + } + } + } + + // ── Helpers ──────────────────────────────────────────────────── + + fn key_relation() -> Relation { + Relation { + name: SqlIdentifier::new("test_idx", false), + fields: vec![feldera_types::program_schema::Field::new( + "bigint".into(), + ColumnType::bigint(false), + )], + materialized: false, + properties: BTreeMap::new(), + } + } + + fn value_relation() -> Relation { + let mut rel = DeltaTestStruct::relation_schema(); + rel.materialized = true; + rel + } + + fn make_endpoint(threads: usize, table_uri: &str, indexed: bool) -> DeltaTableWriter { + let key_schema = if indexed { Some(key_relation()) } else { None }; + DeltaTableWriter::new( + EndpointId::default(), + "test_endpoint", + &DeltaTableWriterConfig { + uri: table_uri.to_string(), + mode: DeltaTableWriteMode::Truncate, + max_retries: Some(0), + threads: Some(threads), + object_store_config: Default::default(), + }, + &key_schema, + &value_relation(), + Weak::new(), + ) + .expect("failed to create endpoint") + } + + fn build_insert_batch(records: &[DeltaTestStruct]) -> Arc { + let tuples: Vec<_> = records + .iter() + .map(|r| Tup2(Tup2(DeltaTestKey { bigint: r.bigint }, r.clone()), 1i64)) + .collect(); + let zset = OrdIndexedZSet::from_tuples((), tuples); + Arc::new(SerBatchImpl::<_, DeltaTestKey, DeltaTestStruct>::new(zset)) + } + + fn build_delete_batch(records: &[DeltaTestStruct]) -> Arc { + let tuples: Vec<_> = records + .iter() + .map(|r| Tup2(Tup2(DeltaTestKey { bigint: r.bigint }, r.clone()), -1i64)) + .collect(); + let zset = OrdIndexedZSet::from_tuples((), tuples); + Arc::new(SerBatchImpl::<_, DeltaTestKey, DeltaTestStruct>::new(zset)) + } + + fn build_upsert_batch(updates: &[(DeltaTestStruct, DeltaTestStruct)]) -> Arc { + let mut tuples = Vec::new(); + for (old, new) in updates { + assert_eq!(old.bigint, new.bigint); + tuples.push(Tup2( + Tup2(DeltaTestKey { bigint: old.bigint }, old.clone()), + -1i64, + )); + tuples.push(Tup2( + Tup2(DeltaTestKey { bigint: new.bigint }, new.clone()), + 1i64, + )); + } + let zset = OrdIndexedZSet::from_tuples((), tuples); + Arc::new(SerBatchImpl::<_, DeltaTestKey, DeltaTestStruct>::new(zset)) + } + + fn build_non_indexed_batch(records: &[DeltaTestStruct], weight: i64) -> Arc { + let tuples: Vec<_> = records.iter().map(|r| Tup2(r.clone(), weight)).collect(); + let zset = OrdZSet::from_keys((), tuples); + Arc::new(SerBatchImpl::<_, DeltaTestStruct, ()>::new(zset)) + } + + fn encode_batch(endpoint: &mut DeltaTableWriter, batch: &Arc) { + endpoint.consumer().batch_start(0); + endpoint + .encode(batch.clone().arc_as_batch_reader()) + .unwrap(); + endpoint.consumer().batch_end(); + } + + fn read_output(table_uri: &str) -> Vec { + let parquet_files = + list_files_recursive(Path::new(table_uri), OsStr::from_bytes(b"parquet")).unwrap(); + let mut records = Vec::new(); + for path in parquet_files { + let mut batch: Vec = load_parquet_file(&path); + records.append(&mut batch); + } + records + } + + fn make_record(i: usize) -> DeltaTestStruct { + DeltaTestStruct { + bigint: i as i64, + binary: ByteArray::from_vec(vec![i as u8, (i >> 8) as u8]), + boolean: i % 2 == 0, + date: Date::from_days(i as i32 % 100_000), + decimal_10_3: SqlDecimal::<10, 3>::new((i as i128 % 1_000_000) * 1000, 3).unwrap(), + double: F64::new((i as f64).trunc()), + float: F32::new((i as f32).trunc()), + int: i as i32, + smallint: (i % 32000) as i16, + string: format!("record_{i}"), + unused: if i % 3 == 0 { + None + } else { + Some(format!("unused_{i}")) + }, + timestamp_ntz: Timestamp::from_milliseconds(1704070800000 + i as i64 * 1000), + tinyint: (i % 120) as i8, + string_array: vec![format!("arr_{i}")], + struct1: TestStruct { + id: i as u32, + b: i % 2 == 0, + i: Some(i as i64), + s: format!("s_{i}"), + }, + struct_array: vec![TestStruct { + id: i as u32, + b: false, + i: None, + s: format!("sa_{i}"), + }], + string_string_map: BTreeMap::from([(format!("key_{i}"), format!("val_{i}"))]), + string_struct_map: BTreeMap::from([( + format!("sk_{i}"), + TestStruct { + id: i as u32, + b: true, + i: Some(i as i64 * 2), + s: format!("sm_{i}"), + }, + )]), + variant: Variant::Map( + std::iter::once(( + Variant::String(SqlString::from_ref("foo")), + Variant::String(SqlString::from(i.to_string())), + )) + .collect::>() + .into(), + ), + uuid: Uuid::from_bytes([i as u8; 16]), + } + } + + fn make_records(n: usize) -> Vec { + (0..n).map(make_record).collect() + } + + // ── Tests ────────────────────────────────────────────────────── + + fn insert_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let records = make_records(100); + let batch = build_insert_batch(&records); + let mut endpoint = make_endpoint(threads, &table_uri, true); + + encode_batch(&mut endpoint, &batch); + + let output = read_output(&table_uri); + assert_eq!(output.len(), 100); + for rec in &output { + assert_eq!(rec.__feldera_op, "i"); + } + // Verify data fields match + let mut output_data: Vec = + output.iter().map(|r| r.to_data_record()).collect(); + output_data.sort(); + let mut expected = records.clone(); + expected.sort(); + assert_eq!(output_data, expected); + } + + #[test] + fn test_insert_single_thread() { + insert_test(1); + } + + #[test] + fn test_insert_multi_thread() { + insert_test(4); + } + + fn upsert_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let records = make_records(50); + let insert_batch = build_insert_batch(&records); + let mut endpoint = make_endpoint(threads, &table_uri, true); + + encode_batch(&mut endpoint, &insert_batch); + + // Upsert: update records 0..10 + let updates: Vec<_> = (0..10) + .map(|i| { + let old = records[i].clone(); + let new = DeltaTestStruct { + boolean: !old.boolean, + int: old.int + 1000, + string: format!("updated_{}", old.bigint), + ..old.clone() + }; + (old, new) + }) + .collect(); + let upsert_batch = build_upsert_batch(&updates); + encode_batch(&mut endpoint, &upsert_batch); + + let output = read_output(&table_uri); + // First batch: 50 inserts, second batch: 10 upserts + let inserts: Vec<_> = output.iter().filter(|r| r.__feldera_op == "i").collect(); + let upserts: Vec<_> = output.iter().filter(|r| r.__feldera_op == "u").collect(); + assert_eq!(inserts.len(), 50); + assert_eq!(upserts.len(), 10); + } + + #[test] + fn test_upsert_single_thread() { + upsert_test(1); + } + + #[test] + fn test_upsert_multi_thread() { + upsert_test(4); + } + + fn delete_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let records = make_records(50); + let insert_batch = build_insert_batch(&records); + let mut endpoint = make_endpoint(threads, &table_uri, true); + + encode_batch(&mut endpoint, &insert_batch); + + // Delete records 0..10 + let to_delete: Vec<_> = records[0..10].to_vec(); + let delete_batch = build_delete_batch(&to_delete); + encode_batch(&mut endpoint, &delete_batch); + + let output = read_output(&table_uri); + let inserts: Vec<_> = output.iter().filter(|r| r.__feldera_op == "i").collect(); + let deletes: Vec<_> = output.iter().filter(|r| r.__feldera_op == "d").collect(); + assert_eq!(inserts.len(), 50); + assert_eq!(deletes.len(), 10); + } + + #[test] + fn test_delete_single_thread() { + delete_test(1); + } + + #[test] + fn test_delete_multi_thread() { + delete_test(4); + } + + fn non_indexed_insert_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let records = make_records(100); + let batch = build_non_indexed_batch(&records, 1); + let mut endpoint = make_endpoint(threads, &table_uri, false); + + encode_batch(&mut endpoint, &batch); + + let output = read_output(&table_uri); + assert_eq!(output.len(), 100); + for rec in &output { + assert_eq!(rec.__feldera_op, "i"); + } + let mut output_data: Vec = + output.iter().map(|r| r.to_data_record()).collect(); + output_data.sort(); + let mut expected = records; + expected.sort(); + assert_eq!(output_data, expected); + } + + #[test] + fn test_non_indexed_insert_single_thread() { + non_indexed_insert_test(1); + } + + #[test] + fn test_non_indexed_rejects_multi_thread() { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + let key_schema = None; + let result = DeltaTableWriter::new( + EndpointId::default(), + "test_endpoint", + &DeltaTableWriterConfig { + uri: table_uri, + mode: DeltaTableWriteMode::Truncate, + max_retries: Some(0), + threads: Some(4), + object_store_config: Default::default(), + }, + &key_schema, + &value_relation(), + Weak::new(), + ); + assert!( + result.is_err(), + "threads > 1 without key_schema should be rejected" + ); + } + + fn empty_batch_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let batch = build_insert_batch(&[]); + let mut endpoint = make_endpoint(threads, &table_uri, true); + + // Should not crash on empty batch. + encode_batch(&mut endpoint, &batch); + + let output = read_output(&table_uri); + assert_eq!(output.len(), 0); + } + + #[test] + fn test_empty_batch_single_thread() { + empty_batch_test(1); + } + + #[test] + fn test_empty_batch_multi_thread() { + empty_batch_test(4); + } + + fn multiple_batches_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let mut endpoint = make_endpoint(threads, &table_uri, true); + + // Batch 1: insert 50 records + let records = make_records(50); + let insert_batch = build_insert_batch(&records); + encode_batch(&mut endpoint, &insert_batch); + + // Batch 2: insert 50 more records (ids 50..100) + let more_records: Vec = (50..100).map(make_record).collect(); + let insert_batch2 = build_insert_batch(&more_records); + encode_batch(&mut endpoint, &insert_batch2); + + // Batch 3: upsert records 0..5 + let updates: Vec<_> = (0..5) + .map(|i| { + let old = records[i].clone(); + let new = DeltaTestStruct { + boolean: !old.boolean, + int: old.int + 1000, + string: format!("updated_{}", old.bigint), + ..old.clone() + }; + (old, new) + }) + .collect(); + let upsert_batch = build_upsert_batch(&updates); + encode_batch(&mut endpoint, &upsert_batch); + + // Batch 4: delete records 90..100 + let to_delete: Vec<_> = more_records[40..50].to_vec(); + let delete_batch = build_delete_batch(&to_delete); + encode_batch(&mut endpoint, &delete_batch); + + let output = read_output(&table_uri); + let inserts = output.iter().filter(|r| r.__feldera_op == "i").count(); + let upserts = output.iter().filter(|r| r.__feldera_op == "u").count(); + let deletes = output.iter().filter(|r| r.__feldera_op == "d").count(); + + assert_eq!(inserts, 100); // 50 + 50 + assert_eq!(upserts, 5); + assert_eq!(deletes, 10); + } + + #[test] + fn test_multiple_batches_single_thread() { + multiple_batches_test(1); + } + + #[test] + fn test_multiple_batches_multi_thread() { + multiple_batches_test(4); + } + + // ── Failure scenario tests ──────────────────────────────────── + + /// Write to a read-only directory should fail with no retries. + #[test] + fn test_write_failure_readonly_dir() { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + // Create the endpoint first (needs writable dir to create the table). + let records = make_records(10); + let batch = build_insert_batch(&records); + let mut endpoint = make_endpoint(1, &table_uri, true); + + // Make directory read-only to trigger write failure. + std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o555)).unwrap(); + + endpoint.consumer().batch_start(0); + let result = endpoint.encode(batch.arc_as_batch_reader()); + + // Restore permissions before asserting (so TempDir cleanup succeeds). + std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o755)).unwrap(); + + assert!(result.is_err(), "write to read-only dir should fail"); + } + + /// Exhausting max_retries should propagate the error. + #[test] + fn test_retry_exhaustion() { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let records = make_records(10); + let batch = build_insert_batch(&records); + + // Create endpoint with max_retries=1. + let key_schema = Some(key_relation()); + let mut endpoint = DeltaTableWriter::new( + EndpointId::default(), + "test_endpoint", + &DeltaTableWriterConfig { + uri: table_uri.clone(), + mode: DeltaTableWriteMode::Truncate, + max_retries: Some(1), + threads: Some(1), + object_store_config: Default::default(), + }, + &key_schema, + &value_relation(), + Weak::new(), + ) + .expect("failed to create endpoint"); + + // Make directory read-only to trigger write failure. + std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o555)).unwrap(); + + endpoint.consumer().batch_start(0); + let result = endpoint.encode(batch.arc_as_batch_reader()); + + // Restore permissions. + std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o755)).unwrap(); + + assert!(result.is_err(), "should fail after exhausting retries"); + } + + /// Verify that threads=0 is rejected in config validation. + #[test] + fn test_threads_zero_rejected() { + let config = DeltaTableWriterConfig { + uri: "/tmp/test".to_string(), + mode: DeltaTableWriteMode::Truncate, + max_retries: Some(0), + threads: Some(0), + object_store_config: Default::default(), + }; + assert!(config.validate().is_err()); + } +} diff --git a/crates/adapters/src/integrated/delta_table/test.rs b/crates/adapters/src/integrated/delta_table/test.rs index 7f12d9baa8..ef9d574274 100644 --- a/crates/adapters/src/integrated/delta_table/test.rs +++ b/crates/adapters/src/integrated/delta_table/test.rs @@ -546,6 +546,7 @@ fn delta_table_output_test( table_uri: &str, object_store_config: &HashMap, verify: bool, + threads: Option, ) { init_logging(); @@ -570,12 +571,15 @@ fn delta_table_output_test( input_file.write_all(b"\n").unwrap(); } - let mut storage_options = object_store_config.clone(); - storage_options.insert("uri".into(), table_uri.into()); - storage_options.insert("mode".into(), "truncate".into()); + let mut storage_options: serde_json::Value = serde_json::to_value(object_store_config).unwrap(); + storage_options["uri"] = json!(table_uri); + storage_options["mode"] = json!("truncate"); + if let Some(threads) = threads { + storage_options["threads"] = json!(threads); + } println!( - "delta_table_output_test: {} records, input file: {}, table uri: {table_uri}", + "delta_table_output_test: {} records, input file: {}, table uri: {table_uri}, threads: {threads:?}", data.len(), input_file.path().display(), ); @@ -616,9 +620,11 @@ fn delta_table_output_test( let controller = Controller::with_test_config( |workers| { - Ok(test_circuit::( + Ok(test_circuit_with_index::( workers, &DeltaTestStruct::schema(), + &[SqlIdentifier::from("bigint")], + |x: &DeltaTestStruct| DeltaTestKey { bigint: x.bigint }, &[None], )) }, @@ -1593,7 +1599,7 @@ proptest! { // Uncomment to inspect output parquet files produced by the test. forget(table_dir); - delta_table_output_test(data.clone(), &table_uri, &HashMap::new(), true); + delta_table_output_test(data.clone(), &table_uri, &HashMap::new(), true, None); // Read delta table unordered. @@ -1754,8 +1760,8 @@ proptest! { let table_uri = format!("s3://feldera-delta-table-test/{uuid}/"); // TODO: enable verification when it's supported for S3. - delta_table_output_test(data.clone(), &table_uri, &object_store_config, false); - //delta_table_output_test(data.clone(), &table_uri, &object_store_config, false); + delta_table_output_test(data.clone(), &table_uri, &object_store_config, false, None); + //delta_table_output_test(data.clone(), &table_uri, &object_store_config, false, None); let mut json_file = delta_table_snapshot_to_json::( &table_uri, diff --git a/crates/feldera-types/src/transport/delta_table.rs b/crates/feldera-types/src/transport/delta_table.rs index ae77d38cc2..c75cf58be3 100644 --- a/crates/feldera-types/src/transport/delta_table.rs +++ b/crates/feldera-types/src/transport/delta_table.rs @@ -47,6 +47,16 @@ pub struct DeltaTableWriterConfig { #[serde(default, skip_serializing_if = "Option::is_none")] pub max_retries: Option, + /// Number of parallel threads used by the connector. + /// + /// Increasing this value can improve Delta Lake write throughput + /// by enabling concurrent writes. + /// + /// Default: 1. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[schema(minimum = 1)] + pub threads: Option, + /// Storage options for configuring backend object store. /// /// For specific options available for different storage backends, see: @@ -57,6 +67,15 @@ pub struct DeltaTableWriterConfig { pub object_store_config: HashMap, } +impl DeltaTableWriterConfig { + pub fn validate(&self) -> Result<(), String> { + if self.threads.is_some_and(|t| t == 0) { + return Err("threads must be greater than 0".to_string()); + } + Ok(()) + } +} + /// Delta table read mode. /// /// Three options are available: diff --git a/docs.feldera.com/docs/connectors/sinks/delta.md b/docs.feldera.com/docs/connectors/sinks/delta.md index 62b7de9c2c..d5c19bc643 100644 --- a/docs.feldera.com/docs/connectors/sinks/delta.md +++ b/docs.feldera.com/docs/connectors/sinks/delta.md @@ -77,6 +77,7 @@ MERGE INTO {target_table} AS target | | - `truncate`: Existing table at the specified location will be truncated. The connector achieves this by outputting delete actions for all files in the latest snapshot of the table. | | | - `error_if_exists`: If a table exists at the specified location, the operation will fail. | | `max_retries`|

Maximum number of retries for failed Delta Lake operations like writing Parquet files and committing transactions.

The connector performs retries on several levels: individual S3 operations, Delta Lake transaction commits, and overall operation retries. This setting controls the overall operation retries. When a write to the table fails, because of an S3 timeout or any other reason that was not resolved by lower-level retries, the connector will retry the entire operation.

When not specified, the connector performs infinite retries. When set to 0, the connector doesn't retry failed operations.

| +| `threads` | Number of parallel threads used by the connector. Increasing this value can improve Delta Lake write throughput by enabling concurrent writes. Default: `1`. | [*]: Required fields diff --git a/openapi.json b/openapi.json index d835becb4a..acddda4248 100644 --- a/openapi.json +++ b/openapi.json @@ -8007,6 +8007,12 @@ "mode": { "$ref": "#/components/schemas/DeltaTableWriteMode" }, + "threads": { + "type": "integer", + "description": "Number of parallel threads used by the connector.\n\nIncreasing this value can improve Delta Lake write throughput\nby enabling concurrent writes.\n\nDefault: 1.", + "nullable": true, + "minimum": 1 + }, "uri": { "type": "string", "description": "Table URI."