@@ -13,7 +13,6 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
1313use datafusion:: datasource:: listing:: {
1414 ListingOptions , ListingTable , ListingTableConfig , ListingTableUrl ,
1515} ;
16- use datafusion:: logical_expr:: LogicalPlan ;
1716use datafusion:: physical_plan:: { PhysicalExpr , displayable} ;
1817use datafusion:: physical_planner:: { DefaultPhysicalPlanner , PhysicalPlanner } ;
1918use datafusion:: prelude:: SessionConfig ;
@@ -105,6 +104,90 @@ fn quote_sql_identifier<S: AsRef<str>>(ident: S) -> String {
105104 format ! ( "\" {}\" " , ident. as_ref( ) . replace( "\" " , "\" \" " ) )
106105}
107106
107+ /// Build the `DataFrame` that streams a CDC transaction to the circuit.
108+ ///
109+ /// Equivalent (in SQL) to:
110+ ///
111+ /// ```sql
112+ /// SELECT * FROM (
113+ /// SELECT * FROM cdc_adds [WHERE <filter>]
114+ /// EXCEPT ALL
115+ /// SELECT * FROM cdc_removes [WHERE <filter>]
116+ /// ) ORDER BY <order_by>
117+ /// ```
118+ ///
119+ /// When `removes_df` is `None`, the set difference is skipped:
120+ ///
121+ /// ```sql
122+ /// SELECT * FROM cdc_adds [WHERE <filter>] ORDER BY <order_by>
123+ /// ```
124+ ///
125+ /// `EXCEPT ALL` (multiset difference) cancels each Remove row against
126+ /// exactly one matching Add row; plain `EXCEPT` would collapse
127+ /// duplicates and under-count legitimately repeated inserts.
128+ ///
129+ /// The optional `filter` is pushed into both sides of the set
130+ /// difference so each Remove only has to cancel against rows that
131+ /// would have been ingested anyway. This also shrinks the inputs to
132+ /// `EXCEPT ALL`, which sorts both relations.
133+ ///
134+ /// The filter is parsed against each `DataFrame`'s own schema, so
135+ /// column references resolve to the correct table qualifier
136+ /// (`cdc_adds.col` vs `cdc_removes.col`). The two sides cannot share
137+ /// a single parsed `Expr` because column references would otherwise
138+ /// point at the wrong relation after `except`.
139+ ///
140+ /// Caveat: `EXCEPT ALL` relies on `arrow_row::RowConverter`, which in
141+ /// the currently pinned `arrow-row` does not support `Map` columns.
142+ /// Pure-append transactions on tables with `Map` columns are
143+ /// unaffected (the `EXCEPT ALL` branch isn't taken when `removes_df`
144+ /// is `None`); transactions that do produce Removes fail here with
145+ /// `NotImplemented`. See issue:
146+ /// - https://github.com/apache/datafusion/issues/15428
147+ /// - https://github.com/apache/arrow-rs/issues/7879
148+ ///
149+ /// Free function (not a method on the connector) so a unit test can
150+ /// inspect the resulting logical plan without standing up the full
151+ /// connector.
152+ pub ( super ) fn build_cdc_dataframe (
153+ adds_df : DataFrame ,
154+ removes_df : Option < DataFrame > ,
155+ filter : Option < & str > ,
156+ order_by : & str ,
157+ description : & str ,
158+ ) -> AnyResult < DataFrame > {
159+ let apply_filter = |df : DataFrame , side : & ' static str | -> AnyResult < DataFrame > {
160+ let Some ( filter) = filter else {
161+ return Ok ( df) ;
162+ } ;
163+ let expr = df
164+ . parse_sql_expr ( filter)
165+ . map_err ( |e| anyhow ! ( "invalid 'filter' expression '{filter}': {e}" ) ) ?;
166+ df. filter ( expr) . map_err ( |e| {
167+ anyhow ! ( "internal error processing {description}; {REPORT_ERROR}; error applying 'filter' to '{side}': {e}" )
168+ } )
169+ } ;
170+
171+ let adds_df = apply_filter ( adds_df, "cdc_adds" ) ?;
172+
173+ let result_df = match removes_df {
174+ None => adds_df,
175+ Some ( removes_df) => {
176+ let removes_df = apply_filter ( removes_df, "cdc_removes" ) ?;
177+ adds_df. except ( removes_df) . map_err ( |e| {
178+ anyhow ! ( "failed to build the CDC set difference for {description}: {e}. This typically means the Delta table contains a `Map` column, which the CDC deduplication step (`EXCEPT ALL`) does not yet support." )
179+ } ) ?
180+ }
181+ } ;
182+
183+ let order_by_expr = result_df
184+ . parse_sql_expr ( order_by)
185+ . map_err ( |e| anyhow ! ( "invalid 'cdc_order_by' expression '{order_by}': {e}" ) ) ?;
186+ result_df. sort_by ( vec ! [ order_by_expr] ) . map_err ( |e| {
187+ anyhow ! ( "internal error processing {description}; {REPORT_ERROR}; error applying 'cdc_order_by': {e}" )
188+ } )
189+ }
190+
108191/// Integrated input connector that reads from a delta table.
109192pub struct DeltaTableInputEndpoint {
110193 endpoint_name : String ,
@@ -1623,67 +1706,50 @@ impl DeltaTableInputEndpointInner {
16231706
16241707 /// Convert the delete filter SQL expression into a Datafusion PhysicalExpr.
16251708 ///
1626- /// To do this, we generate a query plan for the `select * from snapshot where <delete_filter>`
1627- /// query and extract the logical expression from the plan and then convert it to
1628- /// a physical expression.
1629- //
1630- // FIXME: I wonder if there's a simpler way that doesn't require registering the `snapshot`
1631- // table just so we can compile this query.
1709+ /// Parses `cdc_delete_filter` directly against the snapshot schema and
1710+ /// lowers the resulting logical expression to a physical expression.
16321711 async fn extract_delete_filter_expr (
16331712 & self ,
16341713 ) -> Result < Option < Arc < dyn PhysicalExpr > > , ControllerError > {
16351714 let Some ( delete_filter) = & self . config . cdc_delete_filter else {
16361715 return Ok ( None ) ;
16371716 } ;
16381717
1639- let query = format ! ( "SELECT * FROM snapshot WHERE {}" , delete_filter) ;
1640-
1641- let logical_plan = self
1718+ let schema = self
16421719 . datafusion
1643- . sql ( & query )
1720+ . table ( "snapshot" )
16441721 . await
1645- . map_err ( |e | {
1722+ . map_err ( |_e | {
16461723 ControllerError :: invalid_transport_configuration (
1647- & self . endpoint_name ,
1648- & format ! ( "invalid delete filter '{delete_filter}': the 'cdc_delete_filter' expression must be a valid SQL expression that can be used in a 'SELECT * FROM snapshot WHERE <cdc_delete_filter>' query, but the following error was encountered when compiling '{query}': {e}" ) ,
1649- )
1650- } ) ?
1651- . logical_plan ( )
1652- . clone ( ) ;
1653-
1654- let filter_expr = if let LogicalPlan :: Projection ( filter_plan) = & logical_plan {
1655- if let LogicalPlan :: Filter ( filter) = filter_plan. input . as_ref ( ) {
1656- filter. predicate . clone ( )
1657- } else {
1658- return Err ( ControllerError :: invalid_encoder_configuration (
16591724 & self . endpoint_name ,
16601725 & format ! (
1661- "internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: unexpected logical plan {logical_plan} "
1726+ "internal error compiling 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: table 'snapshot' not found "
16621727 ) ,
1663- ) ) ;
1664- }
1665- } else {
1666- return Err ( ControllerError :: invalid_encoder_configuration (
1667- & self . endpoint_name ,
1668- & format ! (
1669- "internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: unexpected logical plan {logical_plan}"
1670- ) ,
1671- ) ) ;
1672- } ;
1673-
1674- let schema = self
1675- . datafusion
1676- . table ( "snapshot" )
1677- . await
1678- . map_err ( |_e| {
1679- ControllerError :: invalid_transport_configuration ( & self . endpoint_name , & format ! ( "internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: table 'snapshot' not found" ) )
1728+ )
16801729 } ) ?
16811730 . schema ( )
16821731 . clone ( ) ;
16831732
1733+ let filter_expr = self
1734+ . datafusion
1735+ . parse_sql_expr ( delete_filter, & schema)
1736+ . map_err ( |e| {
1737+ ControllerError :: invalid_transport_configuration (
1738+ & self . endpoint_name ,
1739+ & format ! ( "invalid 'cdc_delete_filter' expression '{delete_filter}': {e}" ) ,
1740+ )
1741+ } ) ?;
1742+
16841743 let physical_expr = DefaultPhysicalPlanner :: default ( )
16851744 . create_physical_expr ( & filter_expr, & schema, & self . datafusion . state ( ) )
1686- . map_err ( |e| ControllerError :: invalid_transport_configuration ( & self . endpoint_name , & format ! ( "internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: error generating physical plan: {e}" ) ) ) ?;
1745+ . map_err ( |e| {
1746+ ControllerError :: invalid_transport_configuration (
1747+ & self . endpoint_name ,
1748+ & format ! (
1749+ "internal error compiling 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: {e}"
1750+ ) ,
1751+ )
1752+ } ) ?;
16871753
16881754 Ok ( Some ( physical_expr) )
16891755 }
@@ -2127,7 +2193,7 @@ impl DeltaTableInputEndpointInner {
21272193 self . process_cdc_transaction ( actions, table, cdc_delete_filter, input_stream, receiver)
21282194 . await ?;
21292195 } else {
2130- let column_names = self . used_column_list ( table) ;
2196+ let used_columns = self . used_columns ( table) ;
21312197
21322198 let start_transaction = self . allocate_follow_transaction_label ( ) ;
21332199
@@ -2144,7 +2210,7 @@ impl DeltaTableInputEndpointInner {
21442210 self . process_action (
21452211 action,
21462212 table,
2147- & column_names ,
2213+ & used_columns ,
21482214 input_stream,
21492215 receiver,
21502216 start_transaction. clone ( ) ,
@@ -2158,7 +2224,7 @@ impl DeltaTableInputEndpointInner {
21582224 self . process_action (
21592225 action,
21602226 table,
2161- & column_names ,
2227+ & used_columns ,
21622228 input_stream,
21632229 receiver,
21642230 start_transaction. clone ( ) ,
@@ -2264,32 +2330,12 @@ impl DeltaTableInputEndpointInner {
22642330 anyhow ! ( "internal error processing {description}; {REPORT_ERROR}; error registering 'cdc_adds' table: {e}" )
22652331 } ) ?;
22662332
2267- // `EXCEPT ALL` (multiset difference) cancels each Remove row against
2268- // exactly one matching Add row; plain `EXCEPT` would collapse
2269- // duplicates and under-count legitimately repeated inserts.
2270- //
2271- // The optional `filter` is pushed into both sides of the set
2272- // difference so each Remove only has to cancel against rows that
2273- // would have been ingested anyway. This also shrinks the inputs to
2274- // `EXCEPT ALL`, which sorts both relations.
2275- //
2276- // Caveat: `EXCEPT ALL` relies on `arrow_row::RowConverter`,
2277- // which in the currently pinned `arrow-row` does not support
2278- // `Map` columns.
2279- // Pure-append transactions on tables with `Map` columns are unaffected
2280- // (the `EXCEPT ALL` branch isn't taken when `removes` is empty);
2281- // transactions that do produce Removes fail here with `NotImplemented`.
2282- // see issue:
2283- // - https://github.com/apache/datafusion/issues/15428
2284- // - https://github.com/apache/arrow-rs/issues/7879
2285- let where_clause = if let Some ( filter) = & self . config . filter {
2286- format ! ( "WHERE {filter}" )
2287- } else {
2288- "" . to_string ( )
2289- } ;
2333+ let adds_df = self . datafusion . table ( "cdc_adds" ) . await . map_err ( |e| {
2334+ anyhow ! ( "internal error processing {description}; {REPORT_ERROR}; error reading 'cdc_adds' table: {e}" )
2335+ } ) ?;
22902336
2291- let from_clause = if removes. is_empty ( ) {
2292- format ! ( "cdc_adds {where_clause}" )
2337+ let removes_df = if removes. is_empty ( ) {
2338+ None
22932339 } else {
22942340 let removes_table = Arc :: new (
22952341 self . create_parquet_table ( table, removes, & description)
@@ -2298,22 +2344,21 @@ impl DeltaTableInputEndpointInner {
22982344 self . datafusion . register_table ( "cdc_removes" , removes_table) . map_err ( |e| {
22992345 anyhow ! ( "internal error processing {description}; {REPORT_ERROR}; error registering 'cdc_removes' table: {e}" )
23002346 } ) ?;
2301- format ! (
2302- "(SELECT * FROM cdc_adds {where_clause} \
2303- EXCEPT ALL \
2304- SELECT * FROM cdc_removes {where_clause})"
2305- )
2347+ Some ( self . datafusion . table ( "cdc_removes" ) . await . map_err ( |e| {
2348+ anyhow ! ( "internal error processing {description}; {REPORT_ERROR}; error reading 'cdc_removes' table: {e}" )
2349+ } ) ?)
23062350 } ;
23072351
2308- // Order the resulting relation by the `cdc_order_by` expression.
2309- // TODO: We don't use `used_column_list` here, as the resulting dataframe will have a different
2310- // schema than the original table, and the `cdc_delete_filter` physical expression won't be valid for it.
2352+ // The `cdc_order_by` expression is mandatory in CDC mode (enforced
2353+ // by `validate_cdc_config`), so the unwrap is safe.
23112354 let order_by = self . config . cdc_order_by . as_ref ( ) . unwrap ( ) ;
2312- let query = format ! ( "SELECT * FROM {from_clause} ORDER BY {order_by}" ) ;
2313-
2314- let df = self . datafusion . sql ( & query) . await . map_err ( |e| {
2315- anyhow ! ( "failed to compile the CDC query '{query}': {e}. This typically indicates one of: (1) `cdc_order_by` or `filter` is not a valid SQL expression for a query of the form `SELECT * FROM <table> WHERE <filter> ORDER BY <cdc_order_by>`; or (2) the Delta table contains a `Map` column, which the CDC deduplication step (`EXCEPT ALL`) does not yet support." )
2316- } ) ?;
2355+ let df = build_cdc_dataframe (
2356+ adds_df,
2357+ removes_df,
2358+ self . config . filter . as_deref ( ) ,
2359+ order_by,
2360+ & description,
2361+ ) ?;
23172362
23182363 let _record_count = self
23192364 . execute_df (
@@ -2362,7 +2407,7 @@ impl DeltaTableInputEndpointInner {
23622407 & self ,
23632408 action : & Action ,
23642409 table : & DeltaTable ,
2365- column_names : & str ,
2410+ used_columns : & [ String ] ,
23662411 input_stream : & mut dyn ArrowStream ,
23672412 receiver : & mut Receiver < PipelineState > ,
23682413 start_transaction : Option < Option < String > > ,
@@ -2373,7 +2418,7 @@ impl DeltaTableInputEndpointInner {
23732418 & add. path ,
23742419 true ,
23752420 table,
2376- column_names ,
2421+ used_columns ,
23772422 input_stream,
23782423 receiver,
23792424 start_transaction,
@@ -2387,7 +2432,7 @@ impl DeltaTableInputEndpointInner {
23872432 & remove. path ,
23882433 false ,
23892434 table,
2390- column_names ,
2435+ used_columns ,
23912436 input_stream,
23922437 receiver,
23932438 start_transaction,
@@ -2414,7 +2459,7 @@ impl DeltaTableInputEndpointInner {
24142459 path : & str ,
24152460 polarity : bool ,
24162461 table : & DeltaTable ,
2417- column_names : & str ,
2462+ used_columns : & [ String ] ,
24182463 input_stream : & mut dyn ArrowStream ,
24192464 receiver : & mut Receiver < PipelineState > ,
24202465 start_transaction : Option < Option < String > > ,
@@ -2434,16 +2479,28 @@ impl DeltaTableInputEndpointInner {
24342479 anyhow ! ( "internal error processing file {full_path}; {REPORT_ERROR}; error registering Parquet table: {e}" )
24352480 } ) ?;
24362481
2482+ let columns: Vec < & str > = used_columns. iter ( ) . map ( String :: as_str) . collect ( ) ;
2483+ let df = self
2484+ . datafusion
2485+ . table ( "tmp_table" )
2486+ . await
2487+ . map_err ( |e| {
2488+ anyhow ! ( "internal error processing file {full_path}; {REPORT_ERROR}; error reading 'tmp_table': {e}" )
2489+ } ) ?
2490+ . select_columns ( & columns)
2491+ . map_err ( |e| {
2492+ anyhow ! ( "internal error processing file {full_path}; {REPORT_ERROR}; error selecting columns: {e}" )
2493+ } ) ?;
2494+
24372495 let df = if let Some ( filter) = & self . config . filter {
2438- let query = format ! ( "SELECT {column_names} FROM tmp_table where {filter}" ) ;
2439- self . datafusion . sql ( & query) . await . map_err ( |e| {
2440- anyhow ! ( "invalid 'cdc_order_by' filter expression '{filter}': 'filter' must be a valid SQL expression that can be used in a 'SELECT * FROM <table> ORDER BY <cdc_order_by>' query, but the following error was encountered when compiling '{query}': {e}" )
2496+ let expr = df
2497+ . parse_sql_expr ( filter)
2498+ . map_err ( |e| anyhow ! ( "invalid 'filter' expression '{filter}': {e}" ) ) ?;
2499+ df. filter ( expr) . map_err ( |e| {
2500+ anyhow ! ( "internal error processing file {full_path}; {REPORT_ERROR}; error applying 'filter': {e}" )
24412501 } ) ?
24422502 } else {
2443- let query = format ! ( "SELECT {column_names} FROM tmp_table" ) ;
2444- self . datafusion . sql ( & query) . await . map_err ( |e| {
2445- anyhow ! ( "internal error processing file {full_path}'; {REPORT_ERROR}; error compiling query '{query}': {e}" )
2446- } ) ?
2503+ df
24472504 } ;
24482505
24492506 let _record_count = self
0 commit comments