From 994d139a11302f71076ff38040582813b349f430 Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Fri, 5 Jun 2026 23:10:32 -0700 Subject: [PATCH] [adapters] Delta: Only skip keys that violate uniqueness. The Delta connector throws an error on unique key violation. When this happens it sbould only skip the affected key, not the entire batch. Signed-off-by: Leonid Ryzhyk --- .../src/integrated/delta_table/output.rs | 18 +- .../src/integrated/delta_table/test.rs | 246 +++++++++++++++++- 2 files changed, 259 insertions(+), 5 deletions(-) diff --git a/crates/adapters/src/integrated/delta_table/output.rs b/crates/adapters/src/integrated/delta_table/output.rs index 258c18fd9b..78c5a3218b 100644 --- a/crates/adapters/src/integrated/delta_table/output.rs +++ b/crates/adapters/src/integrated/delta_table/output.rs @@ -696,8 +696,22 @@ async fn stream_encode_and_write( let mut cursor = cursor_builder.build(); while cursor.key_valid() { - let op = indexed_operation_type(&inner.value_schema.name, index_name, &mut cursor) - .map_err(WriteError::Deterministic)?; + let op = match indexed_operation_type(&inner.value_schema.name, index_name, &mut cursor) + { + Ok(op) => op, + Err(e) => { + if let Some(controller) = inner.controller.upgrade() { + controller.output_transport_error( + inner.endpoint_id, + &inner.endpoint_name, + false, + e, + Some("delta_uniqueness_violation"), + ); + } + continue; + } + }; if let Some(op) = op { cursor.rewind_vals(); diff --git a/crates/adapters/src/integrated/delta_table/test.rs b/crates/adapters/src/integrated/delta_table/test.rs index 9322939a85..d8bd6742bc 100644 --- a/crates/adapters/src/integrated/delta_table/test.rs +++ b/crates/adapters/src/integrated/delta_table/test.rs @@ -8,12 +8,14 @@ use crate::test::{ DeltaTestStruct, file_to_zset, list_files_recursive, test_circuit, test_circuit_with_index, wait, }; +use crate::{Catalog, CircuitCatalog}; use arrow::datatypes::Schema as ArrowSchema; use chrono::NaiveDate; +use dbsp::circuit::CircuitConfig; #[cfg(feature = "delta-s3-test")] use dbsp::typed_batch::DynBatchReader; use dbsp::utils::Tup2; -use dbsp::{DBData, OrdZSet}; +use dbsp::{DBData, DBSPHandle, OrdZSet, Runtime}; use delta_kernel::engine::arrow_conversion::TryFromArrow; use deltalake::datafusion::prelude::SessionContext; use deltalake::kernel::{DataType, StructField}; @@ -23,7 +25,7 @@ use deltalake::{DeltaTable, DeltaTableBuilder, ensure_table_uri}; use feldera_sqllib::Variant; use feldera_types::config::PipelineConfig; use feldera_types::format::json::JsonFlavor; -use feldera_types::program_schema::{Field, SqlIdentifier}; +use feldera_types::program_schema::{Field, Relation, SqlIdentifier}; use feldera_types::serde_with_context::serde_config::DecimalFormat; use feldera_types::serde_with_context::serialize::SerializeWithContextWrapper; use feldera_types::serde_with_context::{ @@ -39,7 +41,7 @@ use serde_json::{Value, json}; #[cfg(feature = "delta-s3-test")] use serial_test::{parallel, serial}; use std::cmp::min; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::ffi::OsStr; use std::fs::File; use std::io::Write; @@ -1056,6 +1058,244 @@ where .unwrap() } +/// Build a circuit whose output index `idx1` keys records by `bigint` while the +/// underlying view is a plain Z-set. +/// +/// Unlike [`test_circuit_with_index`], the input is an unkeyed Z-set, so two +/// records that share a `bigint` are *both* retained and the index `idx1` ends +/// up with multiple values for that key. This is exactly the non-unique-key +/// situation the Delta output connector must tolerate by skipping the offending +/// keys rather than failing the whole batch. +fn nonunique_index_circuit(config: CircuitConfig) -> (DBSPHandle, Box) { + let schema = DeltaTestStruct::schema(); + + let (circuit, catalog) = Runtime::init_circuit(config, move |circuit| { + let mut catalog = Catalog::new(); + + let (input, hinput) = circuit.add_input_zset::(); + + let input_schema = serde_json::to_string(&Relation::new( + "test_input1".into(), + schema.clone(), + false, + BTreeMap::new(), + )) + .unwrap(); + let output_schema = serde_json::to_string(&Relation::new( + "test_output1".into(), + schema.clone(), + false, + BTreeMap::new(), + )) + .unwrap(); + + catalog.register_materialized_input_zset(input.clone(), hinput, &input_schema); + catalog + .register_materialized_output_zset::<_, DeltaTestStruct>(input.clone(), &output_schema); + + let indexed = input.map_index(|r| (DeltaTestKey { bigint: r.bigint }, r.clone())); + catalog + .register_index::( + indexed, + &SqlIdentifier::from("idx1"), + &SqlIdentifier::from("test_output1"), + &["bigint".to_string()], + ) + .expect("failed to register index"); + + Ok(catalog) + }) + .unwrap(); + + (circuit, Box::new(catalog)) +} + +/// Read the records referenced by the *current* Delta table snapshot, ignoring +/// the `__feldera_op`/`__feldera_ts` metadata columns and any orphaned parquet +/// files left behind by previous `truncate` commits. +fn read_delta_snapshot(table_uri: &str) -> Vec { + use dbsp::circuit::tokio::TOKIO; + use deltalake::open_table; + + let url = url::Url::from_file_path(table_uri).unwrap(); + let Ok(table) = TOKIO.block_on(async move { open_table(url).await }) else { + // The connector creates the table lazily; treat "not yet created" as empty. + return Vec::new(); + }; + let base = Path::new(table_uri); + let mut records = Vec::new(); + for uri in table.get_file_uris().unwrap() { + let mut batch: Vec = load_parquet_file(&base.join(&*uri)); + records.append(&mut batch); + } + records +} + +/// A unique-key constraint violation in an indexed Delta output must skip only +/// the offending key, not the rest of the batch, and must report one error per +/// non-unique key. +/// +/// The connector is configured with `index: idx1`, so every key is expected to +/// map to a single value. We feed a single batch in which most keys are unique +/// but two keys (`bigint` 100 and 200) carry two distinct values each. The +/// connector must (a) write every well-formed record and (b) report a +/// uniqueness-violation error for each of the two offending keys while leaving +/// their records out of the table. +/// +/// Output buffering is sized so the whole input collapses into one output +/// batch; this guarantees that both values of each non-unique key are encoded +/// together (otherwise the conflict would not be observable within a single +/// batch). +#[test] +fn delta_table_output_non_unique_keys_skipped_test() { + init_logging(); + + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + // Ten well-formed keys, one value each. + let unique: Vec = (0..10).map(delta_test_record).collect(); + + // Two non-unique keys, each with two distinct values inserted in the same + // batch. `delta_test_record` is deterministic, so we mutate a field to make + // the second value differ from the first. + let make_conflict = |bigint: i64| { + let first = delta_test_record(bigint); + let second = DeltaTestStruct { + int: first.int.wrapping_add(1), + ..first.clone() + }; + assert_eq!(first.bigint, second.bigint); + assert_ne!(first, second); + [first, second] + }; + let conflict_a = make_conflict(100); + let conflict_b = make_conflict(200); + + let all_records: Vec = unique + .iter() + .cloned() + .chain(conflict_a.iter().cloned()) + .chain(conflict_b.iter().cloned()) + .collect(); + + // Write all records (as raw inserts) to the input file. + let mut input_file = NamedTempFile::new().unwrap(); + for record in &all_records { + let buffer: Vec = Vec::new(); + let mut serializer = serde_json::Serializer::new(buffer); + record + .serialize_with_context( + &mut serializer, + &SqlSerdeConfig::from(JsonFlavor::default()), + ) + .unwrap(); + input_file + .as_file_mut() + .write_all(&serializer.into_inner()) + .unwrap(); + input_file.write_all(b"\n").unwrap(); + } + + let config: PipelineConfig = serde_json::from_value(json!({ + "name": "test", + "workers": 4, + "inputs": { + "test_input1": { + "stream": "test_input1", + "transport": { + "name": "file_input", + "config": { "path": input_file.path() } + }, + "format": { + "name": "json", + "config": { "update_format": "raw" } + } + } + }, + "outputs": { + "test_output1": { + "stream": "test_output1", + "index": "idx1", + "enable_output_buffer": true, + "max_output_buffer_size_records": 1_000_000, + "max_output_buffer_time_millis": 2_000, + "transport": { + "name": "delta_table_output", + "config": { "uri": table_uri.clone(), "mode": "truncate" } + } + } + } + })) + .unwrap(); + + // Capture every error the controller reports instead of panicking, so we + // can assert on the uniqueness violations. + let errors: Arc)>>> = Arc::new(Mutex::new(Vec::new())); + let errors_clone = errors.clone(); + + let controller = Controller::with_test_config( + move |workers| Ok(nonunique_index_circuit(workers)), + &config, + Box::new(move |e, tag| { + errors_clone.lock().unwrap().push((e.to_string(), tag)); + }), + ) + .unwrap(); + + controller.start(); + + // The unique records become visible only after the (single) output batch + // commits, which happens after the offending keys have been skipped and + // their errors reported. + wait( + || read_delta_snapshot(&table_uri).len() == unique.len(), + 60_000, + ) + .expect("timeout waiting for the well-formed records to reach the Delta table"); + + controller.stop().unwrap(); + + // (a) Exactly the well-formed records are written; the non-unique keys are + // dropped entirely. + let mut written = read_delta_snapshot(&table_uri); + written.sort(); + let mut expected = unique.clone(); + expected.sort(); + assert_eq!( + written, expected, + "non-unique keys must be skipped while every unique key is preserved" + ); + assert!( + written.iter().all(|r| r.bigint != 100 && r.bigint != 200), + "no record for a non-unique key should leak into the output" + ); + + // (b) One uniqueness-violation error is reported per non-unique key, and + // each names its offending key. + let errors = errors.lock().unwrap(); + let violations: Vec<&(String, Option)> = errors + .iter() + .filter(|(_, tag)| { + tag.as_deref() + .is_some_and(|tag| tag.contains("delta_uniqueness_violation")) + }) + .collect(); + assert_eq!( + violations.len(), + 2, + "expected exactly one uniqueness-violation error per non-unique key, got: {errors:?}" + ); + assert!( + violations.iter().any(|(msg, _)| msg.contains("100")), + "a uniqueness-violation error should name key 100: {violations:?}" + ); + assert!( + violations.iter().any(|(msg, _)| msg.contains("200")), + "a uniqueness-violation error should name key 200: {violations:?}" + ); +} + /// Test function that works for both local FS and remote object stores. /// /// * `verify` - verify the final contents of the delta table is equivalent to