diff --git a/crates/adapters/src/integrated/delta_table/input.rs b/crates/adapters/src/integrated/delta_table/input.rs index 31eecfdd6e..cbf7e77c0d 100644 --- a/crates/adapters/src/integrated/delta_table/input.rs +++ b/crates/adapters/src/integrated/delta_table/input.rs @@ -13,7 +13,6 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; -use datafusion::logical_expr::LogicalPlan; use datafusion::physical_plan::{PhysicalExpr, displayable}; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use datafusion::prelude::SessionConfig; @@ -105,6 +104,90 @@ fn quote_sql_identifier>(ident: S) -> String { format!("\"{}\"", ident.as_ref().replace("\"", "\"\"")) } +/// Build the `DataFrame` that streams a CDC transaction to the circuit. +/// +/// Equivalent (in SQL) to: +/// +/// ```sql +/// SELECT * FROM ( +/// SELECT * FROM cdc_adds [WHERE ] +/// EXCEPT ALL +/// SELECT * FROM cdc_removes [WHERE ] +/// ) ORDER BY +/// ``` +/// +/// When `removes_df` is `None`, the set difference is skipped: +/// +/// ```sql +/// SELECT * FROM cdc_adds [WHERE ] ORDER BY +/// ``` +/// +/// `EXCEPT ALL` (multiset difference) cancels each Remove row against +/// exactly one matching Add row; plain `EXCEPT` would collapse +/// duplicates and under-count legitimately repeated inserts. +/// +/// The optional `filter` is pushed into both sides of the set +/// difference so each Remove only has to cancel against rows that +/// would have been ingested anyway. This also shrinks the inputs to +/// `EXCEPT ALL`, which sorts both relations. +/// +/// The filter is parsed against each `DataFrame`'s own schema, so +/// column references resolve to the correct table qualifier +/// (`cdc_adds.col` vs `cdc_removes.col`). The two sides cannot share +/// a single parsed `Expr` because column references would otherwise +/// point at the wrong relation after `except`. +/// +/// Caveat: `EXCEPT ALL` relies on `arrow_row::RowConverter`, which in +/// the currently pinned `arrow-row` does not support `Map` columns. +/// Pure-append transactions on tables with `Map` columns are +/// unaffected (the `EXCEPT ALL` branch isn't taken when `removes_df` +/// is `None`); transactions that do produce Removes fail here with +/// `NotImplemented`. See issue: +/// - https://github.com/apache/datafusion/issues/15428 +/// - https://github.com/apache/arrow-rs/issues/7879 +/// +/// Free function (not a method on the connector) so a unit test can +/// inspect the resulting logical plan without standing up the full +/// connector. +pub(super) fn build_cdc_dataframe( + adds_df: DataFrame, + removes_df: Option, + filter: Option<&str>, + order_by: &str, + description: &str, +) -> AnyResult { + let apply_filter = |df: DataFrame, side: &'static str| -> AnyResult { + let Some(filter) = filter else { + return Ok(df); + }; + let expr = df + .parse_sql_expr(filter) + .map_err(|e| anyhow!("invalid 'filter' expression '{filter}': {e}"))?; + df.filter(expr).map_err(|e| { + anyhow!("internal error processing {description}; {REPORT_ERROR}; error applying 'filter' to '{side}': {e}") + }) + }; + + let adds_df = apply_filter(adds_df, "cdc_adds")?; + + let result_df = match removes_df { + None => adds_df, + Some(removes_df) => { + let removes_df = apply_filter(removes_df, "cdc_removes")?; + adds_df.except(removes_df).map_err(|e| { + anyhow!("failed to build the CDC set difference for {description}: {e}. This typically means the Delta table contains a `Map` column, which the CDC deduplication step (`EXCEPT ALL`) does not yet support.") + })? + } + }; + + let order_by_expr = result_df + .parse_sql_expr(order_by) + .map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?; + result_df.sort_by(vec![order_by_expr]).map_err(|e| { + anyhow!("internal error processing {description}; {REPORT_ERROR}; error applying 'cdc_order_by': {e}") + }) +} + /// Integrated input connector that reads from a delta table. pub struct DeltaTableInputEndpoint { endpoint_name: String, @@ -1623,12 +1706,8 @@ impl DeltaTableInputEndpointInner { /// Convert the delete filter SQL expression into a Datafusion PhysicalExpr. /// - /// To do this, we generate a query plan for the `select * from snapshot where ` - /// query and extract the logical expression from the plan and then convert it to - /// a physical expression. - // - // FIXME: I wonder if there's a simpler way that doesn't require registering the `snapshot` - // table just so we can compile this query. + /// Parses `cdc_delete_filter` directly against the snapshot schema and + /// lowers the resulting logical expression to a physical expression. async fn extract_delete_filter_expr( &self, ) -> Result>, ControllerError> { @@ -1636,54 +1715,41 @@ impl DeltaTableInputEndpointInner { return Ok(None); }; - let query = format!("SELECT * FROM snapshot WHERE {}", delete_filter); - - let logical_plan = self + let schema = self .datafusion - .sql(&query) + .table("snapshot") .await - .map_err(|e| { + .map_err(|_e| { ControllerError::invalid_transport_configuration( - &self.endpoint_name, - &format!("invalid delete filter '{delete_filter}': the 'cdc_delete_filter' expression must be a valid SQL expression that can be used in a 'SELECT * FROM snapshot WHERE ' query, but the following error was encountered when compiling '{query}': {e}"), - ) - })? - .logical_plan() - .clone(); - - let filter_expr = if let LogicalPlan::Projection(filter_plan) = &logical_plan { - if let LogicalPlan::Filter(filter) = filter_plan.input.as_ref() { - filter.predicate.clone() - } else { - return Err(ControllerError::invalid_encoder_configuration( &self.endpoint_name, &format!( - "internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: unexpected logical plan {logical_plan}" + "internal error compiling 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: table 'snapshot' not found" ), - )); - } - } else { - return Err(ControllerError::invalid_encoder_configuration( - &self.endpoint_name, - &format!( - "internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: unexpected logical plan {logical_plan}" - ), - )); - }; - - let schema = self - .datafusion - .table("snapshot") - .await - .map_err(|_e| { - ControllerError::invalid_transport_configuration(&self.endpoint_name, &format!("internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: table 'snapshot' not found")) + ) })? .schema() .clone(); + let filter_expr = self + .datafusion + .parse_sql_expr(delete_filter, &schema) + .map_err(|e| { + ControllerError::invalid_transport_configuration( + &self.endpoint_name, + &format!("invalid 'cdc_delete_filter' expression '{delete_filter}': {e}"), + ) + })?; + let physical_expr = DefaultPhysicalPlanner::default() .create_physical_expr(&filter_expr, &schema, &self.datafusion.state()) - .map_err(|e| ControllerError::invalid_transport_configuration(&self.endpoint_name, &format!("internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: error generating physical plan: {e}")))?; + .map_err(|e| { + ControllerError::invalid_transport_configuration( + &self.endpoint_name, + &format!( + "internal error compiling 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: {e}" + ), + ) + })?; Ok(Some(physical_expr)) } @@ -2127,7 +2193,7 @@ impl DeltaTableInputEndpointInner { self.process_cdc_transaction(actions, table, cdc_delete_filter, input_stream, receiver) .await?; } else { - let column_names = self.used_column_list(table); + let used_columns = self.used_columns(table); let start_transaction = self.allocate_follow_transaction_label(); @@ -2144,7 +2210,7 @@ impl DeltaTableInputEndpointInner { self.process_action( action, table, - &column_names, + &used_columns, input_stream, receiver, start_transaction.clone(), @@ -2158,7 +2224,7 @@ impl DeltaTableInputEndpointInner { self.process_action( action, table, - &column_names, + &used_columns, input_stream, receiver, start_transaction.clone(), @@ -2264,32 +2330,12 @@ impl DeltaTableInputEndpointInner { anyhow!("internal error processing {description}; {REPORT_ERROR}; error registering 'cdc_adds' table: {e}") })?; - // `EXCEPT ALL` (multiset difference) cancels each Remove row against - // exactly one matching Add row; plain `EXCEPT` would collapse - // duplicates and under-count legitimately repeated inserts. - // - // The optional `filter` is pushed into both sides of the set - // difference so each Remove only has to cancel against rows that - // would have been ingested anyway. This also shrinks the inputs to - // `EXCEPT ALL`, which sorts both relations. - // - // Caveat: `EXCEPT ALL` relies on `arrow_row::RowConverter`, - // which in the currently pinned `arrow-row` does not support - // `Map` columns. - // Pure-append transactions on tables with `Map` columns are unaffected - // (the `EXCEPT ALL` branch isn't taken when `removes` is empty); - // transactions that do produce Removes fail here with `NotImplemented`. - // see issue: - // - https://github.com/apache/datafusion/issues/15428 - // - https://github.com/apache/arrow-rs/issues/7879 - let where_clause = if let Some(filter) = &self.config.filter { - format!("WHERE {filter}") - } else { - "".to_string() - }; + let adds_df = self.datafusion.table("cdc_adds").await.map_err(|e| { + anyhow!("internal error processing {description}; {REPORT_ERROR}; error reading 'cdc_adds' table: {e}") + })?; - let from_clause = if removes.is_empty() { - format!("cdc_adds {where_clause}") + let removes_df = if removes.is_empty() { + None } else { let removes_table = Arc::new( self.create_parquet_table(table, removes, &description) @@ -2298,22 +2344,21 @@ impl DeltaTableInputEndpointInner { self.datafusion.register_table("cdc_removes", removes_table).map_err(|e| { anyhow!("internal error processing {description}; {REPORT_ERROR}; error registering 'cdc_removes' table: {e}") })?; - format!( - "(SELECT * FROM cdc_adds {where_clause} \ - EXCEPT ALL \ - SELECT * FROM cdc_removes {where_clause})" - ) + Some(self.datafusion.table("cdc_removes").await.map_err(|e| { + anyhow!("internal error processing {description}; {REPORT_ERROR}; error reading 'cdc_removes' table: {e}") + })?) }; - // Order the resulting relation by the `cdc_order_by` expression. - // TODO: We don't use `used_column_list` here, as the resulting dataframe will have a different - // schema than the original table, and the `cdc_delete_filter` physical expression won't be valid for it. + // The `cdc_order_by` expression is mandatory in CDC mode (enforced + // by `validate_cdc_config`), so the unwrap is safe. let order_by = self.config.cdc_order_by.as_ref().unwrap(); - let query = format!("SELECT * FROM {from_clause} ORDER BY {order_by}"); - - let df = self.datafusion.sql(&query).await.map_err(|e| { - anyhow!("failed to compile the CDC query '{query}': {e}. This typically indicates one of: (1) `cdc_order_by` or `filter` is not a valid SQL expression for a query of the form `SELECT * FROM WHERE ORDER BY `; or (2) the Delta table contains a `Map` column, which the CDC deduplication step (`EXCEPT ALL`) does not yet support.") - })?; + let df = build_cdc_dataframe( + adds_df, + removes_df, + self.config.filter.as_deref(), + order_by, + &description, + )?; let _record_count = self .execute_df( @@ -2362,7 +2407,7 @@ impl DeltaTableInputEndpointInner { &self, action: &Action, table: &DeltaTable, - column_names: &str, + used_columns: &[String], input_stream: &mut dyn ArrowStream, receiver: &mut Receiver, start_transaction: Option>, @@ -2373,7 +2418,7 @@ impl DeltaTableInputEndpointInner { &add.path, true, table, - column_names, + used_columns, input_stream, receiver, start_transaction, @@ -2387,7 +2432,7 @@ impl DeltaTableInputEndpointInner { &remove.path, false, table, - column_names, + used_columns, input_stream, receiver, start_transaction, @@ -2414,7 +2459,7 @@ impl DeltaTableInputEndpointInner { path: &str, polarity: bool, table: &DeltaTable, - column_names: &str, + used_columns: &[String], input_stream: &mut dyn ArrowStream, receiver: &mut Receiver, start_transaction: Option>, @@ -2434,16 +2479,28 @@ impl DeltaTableInputEndpointInner { anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error registering Parquet table: {e}") })?; + let columns: Vec<&str> = used_columns.iter().map(String::as_str).collect(); + let df = self + .datafusion + .table("tmp_table") + .await + .map_err(|e| { + anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error reading 'tmp_table': {e}") + })? + .select_columns(&columns) + .map_err(|e| { + anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error selecting columns: {e}") + })?; + let df = if let Some(filter) = &self.config.filter { - let query = format!("SELECT {column_names} FROM tmp_table where {filter}"); - self.datafusion.sql(&query).await.map_err(|e| { - anyhow!("invalid 'cdc_order_by' filter expression '{filter}': 'filter' must be a valid SQL expression that can be used in a 'SELECT * FROM
ORDER BY ' query, but the following error was encountered when compiling '{query}': {e}") + let expr = df + .parse_sql_expr(filter) + .map_err(|e| anyhow!("invalid 'filter' expression '{filter}': {e}"))?; + df.filter(expr).map_err(|e| { + anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error applying 'filter': {e}") })? } else { - let query = format!("SELECT {column_names} FROM tmp_table"); - self.datafusion.sql(&query).await.map_err(|e| { - anyhow!("internal error processing file {full_path}'; {REPORT_ERROR}; error compiling query '{query}': {e}") - })? + df }; let _record_count = self diff --git a/crates/adapters/src/integrated/delta_table/test.rs b/crates/adapters/src/integrated/delta_table/test.rs index 175c561eca..33417abb8a 100644 --- a/crates/adapters/src/integrated/delta_table/test.rs +++ b/crates/adapters/src/integrated/delta_table/test.rs @@ -1794,6 +1794,231 @@ async fn delta_table_cdc_rewrite_test() { read_pipeline.stop().unwrap(); } +/// `cdc_delete_filter` is parsed against the snapshot schema at +/// connector startup, so an expression that references a column that +/// does not exist on the Delta table must fail before any row flows +/// through the pipeline, with an error that names the missing column +/// and the `cdc_delete_filter` setting. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn delta_table_cdc_invalid_delete_filter_test() { + use crate::test::TestStruct; + use deltalake::kernel::{DataType as KernelDataType, PrimitiveType, StructField}; + + init_logging(); + + let struct_fields = vec![ + StructField::new("id", KernelDataType::Primitive(PrimitiveType::Long), false), + StructField::new( + "b", + KernelDataType::Primitive(PrimitiveType::Boolean), + false, + ), + StructField::new("s", KernelDataType::Primitive(PrimitiveType::String), false), + StructField::new( + "__feldera_op", + KernelDataType::Primitive(PrimitiveType::String), + false, + ), + StructField::new( + "__feldera_ts", + KernelDataType::Primitive(PrimitiveType::TimestampNtz), + false, + ), + ]; + + let table_dir = TempDir::new().unwrap(); + let table_uri = table_dir.path().display().to_string(); + let _ = create_table(&table_uri, &HashMap::new(), &struct_fields).await; + + let storage_dir = TempDir::new().unwrap(); + let result = { + let table_uri = table_uri.clone(); + let storage_dir = storage_dir.path().to_path_buf(); + tokio::task::spawn_blocking(move || { + let pipeline_config: PipelineConfig = serde_json::from_value(json!({ + "name": "test", + "workers": 4, + "storage_config": { "path": storage_dir }, + "inputs": { + "test_input1": { + "stream": "test_input1", + "transport": { + "name": "delta_table_input", + "config": { + "uri": table_uri, + "mode": "cdc", + "cdc_delete_filter": "no_such_column = 'd'", + "cdc_order_by": "__feldera_ts", + } + } + } + } + })) + .unwrap(); + Controller::with_test_config( + move |workers| { + Ok(test_circuit::( + workers, + &TestStruct::schema(), + &[Some("output")], + )) + }, + &pipeline_config, + Box::new(move |_, _| {}), + ) + }) + .await + .unwrap() + }; + + let err = match result { + Ok(_) => panic!("controller should fail to start with an invalid cdc_delete_filter"), + Err(e) => e, + }; + let msg = err.to_string(); + assert!( + msg.contains("no_such_column"), + "expected error to name the missing column 'no_such_column', got: {msg}", + ); + assert!( + msg.contains("cdc_delete_filter"), + "expected error to mention 'cdc_delete_filter', got: {msg}", + ); +} + +/// Assert the logical-plan shape of the CDC `DataFrame`: filter on +/// both sides of an `EXCEPT ALL`, then a sort by `cdc_order_by`. Also +/// checks that an `order_by` referencing an unknown column fails at +/// build time. +#[tokio::test] +async fn build_cdc_dataframe_plan_structure() { + use crate::integrated::delta_table::input::build_cdc_dataframe; + use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, TimeUnit}; + use datafusion::datasource::MemTable; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int64, false), + ArrowField::new("b", ArrowDataType::Boolean, false), + ArrowField::new("__feldera_op", ArrowDataType::Utf8, false), + ArrowField::new( + "__feldera_ts", + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + false, + ), + ])); + + // Two independently-registered tables so the filter expression has + // to resolve against each side of the set difference. + let ctx = SessionContext::new(); + let empty = || Arc::new(MemTable::try_new(arrow_schema.clone(), vec![vec![]]).unwrap()); + ctx.register_table("cdc_adds", empty()).unwrap(); + ctx.register_table("cdc_removes", empty()).unwrap(); + + // Full path: filter + removes + order_by. + let df = build_cdc_dataframe( + ctx.table("cdc_adds").await.unwrap(), + Some(ctx.table("cdc_removes").await.unwrap()), + Some("id % 2 = 0"), + "__feldera_ts", + "unit-test", + ) + .unwrap(); + let plan = format!("{}", df.logical_plan().display_indent()); + + assert!( + plan.contains("Sort: "), + "expected a Sort node; plan was:\n{plan}" + ); + assert!( + plan.contains("__feldera_ts"), + "expected sort key '__feldera_ts'; plan was:\n{plan}" + ); + assert!( + plan.contains("LeftAnti"), + "expected a LeftAnti join (DataFusion encoding of EXCEPT); plan was:\n{plan}" + ); + // EXCEPT ALL uses a LeftAnti join without a Distinct wrapper; + // EXCEPT DISTINCT inserts a `Distinct:` node above the join. + assert!( + !plan.contains("Distinct:"), + "expected EXCEPT ALL (no Distinct node); plan was:\n{plan}" + ); + let filter_lines = plan + .lines() + .filter(|line| line.trim_start().starts_with("Filter:")) + .count(); + assert_eq!( + filter_lines, 2, + "expected Filter applied to both adds and removes; plan was:\n{plan}" + ); + + // No removes: no set-difference in the plan. + let df = build_cdc_dataframe( + ctx.table("cdc_adds").await.unwrap(), + None, + Some("id % 2 = 0"), + "__feldera_ts", + "unit-test", + ) + .unwrap(); + let plan = format!("{}", df.logical_plan().display_indent()); + assert!( + plan.contains("Sort: "), + "expected Sort even without removes; plan was:\n{plan}" + ); + assert!( + !plan.contains("LeftAnti"), + "no removes -> no LeftAnti join; plan was:\n{plan}" + ); + let filter_lines = plan + .lines() + .filter(|line| line.trim_start().starts_with("Filter:")) + .count(); + assert_eq!( + filter_lines, 1, + "expected one Filter (adds only); plan was:\n{plan}" + ); + + // No filter: set difference and sort, no Filter node. + let df = build_cdc_dataframe( + ctx.table("cdc_adds").await.unwrap(), + Some(ctx.table("cdc_removes").await.unwrap()), + None, + "__feldera_ts", + "unit-test", + ) + .unwrap(); + let plan = format!("{}", df.logical_plan().display_indent()); + assert!(plan.contains("Sort: "), "expected Sort; plan was:\n{plan}"); + assert!( + plan.contains("LeftAnti"), + "expected LeftAnti join; plan was:\n{plan}" + ); + assert!( + !plan.contains("Filter:"), + "no filter configured -> no Filter node; plan was:\n{plan}" + ); + + // Invalid `order_by`: error surfaces at build time. + let err = build_cdc_dataframe( + ctx.table("cdc_adds").await.unwrap(), + None, + None, + "no_such_column", + "unit-test", + ) + .unwrap_err() + .to_string(); + assert!( + err.contains("no_such_column"), + "expected error to name the missing column; got: {err}" + ); + assert!( + err.contains("cdc_order_by"), + "expected error to mention 'cdc_order_by'; got: {err}" + ); +} + /// Pin the exact set of arrow types that the `EXCEPT ALL` cancellation /// step in `do_process_cdc_transaction` cannot handle. ///