Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 152 additions & 95 deletions crates/adapters/src/integrated/delta_table/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::{PhysicalExpr, displayable};
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion::prelude::SessionConfig;
Expand Down Expand Up @@ -105,6 +104,90 @@ fn quote_sql_identifier<S: AsRef<str>>(ident: S) -> String {
format!("\"{}\"", ident.as_ref().replace("\"", "\"\""))
}

/// Build the `DataFrame` that streams a CDC transaction to the circuit.
///
/// Equivalent (in SQL) to:
///
/// ```sql
/// SELECT * FROM (
/// SELECT * FROM cdc_adds [WHERE <filter>]
/// EXCEPT ALL
/// SELECT * FROM cdc_removes [WHERE <filter>]
/// ) ORDER BY <order_by>
/// ```
///
/// When `removes_df` is `None`, the set difference is skipped:
///
/// ```sql
/// SELECT * FROM cdc_adds [WHERE <filter>] ORDER BY <order_by>
/// ```
///
/// `EXCEPT ALL` (multiset difference) cancels each Remove row against
/// exactly one matching Add row; plain `EXCEPT` would collapse
/// duplicates and under-count legitimately repeated inserts.
///
/// The optional `filter` is pushed into both sides of the set
/// difference so each Remove only has to cancel against rows that
/// would have been ingested anyway. This also shrinks the inputs to
/// `EXCEPT ALL`, which sorts both relations.
///
/// The filter is parsed against each `DataFrame`'s own schema, so
/// column references resolve to the correct table qualifier
/// (`cdc_adds.col` vs `cdc_removes.col`). The two sides cannot share
/// a single parsed `Expr` because column references would otherwise
/// point at the wrong relation after `except`.
///
/// Caveat: `EXCEPT ALL` relies on `arrow_row::RowConverter`, which in
/// the currently pinned `arrow-row` does not support `Map` columns.
/// Pure-append transactions on tables with `Map` columns are
/// unaffected (the `EXCEPT ALL` branch isn't taken when `removes_df`
/// is `None`); transactions that do produce Removes fail here with
/// `NotImplemented`. See issue:
/// - https://github.com/apache/datafusion/issues/15428
/// - https://github.com/apache/arrow-rs/issues/7879
///
/// Free function (not a method on the connector) so a unit test can
/// inspect the resulting logical plan without standing up the full
/// connector.
pub(super) fn build_cdc_dataframe(
adds_df: DataFrame,
removes_df: Option<DataFrame>,
filter: Option<&str>,
order_by: &str,
description: &str,
) -> AnyResult<DataFrame> {
let apply_filter = |df: DataFrame, side: &'static str| -> AnyResult<DataFrame> {
let Some(filter) = filter else {
return Ok(df);
};
let expr = df
.parse_sql_expr(filter)
.map_err(|e| anyhow!("invalid 'filter' expression '{filter}': {e}"))?;
df.filter(expr).map_err(|e| {
anyhow!("internal error processing {description}; {REPORT_ERROR}; error applying 'filter' to '{side}': {e}")
})
};

let adds_df = apply_filter(adds_df, "cdc_adds")?;

let result_df = match removes_df {
None => adds_df,
Some(removes_df) => {
let removes_df = apply_filter(removes_df, "cdc_removes")?;
adds_df.except(removes_df).map_err(|e| {
anyhow!("failed to build the CDC set difference for {description}: {e}. This typically means the Delta table contains a `Map` column, which the CDC deduplication step (`EXCEPT ALL`) does not yet support.")
})?
}
};

let order_by_expr = result_df
.parse_sql_expr(order_by)
.map_err(|e| anyhow!("invalid 'cdc_order_by' expression '{order_by}': {e}"))?;
result_df.sort_by(vec![order_by_expr]).map_err(|e| {
anyhow!("internal error processing {description}; {REPORT_ERROR}; error applying 'cdc_order_by': {e}")
})
}

/// Integrated input connector that reads from a delta table.
pub struct DeltaTableInputEndpoint {
endpoint_name: String,
Expand Down Expand Up @@ -1623,67 +1706,50 @@ impl DeltaTableInputEndpointInner {

/// Convert the delete filter SQL expression into a Datafusion PhysicalExpr.
///
/// To do this, we generate a query plan for the `select * from snapshot where <delete_filter>`
/// query and extract the logical expression from the plan and then convert it to
/// a physical expression.
//
// FIXME: I wonder if there's a simpler way that doesn't require registering the `snapshot`
// table just so we can compile this query.
/// Parses `cdc_delete_filter` directly against the snapshot schema and
/// lowers the resulting logical expression to a physical expression.
async fn extract_delete_filter_expr(
&self,
) -> Result<Option<Arc<dyn PhysicalExpr>>, ControllerError> {
let Some(delete_filter) = &self.config.cdc_delete_filter else {
return Ok(None);
};

let query = format!("SELECT * FROM snapshot WHERE {}", delete_filter);

let logical_plan = self
let schema = self
.datafusion
.sql(&query)
.table("snapshot")
.await
.map_err(|e| {
.map_err(|_e| {
ControllerError::invalid_transport_configuration(
&self.endpoint_name,
&format!("invalid delete filter '{delete_filter}': the 'cdc_delete_filter' expression must be a valid SQL expression that can be used in a 'SELECT * FROM snapshot WHERE <cdc_delete_filter>' query, but the following error was encountered when compiling '{query}': {e}"),
)
})?
.logical_plan()
.clone();

let filter_expr = if let LogicalPlan::Projection(filter_plan) = &logical_plan {
if let LogicalPlan::Filter(filter) = filter_plan.input.as_ref() {
filter.predicate.clone()
} else {
return Err(ControllerError::invalid_encoder_configuration(
&self.endpoint_name,
&format!(
"internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: unexpected logical plan {logical_plan}"
"internal error compiling 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: table 'snapshot' not found"
),
));
}
} else {
return Err(ControllerError::invalid_encoder_configuration(
&self.endpoint_name,
&format!(
"internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: unexpected logical plan {logical_plan}"
),
));
};

let schema = self
.datafusion
.table("snapshot")
.await
.map_err(|_e| {
ControllerError::invalid_transport_configuration(&self.endpoint_name, &format!("internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: table 'snapshot' not found"))
)
})?
.schema()
.clone();

let filter_expr = self
.datafusion
.parse_sql_expr(delete_filter, &schema)
.map_err(|e| {
ControllerError::invalid_transport_configuration(
&self.endpoint_name,
&format!("invalid 'cdc_delete_filter' expression '{delete_filter}': {e}"),
)
})?;

let physical_expr = DefaultPhysicalPlanner::default()
.create_physical_expr(&filter_expr, &schema, &self.datafusion.state())
.map_err(|e| ControllerError::invalid_transport_configuration(&self.endpoint_name, &format!("internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: error generating physical plan: {e}")))?;
.map_err(|e| {
ControllerError::invalid_transport_configuration(
&self.endpoint_name,
&format!(
"internal error compiling 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: {e}"
),
)
})?;

Ok(Some(physical_expr))
}
Expand Down Expand Up @@ -2127,7 +2193,7 @@ impl DeltaTableInputEndpointInner {
self.process_cdc_transaction(actions, table, cdc_delete_filter, input_stream, receiver)
.await?;
} else {
let column_names = self.used_column_list(table);
let used_columns = self.used_columns(table);

let start_transaction = self.allocate_follow_transaction_label();

Expand All @@ -2144,7 +2210,7 @@ impl DeltaTableInputEndpointInner {
self.process_action(
action,
table,
&column_names,
&used_columns,
input_stream,
receiver,
start_transaction.clone(),
Expand All @@ -2158,7 +2224,7 @@ impl DeltaTableInputEndpointInner {
self.process_action(
action,
table,
&column_names,
&used_columns,
input_stream,
receiver,
start_transaction.clone(),
Expand Down Expand Up @@ -2264,32 +2330,12 @@ impl DeltaTableInputEndpointInner {
anyhow!("internal error processing {description}; {REPORT_ERROR}; error registering 'cdc_adds' table: {e}")
})?;

// `EXCEPT ALL` (multiset difference) cancels each Remove row against
// exactly one matching Add row; plain `EXCEPT` would collapse
// duplicates and under-count legitimately repeated inserts.
//
// The optional `filter` is pushed into both sides of the set
// difference so each Remove only has to cancel against rows that
// would have been ingested anyway. This also shrinks the inputs to
// `EXCEPT ALL`, which sorts both relations.
//
// Caveat: `EXCEPT ALL` relies on `arrow_row::RowConverter`,
// which in the currently pinned `arrow-row` does not support
// `Map` columns.
// Pure-append transactions on tables with `Map` columns are unaffected
// (the `EXCEPT ALL` branch isn't taken when `removes` is empty);
// transactions that do produce Removes fail here with `NotImplemented`.
// see issue:
// - https://github.com/apache/datafusion/issues/15428
// - https://github.com/apache/arrow-rs/issues/7879
let where_clause = if let Some(filter) = &self.config.filter {
format!("WHERE {filter}")
} else {
"".to_string()
};
let adds_df = self.datafusion.table("cdc_adds").await.map_err(|e| {
anyhow!("internal error processing {description}; {REPORT_ERROR}; error reading 'cdc_adds' table: {e}")
})?;

let from_clause = if removes.is_empty() {
format!("cdc_adds {where_clause}")
let removes_df = if removes.is_empty() {
None
} else {
let removes_table = Arc::new(
self.create_parquet_table(table, removes, &description)
Expand All @@ -2298,22 +2344,21 @@ impl DeltaTableInputEndpointInner {
self.datafusion.register_table("cdc_removes", removes_table).map_err(|e| {
anyhow!("internal error processing {description}; {REPORT_ERROR}; error registering 'cdc_removes' table: {e}")
})?;
format!(
"(SELECT * FROM cdc_adds {where_clause} \
EXCEPT ALL \
SELECT * FROM cdc_removes {where_clause})"
)
Some(self.datafusion.table("cdc_removes").await.map_err(|e| {
anyhow!("internal error processing {description}; {REPORT_ERROR}; error reading 'cdc_removes' table: {e}")
})?)
};

// Order the resulting relation by the `cdc_order_by` expression.
// TODO: We don't use `used_column_list` here, as the resulting dataframe will have a different
// schema than the original table, and the `cdc_delete_filter` physical expression won't be valid for it.
// The `cdc_order_by` expression is mandatory in CDC mode (enforced
// by `validate_cdc_config`), so the unwrap is safe.
let order_by = self.config.cdc_order_by.as_ref().unwrap();
let query = format!("SELECT * FROM {from_clause} ORDER BY {order_by}");

let df = self.datafusion.sql(&query).await.map_err(|e| {
anyhow!("failed to compile the CDC query '{query}': {e}. This typically indicates one of: (1) `cdc_order_by` or `filter` is not a valid SQL expression for a query of the form `SELECT * FROM <table> WHERE <filter> ORDER BY <cdc_order_by>`; or (2) the Delta table contains a `Map` column, which the CDC deduplication step (`EXCEPT ALL`) does not yet support.")
})?;
let df = build_cdc_dataframe(
adds_df,
removes_df,
self.config.filter.as_deref(),
order_by,
&description,
)?;

let _record_count = self
.execute_df(
Expand Down Expand Up @@ -2362,7 +2407,7 @@ impl DeltaTableInputEndpointInner {
&self,
action: &Action,
table: &DeltaTable,
column_names: &str,
used_columns: &[String],
input_stream: &mut dyn ArrowStream,
receiver: &mut Receiver<PipelineState>,
start_transaction: Option<Option<String>>,
Expand All @@ -2373,7 +2418,7 @@ impl DeltaTableInputEndpointInner {
&add.path,
true,
table,
column_names,
used_columns,
input_stream,
receiver,
start_transaction,
Expand All @@ -2387,7 +2432,7 @@ impl DeltaTableInputEndpointInner {
&remove.path,
false,
table,
column_names,
used_columns,
input_stream,
receiver,
start_transaction,
Expand All @@ -2414,7 +2459,7 @@ impl DeltaTableInputEndpointInner {
path: &str,
polarity: bool,
table: &DeltaTable,
column_names: &str,
used_columns: &[String],
input_stream: &mut dyn ArrowStream,
receiver: &mut Receiver<PipelineState>,
start_transaction: Option<Option<String>>,
Expand All @@ -2434,16 +2479,28 @@ impl DeltaTableInputEndpointInner {
anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error registering Parquet table: {e}")
})?;

let columns: Vec<&str> = used_columns.iter().map(String::as_str).collect();
let df = self
.datafusion
.table("tmp_table")
.await
.map_err(|e| {
anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error reading 'tmp_table': {e}")
})?
.select_columns(&columns)
.map_err(|e| {
anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error selecting columns: {e}")
})?;

let df = if let Some(filter) = &self.config.filter {
let query = format!("SELECT {column_names} FROM tmp_table where {filter}");
self.datafusion.sql(&query).await.map_err(|e| {
anyhow!("invalid 'cdc_order_by' filter expression '{filter}': 'filter' must be a valid SQL expression that can be used in a 'SELECT * FROM <table> ORDER BY <cdc_order_by>' query, but the following error was encountered when compiling '{query}': {e}")
let expr = df
.parse_sql_expr(filter)
.map_err(|e| anyhow!("invalid 'filter' expression '{filter}': {e}"))?;
df.filter(expr).map_err(|e| {
anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error applying 'filter': {e}")
})?
} else {
let query = format!("SELECT {column_names} FROM tmp_table");
self.datafusion.sql(&query).await.map_err(|e| {
anyhow!("internal error processing file {full_path}'; {REPORT_ERROR}; error compiling query '{query}': {e}")
})?
df
};

let _record_count = self
Expand Down
Loading
Loading