Skip to content

Commit d2699ce

Browse files
ryzhykgz
authored andcommitted
[adapters] Delta output: enable checkpoints.
The Delta output connector did not create periodic checkpoints. While this is in itself problematic, it also meant that the connector became slow over time, due to this delta-rs bug, which causes the `update_incremental` function to scan the entire transaction log on every commit: delta-io/delta-kernel-rs#2103. This commit: - Introduces the `checkpoint_interval` option, which tells the connector to configure checkpoint interval when creating the table. - Creates a CommitBuilder that is actually setup to create checkpoints. Without this fix the time to create a trivial delta commit increases from 1.5s to 6s after ~1000 commits. With the fix it remains constant at ~2s. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 49913a8 commit d2699ce

File tree

6 files changed

+54
-6
lines changed

6 files changed

+54
-6
lines changed

crates/adapters/benches/delta_encoder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ fn create_indexed_writer(threads: usize, table_uri: &str) -> DeltaTableWriter {
1919
max_retries: Some(0),
2020
threads: Some(threads),
2121
object_store_config: Default::default(),
22+
checkpoint_interval: None,
2223
};
2324
let key_schema = Some(BenchKeyStruct::relation_schema());
2425
let mut value_schema = BenchTestStruct::relation_schema();

crates/adapters/src/integrated/delta_table/output.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use dbsp::circuit::tokio::TOKIO;
1616
use delta_kernel::engine::arrow_conversion::TryFromArrow;
1717
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
1818
use deltalake::DeltaTable;
19-
use deltalake::kernel::transaction::{CommitBuilder, TableReference};
19+
use deltalake::kernel::transaction::{CommitBuilder, CommitProperties, TableReference};
2020
use deltalake::kernel::{Action, Add, DataType, StructField};
2121
use deltalake::logstore::ObjectStoreRef;
2222
use deltalake::operations::create::CreateBuilder;
@@ -281,11 +281,20 @@ impl WriterTask {
281281
let mut operation_timeout: Duration = Duration::from_secs(60);
282282

283283
loop {
284+
let checkpoint_interval = match inner.config.checkpoint_interval {
285+
Some(0) => None,
286+
Some(interval) => Some(interval.to_string()),
287+
None => Some("10".to_string()),
288+
};
284289
let create_future = CreateBuilder::new()
285290
.with_location(inner.config.uri.clone())
286291
.with_save_mode(save_mode)
287292
.with_storage_options(storage_options.clone())
288-
.with_columns(inner.struct_fields.clone());
293+
.with_columns(inner.struct_fields.clone())
294+
.with_configuration_property(
295+
deltalake::TableProperty::CheckpointInterval,
296+
checkpoint_interval,
297+
);
289298

290299
match tokio::time::timeout(operation_timeout, create_future).await {
291300
Ok(Ok(table)) => break table,
@@ -357,7 +366,11 @@ impl WriterTask {
357366
))
358367
})?;
359368

360-
CommitBuilder::default()
369+
// `CommitBuilder::default()` leaves `post_commit_hook` unset, so delta-rs skips the
370+
// post-commit hook entirely and never writes `_last_checkpoint` / `*.checkpoint.parquet`,
371+
// regardless of `delta.checkpointInterval`. Use default commit properties so checkpoint
372+
// creation runs when `(version + 1) % checkpoint_interval == 0`.
373+
CommitBuilder::from(CommitProperties::default())
361374
.with_actions(
362375
actions
363376
.iter()
@@ -988,6 +1001,7 @@ mod parallel {
9881001
max_retries: Some(0),
9891002
threads: Some(threads),
9901003
object_store_config: Default::default(),
1004+
checkpoint_interval: None,
9911005
},
9921006
&key_schema,
9931007
&value_relation(),
@@ -1268,6 +1282,7 @@ mod parallel {
12681282
max_retries: Some(0),
12691283
threads: Some(4),
12701284
object_store_config: Default::default(),
1285+
checkpoint_interval: None,
12711286
},
12721287
&key_schema,
12731288
&value_relation(),
@@ -1405,6 +1420,7 @@ mod parallel {
14051420
max_retries: Some(1),
14061421
threads: Some(1),
14071422
object_store_config: Default::default(),
1423+
checkpoint_interval: None,
14081424
},
14091425
&key_schema,
14101426
&value_relation(),
@@ -1433,6 +1449,7 @@ mod parallel {
14331449
max_retries: Some(0),
14341450
threads: Some(0),
14351451
object_store_config: Default::default(),
1452+
checkpoint_interval: None,
14361453
};
14371454
assert!(config.validate().is_err());
14381455
}

crates/adapters/src/integrated/delta_table/test.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -646,8 +646,14 @@ fn delta_table_output_test(
646646

647647
let mut output_records = Vec::with_capacity(data.len());
648648
for parquet_file in parquet_files {
649-
let mut records: Vec<DeltaTestStruct> = load_parquet_file(&parquet_file);
650-
output_records.append(&mut records);
649+
if !parquet_file
650+
.display()
651+
.to_string()
652+
.ends_with(".checkpoint.parquet")
653+
{
654+
let mut records: Vec<DeltaTestStruct> = load_parquet_file(&parquet_file);
655+
output_records.append(&mut records);
656+
}
651657
}
652658

653659
output_records.sort();

crates/feldera-types/src/transport/delta_table.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use utoipa::ToSchema;
99
#[derive(Default, Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
1010
pub enum DeltaTableWriteMode {
1111
/// New updates will be appended to the existing table at the target location.
12+
///
13+
/// If the table doesn't exist, it is created.
1214
#[default]
1315
#[serde(rename = "append")]
1416
Append,
@@ -17,6 +19,8 @@ pub enum DeltaTableWriteMode {
1719
///
1820
/// The connector truncates the table by outputting delete actions for all
1921
/// files in the latest snapshot of the table.
22+
///
23+
/// If the table doesn't exist, it is created.
2024
#[serde(rename = "truncate")]
2125
Truncate,
2226

@@ -35,6 +39,18 @@ pub struct DeltaTableWriterConfig {
3539
#[serde(default)]
3640
pub mode: DeltaTableWriteMode,
3741

42+
/// Checkpoint interval (i.e., the number of commits after which a new checkpoint should be created) for newly created Delta tables.
43+
///
44+
/// The option is only available when creating the Delta table (`mode = append` and there
45+
/// is no existing table at the target location or `mode = truncate`). It configures the `checkpointInterval`
46+
/// table property, which determines the number of commits after which a new checkpoint should be created.
47+
///
48+
/// 0 means no checkpoints are created.
49+
///
50+
/// Default: 10.
51+
#[serde(default, skip_serializing_if = "Option::is_none")]
52+
pub checkpoint_interval: Option<u32>,
53+
3854
/// Maximum number of retries for failed operations.
3955
///
4056
/// The connector performs retries on several levels: individual S3 operations, Delta Lake transaction commits,

docs.feldera.com/docs/connectors/sinks/delta.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,10 @@ MERGE INTO {target_table} AS target
7373
|------------|------------|
7474
| `uri`* | Table URI, e.g., `"s3://feldera-fraud-detection-data/feature_train"`. |
7575
| `mode`* | Determines how the Delta table connector handles an existing table at the target location. Options: |
76-
| | - `append`: New updates will be appended to the existing table at the target location. |
76+
| | - `append`: New updates will be appended to the existing table at the target location. If the table doesn't exist, it will be created. |
7777
| | - `truncate`: Existing table at the specified location will be truncated. The connector achieves this by outputting delete actions for all files in the latest snapshot of the table. |
7878
| | - `error_if_exists`: If a table exists at the specified location, the operation will fail. |
79+
| `checkpoint_interval` | <p>Checkpoint interval (i.e., the number of commits after which a new checkpoint should be created) for newly created Delta tables.</p><p>The option is only available when creating the Delta table (`mode = append` and there is no existing table at the target location or `mode = truncate`). It configures the `checkpointInterval` table property, which determines the number of commits after which a new checkpoint should be created.</p><p>0 means no checkpoints are created.</p><p>Default: 10.</p>|
7980
| `max_retries`|<p>Maximum number of retries for failed Delta Lake operations like writing Parquet files and committing transactions.</p><p>The connector performs retries on several levels: individual S3 operations, Delta Lake transaction commits, and overall operation retries. This setting controls the overall operation retries. When a write to the table fails, because of an S3 timeout or any other reason that was not resolved by lower-level retries, the connector will retry the entire operation.</p><p>When not specified, the connector performs infinite retries. When set to 0, the connector doesn't retry failed operations.</p>|
8081
| `threads` | Number of parallel threads used by the connector. Increasing this value can improve Delta Lake write throughput by enabling concurrent writes. Default: `1`. |
8182

openapi.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7997,6 +7997,13 @@
79977997
"uri"
79987998
],
79997999
"properties": {
8000+
"checkpoint_interval": {
8001+
"type": "integer",
8002+
"format": "int32",
8003+
"description": "Checkpoint interval (i.e., the number of commits after which a new checkpoint should be created) for newly created Delta tables.\n\nThe option is only available when creating the Delta table (`mode = append` and there\nis no existing table at the target location or `mode = truncate`). It configures the `checkpointInterval`\ntable property, which determines the number of commits after which a new checkpoint should be created.\n\n0 means no checkpoints are created.\n\nDefault: 10.",
8004+
"nullable": true,
8005+
"minimum": 0
8006+
},
80008007
"max_retries": {
80018008
"type": "integer",
80028009
"format": "int32",

0 commit comments

Comments
 (0)