diff --git a/crates/adapters/src/integrated/delta_table/output.rs b/crates/adapters/src/integrated/delta_table/output.rs index 258c18fd9b..78c5a3218b 100644 --- a/crates/adapters/src/integrated/delta_table/output.rs +++ b/crates/adapters/src/integrated/delta_table/output.rs @@ -696,8 +696,22 @@ async fn stream_encode_and_write( let mut cursor = cursor_builder.build(); while cursor.key_valid() { - let op = indexed_operation_type(&inner.value_schema.name, index_name, &mut cursor) - .map_err(WriteError::Deterministic)?; + let op = match indexed_operation_type(&inner.value_schema.name, index_name, &mut cursor) + { + Ok(op) => op, + Err(e) => { + if let Some(controller) = inner.controller.upgrade() { + controller.output_transport_error( + inner.endpoint_id, + &inner.endpoint_name, + false, + e, + Some("delta_uniqueness_violation"), + ); + } + continue; + } + }; if let Some(op) = op { cursor.rewind_vals(); diff --git a/crates/adapters/src/integrated/delta_table/test.rs b/crates/adapters/src/integrated/delta_table/test.rs index 9322939a85..d8bd6742bc 100644 --- a/crates/adapters/src/integrated/delta_table/test.rs +++ b/crates/adapters/src/integrated/delta_table/test.rs @@ -8,12 +8,14 @@ use crate::test::{ DeltaTestStruct, file_to_zset, list_files_recursive, test_circuit, test_circuit_with_index, wait, }; +use crate::{Catalog, CircuitCatalog}; use arrow::datatypes::Schema as ArrowSchema; use chrono::NaiveDate; +use dbsp::circuit::CircuitConfig; #[cfg(feature = "delta-s3-test")] use dbsp::typed_batch::DynBatchReader; use dbsp::utils::Tup2; -use dbsp::{DBData, OrdZSet}; +use dbsp::{DBData, DBSPHandle, OrdZSet, Runtime}; use delta_kernel::engine::arrow_conversion::TryFromArrow; use deltalake::datafusion::prelude::SessionContext; use deltalake::kernel::{DataType, StructField}; @@ -23,7 +25,7 @@ use deltalake::{DeltaTable, DeltaTableBuilder, ensure_table_uri}; use feldera_sqllib::Variant; use feldera_types::config::PipelineConfig; use feldera_types::format::json::JsonFlavor; -use feldera_types::program_schema::{Field, SqlIdentifier}; +use feldera_types::program_schema::{Field, Relation, SqlIdentifier}; use feldera_types::serde_with_context::serde_config::DecimalFormat; use feldera_types::serde_with_context::serialize::SerializeWithContextWrapper; use feldera_types::serde_with_context::{ @@ -39,7 +41,7 @@ use serde_json::{Value, json}; #[cfg(feature = "delta-s3-test")] use serial_test::{parallel, serial}; use std::cmp::min; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::ffi::OsStr; use std::fs::File; use std::io::Write; @@ -1056,6 +1058,244 @@ where .unwrap() } +/// Build a circuit whose output index `idx1` keys records by `bigint` while the +/// underlying view is a plain Z-set. +/// +/// Unlike [`test_circuit_with_index`], the input is an unkeyed Z-set, so two +/// records that share a `bigint` are *both* retained and the index `idx1` ends +/// up with multiple values for that key. This is exactly the non-unique-key +/// situation the Delta output connector must tolerate by skipping the offending +/// keys rather than failing the whole batch. +fn nonunique_index_circuit(config: CircuitConfig) -> (DBSPHandle, Box) { + let schema = DeltaTestStruct::schema(); + + let (circuit, catalog) = Runtime::init_circuit(config, move |circuit| { + let mut catalog = Catalog::new(); + + let (input, hinput) = circuit.add_input_zset::(); + + let input_schema = serde_json::to_string(&Relation::new( + "test_input1".into(), + schema.clone(), + false, + BTreeMap::new(), + )) + .unwrap(); + let output_schema = serde_json::to_string(&Relation::new( + "test_output1".into(), + schema.clone(), + false, + BTreeMap::new(), + )) + .unwrap(); + + catalog.register_materialized_input_zset(input.clone(), hinput, &input_schema); + catalog + .register_materialized_output_zset::<_, DeltaTestStruct>(input.clone(), &output_schema); + + let indexed = input.map_index(|r| (DeltaTestKey { bigint: r.bigint }, r.clone())); + catalog + .register_index::( + indexed, + &SqlIdentifier::from("idx1"), + &SqlIdentifier::from("test_output1"), + &["bigint".to_string()], + ) + .expect("failed to register index"); + + Ok(catalog) + }) + .unwrap(); + + (circuit, Box::new(catalog)) +} + +/// Read the records referenced by the *current* Delta table snapshot, ignoring +/// the `__feldera_op`/`__feldera_ts` metadata columns and any orphaned parquet +/// files left behind by previous `truncate` commits. +fn read_delta_snapshot(table_uri: &str) -> Vec { + use dbsp::circuit::tokio::TOKIO; + use deltalake::open_table; + + let url = url::Url::from_file_path(table_uri).unwrap(); + let Ok(table) = TOKIO.block_on(async move { open_table(url).await }) else { + // The connector creates the table lazily; treat "not yet created" as empty. + return Vec::new(); + }; + let base = Path::new(table_uri); + let mut records = Vec::new(); + for uri in table.get_file_uris().unwrap() { + let mut batch: Vec = load_parquet_file(&base.join(&*uri)); + records.append(&mut batch); + } + records +} + +/// A unique-key constraint violation in an indexed Delta output must skip only +/// the offending key, not the rest of the batch, and must report one error per +/// non-unique key. +/// +/// The connector is configured with `index: idx1`, so every key is expected to +/// map to a single value. We feed a single batch in which most keys are unique +/// but two keys (`bigint` 100 and 200) carry two distinct values each. The +/// connector must (a) write every well-formed record and (b) report a +/// uniqueness-violation error for each of the two offending keys while leaving +/// their records out of the table. +/// +/// Output buffering is sized so the whole input collapses into one output +/// batch; this guarantees that both values of each non-unique key are encoded +/// together (otherwise the conflict would not be observable within a single +/// batch). +#[test] +fn delta_table_output_non_unique_keys_skipped_test() { + init_logging(); + + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + + // Ten well-formed keys, one value each. + let unique: Vec = (0..10).map(delta_test_record).collect(); + + // Two non-unique keys, each with two distinct values inserted in the same + // batch. `delta_test_record` is deterministic, so we mutate a field to make + // the second value differ from the first. + let make_conflict = |bigint: i64| { + let first = delta_test_record(bigint); + let second = DeltaTestStruct { + int: first.int.wrapping_add(1), + ..first.clone() + }; + assert_eq!(first.bigint, second.bigint); + assert_ne!(first, second); + [first, second] + }; + let conflict_a = make_conflict(100); + let conflict_b = make_conflict(200); + + let all_records: Vec = unique + .iter() + .cloned() + .chain(conflict_a.iter().cloned()) + .chain(conflict_b.iter().cloned()) + .collect(); + + // Write all records (as raw inserts) to the input file. + let mut input_file = NamedTempFile::new().unwrap(); + for record in &all_records { + let buffer: Vec = Vec::new(); + let mut serializer = serde_json::Serializer::new(buffer); + record + .serialize_with_context( + &mut serializer, + &SqlSerdeConfig::from(JsonFlavor::default()), + ) + .unwrap(); + input_file + .as_file_mut() + .write_all(&serializer.into_inner()) + .unwrap(); + input_file.write_all(b"\n").unwrap(); + } + + let config: PipelineConfig = serde_json::from_value(json!({ + "name": "test", + "workers": 4, + "inputs": { + "test_input1": { + "stream": "test_input1", + "transport": { + "name": "file_input", + "config": { "path": input_file.path() } + }, + "format": { + "name": "json", + "config": { "update_format": "raw" } + } + } + }, + "outputs": { + "test_output1": { + "stream": "test_output1", + "index": "idx1", + "enable_output_buffer": true, + "max_output_buffer_size_records": 1_000_000, + "max_output_buffer_time_millis": 2_000, + "transport": { + "name": "delta_table_output", + "config": { "uri": table_uri.clone(), "mode": "truncate" } + } + } + } + })) + .unwrap(); + + // Capture every error the controller reports instead of panicking, so we + // can assert on the uniqueness violations. + let errors: Arc)>>> = Arc::new(Mutex::new(Vec::new())); + let errors_clone = errors.clone(); + + let controller = Controller::with_test_config( + move |workers| Ok(nonunique_index_circuit(workers)), + &config, + Box::new(move |e, tag| { + errors_clone.lock().unwrap().push((e.to_string(), tag)); + }), + ) + .unwrap(); + + controller.start(); + + // The unique records become visible only after the (single) output batch + // commits, which happens after the offending keys have been skipped and + // their errors reported. + wait( + || read_delta_snapshot(&table_uri).len() == unique.len(), + 60_000, + ) + .expect("timeout waiting for the well-formed records to reach the Delta table"); + + controller.stop().unwrap(); + + // (a) Exactly the well-formed records are written; the non-unique keys are + // dropped entirely. + let mut written = read_delta_snapshot(&table_uri); + written.sort(); + let mut expected = unique.clone(); + expected.sort(); + assert_eq!( + written, expected, + "non-unique keys must be skipped while every unique key is preserved" + ); + assert!( + written.iter().all(|r| r.bigint != 100 && r.bigint != 200), + "no record for a non-unique key should leak into the output" + ); + + // (b) One uniqueness-violation error is reported per non-unique key, and + // each names its offending key. + let errors = errors.lock().unwrap(); + let violations: Vec<&(String, Option)> = errors + .iter() + .filter(|(_, tag)| { + tag.as_deref() + .is_some_and(|tag| tag.contains("delta_uniqueness_violation")) + }) + .collect(); + assert_eq!( + violations.len(), + 2, + "expected exactly one uniqueness-violation error per non-unique key, got: {errors:?}" + ); + assert!( + violations.iter().any(|(msg, _)| msg.contains("100")), + "a uniqueness-violation error should name key 100: {violations:?}" + ); + assert!( + violations.iter().any(|(msg, _)| msg.contains("200")), + "a uniqueness-violation error should name key 200: {violations:?}" + ); +} + /// Test function that works for both local FS and remote object stores. /// /// * `verify` - verify the final contents of the delta table is equivalent to