Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions crates/adapters/src/integrated/delta_table/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,22 @@ async fn stream_encode_and_write(
let mut cursor = cursor_builder.build();

while cursor.key_valid() {
let op = indexed_operation_type(&inner.value_schema.name, index_name, &mut cursor)
.map_err(WriteError::Deterministic)?;
let op = match indexed_operation_type(&inner.value_schema.name, index_name, &mut cursor)
{
Ok(op) => op,
Err(e) => {
if let Some(controller) = inner.controller.upgrade() {
controller.output_transport_error(
inner.endpoint_id,
&inner.endpoint_name,
false,
e,
Some("delta_uniqueness_violation"),
);
}
continue;
}
};

if let Some(op) = op {
cursor.rewind_vals();
Expand Down
246 changes: 243 additions & 3 deletions crates/adapters/src/integrated/delta_table/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use crate::test::{
DeltaTestStruct, file_to_zset, list_files_recursive, test_circuit, test_circuit_with_index,
wait,
};
use crate::{Catalog, CircuitCatalog};
use arrow::datatypes::Schema as ArrowSchema;
use chrono::NaiveDate;
use dbsp::circuit::CircuitConfig;
#[cfg(feature = "delta-s3-test")]
use dbsp::typed_batch::DynBatchReader;
use dbsp::utils::Tup2;
use dbsp::{DBData, OrdZSet};
use dbsp::{DBData, DBSPHandle, OrdZSet, Runtime};
use delta_kernel::engine::arrow_conversion::TryFromArrow;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::kernel::{DataType, StructField};
Expand All @@ -23,7 +25,7 @@ use deltalake::{DeltaTable, DeltaTableBuilder, ensure_table_uri};
use feldera_sqllib::Variant;
use feldera_types::config::PipelineConfig;
use feldera_types::format::json::JsonFlavor;
use feldera_types::program_schema::{Field, SqlIdentifier};
use feldera_types::program_schema::{Field, Relation, SqlIdentifier};
use feldera_types::serde_with_context::serde_config::DecimalFormat;
use feldera_types::serde_with_context::serialize::SerializeWithContextWrapper;
use feldera_types::serde_with_context::{
Expand All @@ -39,7 +41,7 @@ use serde_json::{Value, json};
#[cfg(feature = "delta-s3-test")]
use serial_test::{parallel, serial};
use std::cmp::min;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::ffi::OsStr;
use std::fs::File;
use std::io::Write;
Expand Down Expand Up @@ -1056,6 +1058,244 @@ where
.unwrap()
}

/// Build a circuit whose output index `idx1` keys records by `bigint` while the
/// underlying view is a plain Z-set.
///
/// Unlike [`test_circuit_with_index`], the input is an unkeyed Z-set, so two
/// records that share a `bigint` are *both* retained and the index `idx1` ends
/// up with multiple values for that key. This is exactly the non-unique-key
/// situation the Delta output connector must tolerate by skipping the offending
/// keys rather than failing the whole batch.
fn nonunique_index_circuit(config: CircuitConfig) -> (DBSPHandle, Box<dyn CircuitCatalog>) {
let schema = DeltaTestStruct::schema();

let (circuit, catalog) = Runtime::init_circuit(config, move |circuit| {
let mut catalog = Catalog::new();

let (input, hinput) = circuit.add_input_zset::<DeltaTestStruct>();

let input_schema = serde_json::to_string(&Relation::new(
"test_input1".into(),
schema.clone(),
false,
BTreeMap::new(),
))
.unwrap();
let output_schema = serde_json::to_string(&Relation::new(
"test_output1".into(),
schema.clone(),
false,
BTreeMap::new(),
))
.unwrap();

catalog.register_materialized_input_zset(input.clone(), hinput, &input_schema);
catalog
.register_materialized_output_zset::<_, DeltaTestStruct>(input.clone(), &output_schema);

let indexed = input.map_index(|r| (DeltaTestKey { bigint: r.bigint }, r.clone()));
catalog
.register_index::<DeltaTestKey, DeltaTestKey, DeltaTestStruct, DeltaTestStruct>(
indexed,
&SqlIdentifier::from("idx1"),
&SqlIdentifier::from("test_output1"),
&["bigint".to_string()],
)
.expect("failed to register index");

Ok(catalog)
})
.unwrap();

(circuit, Box::new(catalog))
}

/// Read the records referenced by the *current* Delta table snapshot, ignoring
/// the `__feldera_op`/`__feldera_ts` metadata columns and any orphaned parquet
/// files left behind by previous `truncate` commits.
fn read_delta_snapshot(table_uri: &str) -> Vec<DeltaTestStruct> {
use dbsp::circuit::tokio::TOKIO;
use deltalake::open_table;

let url = url::Url::from_file_path(table_uri).unwrap();
let Ok(table) = TOKIO.block_on(async move { open_table(url).await }) else {
// The connector creates the table lazily; treat "not yet created" as empty.
return Vec::new();
};
let base = Path::new(table_uri);
let mut records = Vec::new();
for uri in table.get_file_uris().unwrap() {
let mut batch: Vec<DeltaTestStruct> = load_parquet_file(&base.join(&*uri));
records.append(&mut batch);
}
records
}

/// A unique-key constraint violation in an indexed Delta output must skip only
/// the offending key, not the rest of the batch, and must report one error per
/// non-unique key.
///
/// The connector is configured with `index: idx1`, so every key is expected to
/// map to a single value. We feed a single batch in which most keys are unique
/// but two keys (`bigint` 100 and 200) carry two distinct values each. The
/// connector must (a) write every well-formed record and (b) report a
/// uniqueness-violation error for each of the two offending keys while leaving
/// their records out of the table.
///
/// Output buffering is sized so the whole input collapses into one output
/// batch; this guarantees that both values of each non-unique key are encoded
/// together (otherwise the conflict would not be observable within a single
/// batch).
#[test]
fn delta_table_output_non_unique_keys_skipped_test() {
init_logging();

let table_dir = TempDir::new().unwrap();
let table_uri = table_dir.path().display().to_string();

// Ten well-formed keys, one value each.
let unique: Vec<DeltaTestStruct> = (0..10).map(delta_test_record).collect();

// Two non-unique keys, each with two distinct values inserted in the same
// batch. `delta_test_record` is deterministic, so we mutate a field to make
// the second value differ from the first.
let make_conflict = |bigint: i64| {
let first = delta_test_record(bigint);
let second = DeltaTestStruct {
int: first.int.wrapping_add(1),
..first.clone()
};
assert_eq!(first.bigint, second.bigint);
assert_ne!(first, second);
[first, second]
};
let conflict_a = make_conflict(100);
let conflict_b = make_conflict(200);

let all_records: Vec<DeltaTestStruct> = unique
.iter()
.cloned()
.chain(conflict_a.iter().cloned())
.chain(conflict_b.iter().cloned())
.collect();

// Write all records (as raw inserts) to the input file.
let mut input_file = NamedTempFile::new().unwrap();
for record in &all_records {
let buffer: Vec<u8> = Vec::new();
let mut serializer = serde_json::Serializer::new(buffer);
record
.serialize_with_context(
&mut serializer,
&SqlSerdeConfig::from(JsonFlavor::default()),
)
.unwrap();
input_file
.as_file_mut()
.write_all(&serializer.into_inner())
.unwrap();
input_file.write_all(b"\n").unwrap();
}

let config: PipelineConfig = serde_json::from_value(json!({
"name": "test",
"workers": 4,
"inputs": {
"test_input1": {
"stream": "test_input1",
"transport": {
"name": "file_input",
"config": { "path": input_file.path() }
},
"format": {
"name": "json",
"config": { "update_format": "raw" }
}
}
},
"outputs": {
"test_output1": {
"stream": "test_output1",
"index": "idx1",
"enable_output_buffer": true,
"max_output_buffer_size_records": 1_000_000,
"max_output_buffer_time_millis": 2_000,
"transport": {
"name": "delta_table_output",
"config": { "uri": table_uri.clone(), "mode": "truncate" }
}
}
}
}))
.unwrap();

// Capture every error the controller reports instead of panicking, so we
// can assert on the uniqueness violations.
let errors: Arc<Mutex<Vec<(String, Option<String>)>>> = Arc::new(Mutex::new(Vec::new()));
let errors_clone = errors.clone();

let controller = Controller::with_test_config(
move |workers| Ok(nonunique_index_circuit(workers)),
&config,
Box::new(move |e, tag| {
errors_clone.lock().unwrap().push((e.to_string(), tag));
}),
)
.unwrap();

controller.start();

// The unique records become visible only after the (single) output batch
// commits, which happens after the offending keys have been skipped and
// their errors reported.
wait(
|| read_delta_snapshot(&table_uri).len() == unique.len(),
60_000,
)
.expect("timeout waiting for the well-formed records to reach the Delta table");

controller.stop().unwrap();

// (a) Exactly the well-formed records are written; the non-unique keys are
// dropped entirely.
let mut written = read_delta_snapshot(&table_uri);
written.sort();
let mut expected = unique.clone();
expected.sort();
assert_eq!(
written, expected,
"non-unique keys must be skipped while every unique key is preserved"
);
assert!(
written.iter().all(|r| r.bigint != 100 && r.bigint != 200),
"no record for a non-unique key should leak into the output"
);

// (b) One uniqueness-violation error is reported per non-unique key, and
// each names its offending key.
let errors = errors.lock().unwrap();
let violations: Vec<&(String, Option<String>)> = errors
.iter()
.filter(|(_, tag)| {
tag.as_deref()
.is_some_and(|tag| tag.contains("delta_uniqueness_violation"))
})
.collect();
assert_eq!(
violations.len(),
2,
"expected exactly one uniqueness-violation error per non-unique key, got: {errors:?}"
);
assert!(
violations.iter().any(|(msg, _)| msg.contains("100")),
"a uniqueness-violation error should name key 100: {violations:?}"
);
assert!(
violations.iter().any(|(msg, _)| msg.contains("200")),
"a uniqueness-violation error should name key 200: {violations:?}"
);
}

/// Test function that works for both local FS and remote object stores.
///
/// * `verify` - verify the final contents of the delta table is equivalent to
Expand Down
Loading