diff --git a/Cargo.lock b/Cargo.lock index 6ae3bbd6580..87207c90c78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3761,7 +3761,7 @@ dependencies = [ [[package]] name = "dbsp" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "arc-swap", @@ -3849,7 +3849,7 @@ dependencies = [ [[package]] name = "dbsp_adapters" -version = "0.277.0" +version = "0.278.0" dependencies = [ "actix", "actix-codec", @@ -3986,7 +3986,7 @@ dependencies = [ [[package]] name = "dbsp_nexmark" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "ascii_table", @@ -4861,7 +4861,7 @@ dependencies = [ [[package]] name = "fda" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "arrow", @@ -4913,7 +4913,7 @@ dependencies = [ [[package]] name = "feldera-adapterlib" -version = "0.277.0" +version = "0.278.0" dependencies = [ "actix-web", "anyhow", @@ -4944,7 +4944,7 @@ dependencies = [ [[package]] name = "feldera-buffer-cache" -version = "0.277.0" +version = "0.278.0" dependencies = [ "crossbeam-utils", "enum-map", @@ -4972,7 +4972,7 @@ dependencies = [ [[package]] name = "feldera-datagen" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "async-channel 2.5.0", @@ -4998,7 +4998,7 @@ dependencies = [ [[package]] name = "feldera-fxp" -version = "0.277.0" +version = "0.278.0" dependencies = [ "bytecheck", "dbsp", @@ -5018,7 +5018,7 @@ dependencies = [ [[package]] name = "feldera-iceberg" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "chrono", @@ -5038,7 +5038,7 @@ dependencies = [ [[package]] name = "feldera-ir" -version = "0.277.0" +version = "0.278.0" dependencies = [ "proptest", "proptest-derive", @@ -5050,7 +5050,7 @@ dependencies = [ [[package]] name = "feldera-macros" -version = "0.277.0" +version = "0.278.0" dependencies = [ "prettyplease", "proc-macro2", @@ -5060,7 +5060,7 @@ dependencies = [ [[package]] name = "feldera-observability" -version = "0.277.0" +version = "0.278.0" dependencies = [ "actix-http", "awc", @@ -5075,7 +5075,7 @@ dependencies = [ [[package]] name = "feldera-rest-api" -version = "0.277.0" +version = "0.278.0" dependencies = [ "chrono", "feldera-observability", @@ -5109,7 +5109,7 @@ dependencies = [ [[package]] name = "feldera-sqllib" -version = "0.277.0" +version = "0.278.0" dependencies = [ "arcstr", "base58", @@ -5150,7 +5150,7 @@ dependencies = [ [[package]] name = "feldera-storage" -version = "0.277.0" +version = "0.278.0" dependencies = [ "anyhow", "crossbeam", @@ -5173,7 +5173,7 @@ dependencies = [ [[package]] name = "feldera-types" -version = "0.277.0" +version = "0.278.0" dependencies = [ "actix-web", "anyhow", @@ -8094,7 +8094,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeline-manager" -version = "0.277.0" +version = "0.278.0" dependencies = [ "actix-cors", "actix-files", @@ -9188,7 +9188,7 @@ dependencies = [ [[package]] name = "readers" -version = "0.277.0" +version = "0.278.0" dependencies = [ "async-std", "csv", @@ -10764,7 +10764,7 @@ dependencies = [ [[package]] name = "sltsqlvalue" -version = "0.277.0" +version = "0.278.0" dependencies = [ "dbsp", "feldera-sqllib", @@ -11067,7 +11067,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "storage-test-compat" -version = "0.277.0" +version = "0.278.0" dependencies = [ "dbsp", "derive_more 1.0.0", diff --git a/Cargo.toml b/Cargo.toml index 1c455e33f64..0d246a88f5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,13 @@ [workspace.package] authors = ["Feldera Team "] -version = "0.277.0" +version = "0.278.0" license = "MIT OR Apache-2.0" homepage = "https://github.com/feldera/feldera" repository = "https://github.com/feldera/feldera" keywords = ["ivm", "analytics", "database", "incremental", "sql"] readme = "README.md" # Define Minimum Supported Rust Version (MSRV) -rust-version = "1.91.1" +rust-version = "1.93.1" edition = "2021" [workspace] @@ -101,7 +101,7 @@ csv = "1.2.2" csv-core = "0.1.10" dashmap = "6.1.0" datafusion = "51.0" -dbsp = { path = "crates/dbsp", version = "0.277.0" } +dbsp = { path = "crates/dbsp", version = "0.278.0" } dbsp_nexmark = { path = "crates/nexmark" } deadpool-postgres = "0.14.1" #deltalake = "0.30.2" @@ -121,19 +121,19 @@ erased-serde = "0.3.31" fake = "2.10" fastbloom = "0.14.0" fdlimit = "0.3.0" -feldera-buffer-cache = { version = "0.277.0", path = "crates/buffer-cache" } +feldera-buffer-cache = { version = "0.278.0", path = "crates/buffer-cache" } feldera-cloud1-client = "0.1.2" feldera-datagen = { path = "crates/datagen" } -feldera-fxp = { version = "0.277.0", path = "crates/fxp", features = ["dbsp"] } +feldera-fxp = { version = "0.278.0", path = "crates/fxp", features = ["dbsp"] } feldera-iceberg = { path = "crates/iceberg" } -feldera-observability = { version = "0.277.0", path = "crates/feldera-observability" } -feldera-macros = { version = "0.277.0", path = "crates/feldera-macros" } -feldera-sqllib = { version = "0.277.0", path = "crates/sqllib" } -feldera-storage = { version = "0.277.0", path = "crates/storage" } -feldera-types = { version = "0.277.0", path = "crates/feldera-types" } -feldera-rest-api = { version = "0.277.0", path = "crates/rest-api" } -feldera-ir = { version = "0.277.0", path = "crates/ir" } -feldera-adapterlib = { version = "0.277.0", path = "crates/adapterlib" } +feldera-observability = { version = "0.278.0", path = "crates/feldera-observability" } +feldera-macros = { version = "0.278.0", path = "crates/feldera-macros" } +feldera-sqllib = { version = "0.278.0", path = "crates/sqllib" } +feldera-storage = { version = "0.278.0", path = "crates/storage" } +feldera-types = { version = "0.278.0", path = "crates/feldera-types" } +feldera-rest-api = { version = "0.278.0", path = "crates/rest-api" } +feldera-ir = { version = "0.278.0", path = "crates/ir" } +feldera-adapterlib = { version = "0.278.0", path = "crates/adapterlib" } flate2 = "1.1.0" form_urlencoded = "1.2.0" futures = "0.3.30" diff --git a/crates/adapters/Cargo.toml b/crates/adapters/Cargo.toml index 95eaeb15a8c..02f0a32c827 100644 --- a/crates/adapters/Cargo.toml +++ b/crates/adapters/Cargo.toml @@ -246,3 +246,7 @@ required-features = ["with-avro"] name = "postgres_output" harness = false required-features = ["bench-mode"] + +[[bench]] +name = "delta_encoder" +harness = false diff --git a/crates/adapters/benches/delta_encoder.rs b/crates/adapters/benches/delta_encoder.rs new file mode 100644 index 00000000000..23f97bbda35 --- /dev/null +++ b/crates/adapters/benches/delta_encoder.rs @@ -0,0 +1,78 @@ +mod bench_common; + +use bench_common::{BenchKeyStruct, BenchTestStruct, build_indexed_batch, generate_test_data}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use dbsp_adapters::Encoder; +use dbsp_adapters::integrated::delta_table::DeltaTableWriter; +use feldera_types::transport::delta_table::{DeltaTableWriteMode, DeltaTableWriterConfig}; +use std::sync::Weak; +use tempfile::TempDir; + +// --------------------------------------------------------------------------- +// Delta-specific helpers +// --------------------------------------------------------------------------- + +fn create_indexed_writer(threads: usize, table_uri: &str) -> DeltaTableWriter { + let config = DeltaTableWriterConfig { + uri: table_uri.to_string(), + mode: DeltaTableWriteMode::Truncate, + max_retries: Some(0), + threads: Some(threads), + object_store_config: Default::default(), + }; + let key_schema = Some(BenchKeyStruct::relation_schema()); + let mut value_schema = BenchTestStruct::relation_schema(); + value_schema.materialized = true; + DeltaTableWriter::new( + Default::default(), + "bench_endpoint", + &config, + &key_schema, + &value_schema, + Weak::new(), + ) + .unwrap() +} + +// --------------------------------------------------------------------------- +// Benchmarks +// --------------------------------------------------------------------------- + +/// Benchmark parallel Delta table encoding with 100k records across 1/2/4/8 workers. +fn bench_indexed_encode(c: &mut Criterion) { + let num_records = 100_000; + let data = generate_test_data(num_records); + let batch = build_indexed_batch(&data); + + let mut group = c.benchmark_group("delta_indexed_encode"); + group.throughput(criterion::Throughput::Elements(num_records as u64)); + + for workers in [1, 2, 4, 8] { + group.bench_with_input( + BenchmarkId::new("workers", workers), + &workers, + |b, &workers| { + // Each iteration needs a fresh directory since the writer + // creates real Parquet files. + b.iter_with_setup( + || { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + let writer = create_indexed_writer(workers, &table_uri); + (writer, table_dir) + }, + |(mut writer, _table_dir)| { + writer.consumer().batch_start(0); + writer.encode(batch.clone().arc_as_batch_reader()).unwrap(); + writer.consumer().batch_end(); + }, + ); + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_indexed_encode); +criterion_main!(benches); diff --git a/crates/adapters/src/integrated.rs b/crates/adapters/src/integrated.rs index 0bb3ad14463..aff0c2b1002 100644 --- a/crates/adapters/src/integrated.rs +++ b/crates/adapters/src/integrated.rs @@ -6,7 +6,7 @@ use feldera_types::program_schema::Relation; use std::sync::Weak; #[cfg(feature = "with-deltalake")] -mod delta_table; +pub mod delta_table; mod postgres; use crate::integrated::postgres::PostgresInputEndpoint; diff --git a/crates/adapters/src/integrated/delta_table/output.rs b/crates/adapters/src/integrated/delta_table/output.rs index 3d011061a2d..fc364063c7d 100644 --- a/crates/adapters/src/integrated/delta_table/output.rs +++ b/crates/adapters/src/integrated/delta_table/output.rs @@ -1,4 +1,4 @@ -use crate::catalog::{CursorWithPolarity, SerBatchReader}; +use crate::catalog::{CursorWithPolarity, SerBatchReader, SplitCursorBuilder}; use crate::controller::{ControllerInner, EndpointId}; use crate::format::MAX_DUPLICATES; use crate::format::parquet::relation_to_arrow_fields; @@ -9,8 +9,7 @@ use crate::{ AsyncErrorCallback, ControllerError, Encoder, OutputConsumer, OutputEndpoint, RecordFormat, SerCursor, }; -use anyhow::{Error as AnyError, Result as AnyResult, anyhow, bail}; -use arrow::array::RecordBatch; +use anyhow::{Result as AnyResult, anyhow, bail}; use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; use chrono::Utc; use dbsp::circuit::tokio::TOKIO; @@ -19,10 +18,10 @@ use delta_kernel::table_properties::DataSkippingNumIndexedCols; use deltalake::DeltaTable; use deltalake::kernel::transaction::{CommitBuilder, TableReference}; use deltalake::kernel::{Action, Add, DataType, StructField}; +use deltalake::logstore::ObjectStoreRef; use deltalake::operations::create::CreateBuilder; use deltalake::operations::write::writer::{DeltaWriter, WriterConfig}; use deltalake::protocol::{DeltaOperation, SaveMode}; -use feldera_types::program_schema::SqlIdentifier; use feldera_types::serde_with_context::serde_config::{ BinaryFormat, DecimalFormat, UuidFormat, VariantFormat, }; @@ -37,10 +36,8 @@ use serde_arrow::ArrayBuilder; use serde_arrow::schema::SerdeArrowSchema; use std::cmp::min; use std::sync::{Arc, Weak}; -use std::thread; -use tokio::sync::mpsc::{Receiver, Sender, channel}; use tokio::time::{Duration, sleep}; -use tracing::{info, trace, warn}; +use tracing::{Instrument, info, info_span, warn}; /// Arrow serde config for reading/writing Delta tables. pub const fn delta_arrow_serde_config() -> &'static SqlSerdeConfig { @@ -69,20 +66,15 @@ struct DeltaTableWriterInner { pub struct DeltaTableWriter { inner: Arc, - command_sender: Sender, - response_receiver: Receiver>, + object_store: ObjectStoreRef, + task: WriterTask, + threads: usize, + pending_actions: Vec, + num_rows: usize, } /// Limit on the number of records buffered in memory in the encoder. -static CHUNK_SIZE: usize = 100_000; - -/// Commands sent to the tokio runtime that performs the actual -/// delta table operations. -enum Command { - BatchStart, - Insert(RecordBatch), - BatchEnd, -} +const CHUNK_SIZE: usize = 100_000; impl DeltaTableWriter { pub fn new( @@ -93,6 +85,22 @@ impl DeltaTableWriter { value_schema: &Relation, controller: Weak, ) -> Result { + config.validate().map_err(|e| { + ControllerError::invalid_transport_configuration(endpoint_name, &e.to_string()) + })?; + + let threads = config.threads.unwrap_or(1); + + if threads > 1 && key_schema.is_none() { + return Err(ControllerError::invalid_transport_configuration( + endpoint_name, + "Parallel writes (threads > 1) require the view to have a unique key to \ + ensure correct ordering of inserts and deletes. Please specify the `index` \ + property in the connector configuration. For more details, see: \ + https://docs.feldera.com/connectors/unique_keys", + )); + } + register_storage_handlers(); // Create arrow schema @@ -134,126 +142,38 @@ impl DeltaTableWriter { value_schema: value_schema.clone(), controller, }); - let inner_clone = inner.clone(); - - let (command_sender, command_receiver) = channel::(1); - let (response_sender, mut response_receiver) = channel::>(1); - - // Start tokio runtime. - thread::Builder::new() - .name(format!("{endpoint_name}-delta-output-tokio-wrapper")) - .spawn(move || { - TOKIO.block_on(async { - let _ = Self::worker_task(inner_clone, command_receiver, response_sender).await; - }) - }) - .expect("failed to spawn output delta connector tokio wrapper thread"); - - response_receiver - .blocking_recv() - .ok_or_else(|| { + // Create or open the delta table. + // Panic safety: block_on() panics if called from a tokio async context. + // new() is called from sync controller code (connect_output), so this is fine. + let task = TOKIO + .block_on(WriterTask::create(inner.clone())) + .map_err(|e| { ControllerError::output_transport_error( endpoint_name, true, - anyhow!("worker thread terminated unexpectedly during initialization"), + anyhow!( + "error creating or opening delta table '{}': {e}", + &config.uri + ), ) - })? - .map_err(|(e, _)| ControllerError::output_transport_error(endpoint_name, true, e))?; - - let writer = Self { - inner, - command_sender, - response_receiver, - }; - - Ok(writer) - } - - fn view_name(&self) -> &SqlIdentifier { - &self.inner.value_schema.name - } - - fn command(&mut self, command: Command) -> Result<(), (AnyError, bool)> { - self.command_sender - .blocking_send(command) - .map_err(|_| (anyhow!("worker thread terminated unexpectedly"), true))?; - self.response_receiver - .blocking_recv() - .ok_or_else(|| (anyhow!("worker thread terminated unexpectedly"), true))? - } - - fn insert_record_batch(&mut self, builder: &mut ArrayBuilder) -> AnyResult<()> { - let batch = builder - .to_record_batch() - .map_err(|e| anyhow!("error generating arrow arrays: {e}"))?; - self.command(Command::Insert(batch)) - .map_err(|(e, _fatal)| e) - } + })?; - async fn worker_task( - inner: Arc, - mut command_receiver: Receiver, - response_sender: Sender>, - ) { - let mut task = match WriterTask::create(inner.clone()).await { - Ok(task) => { - let _ = response_sender.send(Ok(())).await; - task - } - Err(e) => { - let _ = response_sender - .send(Err(( - anyhow!( - "error creating or opening delta table '{}': {e}", - &inner.config.uri - ), - false, - ))) - .await; - return; - } - }; + let object_store = task.delta_table.object_store(); - loop { - match command_receiver.recv().await { - Some(Command::BatchStart) => { - task.batch_start().await; - // Ignore closed channel, we'll handle it at the next loop iteration. - let _ = response_sender.send(Ok(())).await; - } - Some(Command::BatchEnd) => match task.batch_end().await { - Ok(()) => { - let _ = response_sender.send(Ok(())).await; - } - Err(e) => { - let _ = response_sender.send(Err((e, false))).await; - } - }, - Some(Command::Insert(batch)) => match task.insert(batch).await { - Ok(()) => { - let _ = response_sender.send(Ok(())).await; - } - Err(e) => { - let _ = response_sender.send(Err((e, false))).await; - } - }, - None => { - trace!( - "delta_table {}: endpoint is shutting down", - &inner.endpoint_name - ); - return; - } - } - } + Ok(Self { + inner, + object_store, + task, + threads, + pending_actions: Vec::new(), + num_rows: 0, + }) } } struct WriterTask { inner: Arc, delta_table: DeltaTable, - writer: Option, - num_rows: usize, } /// Retry `op` with exponential backoff of up to 10 seconds until it succeeds or config.max_retries is reached. @@ -421,41 +341,7 @@ impl WriterTask { } ); - Ok(Self { - inner, - delta_table, - writer: None, - num_rows: 0, - }) - } - - async fn batch_start(&mut self) { - trace!( - "delta_table {}: starting a new output batch", - &self.inner.endpoint_name, - ); - - self.num_rows = 0; - - // TODO: make target_file_size configurable. - // TODO: configure WriterProperties, e.g., do we want to set WriterProperties::sorting_columns? - let writer_config = WriterConfig::new( - self.inner.arrow_schema.clone(), - vec![], - None, - None, - None, - DataSkippingNumIndexedCols::NumColumns(min( - 32, - self.inner.arrow_schema.fields.len() as u64, - )), - None, - ); - - self.writer = Some(DeltaWriter::new( - self.delta_table.object_store(), - writer_config, - )); + Ok(Self { inner, delta_table }) } async fn commit(&mut self, actions: &[Add]) -> AnyResult<()> { @@ -501,77 +387,246 @@ impl WriterTask { Ok(()) } - async fn batch_end(&mut self) -> AnyResult<()> { - trace!( - "delta_table {}: finished writing output records, committing (current table version: {})", - &self.inner.endpoint_name, - self.current_version() - ); - match self.writer.take() { - Some(writer) => { - // TODO: this is currently not retryable. - // See: https://github.com/delta-io/delta-rs/issues/4265 - // Another option is to retry the entire transaction, since it just happens that there's always exactly one - // insert between batch_start and batch_end, so we can hold on to it and retry the whole thing. - let actions = writer - .close() - .await - .map_err(|e| anyhow!("error flushing {} Parquet rows: {e:?}", self.num_rows))?; - - if actions.is_empty() { - return Ok(()); - } - - let num_bytes = actions.iter().map(|action| action.size as usize).sum(); + async fn commit_with_retry(&mut self, actions: &[Add]) -> AnyResult<()> { + retry!( + self, + "committing Delta table transaction", + self.commit(actions).await + ) + } +} - retry!( - self, - "committing Delta table transaction", - self.commit(&actions).await - )?; +/// Error classification for Delta table write operations. +/// +/// Separates deterministic failures (which will recur on every attempt) from +/// transient I/O failures (which may succeed on retry). +enum WriteError { + /// Data-dependent error that will recur identically on retry. + /// Examples: non-unique keys, schema mismatches, serialization failures. + Deterministic(anyhow::Error), + /// Transient I/O error that may resolve on retry. + /// Examples: object store timeouts, network failures. + Transient(anyhow::Error), +} - if let Some(controller) = self.inner.controller.upgrade() { - controller.status.output_buffer( - self.inner.endpoint_id, - num_bytes, - self.num_rows, - ) - }; - Ok(()) +/// Encode a key range and stream-write it to a `DeltaWriter`, retrying transient failures. +/// +/// On retry, a fresh cursor is rebuilt from `cursor_builder` and a new `DeltaWriter` +/// is created. Any Parquet files written by a failed attempt become orphans that +/// Delta `VACUUM` will clean up. +/// +/// Only transient I/O errors are retried; deterministic errors (e.g., non-unique keys, +/// serialization failures) are returned immediately. +async fn encode_and_write_range( + cursor_builder: SplitCursorBuilder, + inner: Arc, + object_store: ObjectStoreRef, + micros: i64, +) -> AnyResult<(Vec, usize)> { + // This function has its own retry loop instead of using the `retry!` macro because: + // Multiple ranges run in parallel; the `retry!` macro clears the connector + // health status on success, which would be incorrect here; a single range + // succeeding must not mask failures in other ranges. + let mut retry_count: u32 = 0; + let mut backoff = Duration::from_secs(1); + let max_backoff = Duration::from_secs(10); + + loop { + match stream_encode_and_write(&cursor_builder, &inner, object_store.clone(), micros).await { + Ok((ref actions, rows)) => { + if retry_count > 0 { + info!( + "delta_table {}: Delta table write succeeded after {retry_count} retries ({rows} rows, {} files)", + inner.endpoint_name, + actions.len(), + ); + } + return Ok((actions.clone(), rows)); } - _ => { - bail!( - "delta_table {}: received a BatchEnd without a matching BatchStart", - &self.inner.endpoint_name - ) + Err(WriteError::Deterministic(e)) => { + return Err(e); + } + Err(WriteError::Transient(e)) + if inner.config.max_retries.is_none() + || retry_count < inner.config.max_retries.unwrap() => + { + retry_count += 1; + let message = format!( + "Delta table write failed (attempt {retry_count}, retrying in {backoff:?}): {e:?}" + ); + if let Some(controller) = inner.controller.upgrade() { + controller.update_output_connector_health( + inner.endpoint_id, + ConnectorHealth::unhealthy(&message), + ); + } + warn!("delta_table {}: {message}", inner.endpoint_name); + sleep(backoff).await; + backoff = std::cmp::min(backoff * 2, max_backoff); + } + Err(WriteError::Transient(e)) => { + return Err(anyhow!( + "Delta table write failed after {retry_count} retries: {e}" + )); } } } +} - async fn insert(&mut self, batch: RecordBatch) -> AnyResult<()> { - let current_version = self.current_version(); +/// Build a `RecordBatch` from `builder` (deterministic), write it via `writer` (transient I/O), +/// and accumulate the row count in `total_rows`. +async fn flush_chunk( + builder: &mut ArrayBuilder, + writer: &mut DeltaWriter, + total_rows: &mut usize, +) -> Result<(), WriteError> { + let batch = builder + .to_record_batch() + .map_err(|e| WriteError::Deterministic(anyhow!("error generating arrow arrays: {e}")))?; + *total_rows += batch.num_rows(); + writer.write(&batch).await.map_err(|e| { + WriteError::Transient(anyhow!("error writing {} records: {e:?}", batch.num_rows())) + })?; + Ok(()) +} - if let Some(writer) = &mut self.writer { - self.num_rows += batch.num_rows(); - trace!( - "delta_table {}: writing {} records", - &self.inner.endpoint_name, self.num_rows, - ); +/// Single-attempt streaming encode + write for one key range. +/// +/// Encodes records from the cursor in chunks of `CHUNK_SIZE` and writes each chunk +/// to the `DeltaWriter` immediately, avoiding buffering all `RecordBatch`es in memory. +/// +/// Returns [`WriteError::Deterministic`] for data-dependent failures (serialization, +/// validation) and [`WriteError::Transient`] for I/O failures (object store writes). +async fn stream_encode_and_write( + cursor_builder: &SplitCursorBuilder, + inner: &DeltaTableWriterInner, + object_store: ObjectStoreRef, + micros: i64, +) -> Result<(Vec, usize), WriteError> { + let num_indexed_cols = min(32, inner.arrow_schema.fields.len() as u64); + let writer_config = WriterConfig::new( + inner.arrow_schema.clone(), + vec![], + None, + None, + None, + DataSkippingNumIndexedCols::NumColumns(num_indexed_cols), + None, + ); + let mut writer = DeltaWriter::new(object_store, writer_config); + let mut insert_builder = ArrayBuilder::new(inner.serde_arrow_schema.clone()) + .map_err(|e| WriteError::Deterministic(anyhow!("error creating array builder: {e}")))?; + let mut num_records = 0; + let mut total_rows = 0; + let index_name = inner.key_schema.as_ref().map(|s| &s.name); + + if let Some(index_name) = index_name { + let mut cursor = cursor_builder.build(); + + while cursor.key_valid() { + let op = indexed_operation_type(&inner.value_schema.name, index_name, &mut cursor) + .map_err(WriteError::Deterministic)?; + + if let Some(op) = op { + cursor.rewind_vals(); + + match op { + IndexedOperationType::Insert => cursor + .serialize_val_to_arrow_with_metadata( + &Meta::new("i", micros), + &mut insert_builder, + ) + .map_err(WriteError::Deterministic)?, + IndexedOperationType::Delete => cursor + .serialize_val_to_arrow_with_metadata( + &Meta::new("d", micros), + &mut insert_builder, + ) + .map_err(WriteError::Deterministic)?, + IndexedOperationType::Upsert => { + assert!(cursor.val_valid()); - let description = format!( - "writing {} records to Delta table (current version: {current_version})", - batch.num_rows(), - ); - retry!(self, description, { writer.write(&batch).await })?; + if cursor.weight() < 0 { + cursor.step_val(); + } + assert!(cursor.val_valid()); - Ok(()) - } else { - bail!( - "delta_table {}: received Data without a matching BatchStart", - &self.inner.endpoint_name - ); + cursor + .serialize_val_to_arrow_with_metadata( + &Meta::new("u", micros), + &mut insert_builder, + ) + .map_err(WriteError::Deterministic)?; + } + }; + + num_records += 1; + + if num_records >= CHUNK_SIZE { + flush_chunk(&mut insert_builder, &mut writer, &mut total_rows).await?; + num_records = 0; + } + }; + + cursor.step_key(); + } + } else { + let cursor = cursor_builder.build(); + let mut cursor = CursorWithPolarity::new(Box::new(cursor)); + + while cursor.key_valid() { + if !cursor.val_valid() { + cursor.step_key(); + continue; + } + + let mut w = cursor.weight(); + if !(-MAX_DUPLICATES..=MAX_DUPLICATES).contains(&w) { + return Err(WriteError::Deterministic(anyhow!( + "Unable to output record with very large weight {w}. \ + Consider adjusting your SQL queries to avoid duplicate output records, \ + e.g., using 'SELECT DISTINCT'." + ))); + } + + while w != 0 { + if w > 0 { + cursor + .serialize_key_to_arrow_with_metadata( + &Meta::new("i", micros), + &mut insert_builder, + ) + .map_err(WriteError::Deterministic)?; + w -= 1; + } else { + cursor + .serialize_key_to_arrow_with_metadata( + &Meta::new("d", micros), + &mut insert_builder, + ) + .map_err(WriteError::Deterministic)?; + w += 1; + } + num_records += 1; + + if num_records >= CHUNK_SIZE { + flush_chunk(&mut insert_builder, &mut writer, &mut total_rows).await?; + num_records = 0; + } + } + cursor.step_key(); } } + + if num_records > 0 { + flush_chunk(&mut insert_builder, &mut writer, &mut total_rows).await?; + } + + let actions = writer + .close() + .await + .map_err(|e| WriteError::Transient(anyhow!("error closing writer: {e:?}")))?; + Ok((actions, total_rows)) } impl OutputConsumer for DeltaTableWriter { @@ -580,18 +635,8 @@ impl OutputConsumer for DeltaTableWriter { } fn batch_start(&mut self, _step: Step) { - self.command(Command::BatchStart) - .unwrap_or_else(|(e, fatal)| { - if let Some(controller) = self.inner.controller.upgrade() { - controller.output_transport_error( - self.inner.endpoint_id, - &self.inner.endpoint_name, - fatal, - e, - Some("delta_batch_start"), - ) - }; - }); + self.pending_actions.clear(); + self.num_rows = 0; } fn push_buffer(&mut self, _buffer: &[u8], _num_records: usize) { @@ -609,18 +654,44 @@ impl OutputConsumer for DeltaTableWriter { } fn batch_end(&mut self) { - self.command(Command::BatchEnd) - .unwrap_or_else(|(e, fatal)| { - if let Some(controller) = self.inner.controller.upgrade() { - controller.output_transport_error( - self.inner.endpoint_id, - &self.inner.endpoint_name, - fatal, - e, - Some("delta_batch_end"), - ) - }; - }); + if self.pending_actions.is_empty() { + return; + } + + let _span = info_span!( + "delta_output", + endpoint = &*self.inner.endpoint_name, + table = &*self.inner.config.uri, + ) + .entered(); + + let num_bytes: usize = self.pending_actions.iter().map(|a| a.size as usize).sum(); + let num_rows = self.num_rows; + let actions = std::mem::take(&mut self.pending_actions); + self.num_rows = 0; + + // Panic safety: block_on() panics if called from a tokio async context. + // batch_end() is called from the dedicated output thread (output_thread_func). + if let Err(e) = TOKIO.block_on(self.task.commit_with_retry(&actions)) { + if let Some(controller) = self.inner.controller.upgrade() { + controller.output_transport_error( + self.inner.endpoint_id, + &self.inner.endpoint_name, + false, + e, + Some("delta_batch_end"), + ) + }; + return; + } + + if let Some(controller) = self.inner.controller.upgrade() { + controller + .update_output_connector_health(self.inner.endpoint_id, ConnectorHealth::healthy()); + controller + .status + .output_buffer(self.inner.endpoint_id, num_bytes, num_rows); + } } } @@ -650,108 +721,89 @@ impl Encoder for DeltaTableWriter { } fn encode(&mut self, batch: Arc) -> AnyResult<()> { - let micros = Utc::now().timestamp_micros(); - let mut insert_builder = ArrayBuilder::new(self.inner.serde_arrow_schema.clone())?; - - let mut num_insert_records = 0; - - let index_name = &self.inner.key_schema.as_ref().map(|s| s.name.to_owned()); - - if let Some(index_name) = &index_name { - let mut cursor = - batch.cursor(RecordFormat::Parquet(delta_arrow_serde_config().clone()))?; - - while cursor.key_valid() { - if let Some(op) = - indexed_operation_type(self.view_name(), index_name, cursor.as_mut())? - { - cursor.rewind_vals(); - - match op { - IndexedOperationType::Insert => cursor - .serialize_val_to_arrow_with_metadata( - &Meta::new("i", micros), - &mut insert_builder, - )?, - IndexedOperationType::Delete => cursor - .serialize_val_to_arrow_with_metadata( - &Meta::new("d", micros), - &mut insert_builder, - )?, - IndexedOperationType::Upsert => { - assert!(cursor.val_valid()); - - if cursor.weight() < 0 { - cursor.step_val(); - } - assert!(cursor.val_valid()); - - cursor.serialize_val_to_arrow_with_metadata( - &Meta::new("u", micros), - &mut insert_builder, - )?; - } - }; + let threads = self.threads; + let mut bounds = batch.keys_factory().default_box(); + batch.partition_keys(threads, &mut *bounds); + + let mut cursor_builders = Vec::new(); + for i in 0..=bounds.len() { + let Some(cb) = SplitCursorBuilder::from_bounds( + batch.clone(), + &*bounds, + i, + RecordFormat::Parquet(delta_arrow_serde_config().clone()), + ) else { + continue; + }; + cursor_builders.push(cb); + } + if cursor_builders.is_empty() { + return Ok(()); + } - num_insert_records += 1; + let micros = Utc::now().timestamp_micros(); - // Split batch into chunks. This does not affect the number or size of generated - // parquet files, since that is controlled by the `DeltaWriter`, but it limits - // the amount of memory used by `builder`. - if num_insert_records >= CHUNK_SIZE { - self.insert_record_batch(&mut insert_builder)?; - num_insert_records = 0; - } - }; + let span = info_span!( + "delta_output", + endpoint = &*self.inner.endpoint_name, + table = &*self.inner.config.uri, + ); - cursor.step_key(); + // Panic safety: block_on() panics if called from a tokio async context. + // encode() is called from the dedicated output thread (output_thread_func). + let results = TOKIO.block_on(async { + let mut handles = Vec::with_capacity(cursor_builders.len()); + for cursor_builder in cursor_builders { + let inner = self.inner.clone(); + let object_store = self.object_store.clone(); + handles.push(tokio::spawn( + encode_and_write_range(cursor_builder, inner, object_store, micros) + .instrument(span.clone()), + )); } - } else { - let mut cursor = CursorWithPolarity::new( - batch.cursor(RecordFormat::Parquet(delta_arrow_serde_config().clone()))?, - ); - while cursor.key_valid() { - if !cursor.val_valid() { - cursor.step_key(); - continue; - } + let mut results = Vec::with_capacity(handles.len()); + for handle in handles { + results.push( + handle + .await + .unwrap_or_else(|e| Err(anyhow!("write task panicked: {e}"))), + ); + } + results + }); - let mut w = cursor.weight(); - if !(-MAX_DUPLICATES..=MAX_DUPLICATES).contains(&w) { - bail!( - "Unable to output record with very large weight {w}. Consider adjusting your SQL queries to avoid duplicate output records, e.g., using 'SELECT DISTINCT'." - ); + let mut errors = Vec::new(); + let mut succeeded_ranges = 0usize; + for result in results { + match result { + Ok((mut actions, rows)) => { + self.pending_actions.append(&mut actions); + self.num_rows += rows; + succeeded_ranges += 1; } - - while w != 0 { - if w > 0 { - cursor.serialize_key_to_arrow_with_metadata( - &Meta::new("i", micros), - &mut insert_builder, - )?; - w -= 1; - } else { - cursor.serialize_key_to_arrow_with_metadata( - &Meta::new("d", micros), - &mut insert_builder, - )?; - w += 1; - } - num_insert_records += 1; - // Split batch into chunks. This does not affect the number or size of generated - // parquet files, since that is controlled by the `DeltaWriter`, but it limits - // the amount of memory used by `builder`. - if num_insert_records >= CHUNK_SIZE { - self.insert_record_batch(&mut insert_builder)?; - num_insert_records = 0; - } - } - cursor.step_key(); + Err(e) => errors.push(e), } } - - if num_insert_records > 0 { - self.insert_record_batch(&mut insert_builder)?; + if !errors.is_empty() { + if succeeded_ranges > 0 { + warn!( + "delta_table {}: {} range(s) succeeded but {} failed; \ + dropping {} file action(s) from this commit (orphaned files will be cleaned up by VACUUM)", + self.inner.endpoint_name, + succeeded_ranges, + errors.len(), + self.pending_actions.len(), + ); + } + self.pending_actions.clear(); + self.num_rows = 0; + let msg = errors + .iter() + .map(|e| format!("{e:#}")) + .collect::>() + .join("; "); + + bail!("{} write task(s) failed: {msg}", errors.len()); } Ok(()) @@ -795,3 +847,593 @@ impl OutputEndpoint for DeltaTableWriter { false } } + +#[cfg(test)] +mod parallel { + use std::collections::BTreeMap; + use std::ffi::OsStr; + use std::os::unix::ffi::OsStrExt; + use std::os::unix::fs::PermissionsExt; + use std::path::Path; + use std::sync::{Arc, Weak}; + + use dbsp::utils::Tup2; + use dbsp::{OrdIndexedZSet, OrdZSet}; + use feldera_sqllib::{ + ByteArray, Date, F32, F64, SqlDecimal, SqlString, Timestamp, Uuid, Variant, + }; + use feldera_types::deserialize_table_record; + use feldera_types::program_schema::{ColumnType, Relation, SqlIdentifier}; + use feldera_types::transport::delta_table::{DeltaTableWriteMode, DeltaTableWriterConfig}; + use tempfile::TempDir; + + use crate::catalog::SerBatch; + use crate::controller::EndpointId; + use crate::format::Encoder; + use crate::format::parquet::test::load_parquet_file; + use crate::static_compile::seroutput::SerBatchImpl; + use crate::test::data::{DeltaTestKey, DeltaTestStruct, TestStruct}; + use crate::test::list_files_recursive; + + use super::DeltaTableWriter; + + // ── Output record type (DeltaTestStruct fields + metadata columns) ── + + #[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone)] + struct OutputRecord { + bigint: i64, + binary: ByteArray, + boolean: bool, + date: Date, + decimal_10_3: SqlDecimal<10, 3>, + double: F64, + float: F32, + int: i32, + smallint: i16, + string: String, + unused: Option, + timestamp_ntz: Timestamp, + tinyint: i8, + string_array: Vec, + struct1: TestStruct, + struct_array: Vec, + string_string_map: BTreeMap, + string_struct_map: BTreeMap, + variant: Variant, + uuid: Uuid, + __feldera_op: String, + __feldera_ts: i64, + } + + deserialize_table_record!(OutputRecord["OutputRecord", Variant, 22] { + (bigint, "bigint", false, i64, |_| None), + (binary, "binary", false, ByteArray, |_| None), + (boolean, "boolean", false, bool, |_| None), + (date, "date", false, Date, |_| None), + (decimal_10_3, "decimal_10_3", false, SqlDecimal<10, 3>, |_| None), + (double, "double", false, F64, |_| None), + (float, "float", false, F32, |_| None), + (int, "int", false, i32, |_| None), + (smallint, "smallint", false, i16, |_| None), + (string, "string", false, String, |_| None), + (unused, "unused", false, Option, |_| Some(None)), + (timestamp_ntz, "timestamp_ntz", false, Timestamp, |_| None), + (tinyint, "tinyint", false, i8, |_| None), + (string_array, "string_array", false, Vec, |_| None), + (struct1, "struct1", false, TestStruct, |_| None), + (struct_array, "struct_array", false, Vec, |_| None), + (string_string_map, "string_string_map", false, BTreeMap, |_| None), + (string_struct_map, "string_struct_map", false, BTreeMap, |_| None), + (variant, "variant", false, Variant, |_| None), + (uuid, "uuid", false, Uuid, |_| None), + (__feldera_op, "__feldera_op", false, String, |_| None), + (__feldera_ts, "__feldera_ts", false, i64, |_| None) + }); + + impl OutputRecord { + fn to_data_record(&self) -> DeltaTestStruct { + DeltaTestStruct { + bigint: self.bigint, + binary: self.binary.clone(), + boolean: self.boolean, + date: self.date, + decimal_10_3: self.decimal_10_3, + double: self.double, + float: self.float, + int: self.int, + smallint: self.smallint, + string: self.string.clone(), + unused: self.unused.clone(), + timestamp_ntz: self.timestamp_ntz, + tinyint: self.tinyint, + string_array: self.string_array.clone(), + struct1: self.struct1.clone(), + struct_array: self.struct_array.clone(), + string_string_map: self.string_string_map.clone(), + string_struct_map: self.string_struct_map.clone(), + variant: self.variant.clone(), + uuid: self.uuid.clone(), + } + } + } + + // ── Helpers ──────────────────────────────────────────────────── + + fn key_relation() -> Relation { + Relation { + name: SqlIdentifier::new("test_idx", false), + fields: vec![feldera_types::program_schema::Field::new( + "bigint".into(), + ColumnType::bigint(false), + )], + materialized: false, + properties: BTreeMap::new(), + } + } + + fn value_relation() -> Relation { + let mut rel = DeltaTestStruct::relation_schema(); + rel.materialized = true; + rel + } + + fn make_endpoint(threads: usize, table_uri: &str, indexed: bool) -> DeltaTableWriter { + let key_schema = if indexed { Some(key_relation()) } else { None }; + DeltaTableWriter::new( + EndpointId::default(), + "test_endpoint", + &DeltaTableWriterConfig { + uri: table_uri.to_string(), + mode: DeltaTableWriteMode::Truncate, + max_retries: Some(0), + threads: Some(threads), + object_store_config: Default::default(), + }, + &key_schema, + &value_relation(), + Weak::new(), + ) + .expect("failed to create endpoint") + } + + fn build_insert_batch(records: &[DeltaTestStruct]) -> Arc { + let tuples: Vec<_> = records + .iter() + .map(|r| Tup2(Tup2(DeltaTestKey { bigint: r.bigint }, r.clone()), 1i64)) + .collect(); + let zset = OrdIndexedZSet::from_tuples((), tuples); + Arc::new(SerBatchImpl::<_, DeltaTestKey, DeltaTestStruct>::new(zset)) + } + + fn build_delete_batch(records: &[DeltaTestStruct]) -> Arc { + let tuples: Vec<_> = records + .iter() + .map(|r| Tup2(Tup2(DeltaTestKey { bigint: r.bigint }, r.clone()), -1i64)) + .collect(); + let zset = OrdIndexedZSet::from_tuples((), tuples); + Arc::new(SerBatchImpl::<_, DeltaTestKey, DeltaTestStruct>::new(zset)) + } + + fn build_upsert_batch(updates: &[(DeltaTestStruct, DeltaTestStruct)]) -> Arc { + let mut tuples = Vec::new(); + for (old, new) in updates { + assert_eq!(old.bigint, new.bigint); + tuples.push(Tup2( + Tup2(DeltaTestKey { bigint: old.bigint }, old.clone()), + -1i64, + )); + tuples.push(Tup2( + Tup2(DeltaTestKey { bigint: new.bigint }, new.clone()), + 1i64, + )); + } + let zset = OrdIndexedZSet::from_tuples((), tuples); + Arc::new(SerBatchImpl::<_, DeltaTestKey, DeltaTestStruct>::new(zset)) + } + + fn build_non_indexed_batch(records: &[DeltaTestStruct], weight: i64) -> Arc { + let tuples: Vec<_> = records.iter().map(|r| Tup2(r.clone(), weight)).collect(); + let zset = OrdZSet::from_keys((), tuples); + Arc::new(SerBatchImpl::<_, DeltaTestStruct, ()>::new(zset)) + } + + fn encode_batch(endpoint: &mut DeltaTableWriter, batch: &Arc) { + endpoint.consumer().batch_start(0); + endpoint + .encode(batch.clone().arc_as_batch_reader()) + .unwrap(); + endpoint.consumer().batch_end(); + } + + fn read_output(table_uri: &str) -> Vec { + let parquet_files = + list_files_recursive(Path::new(table_uri), OsStr::from_bytes(b"parquet")).unwrap(); + let mut records = Vec::new(); + for path in parquet_files { + let mut batch: Vec = load_parquet_file(&path); + records.append(&mut batch); + } + records + } + + fn make_record(i: usize) -> DeltaTestStruct { + DeltaTestStruct { + bigint: i as i64, + binary: ByteArray::from_vec(vec![i as u8, (i >> 8) as u8]), + boolean: i % 2 == 0, + date: Date::from_days(i as i32 % 100_000), + decimal_10_3: SqlDecimal::<10, 3>::new((i as i128 % 1_000_000) * 1000, 3).unwrap(), + double: F64::new((i as f64).trunc()), + float: F32::new((i as f32).trunc()), + int: i as i32, + smallint: (i % 32000) as i16, + string: format!("record_{i}"), + unused: if i % 3 == 0 { + None + } else { + Some(format!("unused_{i}")) + }, + timestamp_ntz: Timestamp::from_milliseconds(1704070800000 + i as i64 * 1000), + tinyint: (i % 120) as i8, + string_array: vec![format!("arr_{i}")], + struct1: TestStruct { + id: i as u32, + b: i % 2 == 0, + i: Some(i as i64), + s: format!("s_{i}"), + }, + struct_array: vec![TestStruct { + id: i as u32, + b: false, + i: None, + s: format!("sa_{i}"), + }], + string_string_map: BTreeMap::from([(format!("key_{i}"), format!("val_{i}"))]), + string_struct_map: BTreeMap::from([( + format!("sk_{i}"), + TestStruct { + id: i as u32, + b: true, + i: Some(i as i64 * 2), + s: format!("sm_{i}"), + }, + )]), + variant: Variant::Map( + std::iter::once(( + Variant::String(SqlString::from_ref("foo")), + Variant::String(SqlString::from(i.to_string())), + )) + .collect::>() + .into(), + ), + uuid: Uuid::from_bytes([i as u8; 16]), + } + } + + fn make_records(n: usize) -> Vec { + (0..n).map(make_record).collect() + } + + // ── Tests ────────────────────────────────────────────────────── + + fn insert_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let records = make_records(100); + let batch = build_insert_batch(&records); + let mut endpoint = make_endpoint(threads, &table_uri, true); + + encode_batch(&mut endpoint, &batch); + + let output = read_output(&table_uri); + assert_eq!(output.len(), 100); + for rec in &output { + assert_eq!(rec.__feldera_op, "i"); + } + // Verify data fields match + let mut output_data: Vec = + output.iter().map(|r| r.to_data_record()).collect(); + output_data.sort(); + let mut expected = records.clone(); + expected.sort(); + assert_eq!(output_data, expected); + } + + #[test] + fn test_insert_single_thread() { + insert_test(1); + } + + #[test] + fn test_insert_multi_thread() { + insert_test(4); + } + + fn upsert_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let records = make_records(50); + let insert_batch = build_insert_batch(&records); + let mut endpoint = make_endpoint(threads, &table_uri, true); + + encode_batch(&mut endpoint, &insert_batch); + + // Upsert: update records 0..10 + let updates: Vec<_> = (0..10) + .map(|i| { + let old = records[i].clone(); + let new = DeltaTestStruct { + boolean: !old.boolean, + int: old.int + 1000, + string: format!("updated_{}", old.bigint), + ..old.clone() + }; + (old, new) + }) + .collect(); + let upsert_batch = build_upsert_batch(&updates); + encode_batch(&mut endpoint, &upsert_batch); + + let output = read_output(&table_uri); + // First batch: 50 inserts, second batch: 10 upserts + let inserts: Vec<_> = output.iter().filter(|r| r.__feldera_op == "i").collect(); + let upserts: Vec<_> = output.iter().filter(|r| r.__feldera_op == "u").collect(); + assert_eq!(inserts.len(), 50); + assert_eq!(upserts.len(), 10); + } + + #[test] + fn test_upsert_single_thread() { + upsert_test(1); + } + + #[test] + fn test_upsert_multi_thread() { + upsert_test(4); + } + + fn delete_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let records = make_records(50); + let insert_batch = build_insert_batch(&records); + let mut endpoint = make_endpoint(threads, &table_uri, true); + + encode_batch(&mut endpoint, &insert_batch); + + // Delete records 0..10 + let to_delete: Vec<_> = records[0..10].to_vec(); + let delete_batch = build_delete_batch(&to_delete); + encode_batch(&mut endpoint, &delete_batch); + + let output = read_output(&table_uri); + let inserts: Vec<_> = output.iter().filter(|r| r.__feldera_op == "i").collect(); + let deletes: Vec<_> = output.iter().filter(|r| r.__feldera_op == "d").collect(); + assert_eq!(inserts.len(), 50); + assert_eq!(deletes.len(), 10); + } + + #[test] + fn test_delete_single_thread() { + delete_test(1); + } + + #[test] + fn test_delete_multi_thread() { + delete_test(4); + } + + fn non_indexed_insert_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let records = make_records(100); + let batch = build_non_indexed_batch(&records, 1); + let mut endpoint = make_endpoint(threads, &table_uri, false); + + encode_batch(&mut endpoint, &batch); + + let output = read_output(&table_uri); + assert_eq!(output.len(), 100); + for rec in &output { + assert_eq!(rec.__feldera_op, "i"); + } + let mut output_data: Vec = + output.iter().map(|r| r.to_data_record()).collect(); + output_data.sort(); + let mut expected = records; + expected.sort(); + assert_eq!(output_data, expected); + } + + #[test] + fn test_non_indexed_insert_single_thread() { + non_indexed_insert_test(1); + } + + #[test] + fn test_non_indexed_rejects_multi_thread() { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + let key_schema = None; + let result = DeltaTableWriter::new( + EndpointId::default(), + "test_endpoint", + &DeltaTableWriterConfig { + uri: table_uri, + mode: DeltaTableWriteMode::Truncate, + max_retries: Some(0), + threads: Some(4), + object_store_config: Default::default(), + }, + &key_schema, + &value_relation(), + Weak::new(), + ); + assert!( + result.is_err(), + "threads > 1 without key_schema should be rejected" + ); + } + + fn empty_batch_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let batch = build_insert_batch(&[]); + let mut endpoint = make_endpoint(threads, &table_uri, true); + + // Should not crash on empty batch. + encode_batch(&mut endpoint, &batch); + + let output = read_output(&table_uri); + assert_eq!(output.len(), 0); + } + + #[test] + fn test_empty_batch_single_thread() { + empty_batch_test(1); + } + + #[test] + fn test_empty_batch_multi_thread() { + empty_batch_test(4); + } + + fn multiple_batches_test(threads: usize) { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let mut endpoint = make_endpoint(threads, &table_uri, true); + + // Batch 1: insert 50 records + let records = make_records(50); + let insert_batch = build_insert_batch(&records); + encode_batch(&mut endpoint, &insert_batch); + + // Batch 2: insert 50 more records (ids 50..100) + let more_records: Vec = (50..100).map(make_record).collect(); + let insert_batch2 = build_insert_batch(&more_records); + encode_batch(&mut endpoint, &insert_batch2); + + // Batch 3: upsert records 0..5 + let updates: Vec<_> = (0..5) + .map(|i| { + let old = records[i].clone(); + let new = DeltaTestStruct { + boolean: !old.boolean, + int: old.int + 1000, + string: format!("updated_{}", old.bigint), + ..old.clone() + }; + (old, new) + }) + .collect(); + let upsert_batch = build_upsert_batch(&updates); + encode_batch(&mut endpoint, &upsert_batch); + + // Batch 4: delete records 90..100 + let to_delete: Vec<_> = more_records[40..50].to_vec(); + let delete_batch = build_delete_batch(&to_delete); + encode_batch(&mut endpoint, &delete_batch); + + let output = read_output(&table_uri); + let inserts = output.iter().filter(|r| r.__feldera_op == "i").count(); + let upserts = output.iter().filter(|r| r.__feldera_op == "u").count(); + let deletes = output.iter().filter(|r| r.__feldera_op == "d").count(); + + assert_eq!(inserts, 100); // 50 + 50 + assert_eq!(upserts, 5); + assert_eq!(deletes, 10); + } + + #[test] + fn test_multiple_batches_single_thread() { + multiple_batches_test(1); + } + + #[test] + fn test_multiple_batches_multi_thread() { + multiple_batches_test(4); + } + + // ── Failure scenario tests ──────────────────────────────────── + + /// Write to a read-only directory should fail with no retries. + #[test] + fn test_write_failure_readonly_dir() { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + // Create the endpoint first (needs writable dir to create the table). + let records = make_records(10); + let batch = build_insert_batch(&records); + let mut endpoint = make_endpoint(1, &table_uri, true); + + // Make directory read-only to trigger write failure. + std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o555)).unwrap(); + + endpoint.consumer().batch_start(0); + let result = endpoint.encode(batch.arc_as_batch_reader()); + + // Restore permissions before asserting (so TempDir cleanup succeeds). + std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o755)).unwrap(); + + assert!(result.is_err(), "write to read-only dir should fail"); + } + + /// Exhausting max_retries should propagate the error. + #[test] + fn test_retry_exhaustion() { + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + let records = make_records(10); + let batch = build_insert_batch(&records); + + // Create endpoint with max_retries=1. + let key_schema = Some(key_relation()); + let mut endpoint = DeltaTableWriter::new( + EndpointId::default(), + "test_endpoint", + &DeltaTableWriterConfig { + uri: table_uri.clone(), + mode: DeltaTableWriteMode::Truncate, + max_retries: Some(1), + threads: Some(1), + object_store_config: Default::default(), + }, + &key_schema, + &value_relation(), + Weak::new(), + ) + .expect("failed to create endpoint"); + + // Make directory read-only to trigger write failure. + std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o555)).unwrap(); + + endpoint.consumer().batch_start(0); + let result = endpoint.encode(batch.arc_as_batch_reader()); + + // Restore permissions. + std::fs::set_permissions(table_dir.path(), std::fs::Permissions::from_mode(0o755)).unwrap(); + + assert!(result.is_err(), "should fail after exhausting retries"); + } + + /// Verify that threads=0 is rejected in config validation. + #[test] + fn test_threads_zero_rejected() { + let config = DeltaTableWriterConfig { + uri: "/tmp/test".to_string(), + mode: DeltaTableWriteMode::Truncate, + max_retries: Some(0), + threads: Some(0), + object_store_config: Default::default(), + }; + assert!(config.validate().is_err()); + } +} diff --git a/crates/adapters/src/integrated/delta_table/test.rs b/crates/adapters/src/integrated/delta_table/test.rs index 7f12d9baa88..ef9d5742740 100644 --- a/crates/adapters/src/integrated/delta_table/test.rs +++ b/crates/adapters/src/integrated/delta_table/test.rs @@ -546,6 +546,7 @@ fn delta_table_output_test( table_uri: &str, object_store_config: &HashMap, verify: bool, + threads: Option, ) { init_logging(); @@ -570,12 +571,15 @@ fn delta_table_output_test( input_file.write_all(b"\n").unwrap(); } - let mut storage_options = object_store_config.clone(); - storage_options.insert("uri".into(), table_uri.into()); - storage_options.insert("mode".into(), "truncate".into()); + let mut storage_options: serde_json::Value = serde_json::to_value(object_store_config).unwrap(); + storage_options["uri"] = json!(table_uri); + storage_options["mode"] = json!("truncate"); + if let Some(threads) = threads { + storage_options["threads"] = json!(threads); + } println!( - "delta_table_output_test: {} records, input file: {}, table uri: {table_uri}", + "delta_table_output_test: {} records, input file: {}, table uri: {table_uri}, threads: {threads:?}", data.len(), input_file.path().display(), ); @@ -616,9 +620,11 @@ fn delta_table_output_test( let controller = Controller::with_test_config( |workers| { - Ok(test_circuit::( + Ok(test_circuit_with_index::( workers, &DeltaTestStruct::schema(), + &[SqlIdentifier::from("bigint")], + |x: &DeltaTestStruct| DeltaTestKey { bigint: x.bigint }, &[None], )) }, @@ -1593,7 +1599,7 @@ proptest! { // Uncomment to inspect output parquet files produced by the test. forget(table_dir); - delta_table_output_test(data.clone(), &table_uri, &HashMap::new(), true); + delta_table_output_test(data.clone(), &table_uri, &HashMap::new(), true, None); // Read delta table unordered. @@ -1754,8 +1760,8 @@ proptest! { let table_uri = format!("s3://feldera-delta-table-test/{uuid}/"); // TODO: enable verification when it's supported for S3. - delta_table_output_test(data.clone(), &table_uri, &object_store_config, false); - //delta_table_output_test(data.clone(), &table_uri, &object_store_config, false); + delta_table_output_test(data.clone(), &table_uri, &object_store_config, false, None); + //delta_table_output_test(data.clone(), &table_uri, &object_store_config, false, None); let mut json_file = delta_table_snapshot_to_json::( &table_uri, diff --git a/crates/dbsp/src/circuit/dbsp_handle.rs b/crates/dbsp/src/circuit/dbsp_handle.rs index 697da655911..5f8fad451a7 100644 --- a/crates/dbsp/src/circuit/dbsp_handle.rs +++ b/crates/dbsp/src/circuit/dbsp_handle.rs @@ -22,6 +22,7 @@ pub use feldera_types::config::{StorageCacheConfig, StorageConfig, StorageOption use feldera_types::transaction::CommitProgressSummary; use itertools::Either; use std::collections::BTreeMap; +use std::num::NonZeroUsize; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -494,6 +495,12 @@ impl From for CircuitConfig { } } +impl From for CircuitConfig { + fn from(n_workers: NonZeroUsize) -> Self { + Self::with_workers(n_workers.get()) + } +} + impl From for CircuitConfig { fn from(layout: Layout) -> Self { Self { diff --git a/crates/feldera-types/src/transport/delta_table.rs b/crates/feldera-types/src/transport/delta_table.rs index ae77d38cc2e..c75cf58be35 100644 --- a/crates/feldera-types/src/transport/delta_table.rs +++ b/crates/feldera-types/src/transport/delta_table.rs @@ -47,6 +47,16 @@ pub struct DeltaTableWriterConfig { #[serde(default, skip_serializing_if = "Option::is_none")] pub max_retries: Option, + /// Number of parallel threads used by the connector. + /// + /// Increasing this value can improve Delta Lake write throughput + /// by enabling concurrent writes. + /// + /// Default: 1. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[schema(minimum = 1)] + pub threads: Option, + /// Storage options for configuring backend object store. /// /// For specific options available for different storage backends, see: @@ -57,6 +67,15 @@ pub struct DeltaTableWriterConfig { pub object_store_config: HashMap, } +impl DeltaTableWriterConfig { + pub fn validate(&self) -> Result<(), String> { + if self.threads.is_some_and(|t| t == 0) { + return Err("threads must be greater than 0".to_string()); + } + Ok(()) + } +} + /// Delta table read mode. /// /// Three options are available: diff --git a/docs.feldera.com/docs/connectors/sinks/delta.md b/docs.feldera.com/docs/connectors/sinks/delta.md index 62b7de9c2c8..d5c19bc6438 100644 --- a/docs.feldera.com/docs/connectors/sinks/delta.md +++ b/docs.feldera.com/docs/connectors/sinks/delta.md @@ -77,6 +77,7 @@ MERGE INTO {target_table} AS target | | - `truncate`: Existing table at the specified location will be truncated. The connector achieves this by outputting delete actions for all files in the latest snapshot of the table. | | | - `error_if_exists`: If a table exists at the specified location, the operation will fail. | | `max_retries`|

Maximum number of retries for failed Delta Lake operations like writing Parquet files and committing transactions.

The connector performs retries on several levels: individual S3 operations, Delta Lake transaction commits, and overall operation retries. This setting controls the overall operation retries. When a write to the table fails, because of an S3 timeout or any other reason that was not resolved by lower-level retries, the connector will retry the entire operation.

When not specified, the connector performs infinite retries. When set to 0, the connector doesn't retry failed operations.

| +| `threads` | Number of parallel threads used by the connector. Increasing this value can improve Delta Lake write throughput by enabling concurrent writes. Default: `1`. | [*]: Required fields diff --git a/openapi.json b/openapi.json index 4c716d9fd28..acddda42481 100644 --- a/openapi.json +++ b/openapi.json @@ -10,7 +10,7 @@ "license": { "name": "MIT OR Apache-2.0" }, - "version": "0.277.0" + "version": "0.278.0" }, "paths": { "/config/authentication": { @@ -8007,6 +8007,12 @@ "mode": { "$ref": "#/components/schemas/DeltaTableWriteMode" }, + "threads": { + "type": "integer", + "description": "Number of parallel threads used by the connector.\n\nIncreasing this value can improve Delta Lake write throughput\nby enabling concurrent writes.\n\nDefault: 1.", + "nullable": true, + "minimum": 1 + }, "uri": { "type": "string", "description": "Table URI." diff --git a/python/pyproject.toml b/python/pyproject.toml index 7e0b5c5667d..c8808d7978b 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "feldera" readme = "README.md" description = "The feldera python client" -version = "0.277.0" +version = "0.278.0" license = "MIT" requires-python = ">=3.10" authors = [ diff --git a/python/uv.lock b/python/uv.lock index 8a04482e6e1..e37650e6027 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -169,7 +169,7 @@ wheels = [ [[package]] name = "feldera" -version = "0.277.0" +version = "0.278.0" source = { editable = "." } dependencies = [ { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" },