Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/adapters/benches/delta_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ fn create_indexed_writer(threads: usize, table_uri: &str) -> DeltaTableWriter {
threads: Some(threads),
object_store_config: Default::default(),
checkpoint_interval: None,
log_retention_duration: None,
enable_expired_log_cleanup: None,
};
let key_schema = Some(BenchKeyStruct::relation_schema());
let mut value_schema = BenchTestStruct::relation_schema();
Expand Down
84 changes: 84 additions & 0 deletions crates/adapters/src/integrated/delta_table/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,17 @@ impl WriterTask {
.with_configuration_property(
deltalake::TableProperty::CheckpointInterval,
checkpoint_interval,
)
.with_configuration_property(
deltalake::TableProperty::LogRetentionDuration,
inner.config.log_retention_duration.clone(),
)
.with_configuration_property(
deltalake::TableProperty::EnableExpiredLogCleanup,
inner
.config
.enable_expired_log_cleanup
.map(|b| b.to_string()),
);

match tokio::time::timeout(operation_timeout, create_future).await {
Expand Down Expand Up @@ -1103,6 +1114,8 @@ mod parallel {
threads: Some(threads),
object_store_config: Default::default(),
checkpoint_interval: None,
log_retention_duration: None,
enable_expired_log_cleanup: None,
},
&key_schema,
&value_relation(),
Expand Down Expand Up @@ -1402,6 +1415,8 @@ mod parallel {
threads: Some(4),
object_store_config: Default::default(),
checkpoint_interval: None,
log_retention_duration: None,
enable_expired_log_cleanup: None,
},
&key_schema,
&value_relation(),
Expand Down Expand Up @@ -1541,6 +1556,8 @@ mod parallel {
threads: Some(1),
object_store_config: Default::default(),
checkpoint_interval: None,
log_retention_duration: None,
enable_expired_log_cleanup: None,
},
&key_schema,
&value_relation(),
Expand All @@ -1561,6 +1578,69 @@ mod parallel {
assert!(result.is_err(), "should fail after exhausting retries");
}

/// `log_retention_duration` and `enable_expired_log_cleanup` should land on the created
/// Delta table's metadata when set, and be absent when not set.
#[test]
fn test_log_retention_table_properties() {
use dbsp::circuit::tokio::TOKIO;
use deltalake::open_table;
use std::time::Duration;

// Case 1: neither option set — neither property should appear in the table metadata.
let table_dir = TempDir::new().unwrap();
let table_uri = table_dir.path().display().to_string();
let _endpoint = make_endpoint(1, &table_uri, true);

let url = url::Url::from_file_path(&table_uri).unwrap();
let table = TOKIO.block_on(async move { open_table(url).await.unwrap() });
let config = table.snapshot().unwrap().table_config();
assert!(
config.log_retention_duration.is_none(),
"logRetentionDuration should not be set when option is unset"
);
assert!(
config.enable_expired_log_cleanup.is_none(),
"enableExpiredLogCleanup should not be set when option is unset"
);

// Case 2: both options set — both properties should be reflected in the table metadata.
let table_dir = TempDir::new().unwrap();
let table_uri = table_dir.path().display().to_string();
let _endpoint = DeltaTableWriter::new(
EndpointId::default(),
"test_endpoint",
&DeltaTableWriterConfig {
uri: table_uri.clone(),
mode: DeltaTableWriteMode::Truncate,
max_retries: Some(0),
threads: Some(1),
object_store_config: Default::default(),
checkpoint_interval: None,
log_retention_duration: Some("interval 7 days".to_string()),
enable_expired_log_cleanup: Some(false),
},
&Some(key_relation()),
&value_relation(),
Weak::new(),
false,
)
.expect("failed to create endpoint");

let url = url::Url::from_file_path(&table_uri).unwrap();
let table = TOKIO.block_on(async move { open_table(url).await.unwrap() });
let config = table.snapshot().unwrap().table_config();
assert_eq!(
config.log_retention_duration,
Some(Duration::from_secs(7 * 24 * 60 * 60)),
"logRetentionDuration should match the configured interval",
);
assert_eq!(
config.enable_expired_log_cleanup,
Some(false),
"enableExpiredLogCleanup should be set to false",
);
}

/// Verify that threads=0 is rejected in config validation.
#[test]
fn test_threads_zero_rejected() {
Expand All @@ -1571,6 +1651,8 @@ mod parallel {
threads: Some(0),
object_store_config: Default::default(),
checkpoint_interval: None,
log_retention_duration: None,
enable_expired_log_cleanup: None,
};
assert!(config.validate().is_err());
}
Expand Down Expand Up @@ -1729,6 +1811,8 @@ mod parallel {
threads: Some(1),
object_store_config: Default::default(),
checkpoint_interval: None,
log_retention_duration: None,
enable_expired_log_cleanup: None,
},
&key_schema,
&value_relation(),
Expand Down
186 changes: 186 additions & 0 deletions crates/feldera-types/src/transport/delta_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,36 @@ pub struct DeltaTableWriterConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub checkpoint_interval: Option<u32>,

/// Log retention duration for newly created Delta tables.
///
/// Configures the `delta.logRetentionDuration` table property, which controls how long the
/// transaction log history of the table is kept. Each time a checkpoint is written, Delta Lake
/// automatically cleans up log entries older than this interval (subject to
/// `enable_expired_log_cleanup`).
///
/// The option is only available when creating the Delta table (`mode = append` and there is
/// no existing table at the target location, or `mode = truncate`).
///
/// The value follows the Delta Lake interval syntax: `"interval <N> <unit>"`, where `<unit>`
/// is one of `nanosecond[s]`, `microsecond[s]`, `millisecond[s]`, `second[s]`, `minute[s]`,
/// `hour[s]`, `day[s]`, or `week[s]`. Examples: `"interval 30 days"`, `"interval 6 hours"`.
///
/// Default: `"interval 30 days"` (Delta Lake default).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub log_retention_duration: Option<String>,

/// Whether to clean up expired log entries when a checkpoint is written.
///
/// Configures the `delta.enableExpiredLogCleanup` table property. When set to `false`,
/// transaction log entries are retained indefinitely regardless of `log_retention_duration`.
///
/// The option is only available when creating the Delta table (`mode = append` and there is
/// no existing table at the target location, or `mode = truncate`).
///
/// Default: `true` (Delta Lake default).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub enable_expired_log_cleanup: Option<bool>,

/// Maximum number of retries for failed operations.
///
/// The connector performs retries on several levels: individual S3 operations, Delta Lake transaction commits,
Expand Down Expand Up @@ -88,10 +118,65 @@ impl DeltaTableWriterConfig {
if self.threads.is_some_and(|t| t == 0) {
return Err("threads must be greater than 0".to_string());
}
if let Some(duration) = &self.log_retention_duration {
parse_delta_interval(duration)
.map_err(|e| format!("invalid 'log_retention_duration' value '{duration}': {e}"))?;
}
Ok(())
}
}

/// Validate a Delta Lake interval string (e.g. `"interval 30 days"`).
///
/// Returns the duration in seconds on success. This mirrors the grammar accepted by `delta-rs`
/// (`crates/core/src/table/config.rs::parse_interval`); we replicate it here to fail fast at
/// config-load time rather than only when the table is created.
fn parse_delta_interval(value: &str) -> Result<u64, String> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't their function public?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const SECONDS_PER_MINUTE: u64 = 60;
const SECONDS_PER_HOUR: u64 = 60 * SECONDS_PER_MINUTE;
const SECONDS_PER_DAY: u64 = 24 * SECONDS_PER_HOUR;
const SECONDS_PER_WEEK: u64 = 7 * SECONDS_PER_DAY;

let mut tokens = value.split_whitespace();
if tokens.next() != Some("interval") {
return Err("expected the value to start with \"interval\"".to_string());
}
let number_token = tokens
.next()
.ok_or_else(|| "expected a number after \"interval\"".to_string())?;
let unit = tokens
.next()
.ok_or_else(|| "expected a unit (e.g. \"days\") after the number".to_string())?;
if tokens.next().is_some() {
return Err("unexpected trailing tokens".to_string());
}
let number: i64 = number_token
.parse()
.map_err(|e| format!("cannot parse '{number_token}' as integer: {e}"))?;
if number < 0 {
return Err("interval cannot be negative".to_string());
}
let number = number as u64;

// Use the smallest sub-second unit's value (in nanoseconds) folded back to seconds, but for
// validation purposes we only need the *unit* to be recognised — the absolute returned value
// isn't consumed.
let seconds = match unit {
"nanosecond" | "nanoseconds" => number / 1_000_000_000,
"microsecond" | "microseconds" => number / 1_000_000,
"millisecond" | "milliseconds" => number / 1_000,
"second" | "seconds" => number,
"minute" | "minutes" => number.saturating_mul(SECONDS_PER_MINUTE),
"hour" | "hours" => number.saturating_mul(SECONDS_PER_HOUR),
"day" | "days" => number.saturating_mul(SECONDS_PER_DAY),
"week" | "weeks" => number.saturating_mul(SECONDS_PER_WEEK),
other => {
return Err(format!("unknown unit '{other}'"));
}
};
Ok(seconds)
}

/// Delta table read mode.
///
/// Three options are available:
Expand Down Expand Up @@ -421,6 +506,107 @@ fn test_delta_table_ingest_mode_display() {
assert_eq!(DeltaTableIngestMode::Cdc.to_string(), "cdc");
}

#[cfg(test)]
mod log_retention_tests {
use super::*;

fn make_config(log_retention: Option<&str>) -> DeltaTableWriterConfig {
DeltaTableWriterConfig {
uri: "memory://".to_string(),
mode: DeltaTableWriteMode::default(),
checkpoint_interval: None,
log_retention_duration: log_retention.map(str::to_string),
enable_expired_log_cleanup: None,
max_retries: None,
threads: None,
object_store_config: HashMap::new(),
}
}

#[test]
fn parse_delta_interval_accepts_valid_units() {
for unit in [
"nanosecond",
"nanoseconds",
"microsecond",
"microseconds",
"millisecond",
"milliseconds",
"second",
"seconds",
"minute",
"minutes",
"hour",
"hours",
"day",
"days",
"week",
"weeks",
] {
let s = format!("interval 5 {unit}");
assert!(parse_delta_interval(&s).is_ok(), "failed for {s}");
}
}

#[test]
fn parse_delta_interval_accepts_zero() {
// Zero is a valid duration; users who want to disable cleanup do so via
// `enable_expired_log_cleanup = false`.
assert!(parse_delta_interval("interval 0 seconds").is_ok());
assert!(parse_delta_interval("interval 0 days").is_ok());
}

#[test]
fn parse_delta_interval_known_durations() {
assert_eq!(
parse_delta_interval("interval 30 days").unwrap(),
30 * 86_400
);
assert_eq!(
parse_delta_interval("interval 2 weeks").unwrap(),
2 * 7 * 86_400
);
assert_eq!(parse_delta_interval("interval 90 seconds").unwrap(), 90);
}

#[test]
fn parse_delta_interval_rejects_malformed() {
for bad in [
"30 days",
"interval days",
"interval 30",
"interval 30 fortnights",
"interval -5 days",
"",
"interval 1 hours extra",
] {
assert!(
parse_delta_interval(bad).is_err(),
"expected error for {bad:?}"
);
}
}

#[test]
fn validate_propagates_interval_errors() {
let cfg = make_config(Some("not an interval"));
let err = cfg.validate().unwrap_err();
assert!(err.contains("log_retention_duration"), "unexpected: {err}");
}

#[test]
fn validate_accepts_well_formed_interval() {
let cfg = make_config(Some("interval 30 days"));
assert!(cfg.validate().is_ok());
}

#[test]
fn validate_accepts_unset_interval() {
let cfg = make_config(None);
assert!(cfg.validate().is_ok());
}
}

impl DeltaTableReaderConfig {
/// `true` if the configuration requires taking an initial snapshot of the table.
pub fn snapshot(&self) -> bool {
Expand Down
2 changes: 2 additions & 0 deletions docs.feldera.com/docs/connectors/sinks/delta.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ MERGE INTO {target_table} AS target
| | - `truncate`: Existing table at the specified location will be truncated on the first pipeline start. When the pipeline resumes from a checkpoint the table is kept as-is so that data written before the restart is preserved. |
| | - `error_if_exists`: If a table exists at the specified location, the operation will fail. When the pipeline resumes from a checkpoint the existing table is opened without error. |
| `checkpoint_interval` | <p>Checkpoint interval (i.e., the number of commits after which a new checkpoint should be created) for newly created Delta tables.</p><p>The option is only available when creating the Delta table (`mode = append` and there is no existing table at the target location or `mode = truncate`). It configures the `checkpointInterval` table property, which determines the number of commits after which a new checkpoint should be created.</p><p>0 means no checkpoints are created.</p><p>Default: 10.</p>|
| `log_retention_duration` | <p>Log retention duration for newly created Delta tables.</p><p>Configures the `delta.logRetentionDuration` table property, which controls how long the table's transaction-log history is kept. Each time a checkpoint is written, Delta Lake automatically cleans up log entries older than this interval (subject to `enable_expired_log_cleanup`).</p><p>The option is only available when creating the Delta table (`mode = append` and there is no existing table at the target location, or `mode = truncate`).</p><p>The value follows the Delta Lake interval syntax `"interval <N> <unit>"`, where `<unit>` is one of `nanosecond[s]`, `microsecond[s]`, `millisecond[s]`, `second[s]`, `minute[s]`, `hour[s]`, `day[s]`, or `week[s]`. Examples: `"interval 30 days"`, `"interval 6 hours"`.</p><p>Default: `"interval 30 days"` (Delta Lake default).</p>|
| `enable_expired_log_cleanup` | <p>Whether to clean up expired log entries when a checkpoint is written.</p><p>Configures the `delta.enableExpiredLogCleanup` table property. When set to `false`, transaction-log entries are retained indefinitely regardless of `log_retention_duration`.</p><p>The option is only available when creating the Delta table (`mode = append` and there is no existing table at the target location, or `mode = truncate`).</p><p>Default: `true` (Delta Lake default).</p>|
| `max_retries`|<p>Maximum number of retries for failed Delta Lake operations like writing Parquet files and committing transactions.</p><p>The connector performs retries on several levels: individual S3 operations, Delta Lake transaction commits, and overall operation retries. This setting controls the overall operation retries. When a write to the table fails, because of an S3 timeout or any other reason that was not resolved by lower-level retries, the connector will retry the entire operation.</p><p>When not specified, the connector performs infinite retries. When set to 0, the connector doesn't retry failed operations.</p>|
| `threads` | Number of parallel threads used by the connector. Increasing this value can improve Delta Lake write throughput by enabling concurrent writes. Default: `1`. |

Expand Down
10 changes: 10 additions & 0 deletions openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -8402,6 +8402,16 @@
"nullable": true,
"minimum": 0
},
"enable_expired_log_cleanup": {
"type": "boolean",
"description": "Whether to clean up expired log entries when a checkpoint is written.\n\nConfigures the `delta.enableExpiredLogCleanup` table property. When set to `false`,\ntransaction log entries are retained indefinitely regardless of `log_retention_duration`.\n\nThe option is only available when creating the Delta table (`mode = append` and there is\nno existing table at the target location, or `mode = truncate`).\n\nDefault: `true` (Delta Lake default).",
"nullable": true
},
"log_retention_duration": {
"type": "string",
"description": "Log retention duration for newly created Delta tables.\n\nConfigures the `delta.logRetentionDuration` table property, which controls how long the\ntransaction log history of the table is kept. Each time a checkpoint is written, Delta Lake\nautomatically cleans up log entries older than this interval (subject to\n`enable_expired_log_cleanup`).\n\nThe option is only available when creating the Delta table (`mode = append` and there is\nno existing table at the target location, or `mode = truncate`).\n\nThe value follows the Delta Lake interval syntax: `\"interval <N> <unit>\"`, where `<unit>`\nis one of `nanosecond[s]`, `microsecond[s]`, `millisecond[s]`, `second[s]`, `minute[s]`,\n`hour[s]`, `day[s]`, or `week[s]`. Examples: `\"interval 30 days\"`, `\"interval 6 hours\"`.\n\nDefault: `\"interval 30 days\"` (Delta Lake default).",
"nullable": true
},
"max_retries": {
"type": "integer",
"format": "int32",
Expand Down
Loading