Skip to content

Commit abdd71b

Browse files
committed
[adapters] Add verbose config setting for delta.
The new `verbose` flag can be used to log detailed progress at the INFO level for a specific connector. Currently we only log add/delete actions in follow and cdc modes. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 16c8def commit abdd71b

3 files changed

Lines changed: 43 additions & 1 deletion

File tree

crates/adapters/src/integrated/delta_table/input.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1625,6 +1625,31 @@ impl DeltaTableInputEndpointInner {
16251625
input_stream: &mut dyn ArrowStream,
16261626
receiver: &mut Receiver<PipelineState>,
16271627
) {
1628+
if self.config.verbose > 0 {
1629+
// Don't log actions we ignore to limit spurious logging. E.g., delta lake
1630+
// optimization passes can generate thousand of noop actions.
1631+
let data_change_actions = actions
1632+
.iter()
1633+
.filter(|action| match action {
1634+
Action::Add(add) if add.data_change => true,
1635+
Action::Remove(remove) if remove.data_change => true,
1636+
_ => false,
1637+
})
1638+
.collect::<Vec<_>>();
1639+
info!(
1640+
"delta_table {}: log entry for table version {new_version}: {data_change_actions:?}{}",
1641+
&self.endpoint_name,
1642+
if actions.len() > data_change_actions.len() {
1643+
format!(
1644+
" ({} other actions)",
1645+
actions.len() - data_change_actions.len()
1646+
)
1647+
} else {
1648+
"".to_string()
1649+
}
1650+
);
1651+
}
1652+
16281653
// Use the time when we _started_ reading transaction data as the ingestion timestamp.
16291654
let timestamp = Utc::now();
16301655

crates/feldera-types/src/transport/delta_table.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,17 @@ pub struct DeltaTableReaderConfig {
296296
/// The default value is 6.
297297
pub max_concurrent_readers: Option<u32>,
298298

299+
/// Enable verbose logging.
300+
///
301+
/// When enabled, the connector will log detailed information at INFO level.
302+
///
303+
/// Supported values:
304+
/// * 0 - no verbose logging
305+
/// * 1 - log all Delta log entries in follow and cdc modes.
306+
/// * >1 - reserved for future use
307+
#[serde(default)]
308+
pub verbose: u32,
309+
299310
/// Storage options for configuring backend object store.
300311
///
301312
/// For specific options available for different storage backends, see:
@@ -324,7 +335,7 @@ fn test_delta_reader_config_serde() {
324335
let config = serde_json::from_str::<DeltaTableReaderConfig>(config_str).unwrap();
325336

326337
let serialized_config = serde_json::to_string(&config).unwrap();
327-
let expected = r#"{"uri":"protocol:/path/to/somewhere","mode":"follow","transaction_mode":"none","timestamp_column":"ts","filter":null,"skip_unused_columns":false,"snapshot_filter":"ts BETWEEN '2005-01-01 00:00:00' AND '2010-12-31 23:59:59'","version":null,"datetime":"2010-12-31 00:00:00Z","end_version":null,"cdc_delete_filter":null,"cdc_order_by":null,"num_parsers":4,"max_concurrent_readers":null,"customoption1":"val1","customoption2":"val2"}"#;
338+
let expected = r#"{"uri":"protocol:/path/to/somewhere","mode":"follow","transaction_mode":"none","timestamp_column":"ts","filter":null,"skip_unused_columns":false,"snapshot_filter":"ts BETWEEN '2005-01-01 00:00:00' AND '2010-12-31 23:59:59'","version":null,"datetime":"2010-12-31 00:00:00Z","end_version":null,"cdc_delete_filter":null,"cdc_order_by":null,"num_parsers":4,"max_concurrent_readers":null,"customoption1":"val1","customoption2":"val2","verbose":0}"#;
328339
assert_eq!(
329340
serde_json::from_str::<serde_json::Value>(&serialized_config).unwrap(),
330341
serde_json::from_str::<serde_json::Value>(expected).unwrap()

openapi.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7431,6 +7431,12 @@
74317431
"type": "string",
74327432
"description": "Table URI.\n\nExample: \"s3://feldera-fraud-detection-data/demographics_train\""
74337433
},
7434+
"verbose": {
7435+
"type": "integer",
7436+
"format": "int32",
7437+
"description": "Enable verbose logging.\n\nWhen enabled, the connector will log detailed information at INFO level.\n\nSupported values:\n* 0 - no verbose logging\n* 1 - log all Delta log entries in follow and cdc modes.\n* >1 - reserved for future use",
7438+
"minimum": 0
7439+
},
74347440
"version": {
74357441
"type": "integer",
74367442
"format": "int64",

0 commit comments

Comments
 (0)