Skip to content

Commit da2149b

Browse files
committed
adapters: use dataframes instead of sql strings
This was prompted by an errornous SQL injection report. While those arent really security vulnerabilities here because there is no point where this can get injected by an untrusted source, it still seemed more ergonomic to transform this into data-frame API calls when looking at it. I think it also enhances error reporting in some cases. Adds a few more tests. Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 8682f0a commit da2149b

2 files changed

Lines changed: 377 additions & 95 deletions

File tree

crates/adapters/src/integrated/delta_table/input.rs

Lines changed: 152 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
1313
use datafusion::datasource::listing::{
1414
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
1515
};
16-
use datafusion::logical_expr::LogicalPlan;
1716
use datafusion::physical_plan::{PhysicalExpr, displayable};
1817
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
1918
use datafusion::prelude::SessionConfig;
@@ -105,6 +104,90 @@ fn quote_sql_identifier<S: AsRef<str>>(ident: S) -> String {
105104
format!("\"{}\"", ident.as_ref().replace("\"", "\"\""))
106105
}
107106

107+
/// Build the `DataFrame` that streams a CDC transaction to the circuit.
108+
///
109+
/// Equivalent (in SQL) to:
110+
///
111+
/// ```sql
112+
/// SELECT * FROM (
113+
/// SELECT * FROM cdc_adds [WHERE <filter>]
114+
/// EXCEPT ALL
115+
/// SELECT * FROM cdc_removes [WHERE <filter>]
116+
/// ) ORDER BY <order_by>
117+
/// ```
118+
///
119+
/// When `removes_df` is `None`, the set difference is skipped:
120+
///
121+
/// ```sql
122+
/// SELECT * FROM cdc_adds [WHERE <filter>] ORDER BY <order_by>
123+
/// ```
124+
///
125+
/// `EXCEPT ALL` (multiset difference) cancels each Remove row against
126+
/// exactly one matching Add row; plain `EXCEPT` would collapse
127+
/// duplicates and under-count legitimately repeated inserts.
128+
///
129+
/// The optional `filter` is pushed into both sides of the set
130+
/// difference so each Remove only has to cancel against rows that
131+
/// would have been ingested anyway. This also shrinks the inputs to
132+
/// `EXCEPT ALL`, which sorts both relations.
133+
///
134+
/// The filter is parsed against each `DataFrame`'s own schema, so
135+
/// column references resolve to the correct table qualifier
136+
/// (`cdc_adds.col` vs `cdc_removes.col`). The two sides cannot share
137+
/// a single parsed `Expr` because column references would otherwise
138+
/// point at the wrong relation after `except`.
139+
///
140+
/// Caveat: `EXCEPT ALL` relies on `arrow_row::RowConverter`, which in
141+
/// the currently pinned `arrow-row` does not support `Map` columns.
142+
/// Pure-append transactions on tables with `Map` columns are
143+
/// unaffected (the `EXCEPT ALL` branch isn't taken when `removes_df`
144+
/// is `None`); transactions that do produce Removes fail here with
145+
/// `NotImplemented`. See issue:
146+
/// - https://github.com/apache/datafusion/issues/15428
147+
/// - https://github.com/apache/arrow-rs/issues/7879
148+
///
149+
/// Free function (not a method on the connector) so a unit test can
150+
/// inspect the resulting logical plan without standing up the full
151+
/// connector.
152+
pub(super) fn build_cdc_dataframe(
153+
adds_df: DataFrame,
154+
removes_df: Option<DataFrame>,
155+
filter: Option<&str>,
156+
order_by: &str,
157+
description: &str,
158+
) -> AnyResult<DataFrame> {
159+
let apply_filter = |df: DataFrame, side: &'static str| -> AnyResult<DataFrame> {
160+
let Some(filter) = filter else {
161+
return Ok(df);
162+
};
163+
let expr = df
164+
.parse_sql_expr(filter)
165+
.map_err(|e| anyhow!("invalid 'filter' expression '{filter}': {e}"))?;
166+
df.filter(expr).map_err(|e| {
167+
anyhow!("internal error processing {description}; {REPORT_ERROR}; error applying 'filter' to '{side}': {e}")
168+
})
169+
};
170+
171+
let adds_df = apply_filter(adds_df, "cdc_adds")?;
172+
173+
let result_df = match removes_df {
174+
None => adds_df,
175+
Some(removes_df) => {
176+
let removes_df = apply_filter(removes_df, "cdc_removes")?;
177+
adds_df.except(removes_df).map_err(|e| {
178+
anyhow!("failed to build the CDC set difference for {description}: {e}. This typically means the Delta table contains a `Map` column, which the CDC deduplication step (`EXCEPT ALL`) does not yet support.")
179+
})?
180+
}
181+
};
182+
183+
let order_by_expr = result_df
184+
.parse_sql_expr(order_by)
185+
.map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?;
186+
result_df.sort_by(vec![order_by_expr]).map_err(|e| {
187+
anyhow!("internal error processing {description}; {REPORT_ERROR}; error applying 'cdc_order_by': {e}")
188+
})
189+
}
190+
108191
/// Integrated input connector that reads from a delta table.
109192
pub struct DeltaTableInputEndpoint {
110193
endpoint_name: String,
@@ -1623,67 +1706,50 @@ impl DeltaTableInputEndpointInner {
16231706

16241707
/// Convert the delete filter SQL expression into a Datafusion PhysicalExpr.
16251708
///
1626-
/// To do this, we generate a query plan for the `select * from snapshot where <delete_filter>`
1627-
/// query and extract the logical expression from the plan and then convert it to
1628-
/// a physical expression.
1629-
//
1630-
// FIXME: I wonder if there's a simpler way that doesn't require registering the `snapshot`
1631-
// table just so we can compile this query.
1709+
/// Parses `cdc_delete_filter` directly against the snapshot schema and
1710+
/// lowers the resulting logical expression to a physical expression.
16321711
async fn extract_delete_filter_expr(
16331712
&self,
16341713
) -> Result<Option<Arc<dyn PhysicalExpr>>, ControllerError> {
16351714
let Some(delete_filter) = &self.config.cdc_delete_filter else {
16361715
return Ok(None);
16371716
};
16381717

1639-
let query = format!("SELECT * FROM snapshot WHERE {}", delete_filter);
1640-
1641-
let logical_plan = self
1718+
let schema = self
16421719
.datafusion
1643-
.sql(&query)
1720+
.table("snapshot")
16441721
.await
1645-
.map_err(|e| {
1722+
.map_err(|_e| {
16461723
ControllerError::invalid_transport_configuration(
1647-
&self.endpoint_name,
1648-
&format!("invalid delete filter '{delete_filter}': the 'cdc_delete_filter' expression must be a valid SQL expression that can be used in a 'SELECT * FROM snapshot WHERE <cdc_delete_filter>' query, but the following error was encountered when compiling '{query}': {e}"),
1649-
)
1650-
})?
1651-
.logical_plan()
1652-
.clone();
1653-
1654-
let filter_expr = if let LogicalPlan::Projection(filter_plan) = &logical_plan {
1655-
if let LogicalPlan::Filter(filter) = filter_plan.input.as_ref() {
1656-
filter.predicate.clone()
1657-
} else {
1658-
return Err(ControllerError::invalid_encoder_configuration(
16591724
&self.endpoint_name,
16601725
&format!(
1661-
"internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: unexpected logical plan {logical_plan}"
1726+
"internal error compiling 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: table 'snapshot' not found"
16621727
),
1663-
));
1664-
}
1665-
} else {
1666-
return Err(ControllerError::invalid_encoder_configuration(
1667-
&self.endpoint_name,
1668-
&format!(
1669-
"internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: unexpected logical plan {logical_plan}"
1670-
),
1671-
));
1672-
};
1673-
1674-
let schema = self
1675-
.datafusion
1676-
.table("snapshot")
1677-
.await
1678-
.map_err(|_e| {
1679-
ControllerError::invalid_transport_configuration(&self.endpoint_name, &format!("internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: table 'snapshot' not found"))
1728+
)
16801729
})?
16811730
.schema()
16821731
.clone();
16831732

1733+
let filter_expr = self
1734+
.datafusion
1735+
.parse_sql_expr(delete_filter, &schema)
1736+
.map_err(|e| {
1737+
ControllerError::invalid_transport_configuration(
1738+
&self.endpoint_name,
1739+
&format!("invalid 'cdc_delete_filter' expression '{delete_filter}': {e}"),
1740+
)
1741+
})?;
1742+
16841743
let physical_expr = DefaultPhysicalPlanner::default()
16851744
.create_physical_expr(&filter_expr, &schema, &self.datafusion.state())
1686-
.map_err(|e| ControllerError::invalid_transport_configuration(&self.endpoint_name, &format!("internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: error generating physical plan: {e}")))?;
1745+
.map_err(|e| {
1746+
ControllerError::invalid_transport_configuration(
1747+
&self.endpoint_name,
1748+
&format!(
1749+
"internal error compiling 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: {e}"
1750+
),
1751+
)
1752+
})?;
16871753

16881754
Ok(Some(physical_expr))
16891755
}
@@ -2127,7 +2193,7 @@ impl DeltaTableInputEndpointInner {
21272193
self.process_cdc_transaction(actions, table, cdc_delete_filter, input_stream, receiver)
21282194
.await?;
21292195
} else {
2130-
let column_names = self.used_column_list(table);
2196+
let used_columns = self.used_columns(table);
21312197

21322198
let start_transaction = self.allocate_follow_transaction_label();
21332199

@@ -2144,7 +2210,7 @@ impl DeltaTableInputEndpointInner {
21442210
self.process_action(
21452211
action,
21462212
table,
2147-
&column_names,
2213+
&used_columns,
21482214
input_stream,
21492215
receiver,
21502216
start_transaction.clone(),
@@ -2158,7 +2224,7 @@ impl DeltaTableInputEndpointInner {
21582224
self.process_action(
21592225
action,
21602226
table,
2161-
&column_names,
2227+
&used_columns,
21622228
input_stream,
21632229
receiver,
21642230
start_transaction.clone(),
@@ -2264,32 +2330,12 @@ impl DeltaTableInputEndpointInner {
22642330
anyhow!("internal error processing {description}; {REPORT_ERROR}; error registering 'cdc_adds' table: {e}")
22652331
})?;
22662332

2267-
// `EXCEPT ALL` (multiset difference) cancels each Remove row against
2268-
// exactly one matching Add row; plain `EXCEPT` would collapse
2269-
// duplicates and under-count legitimately repeated inserts.
2270-
//
2271-
// The optional `filter` is pushed into both sides of the set
2272-
// difference so each Remove only has to cancel against rows that
2273-
// would have been ingested anyway. This also shrinks the inputs to
2274-
// `EXCEPT ALL`, which sorts both relations.
2275-
//
2276-
// Caveat: `EXCEPT ALL` relies on `arrow_row::RowConverter`,
2277-
// which in the currently pinned `arrow-row` does not support
2278-
// `Map` columns.
2279-
// Pure-append transactions on tables with `Map` columns are unaffected
2280-
// (the `EXCEPT ALL` branch isn't taken when `removes` is empty);
2281-
// transactions that do produce Removes fail here with `NotImplemented`.
2282-
// see issue:
2283-
// - https://github.com/apache/datafusion/issues/15428
2284-
// - https://github.com/apache/arrow-rs/issues/7879
2285-
let where_clause = if let Some(filter) = &self.config.filter {
2286-
format!("WHERE {filter}")
2287-
} else {
2288-
"".to_string()
2289-
};
2333+
let adds_df = self.datafusion.table("cdc_adds").await.map_err(|e| {
2334+
anyhow!("internal error processing {description}; {REPORT_ERROR}; error reading 'cdc_adds' table: {e}")
2335+
})?;
22902336

2291-
let from_clause = if removes.is_empty() {
2292-
format!("cdc_adds {where_clause}")
2337+
let removes_df = if removes.is_empty() {
2338+
None
22932339
} else {
22942340
let removes_table = Arc::new(
22952341
self.create_parquet_table(table, removes, &description)
@@ -2298,22 +2344,21 @@ impl DeltaTableInputEndpointInner {
22982344
self.datafusion.register_table("cdc_removes", removes_table).map_err(|e| {
22992345
anyhow!("internal error processing {description}; {REPORT_ERROR}; error registering 'cdc_removes' table: {e}")
23002346
})?;
2301-
format!(
2302-
"(SELECT * FROM cdc_adds {where_clause} \
2303-
EXCEPT ALL \
2304-
SELECT * FROM cdc_removes {where_clause})"
2305-
)
2347+
Some(self.datafusion.table("cdc_removes").await.map_err(|e| {
2348+
anyhow!("internal error processing {description}; {REPORT_ERROR}; error reading 'cdc_removes' table: {e}")
2349+
})?)
23062350
};
23072351

2308-
// Order the resulting relation by the `cdc_order_by` expression.
2309-
// TODO: We don't use `used_column_list` here, as the resulting dataframe will have a different
2310-
// schema than the original table, and the `cdc_delete_filter` physical expression won't be valid for it.
2352+
// The `cdc_order_by` expression is mandatory in CDC mode (enforced
2353+
// by `validate_cdc_config`), so the unwrap is safe.
23112354
let order_by = self.config.cdc_order_by.as_ref().unwrap();
2312-
let query = format!("SELECT * FROM {from_clause} ORDER BY {order_by}");
2313-
2314-
let df = self.datafusion.sql(&query).await.map_err(|e| {
2315-
anyhow!("failed to compile the CDC query '{query}': {e}. This typically indicates one of: (1) `cdc_order_by` or `filter` is not a valid SQL expression for a query of the form `SELECT * FROM <table> WHERE <filter> ORDER BY <cdc_order_by>`; or (2) the Delta table contains a `Map` column, which the CDC deduplication step (`EXCEPT ALL`) does not yet support.")
2316-
})?;
2355+
let df = build_cdc_dataframe(
2356+
adds_df,
2357+
removes_df,
2358+
self.config.filter.as_deref(),
2359+
order_by,
2360+
&description,
2361+
)?;
23172362

23182363
let _record_count = self
23192364
.execute_df(
@@ -2362,7 +2407,7 @@ impl DeltaTableInputEndpointInner {
23622407
&self,
23632408
action: &Action,
23642409
table: &DeltaTable,
2365-
column_names: &str,
2410+
used_columns: &[String],
23662411
input_stream: &mut dyn ArrowStream,
23672412
receiver: &mut Receiver<PipelineState>,
23682413
start_transaction: Option<Option<String>>,
@@ -2373,7 +2418,7 @@ impl DeltaTableInputEndpointInner {
23732418
&add.path,
23742419
true,
23752420
table,
2376-
column_names,
2421+
used_columns,
23772422
input_stream,
23782423
receiver,
23792424
start_transaction,
@@ -2387,7 +2432,7 @@ impl DeltaTableInputEndpointInner {
23872432
&remove.path,
23882433
false,
23892434
table,
2390-
column_names,
2435+
used_columns,
23912436
input_stream,
23922437
receiver,
23932438
start_transaction,
@@ -2414,7 +2459,7 @@ impl DeltaTableInputEndpointInner {
24142459
path: &str,
24152460
polarity: bool,
24162461
table: &DeltaTable,
2417-
column_names: &str,
2462+
used_columns: &[String],
24182463
input_stream: &mut dyn ArrowStream,
24192464
receiver: &mut Receiver<PipelineState>,
24202465
start_transaction: Option<Option<String>>,
@@ -2434,16 +2479,28 @@ impl DeltaTableInputEndpointInner {
24342479
anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error registering Parquet table: {e}")
24352480
})?;
24362481

2482+
let columns: Vec<&str> = used_columns.iter().map(String::as_str).collect();
2483+
let df = self
2484+
.datafusion
2485+
.table("tmp_table")
2486+
.await
2487+
.map_err(|e| {
2488+
anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error reading 'tmp_table': {e}")
2489+
})?
2490+
.select_columns(&columns)
2491+
.map_err(|e| {
2492+
anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error selecting columns: {e}")
2493+
})?;
2494+
24372495
let df = if let Some(filter) = &self.config.filter {
2438-
let query = format!("SELECT {column_names} FROM tmp_table where {filter}");
2439-
self.datafusion.sql(&query).await.map_err(|e| {
2440-
anyhow!("invalid 'cdc_order_by' filter expression '{filter}': 'filter' must be a valid SQL expression that can be used in a 'SELECT * FROM <table> ORDER BY <cdc_order_by>' query, but the following error was encountered when compiling '{query}': {e}")
2496+
let expr = df
2497+
.parse_sql_expr(filter)
2498+
.map_err(|e| anyhow!("invalid 'filter' expression '{filter}': {e}"))?;
2499+
df.filter(expr).map_err(|e| {
2500+
anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error applying 'filter': {e}")
24412501
})?
24422502
} else {
2443-
let query = format!("SELECT {column_names} FROM tmp_table");
2444-
self.datafusion.sql(&query).await.map_err(|e| {
2445-
anyhow!("internal error processing file {full_path}'; {REPORT_ERROR}; error compiling query '{query}': {e}")
2446-
})?
2503+
df
24472504
};
24482505

24492506
let _record_count = self

0 commit comments

Comments
 (0)