Skip to content

Commit 2dbdf68

Browse files
Leonid Ryzhykryzhyk
authored andcommitted
[adapters] Fix delta connector stuck in the barrier state.
Make sure that the delta connector doesn't get stuck in the barrier state indefinitely even if the transaction log of the table is empty. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
1 parent cb62755 commit 2dbdf68

2 files changed

Lines changed: 22 additions & 7 deletions

File tree

crates/adapters/src/integrated/delta_table/input.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,13 @@ impl DeltaTableInputEndpointInner {
845845
)
846846
})?;
847847

848+
// If we are about to follow the table, set resume state to the current table version, otherwise
849+
// the connector will remain in the barrier state until at least one transaction is added to the log.
850+
if !self.config.snapshot() {
851+
*self.last_resume_status.lock().unwrap() =
852+
Some(DeltaResumeInfo::new(delta_table.version()));
853+
}
854+
848855
// Register object store with datafusion, so it will recognize individual parquet
849856
// file URIs when processing transaction log. The `object_store_url` function
850857
// generates a unique URL, which only makes sense to datafusion. We must append
@@ -1002,14 +1009,14 @@ impl DeltaTableInputEndpointInner {
10021009
.table("snapshot")
10031010
.await
10041011
.map_err(|e| {
1005-
ControllerError::invalid_transport_configuration(&self.endpoint_name, &format!("internal error when compiling the 'cdc_delete_filter' expession '{delete_filter}'; {REPORT_ERROR}: table 'snapshot' not found"))
1012+
ControllerError::invalid_transport_configuration(&self.endpoint_name, &format!("internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: table 'snapshot' not found"))
10061013
})?
10071014
.schema()
10081015
.clone();
10091016

10101017
let physical_expr = DefaultPhysicalPlanner::default()
10111018
.create_physical_expr(&filter_expr, &schema, &self.datafusion.state())
1012-
.map_err(|e| ControllerError::invalid_transport_configuration(&self.endpoint_name, &format!("internal error when compiling the 'cdc_delete_filter' expession '{delete_filter}'; {REPORT_ERROR}: error generating physical plan: {e}")))?;
1019+
.map_err(|e| ControllerError::invalid_transport_configuration(&self.endpoint_name, &format!("internal error when compiling the 'cdc_delete_filter' expression '{delete_filter}'; {REPORT_ERROR}: error generating physical plan: {e}")))?;
10131020

10141021
Ok(Some(physical_expr))
10151022
}
@@ -1035,7 +1042,7 @@ impl DeltaTableInputEndpointInner {
10351042
.downcast_ref::<BooleanArray>()
10361043
.ok_or_else(|| {
10371044
anyhow!(
1038-
"internal error converting the result of the delete filter exptession to BooleanArray; {REPORT_ERROR}: expected Boolean, found {:?}", array.data_type()
1045+
"internal error converting the result of the delete filter expression to BooleanArray; {REPORT_ERROR}: expected Boolean, found {:?}", array.data_type()
10391046
)
10401047
})?;
10411048

crates/adapters/src/integrated/delta_table/test.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,11 +1048,12 @@ async fn test_cdc(
10481048

10491049
let mut total_count = 0;
10501050

1051-
// Write data in 10 chunks, wait for it to show up in the output view.
1052-
for chunk in data.chunks(std::cmp::max(data.len() / 10, 1)) {
1053-
write_updates_as_json(input_file.as_file_mut(), chunk, true);
1054-
total_count += chunk.len();
1051+
let mut chunks = data.chunks(std::cmp::max(data.len() / 10, 1));
10551052

1053+
// Write data in 10 chunks, wait for it to show up in the output view.
1054+
loop {
1055+
// Perform the first suspend while the table is empty to test that the delta lake connector
1056+
// can successfully suspend when reading an empty table.
10561057
if suspend {
10571058
println!("start suspend");
10581059
let (sender, mut receiver) = mpsc::channel(1);
@@ -1103,6 +1104,13 @@ async fn test_cdc(
11031104
.collect::<Vec<_>>(),
11041105
)
11051106
.await;
1107+
1108+
let Some(chunk) = chunks.next() else {
1109+
break;
1110+
};
1111+
1112+
write_updates_as_json(input_file.as_file_mut(), chunk, true);
1113+
total_count += chunk.len();
11061114
}
11071115

11081116
// Modify all records by negating the `boolean` field.

0 commit comments

Comments
 (0)