Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
adapters: configurable error handling for failed batches during snaps…
…hot reading

Fixes issue #5750 by introducing configurable snapshot error handling.
Previously, failed batch errors were always non-fatal, causing the connector to just log error
and skip batches. Now users can choose snapshot_error_mode:
  - ignore (default): log warning and skip batch (previous behavior)
  - fail: treat error as fatal

Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
  • Loading branch information
swanandx committed Mar 7, 2026
commit 23cc39fa5cd92f4bdc408b3f96cab4d85ad634ac
8 changes: 6 additions & 2 deletions crates/adapters/src/integrated/delta_table/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use feldera_adapterlib::utils::datafusion::{
use feldera_storage::tokio::TOKIO_DEDICATED_IO;
use feldera_types::config::FtModel;
use feldera_types::program_schema::Relation;
use feldera_types::transport::delta_table::{DeltaTableReaderConfig, DeltaTableTransactionMode};
use feldera_types::transport::delta_table::{
DeltaTableReaderConfig, DeltaTableSnapshotErrorMode, DeltaTableTransactionMode,
};
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
Expand Down Expand Up @@ -1714,8 +1716,10 @@ impl DeltaTableInputEndpointInner {
let batch = match batch {
Ok(batch) => batch,
Err(e) => {
let is_fatal =
self.config.snapshot_error_mode == DeltaTableSnapshotErrorMode::Fail;
self.consumer.error(
false,
is_fatal,
anyhow!("error retrieving batch {num_batches} of {descr}: {e:?}"),
Some("delta-batch"),
);
Expand Down
68 changes: 67 additions & 1 deletion crates/feldera-types/src/transport/delta_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ fn default_num_parsers() -> u32 {
4
}

/// Controls how the Delta table input connector handles errors during snapshot loading.
///
/// * `ignore` - Log a non-fatal warning and skip the failed batch (default).
/// * `fail` - Treat the error as fatal and stop the pipeline immediately.
#[derive(Default, Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
#[serde(rename_all = "lowercase")]
pub enum DeltaTableSnapshotErrorMode {
/// Log a non-fatal warning and skip the failed batch.
#[default]
Ignore,

/// Treat the error as fatal and stop the pipeline immediately.
Fail,
}

/// Delta table transaction mode.
///
/// Determines how the connector breaks up its input into transactions.
Expand Down Expand Up @@ -307,6 +322,13 @@ pub struct DeltaTableReaderConfig {
#[serde(default)]
pub verbose: u32,

/// Controls how the connector handles errors during snapshot loading.
///
/// * `"ignore"` - Log a non-fatal warning and skip the failed batch (default).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// * `"ignore"` - Log a non-fatal warning and skip the failed batch (default).
/// * `"ignore"` - Log a non-fatal error and skip the failed batch (default).

/// * `"fail"` - Treat the error as fatal and stop the pipeline immediately.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// * `"fail"` - Treat the error as fatal and stop the pipeline immediately.
/// * `"fail"` - Treat the error as fatal and stop the connector immediately.

#[serde(default)]
pub snapshot_error_mode: DeltaTableSnapshotErrorMode,

/// Storage options for configuring backend object store.
///
/// For specific options available for different storage backends, see:
Expand Down Expand Up @@ -335,13 +357,57 @@ fn test_delta_reader_config_serde() {
let config = serde_json::from_str::<DeltaTableReaderConfig>(config_str).unwrap();

let serialized_config = serde_json::to_string(&config).unwrap();
let expected = r#"{"uri":"protocol:/path/to/somewhere","mode":"follow","transaction_mode":"none","timestamp_column":"ts","filter":null,"skip_unused_columns":false,"snapshot_filter":"ts BETWEEN '2005-01-01 00:00:00' AND '2010-12-31 23:59:59'","version":null,"datetime":"2010-12-31 00:00:00Z","end_version":null,"cdc_delete_filter":null,"cdc_order_by":null,"num_parsers":4,"max_concurrent_readers":null,"customoption1":"val1","customoption2":"val2","verbose":0}"#;
let expected = r#"{"uri":"protocol:/path/to/somewhere","mode":"follow","transaction_mode":"none","timestamp_column":"ts","filter":null,"skip_unused_columns":false,"snapshot_filter":"ts BETWEEN '2005-01-01 00:00:00' AND '2010-12-31 23:59:59'","version":null,"datetime":"2010-12-31 00:00:00Z","end_version":null,"cdc_delete_filter":null,"cdc_order_by":null,"num_parsers":4,"max_concurrent_readers":null,"snapshot_error_mode":"ignore","customoption1":"val1","customoption2":"val2","verbose":0}"#;
assert_eq!(
serde_json::from_str::<serde_json::Value>(&serialized_config).unwrap(),
serde_json::from_str::<serde_json::Value>(expected).unwrap()
);
}

#[cfg(test)]
#[test]
fn test_snapshot_error_mode_default() {
let config_str = r#"{
"uri": "s3://bucket/table",
"mode": "snapshot"
}"#;
let config = serde_json::from_str::<DeltaTableReaderConfig>(config_str).unwrap();
assert_eq!(
config.snapshot_error_mode,
DeltaTableSnapshotErrorMode::Ignore
);
}

#[cfg(test)]
#[test]
fn test_snapshot_error_mode_ignore() {
let config_str = r#"{
"uri": "s3://bucket/table",
"mode": "snapshot",
"snapshot_error_mode": "ignore"
}"#;
let config = serde_json::from_str::<DeltaTableReaderConfig>(config_str).unwrap();
assert_eq!(
config.snapshot_error_mode,
DeltaTableSnapshotErrorMode::Ignore
);
}

#[cfg(test)]
#[test]
fn test_snapshot_error_mode_fail() {
let config_str = r#"{
"uri": "s3://bucket/table",
"mode": "snapshot",
"snapshot_error_mode": "fail"
}"#;
let config = serde_json::from_str::<DeltaTableReaderConfig>(config_str).unwrap();
assert_eq!(
config.snapshot_error_mode,
DeltaTableSnapshotErrorMode::Fail
);
}

impl DeltaTableReaderConfig {
/// `true` if the configuration requires taking an initial snapshot of the table.
pub fn snapshot(&self) -> bool {
Expand Down
Loading