diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7165ec1a008..466724291ca 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -56,11 +56,6 @@ jobs: invoke-build-docker: name: Build Docker needs: [invoke-build-rust, invoke-build-java, invoke-tests-web-console-unit] - if: | - always() && - (needs.invoke-build-rust.result == 'success' || needs.invoke-build-rust.result == 'skipped') && - (needs.invoke-build-java.result == 'success' || needs.invoke-build-java.result == 'skipped') && - (needs.invoke-tests-web-console-unit.result == 'success' || needs.invoke-tests-web-console-unit.result == 'skipped') uses: ./.github/workflows/build-docker.yml secrets: inherit diff --git a/Cargo.lock b/Cargo.lock index 4a43b6b1e66..b4ca54aa7f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3750,7 +3750,7 @@ dependencies = [ [[package]] name = "dbsp" -version = "0.286.0" +version = "0.287.0" dependencies = [ "anyhow", "arc-swap", @@ -3838,7 +3838,7 @@ dependencies = [ [[package]] name = "dbsp_adapters" -version = "0.286.0" +version = "0.287.0" dependencies = [ "actix", "actix-codec", @@ -3975,7 +3975,7 @@ dependencies = [ [[package]] name = "dbsp_nexmark" -version = "0.286.0" +version = "0.287.0" dependencies = [ "anyhow", "ascii_table", @@ -4850,7 +4850,7 @@ dependencies = [ [[package]] name = "fda" -version = "0.286.0" +version = "0.287.0" dependencies = [ "anyhow", "arrow", @@ -4902,7 +4902,7 @@ dependencies = [ [[package]] name = "feldera-adapterlib" -version = "0.286.0" +version = "0.287.0" dependencies = [ "actix-web", "anyhow", @@ -4933,7 +4933,7 @@ dependencies = [ [[package]] name = "feldera-buffer-cache" -version = "0.286.0" +version = "0.287.0" dependencies = [ "crossbeam-utils", "enum-map", @@ -4961,7 +4961,7 @@ dependencies = [ [[package]] name = "feldera-datagen" -version = "0.286.0" +version = "0.287.0" dependencies = [ "anyhow", "async-channel 2.5.0", @@ -4987,7 +4987,7 @@ dependencies = [ [[package]] name = "feldera-fxp" -version = "0.286.0" +version = "0.287.0" dependencies = [ "bytecheck", "dbsp", @@ -5007,7 +5007,7 @@ dependencies = [ [[package]] name = "feldera-iceberg" -version = "0.286.0" +version = "0.287.0" dependencies = [ "anyhow", "chrono", @@ -5027,7 +5027,7 @@ dependencies = [ [[package]] name = "feldera-ir" -version = "0.286.0" +version = "0.287.0" dependencies = [ "proptest", "proptest-derive", @@ -5039,7 +5039,7 @@ dependencies = [ [[package]] name = "feldera-macros" -version = "0.286.0" +version = "0.287.0" dependencies = [ "prettyplease", "proc-macro2", @@ -5049,7 +5049,7 @@ dependencies = [ [[package]] name = "feldera-observability" -version = "0.286.0" +version = "0.287.0" dependencies = [ "actix-http", "awc", @@ -5064,7 +5064,7 @@ dependencies = [ [[package]] name = "feldera-rest-api" -version = "0.286.0" +version = "0.287.0" dependencies = [ "chrono", "feldera-observability", @@ -5098,7 +5098,7 @@ dependencies = [ [[package]] name = "feldera-sqllib" -version = "0.286.0" +version = "0.287.0" dependencies = [ "arcstr", "base58", @@ -5141,7 +5141,7 @@ dependencies = [ [[package]] name = "feldera-storage" -version = "0.286.0" +version = "0.287.0" dependencies = [ "anyhow", "crossbeam", @@ -5164,7 +5164,7 @@ dependencies = [ [[package]] name = "feldera-types" -version = "0.286.0" +version = "0.287.0" dependencies = [ "actix-web", "anyhow", @@ -8074,7 +8074,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeline-manager" -version = "0.286.0" +version = "0.287.0" dependencies = [ "actix-cors", "actix-files", @@ -9168,7 +9168,7 @@ dependencies = [ [[package]] name = "readers" -version = "0.286.0" +version = "0.287.0" dependencies = [ "async-std", "csv", @@ -10744,7 +10744,7 @@ dependencies = [ [[package]] name = "sltsqlvalue" -version = "0.286.0" +version = "0.287.0" dependencies = [ "dbsp", "feldera-sqllib", @@ -11047,7 +11047,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "storage-test-compat" -version = "0.286.0" +version = "0.287.0" dependencies = [ "dbsp", "derive_more 1.0.0", diff --git a/Cargo.toml b/Cargo.toml index 233ee490bf4..6ba7adb024a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace.package] authors = ["Feldera Team "] -version = "0.286.0" +version = "0.287.0" license = "MIT OR Apache-2.0" homepage = "https://github.com/feldera/feldera" repository = "https://github.com/feldera/feldera" @@ -102,7 +102,7 @@ csv = "1.2.2" csv-core = "0.1.10" dashmap = "6.1.0" datafusion = "51.0" -dbsp = { path = "crates/dbsp", version = "0.286.0" } +dbsp = { path = "crates/dbsp", version = "0.287.0" } dbsp_nexmark = { path = "crates/nexmark" } deadpool-postgres = "0.14.1" #deltalake = "0.30.2" @@ -122,19 +122,19 @@ erased-serde = "0.3.31" fake = "2.10" fastbloom = "0.14.0" fdlimit = "0.3.0" -feldera-buffer-cache = { version = "0.286.0", path = "crates/buffer-cache" } +feldera-buffer-cache = { version = "0.287.0", path = "crates/buffer-cache" } feldera-cloud1-client = "0.1.2" feldera-datagen = { path = "crates/datagen" } -feldera-fxp = { version = "0.286.0", path = "crates/fxp", features = ["dbsp"] } +feldera-fxp = { version = "0.287.0", path = "crates/fxp", features = ["dbsp"] } feldera-iceberg = { path = "crates/iceberg" } -feldera-observability = { version = "0.286.0", path = "crates/feldera-observability" } -feldera-macros = { version = "0.286.0", path = "crates/feldera-macros" } -feldera-sqllib = { version = "0.286.0", path = "crates/sqllib" } -feldera-storage = { version = "0.286.0", path = "crates/storage" } -feldera-types = { version = "0.286.0", path = "crates/feldera-types" } -feldera-rest-api = { version = "0.286.0", path = "crates/rest-api" } -feldera-ir = { version = "0.286.0", path = "crates/ir" } -feldera-adapterlib = { version = "0.286.0", path = "crates/adapterlib" } +feldera-observability = { version = "0.287.0", path = "crates/feldera-observability" } +feldera-macros = { version = "0.287.0", path = "crates/feldera-macros" } +feldera-sqllib = { version = "0.287.0", path = "crates/sqllib" } +feldera-storage = { version = "0.287.0", path = "crates/storage" } +feldera-types = { version = "0.287.0", path = "crates/feldera-types" } +feldera-rest-api = { version = "0.287.0", path = "crates/rest-api" } +feldera-ir = { version = "0.287.0", path = "crates/ir" } +feldera-adapterlib = { version = "0.287.0", path = "crates/adapterlib" } flate2 = "1.1.0" form_urlencoded = "1.2.0" futures = "0.3.30" diff --git a/crates/adapterlib/src/transport.rs b/crates/adapterlib/src/transport.rs index d2cc203a96a..1c357dd6ad5 100644 --- a/crates/adapterlib/src/transport.rs +++ b/crates/adapterlib/src/transport.rs @@ -1,6 +1,7 @@ use anyhow::{Error as AnyError, Result as AnyResult}; use chrono::{DateTime, Utc}; use dyn_clone::DynClone; +use feldera_types::adapter_stats::ConnectorHealth; use feldera_types::config::FtModel; use feldera_types::program_schema::Relation; use rmpv::{Value as RmpValue, ext::Error as RmpDecodeError}; @@ -773,6 +774,9 @@ pub trait InputConsumer: Send + Sync + DynClone { /// Optional tag that can be used for additional context /// e.g. for rate limiting fn error(&self, fatal: bool, error: AnyError, tag: Option<&'static str>); + + /// Updates the health status of the connector. + fn update_connector_health(&self, health: ConnectorHealth); } /// Information needed to restart after or replay input. diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 33575fec75b..fc6e4399357 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -7096,6 +7096,11 @@ impl InputConsumer for InputProbe { .status .set_custom_metrics(self.endpoint_id, metrics); } + + fn update_connector_health(&self, health: ConnectorHealth) { + self.controller + .update_input_connector_health(self.endpoint_id, health); + } } /// An output probe inserted between the encoder and the output transport diff --git a/crates/adapters/src/integrated/delta_table/input.rs b/crates/adapters/src/integrated/delta_table/input.rs index 0d276f6fe24..d5132ed7c24 100644 --- a/crates/adapters/src/integrated/delta_table/input.rs +++ b/crates/adapters/src/integrated/delta_table/input.rs @@ -33,14 +33,18 @@ use feldera_adapterlib::utils::datafusion::{ validate_sql_expression, validate_timestamp_column, }; use feldera_storage::tokio::TOKIO_DEDICATED_IO; +use feldera_types::adapter_stats::ConnectorHealth; use feldera_types::config::FtModel; use feldera_types::program_schema::Relation; use feldera_types::transport::delta_table::{DeltaTableReaderConfig, DeltaTableTransactionMode}; use futures_util::StreamExt; +use rand::Rng; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::cmp::min; use std::collections::{BTreeSet, HashMap}; +use std::fmt::Display; +use std::future::Future; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; @@ -56,12 +60,18 @@ use url::Url; const POLL_INTERVAL: Duration = Duration::from_millis(1000); /// Calculate exponential backoff delay for retrying delta log reads. -/// Starts at 0.5s, doubles each retry, caps at 32s. +/// Starts at 0.5s, doubles each retry, caps at 32s, plus uniform jitter up to 25% of that delay +/// (capped at `max_delay_ms`) to reduce synchronized retries. fn calculate_backoff_delay(retry_count: u32) -> Duration { - let base_delay_ms = 500; // 0.5 seconds - let max_delay_ms = 32_000; // 32 seconds - let delay_ms = std::cmp::min(base_delay_ms << retry_count, max_delay_ms); - Duration::from_millis(delay_ms) + let base_delay_ms: u64 = 500; // 0.5 seconds + let max_delay_ms: u64 = 32_000; // 32 seconds + let delay_ms = min( + base_delay_ms.checked_shl(retry_count).unwrap_or(u64::MAX), + max_delay_ms, + ); + let jitter_span = (delay_ms / 4).max(1); + let jitter_ms = rand::thread_rng().gen_range(0..jitter_span); + Duration::from_millis(min(delay_ms + jitter_ms, max_delay_ms)) } /// Default object store timeout. When not explicitly set by the user, @@ -83,7 +93,7 @@ static DELTA_READER_SEMAPHORE: std::sync::LazyLock = /// Used to detect conflicting values of `max_concurrent_readers`. static MAX_CONCURRENT_READERS_SET: AtomicBool = AtomicBool::new(false); -/// Takes a column name from a DeltaLake schema and returns a qouted string +/// Takes a column name from a DeltaLake schema and returns a quoted string /// that can be used in datafusion queries like `select "foo""bar" from my_table`. fn quote_sql_identifier>(ident: S) -> String { format!("\"{}\"", ident.as_ref().replace("\"", "\"\"")) @@ -274,9 +284,11 @@ impl DeltaTableInputReader { // status for the frontier will be set to the current time instead of whenever the table version in resume_info // was actually processed. The right solution is to checkpoint the frontier with the connector. if resume_info.is_some() { - endpoint - .queue - .push_with_aux((None, Vec::new()), Utc::now(), resume_info); + endpoint.queue.push_with_aux( + (None, Vec::new()), + Utc::now(), + QueueEntry::ResumeInfo(resume_info), + ); } if eoi { @@ -347,18 +359,34 @@ impl InputReader for DeltaTableInputReader { checkpoint_requested, } => { // When initiating a checkpoint, try to stop at a delta table transaction boundary. - let stop_at: &dyn Fn(&Option) -> bool = if checkpoint_requested { - &|resume_info: &Option| resume_info.is_some() + let stop_at: &dyn Fn(&QueueEntry) -> bool = if checkpoint_requested { + &|entry: &QueueEntry| { + matches!( + entry, + QueueEntry::ResumeInfo(Some(_)) | QueueEntry::Rollback + ) + } } else { - &|_: &Option| false + &|_: &QueueEntry| false }; let (total, _, resume_info) = self.inner.queue.flush_with_aux_until(stop_at); - let resume_status = resume_info - .last() - .map(|(_ts, resume_info)| resume_info.clone()) - .unwrap_or_else(|| self.inner.last_resume_status.lock().unwrap().clone()); + let resume_status = match resume_info.last() { + None => self.inner.last_resume_status.lock().unwrap().clone(), + Some((_ts, QueueEntry::ResumeInfo(resume_info))) => resume_info.clone(), + Some((_ts, QueueEntry::Rollback)) => Some( + self.inner + .last_checkpointable_status + .lock() + .unwrap() + .clone(), + ), + }; + *self.inner.last_resume_status.lock().unwrap() = resume_status.clone(); + if let Some(resume_status) = &resume_status { + *self.inner.last_checkpointable_status.lock().unwrap() = resume_status.clone(); + } let resume = match resume_status { None => Resume::Barrier, @@ -371,11 +399,12 @@ impl InputReader for DeltaTableInputReader { Some(resume), resume_info .into_iter() - .map(|(timestamp, metadata)| { - Watermark::new( + .filter_map(|(timestamp, metadata)| match metadata { + QueueEntry::ResumeInfo(resume_info) => Some(Watermark::new( timestamp, - metadata.map(|m| serde_json::to_value(m).unwrap()), - ) + resume_info.map(|m| serde_json::to_value(m).unwrap()), + )), + QueueEntry::Rollback => None, }) .collect(), ); @@ -550,10 +579,28 @@ struct DeltaTableInputEndpointInner { /// * Updated to `Some(new_version)` after advancing to the next table version in the transaction log /// in follow mode or after ingesting the initial snapshot. last_resume_status: Mutex>, - queue: Arc, StagedInputBuffer>>, + + /// The latest checkpointable status of this endpoint. + last_checkpointable_status: Mutex, + + queue: Arc>, metrics: Arc, } +#[derive(Debug, Clone)] +enum QueueEntry { + /// Resume info for the connector after processing this queue entry. + ResumeInfo(Option), + + /// Sent after failing to read a delta log entry, before retrying or + /// declaring a fatal error. Makes sure that the connector can be checkpointed + /// between retries. + /// + /// Note: this is not the actual transaction rollback: the current transaction, + /// if any, will be committed. + Rollback, +} + impl DeltaTableInputEndpointInner { fn new( endpoint_name: &str, @@ -573,6 +620,8 @@ impl DeltaTableInputEndpointInner { let metrics = Arc::new(DeltaTableMetrics::new()); consumer.set_custom_metrics(Arc::clone(&metrics) as Arc); + let resume_status = resume_info.unwrap_or_else(DeltaResumeInfo::initial); + Self { endpoint_name: endpoint_name.to_string(), schema, @@ -582,9 +631,9 @@ impl DeltaTableInputEndpointInner { transaction_index: AtomicUsize::new(0), // Set version to None by default so that the connector is checkpointable in the initial state. - last_resume_status: Mutex::new(Some( - resume_info.unwrap_or_else(DeltaResumeInfo::initial), - )), + last_resume_status: Mutex::new(Some(resume_status.clone())), + + last_checkpointable_status: Mutex::new(resume_status), queue, metrics, } @@ -737,12 +786,16 @@ impl DeltaTableInputEndpointInner { /// Load the entire table snapshot as a single "select * where " query. /// Returns the total number of records processed. + /// + /// Fails with an error if the function fails to read the snapshot. This function + /// doesn't retry (the idea being that the snapshot can be large, and it's better to + /// fail fast and give the user a chance to restart the pipeline). async fn read_unordered_snapshot( &self, table: &DeltaTable, input_stream: &mut dyn ArrowStream, receiver: &mut Receiver, - ) -> usize { + ) -> AnyResult { let column_names = self.used_column_list(table); let mut snapshot_query = format!("select {column_names} from snapshot"); @@ -759,8 +812,14 @@ impl DeltaTableInputEndpointInner { let timestamp = Utc::now(); let record_count = self - .execute_snapshot_query(&snapshot_query, "initial snapshot", input_stream, receiver) - .await; + .execute_snapshot_query( + &snapshot_query, + "initial snapshot", + input_stream, + receiver, + self.config.max_retries(), + ) + .await?; self.metrics .snapshot_records_total .fetch_add(record_count as u64, Ordering::Relaxed); @@ -769,11 +828,11 @@ impl DeltaTableInputEndpointInner { self.queue.push_entry( InputQueueEntry::new_with_aux( timestamp, - Some(DeltaResumeInfo::follow_mode( + QueueEntry::ResumeInfo(Some(DeltaResumeInfo::follow_mode( // We verified that the table version is not None in the open_table method. table.version().unwrap(), !self.config.follow(), - )), + ))), ) // If we started a transaction while processing the snapshot, commit it now. .with_commit_transaction(true), @@ -787,38 +846,37 @@ impl DeltaTableInputEndpointInner { record_count, table.version().unwrap() ); - record_count + Ok(record_count) } /// Load the initial snapshot by issuing a sequence of queries for monotonically /// increasing timestamp ranges. /// Returns the total number of records processed. + /// + /// Fails with an error if the function fails to complete one of the range queries + /// after retrying `self.config.max_retries` times. async fn read_ordered_snapshot( &self, table: &DeltaTable, input_stream: &mut dyn ArrowStream, receiver: &mut Receiver, - ) -> usize { + ) -> AnyResult { // Use the time when we started reading the snapshot as the ingestion timestamp for the snapshot. let timestamp = Utc::now(); let total_records = self .read_ordered_snapshot_inner(table, input_stream, receiver) - .await - .unwrap_or_else(|e| { - self.consumer.error(true, e, None); - 0 - }); + .await?; // Empty buffer to indicate checkpointable state. self.queue.push_entry( InputQueueEntry::new_with_aux( timestamp, - Some(DeltaResumeInfo::follow_mode( + QueueEntry::ResumeInfo(Some(DeltaResumeInfo::follow_mode( // We verified that the table version is not None in the open_table method. table.version().unwrap(), !self.config.follow(), - )), + ))), ) // If we started a transaction while processing the snapshot, commit it now. .with_commit_transaction(true), @@ -831,7 +889,7 @@ impl DeltaTableInputEndpointInner { total_records, table.version().unwrap() ); - total_records + Ok(total_records) } async fn read_ordered_snapshot_inner( @@ -940,8 +998,14 @@ impl DeltaTableInputEndpointInner { } let range_record_count = self - .execute_snapshot_query(&range_query, "range", input_stream, receiver) - .await; + .execute_snapshot_query( + &range_query, + "range", + input_stream, + receiver, + self.config.max_retries(), + ) + .await?; self.metrics .snapshot_records_total .fetch_add(range_record_count as u64, Ordering::Relaxed); @@ -962,11 +1026,11 @@ impl DeltaTableInputEndpointInner { self.queue.push_entry( InputQueueEntry::new_with_aux( Utc::now(), - Some(DeltaResumeInfo::snapshot_mode( + QueueEntry::ResumeInfo(Some(DeltaResumeInfo::snapshot_mode( // We verified that the table version is not None in the open_table method. table.version().unwrap(), &start, - )), + ))), ) // If we started a transaction while processing the range query, commit it now. .with_commit_transaction(true), @@ -977,6 +1041,55 @@ impl DeltaTableInputEndpointInner { Ok(total_records) } + /// Runs `operation` until it succeeds or [`DeltaTableReaderConfig::max_retries`] is exhausted. + /// + /// On failure before the limit: sets connector health to unhealthy, logs a warning with + /// `description` as the message prefix, sleeps using [`calculate_backoff_delay`], then retries. + /// On final failure: updates health, invokes [`InputConsumer::error`], and returns `Err`. + async fn retry( + &self, + description: &str, + error_tag: Option<&'static str>, + mut operation: F, + ) -> Result + where + F: FnMut() -> Fut, + Fut: Future>, + E: Display, + { + let max_retries = self.config.max_retries(); + let mut retry_count = 0u32; + loop { + match operation().await { + Ok(value) => { + self.consumer + .update_connector_health(ConnectorHealth::healthy()); + return Ok(value); + } + Err(e) => { + retry_count += 1; + if retry_count - 1 == max_retries { + let message = format!("{description} after {retry_count} attempts: {e}"); + self.consumer + .update_connector_health(ConnectorHealth::unhealthy(&message)); + self.consumer + .error(true, anyhow!(message.clone()), error_tag); + return Err(anyhow!(message)); + } + let backoff_delay = calculate_backoff_delay(retry_count - 1); + let message = format!( + "{description} after {retry_count} attempts: {e}; retrying in {:?}", + backoff_delay + ); + self.consumer + .update_connector_health(ConnectorHealth::unhealthy(&message)); + warn!("delta_table {}: {message}", &self.endpoint_name); + sleep(backoff_delay).await; + } + } + } + } + async fn worker_task_inner( self: Arc, mut input_stream: Box, @@ -1032,19 +1145,28 @@ impl DeltaTableInputEndpointInner { }) ); - let mut snapshot_record_count = 0usize; - - if snapshot_incomplete && self.config.snapshot() && self.config.timestamp_column.is_none() { + let snapshot_record_count = if snapshot_incomplete + && self.config.snapshot() + && self.config.timestamp_column.is_none() + { // Read snapshot chunk-by-chunk. - snapshot_record_count = self - .read_unordered_snapshot(&table, input_stream.as_mut(), &mut receiver) - .await; + self.read_unordered_snapshot(&table, input_stream.as_mut(), &mut receiver) + .await } else if snapshot_incomplete && self.config.snapshot() { // Read the entire snapshot in one query. - snapshot_record_count = self - .read_ordered_snapshot(&table, input_stream.as_mut(), &mut receiver) - .await; - } + self.read_ordered_snapshot(&table, input_stream.as_mut(), &mut receiver) + .await + } else { + Ok(0) + }; + + let snapshot_record_count = match snapshot_record_count { + Ok(snapshot_record_count) => snapshot_record_count, + Err(e) => { + self.consumer.error(true, e, None); + return; + } + }; // Start following the table if required by the configuration. if self.config.follow() { @@ -1074,8 +1196,6 @@ impl DeltaTableInputEndpointInner { // Note: If self.config.snapshot() && !snapshot_incomplete, we're resuming from a checkpoint // where the snapshot was already completed, so no special log needed - let mut retry_count = 0; - // If we haven't previously read a snapshot of the table, report initial frontier. // This makes sure that even if the current version of the table is the final version, // we will report the frontier. @@ -1083,7 +1203,7 @@ impl DeltaTableInputEndpointInner { self.queue.push_with_aux( (None, Vec::new()), Utc::now(), - Some(DeltaResumeInfo::follow_mode(version, false)), + QueueEntry::ResumeInfo(Some(DeltaResumeInfo::follow_mode(version, false))), ); } @@ -1091,36 +1211,59 @@ impl DeltaTableInputEndpointInner { wait_running(&mut receiver).await; let new_version = version + 1; - match table.log_store().read_commit_entry(new_version).await { - Ok(None) => sleep(POLL_INTERVAL).await, - Ok(Some(bytes)) + let table_for_retry = Arc::clone(&table); + let entry = match self + .retry( + &format!( + "error reading the next log entry (current table version: {version})" + ), + Some("delta-next-log"), + move || { + let table = Arc::clone(&table_for_retry); + async move { table.log_store().read_commit_entry(new_version).await } + }, + ) + .await + { + Ok(entry) => entry, + Err(_) => break, + }; + + match entry { + None => sleep(POLL_INTERVAL).await, + Some(bytes) if self.config.end_version.is_none() || self.config.end_version.unwrap() >= new_version => { - retry_count = 0; - let actions = match logstore::get_actions(new_version, &bytes) { Ok(actions) => actions, Err(e) => { self.consumer.error( true, - anyhow!("error parsing log entry for table version {new_version}: {e}"), - Some("delta-parse-log") + anyhow!( + "error parsing log entry for table version {new_version}: {e}" + ), + None, ); break; } }; version = new_version; - self.process_log_entry( - new_version, - &actions, - &table, - cdc_delete_filter.clone(), - input_stream.as_mut(), - &mut receiver, - ) - .await; + if let Err(e) = self + .process_log_entry( + new_version, + &actions, + &table, + cdc_delete_filter.clone(), + input_stream.as_mut(), + &mut receiver, + ) + .await + { + self.consumer.error(true, e, None); + break; + }; if let Some(end_version) = self.config.end_version && end_version <= new_version @@ -1136,7 +1279,7 @@ impl DeltaTableInputEndpointInner { break; } } - Ok(Some(_bytes)) => { + Some(_bytes) => { info!( "delta_table {}: reached table version {new_version}, which is greater than the 'end_version' {} specified in connector config: stopping the connector", &self.endpoint_name, @@ -1151,31 +1294,11 @@ impl DeltaTableInputEndpointInner { self.queue.push_with_aux( (None, Vec::new()), Utc::now(), - Some(DeltaResumeInfo::eoi()), + QueueEntry::ResumeInfo(Some(DeltaResumeInfo::eoi())), ); break; } - Err(e) => { - // Transient timeouts are common when reading the next log entry from S3. - retry_count += 1; - - if retry_count == 20 { - self.consumer.error( - true, - anyhow!("error reading the next log entry after {retry_count} attempts (current table version: {version}): {e}"), - Some("delta-next-log") - ); - break; - } else { - let backoff_delay = calculate_backoff_delay(retry_count - 1); - warn!( - "delta_table {}: error reading the next log entry after {retry_count} attempts (current table version: {version}): {e}; retrying in {:?}", - &self.endpoint_name, backoff_delay - ); - sleep(backoff_delay).await; - } - } } } } else { @@ -1209,6 +1332,7 @@ impl DeltaTableInputEndpointInner { let delta_table: DeltaTable = { let mut retry_count = 0; + // We don't use config.max_retries here. Do we want unlimited retries opening the table? const MAX_RETRIES: u32 = 10; // We've seen the table builder get stuck forever in S3 authentication for some configurations @@ -1338,6 +1462,8 @@ impl DeltaTableInputEndpointInner { if !self.config.snapshot() { *self.last_resume_status.lock().unwrap() = Some(DeltaResumeInfo::follow_mode(version, false)); + *self.last_checkpointable_status.lock().unwrap() = + DeltaResumeInfo::follow_mode(version, false); } // Register object store with datafusion, so it will recognize individual parquet @@ -1582,13 +1708,17 @@ impl DeltaTableInputEndpointInner { /// Execute a SQL query to load a complete or partial snapshot of the DeltaTable. /// Returns the total number of records processed. + /// + /// Fails with an error if the function fails to read the snapshot after retrying + /// `num_retries` times. async fn execute_snapshot_query( &self, query: &str, descr: &str, input_stream: &mut dyn ArrowStream, receiver: &mut Receiver, - ) -> usize { + num_retries: u32, + ) -> Result { let descr = format!("{descr} query '{query}'"); debug!( "delta_table {}: retrieving data from the Delta table snapshot using {descr}", @@ -1602,9 +1732,7 @@ impl DeltaTableInputEndpointInner { let df = match self.datafusion.sql_with_options(query, options).await { Ok(df) => df, Err(e) => { - self.consumer - .error(true, anyhow!("error compiling query '{query}': {e}"), None); - return 0; + return Err(anyhow!("error compiling query '{query}': {e}")); } }; @@ -1616,6 +1744,8 @@ impl DeltaTableInputEndpointInner { input_stream, receiver, self.allocate_snapshot_transaction_label(), + num_retries, + None, ) .await } @@ -1633,7 +1763,19 @@ impl DeltaTableInputEndpointInner { /// /// * `transaction` - execute the dataframe as part of a transaction with the given label (is `Some`). /// + /// * `max_retries` - the maximum number of retries to attempt if the function fails to read the log entry. + /// /// Returns the total number of records processed. + /// + /// Returns an error if the function fails to read the log entry after performing the configured + /// number of retries. Note that errors parsing table records are not reported here; they are + /// reported by calling `consumer.error`. + /// + /// On error, the function commits the current transaction if any. It is possible that some of the + /// records have been processed and pushed to the circuit before the error. + /// + /// If `max_retries` is >0, the function can push duplicate inputs to the circuit as part of the + /// retry loop. #[allow(clippy::too_many_arguments)] async fn execute_df( &self, @@ -1644,21 +1786,89 @@ impl DeltaTableInputEndpointInner { input_stream: &mut dyn ArrowStream, receiver: &mut Receiver, transaction: Option>, - ) -> usize { + max_retries: u32, + current_table_version: Option, + ) -> Result { + let mut retry_count = 0; + loop { + match self + .execute_df_inner( + dataframe.clone(), + polarity, + cdc_delete_filter.clone(), + input_stream, + receiver, + &transaction, + ) + .await + { + Ok(total_records) => { + self.consumer + .update_connector_health(ConnectorHealth::healthy()); + return Ok(total_records); + } + Err(e) => { + retry_count += 1; + if retry_count - 1 == max_retries { + let message = format!( + "error retrieving {descr} after {retry_count} attempts{}: {e}", + if let Some(version) = current_table_version { + format!(" (current table version: {version})") + } else { + String::new() + } + ); + self.consumer + .update_connector_health(ConnectorHealth::unhealthy(&message)); + return Err(anyhow!(message)); + } + let backoff_delay = calculate_backoff_delay(retry_count - 1); + + let message = format!( + "error retrieving {descr} after {retry_count} attempts{}: {e}; retrying in {backoff_delay:?}", + if let Some(version) = current_table_version { + format!(" (current table version: {version})") + } else { + String::new() + } + ); + self.consumer + .update_connector_health(ConnectorHealth::unhealthy(&message)); + warn!("delta_table {}: {message}", &self.endpoint_name); + sleep(backoff_delay).await; + } + } + } + } + + // A single attempt of the `execute_df` retry loop. + #[allow(clippy::too_many_arguments)] + async fn execute_df_inner( + &self, + dataframe: DataFrame, + polarity: bool, + cdc_delete_filter: Option>, + input_stream: &mut dyn ArrowStream, + receiver: &mut Receiver, + transaction: &Option>, + ) -> Result { wait_running(receiver).await; + let transaction = transaction.clone(); // Limit the number of connectors simultaneously reading from Delta Lake. let _token = DELTA_READER_SEMAPHORE.acquire().await.unwrap(); let mut stream = match dataframe.execute_stream().await { Err(e) => { - self.consumer - .error(true, anyhow!("error retrieving {descr}: {e:?}"), None); - return 0; + return Err(format!("{e:?}")); } Ok(stream) => stream, }; + // We declare the connector healthy at this point. + self.consumer + .update_connector_health(ConnectorHealth::healthy()); + let mut num_batches = 0; let mut total_records = 0usize; @@ -1701,7 +1911,7 @@ impl DeltaTableInputEndpointInner { }, move |(buffer, errors, timestamp)| { queue.push_entry( - InputQueueEntry::new_with_aux(timestamp, None) + InputQueueEntry::new_with_aux(timestamp, QueueEntry::ResumeInfo(None)) .with_buffer(buffer) .with_start_transaction(transaction.clone()), errors, @@ -1717,13 +1927,18 @@ impl DeltaTableInputEndpointInner { let batch = match batch { Ok(batch) => batch, Err(e) => { - self.consumer.error( - false, - anyhow!("error retrieving batch {num_batches} of {descr}: {e:?}"), - Some("delta-batch"), + drop(job_queue); + // We don't have a way to rollback the transaction at this point. The best + // we can do is commit the transaction so it doesn't block the pipeline. + // This means that the connector will generate duplicate inputs on a retry. + self.queue.push_entry( + InputQueueEntry::new_with_aux(timestamp, QueueEntry::Rollback) + // If we started a transaction while processing the log entry, commit it now. + .with_commit_transaction(true), + Vec::new(), ); - continue; + return Err(format!("error retrieving batch {num_batches}: {e:?}")); } }; // info!("schema: {}", batch.schema()); @@ -1736,7 +1951,7 @@ impl DeltaTableInputEndpointInner { } job_queue.flush().await; - total_records + Ok(total_records) } async fn parse_record_batch( @@ -1786,6 +2001,16 @@ impl DeltaTableInputEndpointInner { /// Apply actions from a transaction log entry. /// /// Only `Add` and `Remove` actions are picked up. + /// + /// Returns an error if the connector failed to read the log entry after performing the `self.config.max_retries` + /// number of retries. Note that errors parsing table records are not reported here; they are + /// reported in the `execute_df` method by calling `consumer.error`. + /// + /// On error, the function commits the current transaction if any. It is possible that some of the + /// records have been processed and pushed to the circuit before the error. + /// + /// If `self.config.max_retries` is >0, the function can push duplicate inputs to the circuit as part of the + /// retry loop. async fn process_log_entry( &self, new_version: i64, @@ -1794,7 +2019,7 @@ impl DeltaTableInputEndpointInner { cdc_delete_filter: Option>, input_stream: &mut dyn ArrowStream, receiver: &mut Receiver, - ) { + ) -> AnyResult<()> { if self.config.verbose > 0 { // Don't log actions we ignore to limit spurious logging. E.g., delta lake // optimization passes can generate thousand of noop actions. @@ -1825,7 +2050,7 @@ impl DeltaTableInputEndpointInner { if self.config.is_cdc() { self.process_cdc_transaction(actions, table, cdc_delete_filter, input_stream, receiver) - .await; + .await?; } else { let column_names = self.used_column_list(table); @@ -1849,7 +2074,7 @@ impl DeltaTableInputEndpointInner { receiver, start_transaction.clone(), ) - .await; + .await?; } } @@ -1863,7 +2088,7 @@ impl DeltaTableInputEndpointInner { receiver, start_transaction.clone(), ) - .await; + .await?; } } } @@ -1872,15 +2097,17 @@ impl DeltaTableInputEndpointInner { self.queue.push_entry( InputQueueEntry::new_with_aux( timestamp, - Some(DeltaResumeInfo::follow_mode( + QueueEntry::ResumeInfo(Some(DeltaResumeInfo::follow_mode( new_version, self.config.end_version == Some(new_version), - )), + ))), ) // If we started a transaction while processing the log entry, commit it now. .with_commit_transaction(true), Vec::new(), ); + + Ok(()) } /// Process a DeltaLake transaction in CDC mode: @@ -1897,7 +2124,7 @@ impl DeltaTableInputEndpointInner { cdc_delete_filter: Option>, input_stream: &mut dyn ArrowStream, receiver: &mut Receiver, - ) { + ) -> AnyResult<()> { let result = self .do_process_cdc_transaction(actions, table, cdc_delete_filter, input_stream, receiver) .await; @@ -1906,9 +2133,7 @@ impl DeltaTableInputEndpointInner { // If the table does not exist, there's no harm. let _ = self.datafusion.deregister_table("tmp_table"); - if let Err(e) = result { - self.consumer.error(false, e, Some("delta-cdc")); - } + result } async fn do_process_cdc_transaction( @@ -1939,12 +2164,12 @@ impl DeltaTableInputEndpointInner { ); // Create a datafusion table backed by these files. - let table = Arc::new( + let parquet_table = Arc::new( self.create_parquet_table(table, files, &description) .await?, ); - self.datafusion.register_table("tmp_table", table).map_err(|e| { + self.datafusion.register_table("tmp_table", parquet_table).map_err(|e| { anyhow!("internal error processing {description}; {REPORT_ERROR}; error registering Parquet table: {e}") })?; @@ -1973,8 +2198,10 @@ impl DeltaTableInputEndpointInner { input_stream, receiver, self.allocate_follow_transaction_label(), + self.config.max_retries(), + table.version(), ) - .await; + .await?; Ok(()) } @@ -2013,7 +2240,7 @@ impl DeltaTableInputEndpointInner { input_stream: &mut dyn ArrowStream, receiver: &mut Receiver, start_transaction: Option>, - ) { + ) -> AnyResult<()> { let result = match action { Action::Add(add) if add.data_change => { self.add_with_polarity( @@ -2041,16 +2268,14 @@ impl DeltaTableInputEndpointInner { ) .await } - _ => return, + _ => return Ok(()), }; // Deregister the table registered by `add_with_polarity`. // If the table does not exist, there's no harm. let _ = self.datafusion.deregister_table("tmp_table"); - if let Err(e) = result { - self.consumer.error(false, e, Some("delta-action")); - } + result } // TODO: here, as well as in `process_cdc_transaction`, we can get some potential speedup by only reading a subset @@ -2074,12 +2299,12 @@ impl DeltaTableInputEndpointInner { let full_path = format!("{}{}", table.log_store().object_store_url().as_str(), path); // Create a datafusion table backed by these files. - let table = Arc::new( + let parquet_table = Arc::new( self.create_parquet_table(table, vec![full_path.clone()], &description) .await?, ); - self.datafusion.register_table("tmp_table", table).map_err(|e| { + self.datafusion.register_table("tmp_table", parquet_table).map_err(|e| { anyhow!("internal error processing file {full_path}; {REPORT_ERROR}; error registering Parquet table: {e}") })?; @@ -2104,8 +2329,10 @@ impl DeltaTableInputEndpointInner { input_stream, receiver, start_transaction, + self.config.max_retries(), + table.version(), ) - .await; + .await?; Ok(()) } diff --git a/crates/adapters/src/integrated/delta_table/test.rs b/crates/adapters/src/integrated/delta_table/test.rs index b221b7def1a..ef23136429a 100644 --- a/crates/adapters/src/integrated/delta_table/test.rs +++ b/crates/adapters/src/integrated/delta_table/test.rs @@ -45,8 +45,8 @@ use std::fs::File; use std::io::Write; use std::mem::forget; use std::os::unix::ffi::OsStrExt; -use std::path::Path; -use std::sync::Arc; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tempfile::{NamedTempFile, TempDir}; use tokio::sync::mpsc; @@ -108,6 +108,7 @@ async fn wait_for_output_records( expected_output: &[T], datafusion: &SessionContext, timeout_ms: u64, + dedup: bool, ) where T: for<'a> DeserializeWithContext<'a, SqlSerdeConfig, Variant> + DBData, { @@ -144,8 +145,12 @@ async fn wait_for_output_records( result.len() ); + result.sort(); + if dedup { + result.dedup(); + } + if result.len() == expected_output.len() { - result.sort(); let mut expected_output = expected_output.to_vec(); expected_output.sort(); assert_eq!(result, expected_output); @@ -713,6 +718,8 @@ async fn test_follow( test_end_version: bool, buffer_size: u64, buffer_timeout_ms: u64, + inject_failure: Option>, + clear_failure: Option>, ) { async fn suspend_pipeline(pipeline: Controller) { println!("start suspend"); @@ -897,6 +904,7 @@ async fn test_follow( &expected_output, &datafusion, 20_000, + false, ) .await; @@ -948,6 +956,58 @@ async fn test_follow( .collect::>(); }; + // Run after the write so the test process can still update the table; the pipeline + // then fails to read the new snapshot until permissions are restored. + if let Some(inject_failure) = &inject_failure { + inject_failure(); + } + + if inject_failure.is_some() { + wait( + || { + pipeline + .input_endpoint_status("test_input1") + .ok() + .and_then(|s| s.health) + .is_some_and(|h| { + let unhealthy = matches!( + h.status, + feldera_types::adapter_stats::ConnectorHealthStatus::Unhealthy + ); + if unhealthy { + println!("unhealthy: {:?}", h); + } + unhealthy + }) + }, + 20_000, + ) + .expect("timeout waiting for input connector health to become unhealthy"); + } + + if let Some(clear_failure) = &clear_failure { + clear_failure(); + } + + if clear_failure.is_some() { + wait( + || { + pipeline + .input_endpoint_status("test_input1") + .ok() + .and_then(|s| s.health) + .is_some_and(|h| { + matches!( + h.status, + feldera_types::adapter_stats::ConnectorHealthStatus::Healthy + ) + }) + }, + 20_000, + ) + .expect("timeout waiting for input connector health to become healthy"); + } + if suspend { suspend_pipeline(pipeline).await; @@ -995,6 +1055,7 @@ async fn test_follow( &expected_output, &datafusion, if suspend { 200_000 } else { 0 }, + inject_failure.is_some(), ) .await; } @@ -1319,6 +1380,48 @@ fn delta_data(max_records: usize) -> impl Strategy> }) } +/// Remove owner read and execute on the delta table **root directory only**, and push that path +/// and its original mode onto `saved` for [`restore_delta_input_table_read_permission`]. +/// +/// Without `r` and `x` on the root, the process cannot traverse into `_delta_log` or data paths +/// even if inner files still have permissive modes. +#[cfg(unix)] +fn strip_delta_input_table_read_permission( + table_root: &Path, + saved: &mut Vec<(PathBuf, u32)>, +) -> std::io::Result<()> { + use std::fs; + use std::os::unix::fs::PermissionsExt; + + let meta = fs::metadata(table_root)?; + if !meta.is_dir() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "delta input table path must be a directory", + )); + } + + let mode = meta.permissions().mode(); + saved.push((table_root.to_path_buf(), mode)); + + let new_mode = mode & !0o500; + let mut perms = meta.permissions(); + perms.set_mode(new_mode); + fs::set_permissions(table_root, perms)?; + Ok(()) +} + +#[cfg(unix)] +fn restore_delta_input_table_read_permission(saved: Vec<(PathBuf, u32)>) -> std::io::Result<()> { + use std::fs; + use std::os::unix::fs::PermissionsExt; + + for (path, mode) in saved.into_iter().rev() { + fs::set_permissions(&path, fs::Permissions::from_mode(mode))?; + } + Ok(()) +} + async fn delta_table_follow_file_test_common( snapshot: bool, transaction_mode: DeltaTableTransactionMode, @@ -1338,6 +1441,45 @@ async fn delta_table_follow_file_test_common( let output_table_dir: TempDir = TempDir::new().unwrap(); let output_table_uri = output_table_dir.path().display().to_string(); + // With `end_version`, the connector stops tailing the log before new versions appear, so + // stripping read permission would not drive the connector unhealthy (wait would time out). + #[cfg(unix)] + let (inject_failure, clear_failure): (Option>, Option>) = + if end_version { + (None, None) + } else { + let saved_modes: Arc>> = Arc::new(Mutex::new(Vec::new())); + let input_root = input_table_dir.path().to_path_buf(); + + let inject_failure: Box = { + let saved_modes = Arc::clone(&saved_modes); + let input_root = input_root.clone(); + Box::new(move || { + let mut guard = saved_modes.lock().unwrap(); + guard.clear(); + strip_delta_input_table_read_permission(&input_root, &mut *guard) + .unwrap_or_else(|e| { + panic!("inject_failure (strip read permission on input table): {e}") + }); + }) + }; + + let clear_failure: Box = { + let saved_modes = Arc::clone(&saved_modes); + Box::new(move || { + let entries = std::mem::take(&mut *saved_modes.lock().unwrap()); + restore_delta_input_table_read_permission(entries).unwrap_or_else(|e| { + panic!("clear_failure (restore read permission on input table): {e}") + }); + }) + }; + + (Some(inject_failure), Some(clear_failure)) + }; + + #[cfg(not(unix))] + let (inject_failure, clear_failure) = (None, None); + test_follow( &relation_schema, &input_table_uri, @@ -1350,6 +1492,8 @@ async fn delta_table_follow_file_test_common( end_version, 1000, 100, + inject_failure, + clear_failure, ) .await; } @@ -1565,6 +1709,8 @@ async fn delta_table_follow_s3_test_common(snapshot: bool, suspend: bool) { false, 1000, 100, + None, + None, ) .await; } diff --git a/crates/adapters/src/server.rs b/crates/adapters/src/server.rs index 58d886153c8..5c16cd35717 100644 --- a/crates/adapters/src/server.rs +++ b/crates/adapters/src/server.rs @@ -1226,6 +1226,7 @@ where .service(start_input_endpoint) .service(input_endpoint_status) .service(output_endpoint_status) + .service(reset_output_endpoint) .service(rebalance) .service(coordination_activate_handler) .service(coordination_status) @@ -2290,6 +2291,13 @@ async fn output_endpoint_status( Ok(HttpResponse::Ok().json(state.controller()?.output_endpoint_status(&path)?)) } +#[post("/output_endpoints/{endpoint_name}/reset")] +async fn reset_output_endpoint(path: web::Path) -> Result { + Err(PipelineError::from(ControllerError::not_supported( + &format!("output endpoint '{}' does not support reset", path.as_str()), + ))) +} + /// This service journals the paused state, but it does not wait for the journal /// record to commit before it returns success, so there is a small race. #[get("/input_endpoints/{endpoint_name}/start")] diff --git a/crates/adapters/src/test/mock_input_consumer.rs b/crates/adapters/src/test/mock_input_consumer.rs index 3bf228bcc20..ea0061314f9 100644 --- a/crates/adapters/src/test/mock_input_consumer.rs +++ b/crates/adapters/src/test/mock_input_consumer.rs @@ -6,6 +6,7 @@ use dbsp::operator::StagedBuffers; use feldera_adapterlib::ConnectorMetadata; use feldera_adapterlib::format::BufferSize; use feldera_adapterlib::transport::{Resume, Watermark}; +use feldera_types::adapter_stats::ConnectorHealth; use feldera_types::config::FtModel; use std::sync::{Arc, Mutex, MutexGuard}; @@ -129,6 +130,8 @@ impl InputConsumer for MockInputConsumer { fn commit_transaction(&self) { self.state().transaction_in_progress = false; } + + fn update_connector_health(&self, _health: ConnectorHealth) {} } pub struct MockInputParserState { diff --git a/crates/adapters/src/transport/kafka/ft/test.rs b/crates/adapters/src/transport/kafka/ft/test.rs index 5ff6bf58e06..79ea0c38559 100644 --- a/crates/adapters/src/transport/kafka/ft/test.rs +++ b/crates/adapters/src/transport/kafka/ft/test.rs @@ -25,6 +25,7 @@ use feldera_adapterlib::format::{BufferSize, flatten_nested}; use feldera_adapterlib::transport::{Resume, Watermark}; use feldera_macros::IsNone; use feldera_sqllib::{ByteArray, SqlString, Variant}; +use feldera_types::adapter_stats::ConnectorHealth; use feldera_types::config::{ ConnectorConfig, FormatConfig, FtModel, InputEndpointConfig, OutputBufferConfig, TransportConfig, default_max_queued_records, @@ -763,6 +764,8 @@ impl InputConsumer for DummyInputConsumer { fn start_transaction(&self, _label: Option<&str>) {} fn commit_transaction(&self) {} + + fn update_connector_health(&self, _health: ConnectorHealth) {} } #[test] diff --git a/crates/feldera-types/Cargo.toml b/crates/feldera-types/Cargo.toml index 54fea8c7763..b5c291dea00 100644 --- a/crates/feldera-types/Cargo.toml +++ b/crates/feldera-types/Cargo.toml @@ -31,7 +31,7 @@ erased-serde = { workspace = true } uuid = { workspace = true, features = ["v7"] } thiserror = { workspace = true } regex = { workspace = true } -chrono = { workspace = true, features = ["serde"] } +chrono = { workspace = true, features = ["serde", "alloc"] } feldera-ir = { workspace = true } time = { workspace = true } bytemuck = { workspace = true } diff --git a/crates/feldera-types/src/config/dev_tweaks.rs b/crates/feldera-types/src/config/dev_tweaks.rs index 057daa4c3cb..2f5e18732d8 100644 --- a/crates/feldera-types/src/config/dev_tweaks.rs +++ b/crates/feldera-types/src/config/dev_tweaks.rs @@ -107,6 +107,7 @@ pub struct DevTweaks { /// /// The default value is 1.2. #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")] pub balancer_min_relative_improvement_threshold: Option, /// The minimum absolute improvement threshold for the balancer. @@ -138,6 +139,7 @@ pub struct DevTweaks { /// /// The default value is 1.1. #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")] pub balancer_balance_tax: Option, /// The balancer threshold for checking for an improved partitioning policy for a stream. @@ -151,6 +153,7 @@ pub struct DevTweaks { /// /// The default value is 0.1. #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")] pub balancer_key_distribution_refresh_threshold: Option, /// False-positive rate for Bloom filters on batches on storage, as a @@ -168,6 +171,7 @@ pub struct DevTweaks { /// /// Values outside the valid range, such as 0.0, disable Bloom filters. #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default, deserialize_with = "crate::serde_via_value::deserialize")] pub bloom_false_positive_rate: Option, /// Maximum batch size in records for level 0 merges. @@ -284,3 +288,65 @@ pub enum MergerType { #[default] ListMerger, } + +#[cfg(test)] +mod tests { + use crate::config::{PipelineConfig, RuntimeConfig}; + + use super::*; + + /// Regression test: `Option` fields inside `DevTweaks` must + /// survive a JSON-string round-trip through `PipelineConfig`, which + /// uses `#[serde(flatten)]` on `RuntimeConfig`. With `serde_json`'s + /// `arbitrary_precision` feature enabled, the serde `Content` buffer + /// represents numbers as maps, which breaks plain `f64` + /// deserialization (serde-rs/json#1157). The `serde_via_value` + /// workaround on each `Option` field fixes this. + #[test] + fn dev_tweaks_f64_roundtrip_through_pipeline_config() { + let mut rc = RuntimeConfig::default(); + rc.dev_tweaks = DevTweaks { + bloom_false_positive_rate: Some(0.0), + balancer_balance_tax: Some(1.1), + balancer_min_relative_improvement_threshold: Some(1.2), + balancer_key_distribution_refresh_threshold: Some(0.1), + ..Default::default() + }; + let pc = PipelineConfig { + global: rc, + multihost: None, + name: Some("test-pipeline".into()), + given_name: None, + storage_config: None, + secrets_dir: None, + inputs: Default::default(), + outputs: Default::default(), + program_ir: None, + }; + + // JSON string round-trip (the path the pipeline process takes). + let json = serde_json::to_string_pretty(&pc).unwrap(); + let pc2: PipelineConfig = serde_json::from_str(&json) + .expect("JSON string round-trip of PipelineConfig with f64 dev_tweaks must succeed"); + assert_eq!(pc2.global.dev_tweaks.bloom_false_positive_rate, Some(0.0)); + assert_eq!(pc2.global.dev_tweaks.balancer_balance_tax, Some(1.1)); + assert_eq!( + pc2.global + .dev_tweaks + .balancer_min_relative_improvement_threshold, + Some(1.2) + ); + assert_eq!( + pc2.global + .dev_tweaks + .balancer_key_distribution_refresh_threshold, + Some(0.1) + ); + + // serde_json::Value round-trip (the path the pipeline manager takes). + let value = serde_json::to_value(&pc).unwrap(); + let pc3: PipelineConfig = serde_json::from_value(value) + .expect("Value round-trip of PipelineConfig with f64 dev_tweaks must succeed"); + assert_eq!(pc3.global.dev_tweaks.bloom_false_positive_rate, Some(0.0)); + } +} diff --git a/crates/feldera-types/src/secret_ref.rs b/crates/feldera-types/src/secret_ref.rs index 6c9bcbaf12f..d7c082abb37 100644 --- a/crates/feldera-types/src/secret_ref.rs +++ b/crates/feldera-types/src/secret_ref.rs @@ -6,6 +6,33 @@ use thiserror::Error as ThisError; /// RFC 1123 specification for a DNS label, which is also used by Kubernetes. pub const PATTERN_RFC_1123_DNS_LABEL: &str = r"^[a-z0-9]+(-[a-z0-9]+)*$"; +/// POSIX pattern for an environment variable name. +pub const PATTERN_ENV_VAR_NAME: &str = r"^[a-zA-Z_][a-zA-Z0-9_]*$"; + +#[derive(Debug, Clone, PartialEq, Eq, ThisError)] +pub enum EnvVarNameParseError { + #[error("cannot be empty")] + Empty, + #[error( + "must only contain alphanumeric characters and underscores (_), and start with a letter or underscore" + )] + InvalidFormat, +} + +/// Validates it is a valid POSIX environment variable name. +pub fn validate_env_var_name(name: &str) -> Result<(), EnvVarNameParseError> { + if name.is_empty() { + Err(EnvVarNameParseError::Empty) + } else { + let re = Regex::new(PATTERN_ENV_VAR_NAME).expect("valid regular expression"); + if re.is_match(name) { + Ok(()) + } else { + Err(EnvVarNameParseError::InvalidFormat) + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, ThisError)] pub enum KubernetesSecretNameParseError { #[error("cannot be empty")] @@ -90,6 +117,11 @@ pub enum SecretRef { /// Key inside the `data:` section of the `Secret` object. data_key: String, }, + /// Reference to a process environment variable. + EnvVar { + /// Name of the environment variable. + name: String, + }, } impl Display for SecretRef { @@ -98,6 +130,9 @@ impl Display for SecretRef { SecretRef::Kubernetes { name, data_key } => { write!(f, "${{secret:kubernetes:{name}/{data_key}}}") } + SecretRef::EnvVar { name } => { + write!(f, "${{env:{name}}}") + } } } } @@ -134,27 +169,45 @@ pub enum MaybeSecretRefParseError { data_key: String, e: KubernetesSecretDataKeyParseError, }, + #[error( + "environment variable reference '{env_ref_str}' has name '{name}' which is not valid: {e}" + )] + InvalidEnvVarName { + env_ref_str: String, + name: String, + e: EnvVarNameParseError, + }, + #[error("environment variable reference '{env_ref_str}' is not valid: name cannot be empty")] + EmptyEnvVarName { env_ref_str: String }, } impl MaybeSecretRef { - /// Determines whether a string is just a plain string or a reference to a secret. + /// Determines whether a string is just a plain string, a reference to a secret, + /// or a reference to a process environment variable. /// /// - Secret reference: any string which starts with `${secret:` and ends with `}` /// is regarded as an attempt to declare a secret reference + /// - Environment variable reference: any string which starts with `${env:` and ends with `}` + /// is regarded as an attempt to declare an environment variable reference /// - Plain string: any other string /// /// A secret reference must follow the following pattern: /// `${secret::}` /// - /// An error is returned if a string is regarded as a secret reference (see above), but: - /// - Specifies a `` which does not exist - /// - Specifies a `` which does not meet the provider-specific requirements + /// An environment variable reference must follow the following pattern: + /// `${env:}` + /// + /// An error is returned if a string is regarded as a secret or env var reference (see above), but: + /// - Specifies a `` which does not exist (for secret refs) + /// - Specifies a `` which does not meet the requirements /// /// Supported providers and their identifier expectations: /// - `${secret:kubernetes:/}` + /// - `${env:}` where `` follows POSIX env var naming rules /// - /// Note that here is not checked whether the secret reference can actually be resolved. + /// Note that here is not checked whether the reference can actually be resolved. pub fn new(value: String) -> Result { + let env_prefix = "${env:"; if value.starts_with("${secret:") && value.ends_with('}') { // Because the pattern only has ASCII characters, they are encoded as single bytes. // The secret reference is extracted by slicing away the first 9 bytes and the last byte. @@ -191,6 +244,24 @@ impl MaybeSecretRef { secret_ref_str: value, }) } + } else if value.starts_with(env_prefix) && value.ends_with('}') { + // Environment variable reference: `${env:}` + // The content is extracted by slicing away the first 6 bytes ("${env:") and the last byte ("}"). + let name = value + .trim_start_matches(env_prefix) + .trim_end_matches("}") + .to_string(); + if name.is_empty() { + Err(MaybeSecretRefParseError::EmptyEnvVarName { env_ref_str: value }) + } else if let Err(e) = validate_env_var_name(&name) { + Err(MaybeSecretRefParseError::InvalidEnvVarName { + env_ref_str: value, + name, + e, + }) + } else { + Ok(MaybeSecretRef::SecretRef(SecretRef::EnvVar { name })) + } } else { Ok(MaybeSecretRef::String(value)) } @@ -213,8 +284,9 @@ impl Display for MaybeSecretRef { #[cfg(test)] mod tests { use super::{ - KubernetesSecretDataKeyParseError, KubernetesSecretNameParseError, MaybeSecretRef, - validate_kubernetes_secret_data_key, validate_kubernetes_secret_name, + EnvVarNameParseError, KubernetesSecretDataKeyParseError, KubernetesSecretNameParseError, + MaybeSecretRef, validate_env_var_name, validate_kubernetes_secret_data_key, + validate_kubernetes_secret_name, }; use super::{MaybeSecretRefParseError, SecretRef}; @@ -228,6 +300,12 @@ mod tests { }), "${secret:kubernetes:example/value}" ); + assert_eq!( + format!("{}", SecretRef::EnvVar { + name: "MY_VAR".to_string(), + }), + "${env:MY_VAR}" + ); } #[test] @@ -453,4 +531,58 @@ mod tests { assert_eq!(validate_kubernetes_secret_data_key(value), expectation); } } + + #[test] + #[rustfmt::skip] // Skip formatting to keep it short + fn env_var_name_validation() { + for (value, expectation) in vec![ + ("A", Ok(())), + ("a", Ok(())), + ("_", Ok(())), + ("A1", Ok(())), + ("MY_VAR", Ok(())), + ("_MY_VAR", Ok(())), + ("MY_VAR_123", Ok(())), + ("", Err(EnvVarNameParseError::Empty)), + ("1A", Err(EnvVarNameParseError::InvalidFormat)), + ("MY-VAR", Err(EnvVarNameParseError::InvalidFormat)), + ("MY VAR", Err(EnvVarNameParseError::InvalidFormat)), + ("MY.VAR", Err(EnvVarNameParseError::InvalidFormat)), + ] { + assert_eq!(validate_env_var_name(value), expectation); + } + } + + #[test] + #[rustfmt::skip] // Skip formatting to keep it short + fn maybe_secret_ref_parse_env_var() { + let values_and_expectations = vec![ + // Valid env var references + ("${env:A}", Ok(MaybeSecretRef::SecretRef(SecretRef::EnvVar { name: "A".to_string() }))), + ("${env:MY_VAR}", Ok(MaybeSecretRef::SecretRef(SecretRef::EnvVar { name: "MY_VAR".to_string() }))), + ("${env:_MY_VAR}", Ok(MaybeSecretRef::SecretRef(SecretRef::EnvVar { name: "_MY_VAR".to_string() }))), + ("${env:MY_VAR_123}", Ok(MaybeSecretRef::SecretRef(SecretRef::EnvVar { name: "MY_VAR_123".to_string() }))), + // Empty name + ("${env:}", Err(MaybeSecretRefParseError::EmptyEnvVarName { + env_ref_str: "${env:}".to_string() + })), + // Invalid name: starts with digit + ("${env:1VAR}", Err(MaybeSecretRefParseError::InvalidEnvVarName { + env_ref_str: "${env:1VAR}".to_string(), + name: "1VAR".to_string(), + e: EnvVarNameParseError::InvalidFormat + })), + // Invalid name: contains hyphen + ("${env:MY-VAR}", Err(MaybeSecretRefParseError::InvalidEnvVarName { + env_ref_str: "${env:MY-VAR}".to_string(), + name: "MY-VAR".to_string(), + e: EnvVarNameParseError::InvalidFormat + })), + // Not an env var reference (no closing brace match for opening pattern) + ("${env:", Ok(MaybeSecretRef::String("${env:".to_string()))), + // Plain strings that look similar but are not env var references + ("$env:MY_VAR}", Ok(MaybeSecretRef::String("$env:MY_VAR}".to_string()))), + ]; + test_values_and_expectations(values_and_expectations); + } } diff --git a/crates/feldera-types/src/secret_resolver.rs b/crates/feldera-types/src/secret_resolver.rs index cda301c194b..66cf07d9870 100644 --- a/crates/feldera-types/src/secret_resolver.rs +++ b/crates/feldera-types/src/secret_resolver.rs @@ -4,6 +4,7 @@ use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::{Map, Value}; use std::collections::BTreeSet; +use std::env; use std::fmt::Debug; use std::fs; use std::io::ErrorKind; @@ -102,6 +103,10 @@ pub enum SecretRefResolutionError { path: String, error_kind: ErrorKind, }, + #[error( + "environment variable reference '{env_ref}' resolution failed: environment variable '{name}' is not set" + )] + EnvVarNotSet { env_ref: SecretRef, name: String }, #[error("secret resolution led to a duplicate key in the mapping, which should not happen")] DuplicateKeyInMapping, #[error("unable to serialize connector configuration: {error}")] @@ -171,7 +176,7 @@ fn resolve_secret_references_in_json( }) } -/// Resolves a string which can potentially be a secret reference. +/// Resolves a string which can potentially be a secret reference or an environment variable reference. fn resolve_potential_secret_reference_string( secrets_dir: &Path, s: String, @@ -179,8 +184,11 @@ fn resolve_potential_secret_reference_string( match MaybeSecretRef::new(s) { Ok(maybe_secret_ref) => match maybe_secret_ref { MaybeSecretRef::String(plain_str) => Ok(plain_str), - MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref { - SecretRef::Kubernetes { name, data_key } => { + MaybeSecretRef::SecretRef(secret_ref) => match secret_ref { + SecretRef::Kubernetes { + ref name, + ref data_key, + } => { // Secret reference: `${secret:kubernetes:/}` // File location: `/kubernetes//` let path = Path::new(secrets_dir) @@ -224,6 +232,20 @@ fn resolve_potential_secret_reference_string( } } } + SecretRef::EnvVar { ref name } => { + // Environment variable reference: `${env:}` + // Resolved by reading the named environment variable from the process. + let name = name.clone(); + match env::var(&name) { + Ok(value) => Ok(value), + Err(env::VarError::NotPresent) | Err(env::VarError::NotUnicode(_)) => { + Err(SecretRefResolutionError::EnvVarNotSet { + env_ref: secret_ref, + name, + }) + } + } + } }, }, Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }), @@ -565,4 +587,107 @@ mod tests { Some("${secret:kubernetes:e/f}".to_string()) ); } + + #[test] + fn resolve_env_var_success() { + // Set the environment variable + unsafe { + std::env::set_var("FELDERA_TEST_ENV_VAR_ABC123", "my_value"); + } + + let dir = tempfile::tempdir().unwrap(); + assert_eq!( + resolve_potential_secret_reference_string( + dir.path(), + "${env:FELDERA_TEST_ENV_VAR_ABC123}".to_string() + ) + .unwrap(), + "my_value" + ); + + unsafe { + std::env::remove_var("FELDERA_TEST_ENV_VAR_ABC123"); + } + } + + #[test] + fn resolve_env_var_not_set() { + let dir = tempfile::tempdir().unwrap(); + let env_ref_str = "${env:FELDERA_TEST_ENV_VAR_NOT_SET_XYZ}"; + unsafe { + std::env::remove_var("FELDERA_TEST_ENV_VAR_NOT_SET_XYZ"); + } + + let MaybeSecretRef::SecretRef(expected_ref) = + crate::secret_ref::MaybeSecretRef::new(env_ref_str.to_string()).unwrap() + else { + unreachable!(); + }; + + assert_eq!( + resolve_potential_secret_reference_string(dir.path(), env_ref_str.to_string()) + .unwrap_err(), + SecretRefResolutionError::EnvVarNotSet { + env_ref: expected_ref, + name: "FELDERA_TEST_ENV_VAR_NOT_SET_XYZ".to_string(), + } + ); + } + + #[test] + fn resolve_env_var_in_connector_config() { + unsafe { + std::env::set_var("FELDERA_TEST_CONN_VAR_A", "resolved_value_a"); + std::env::set_var("FELDERA_TEST_CONN_VAR_B", "resolved_value_b"); + } + + let connector_config_json = json!({ + "transport": { + "name": "datagen", + "config": { + "plan": [{ + "limit": 2, + "fields": { + "col1": { "values": [1, 2] }, + "col2": { "values": ["${env:FELDERA_TEST_CONN_VAR_A}", "${env:FELDERA_TEST_CONN_VAR_B}"] } + } + }] + } + }, + "format": { + "name": "json", + "config": { + "example": "${env:FELDERA_TEST_CONN_VAR_A}" + } + } + }); + + let connector_config: ConnectorConfig = + serde_json::from_value(connector_config_json).unwrap(); + + let dir = tempfile::tempdir().unwrap(); + let resolved = + resolve_secret_references_in_connector_config(dir.path(), &connector_config).unwrap(); + + let TransportConfig::Datagen(datagen_input_config) = resolved.transport else { + unreachable!(); + }; + assert_eq!( + datagen_input_config.plan[0].fields["col2"] + .values + .as_ref() + .unwrap(), + &vec![json!("resolved_value_a"), json!("resolved_value_b")] + ); + + let Some(format_config) = resolved.format else { + unreachable!(); + }; + assert_eq!(format_config.config, json!({"example": "resolved_value_a"})); + + unsafe { + std::env::remove_var("FELDERA_TEST_CONN_VAR_A"); + std::env::remove_var("FELDERA_TEST_CONN_VAR_B"); + } + } } diff --git a/crates/feldera-types/src/transport/delta_table.rs b/crates/feldera-types/src/transport/delta_table.rs index 51abdc7aee7..d827e858c48 100644 --- a/crates/feldera-types/src/transport/delta_table.rs +++ b/crates/feldera-types/src/transport/delta_table.rs @@ -354,6 +354,20 @@ pub struct DeltaTableReaderConfig { #[serde(default)] pub verbose: u32, + /// Maximum number of retries for failed object store operations. + /// + /// Controls how many times the connector retries high-level storage operations, + /// such as reading a Delta log entry or a Parquet file. + /// + /// This is in addition to lower-level retries (e.g., individual S3 operation retries governed + /// by storage options like `retry_timeout`). If those retries are exhausted + /// or the failure is otherwise unrecoverable at the storage layer, the + /// connector retries the entire operation. + /// + /// Defaults to unlimited retries. Set to 0 to disable retries. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub max_retries: Option, + /// Storage options for configuring backend object store. /// /// For specific options available for different storage backends, see: @@ -364,6 +378,12 @@ pub struct DeltaTableReaderConfig { pub object_store_config: HashMap, } +impl DeltaTableReaderConfig { + pub fn max_retries(&self) -> u32 { + self.max_retries.unwrap_or(u32::MAX) + } +} + #[cfg(test)] #[test] fn test_delta_reader_config_serde() { diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs index c4bd3ef4fef..5ebb8840b35 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs @@ -460,6 +460,85 @@ pub(crate) async fn get_pipeline_output_connector_status( Ok(response) } +/// Reset Output Connector +/// +/// Reset an output connector configured in `snapshot_and_follow` mode. +/// +/// This clears buffered output, asks the sink to reset itself, and then replays +/// a full snapshot before resuming incremental updates. +#[utoipa::path( + context_path = "/v0", + security(("JSON web token (JWT) or API key" = [])), + params( + ("pipeline_name" = String, Path, description = "Unique pipeline name"), + ("view_name" = String, Path, description = "SQL view name"), + ("connector_name" = String, Path, description = "Output connector name"), + ), + responses( + (status = OK + , description = "Output connector reset request has been processed"), + (status = NOT_FOUND + , body = ErrorResponse + , description = "Pipeline, view and/or output connector with that name does not exist" + , examples( + ("Pipeline with that name does not exist" = (value = json!(examples::error_unknown_pipeline_name()))), + ) + ), + (status = BAD_REQUEST + , body = ErrorResponse + , description = "The output connector does not support reset"), + (status = SERVICE_UNAVAILABLE + , body = ErrorResponse + , examples( + ("Pipeline is not deployed" = (value = json!(examples::error_pipeline_interaction_not_deployed()))), + ("Pipeline is currently unavailable" = (value = json!(examples::error_pipeline_interaction_currently_unavailable()))), + ("Disconnected during response" = (value = json!(examples::error_pipeline_interaction_disconnected()))), + ("Response timeout" = (value = json!(examples::error_pipeline_interaction_timeout()))) + ) + ), + (status = INTERNAL_SERVER_ERROR, body = ErrorResponse), + ), + tag = "Output Connectors" +)] +#[post("/pipelines/{pipeline_name}/views/{view_name}/connectors/{connector_name}/reset")] +pub(crate) async fn post_pipeline_output_connector_reset( + state: WebData, + client: WebData, + tenant_id: ReqData, + path: web::Path<(String, String, String)>, +) -> Result { + let (pipeline_name, view_name, connector_name) = path.into_inner(); + + let actual_view_name = SqlIdentifier::from(&view_name).name(); + let endpoint_name = format!("{actual_view_name}.{connector_name}"); + let encoded_endpoint_name = urlencoding::encode(&endpoint_name).to_string(); + + let response = state + .runner + .forward_http_request_to_pipeline_by_name( + client.as_ref(), + *tenant_id, + &pipeline_name, + Method::POST, + &format!("output_endpoints/{encoded_endpoint_name}/reset"), + "", + None, + None, + ) + .await?; + + if response.status() == StatusCode::OK { + info!( + pipeline = %pipeline_name, + pipeline_id = "N/A", + tenant = %tenant_id.0, + "Connector action: reset on view '{view_name}' on connector '{connector_name}'" + ); + } + + Ok(response) +} + /// Get Pipeline Stats /// /// Retrieve statistics (e.g., performance counters) of a running or paused pipeline. diff --git a/crates/pipeline-manager/src/api/main.rs b/crates/pipeline-manager/src/api/main.rs index a70b6e0151a..95c2e9b440c 100644 --- a/crates/pipeline-manager/src/api/main.rs +++ b/crates/pipeline-manager/src/api/main.rs @@ -204,6 +204,7 @@ It contains the following fields: endpoints::pipeline_interaction::post_pipeline_input_connector_action, endpoints::pipeline_interaction::get_pipeline_input_connector_status, endpoints::pipeline_interaction::get_pipeline_output_connector_status, + endpoints::pipeline_interaction::post_pipeline_output_connector_reset, endpoints::pipeline_interaction::get_pipeline_stats, endpoints::pipeline_interaction::get_pipeline_metrics, endpoints::pipeline_interaction::get_pipeline_circuit_profile, @@ -571,6 +572,7 @@ fn api_scope() -> Scope { .service(endpoints::pipeline_interaction::post_pipeline_input_connector_action) .service(endpoints::pipeline_interaction::get_pipeline_input_connector_status) .service(endpoints::pipeline_interaction::get_pipeline_output_connector_status) + .service(endpoints::pipeline_interaction::post_pipeline_output_connector_reset) .service(endpoints::pipeline_interaction::get_pipeline_stats) .service(endpoints::pipeline_interaction::get_pipeline_metrics) .service(endpoints::pipeline_interaction::get_pipeline_time_series) diff --git a/docs.feldera.com/docs/changelog.md b/docs.feldera.com/docs/changelog.md index 047667c6266..3b034515cfe 100644 --- a/docs.feldera.com/docs/changelog.md +++ b/docs.feldera.com/docs/changelog.md @@ -12,7 +12,20 @@ import TabItem from '@theme/TabItem'; - ## Unreleased + ## v0.288.0 + + Delta Lake input connector error handling behavior change: + + In the past if the connector wasn't able to read a table version, it + signaled an error and moved to the next version. This could cause data loss. + With this change the connector will either retry forever or fail and stop + producing input after exhausting retry attempts. + + The second behavioral change is that the connector can now produce + duplicate inputs even without a pipeline restart as the connector retries + processing delta log entries. + + ## v0.281.0 Starting a pipeline while storage is still clearing (`storage_status=Clearing`) now returns `CannotStartWhileClearingStorage` instead of succeeding. Clearing storage while a start diff --git a/docs.feldera.com/docs/connectors/secret-references.md b/docs.feldera.com/docs/connectors/secret-references.md index 420efa38a64..d73d06ea365 100644 --- a/docs.feldera.com/docs/connectors/secret-references.md +++ b/docs.feldera.com/docs/connectors/secret-references.md @@ -4,19 +4,28 @@ Rather than directly supplying a secret (e.g., passwords, PEM, etc.) in the conn configuration as a string, it is possible to refer to (externalize) them. This mechanism in Feldera is called a **secret reference**. -A secret reference is a string in the connector configuration JSON which takes a specific format: +Feldera supports two types of references in connector configuration strings: -``` -${secret::} -``` +- **Secret references** — resolved from an external secret provider (e.g., Kubernetes): + ``` + ${secret::} + ``` It refers to an identifiable secret provided by a provider. Feldera's control plane mounts the secret into the pipeline. When the pipeline initializes, it will replace the secret references in the configuration with their values. We currently only support a single secret provider, Kubernetes. -Feldera resolves secrets when a pipeline starts, as well as each time -it resumes. Feldera does not write resolved values of secrets to -checkpoints or journals. +- **Environment variable references** — resolved from the pipeline process environment: + ``` + ${env:} + ``` + +When the pipeline initializes, it replaces all references in the connector configuration +with their resolved values. Feldera resolves references when a pipeline starts, as well as +each time it resumes. Feldera does not write resolved values to checkpoints or journals. + +Use environment variables for non-sensitive deployment configuration only. +Storing secrets in environment variables is generally discouraged; use a dedicated secret manager or secret store instead. ## Kubernetes @@ -80,7 +89,47 @@ We can then specify a connector configuration that refers to it using `${secret: } ``` -## Restrictions +## Environment variables + +### Usage + +``` +${env:} +``` + +Here, `` is the name of an environment variable following POSIX naming +rules (letters, digits, and underscores, must start with a letter or underscore). + +The reference is resolved at pipeline startup by reading the named variable from the pipeline +process environment. This is useful for injecting configuration values (e.g., hostnames, +credentials) via environment variables set in the `env` field of +[`RuntimeConfig`](/api/patch-pipeline#body-runtime_config) or through the deployment environment. + +### Example + +```json +{ + "transport": { + "name": "kafka_input", + "config": { + "bootstrap.servers": "${env:KAFKA_BOOTSTRAP_SERVERS}", + "sasl.password": "${env:KAFKA_SASL_PASSWORD}" + } + }, + "format": ... +} +``` + +### Restrictions + +- The environment variable name must follow POSIX rules: only letters (`a`–`z`, `A`–`Z`), + digits (`0`–`9`), and underscores (`_`), and must start with a letter or underscore +- If the referenced environment variable is not set when the pipeline starts, the pipeline + will fail to initialize with an error +- It is not possible to have string values starting with `${env:` and ending with `}` + without them being identified as an environment variable reference + +## Restrictions (secret references) - The secret name may only contain lowercase alphanumeric characters or hyphens, must start and end with a lowercase alphanumeric character and can be at most 63 characters long @@ -89,51 +138,51 @@ We can then specify a connector configuration that refers to it using `${secret: - It is not possible to have any plain string value which starts with `${secret:` and ends with `}` without it being identified to be a secret reference. - Only string values in the connector configuration JSON under `transport.config` and `format.config` - can be identified to be secret references (this excludes keys), for example (secret named `a` at - data key `b` has value `value`): - ``` - { - "transport": { - "name": "some_transport", - "config": { - "${secret:kubernetes:a/b}": "${secret:kubernetes:a/b}", - "v1": "${secret:kubernetes:a/b}", - "v2": [ "${secret:kubernetes:a/b}" ] - } - }, - "format": { - "name": "some_format", - "config": { - "v3": "${secret:kubernetes:a/b}" - } - }, - "index": "${secret:kubernetes:a/b}" - } - ``` - ... will be resolved to: - ``` - { - "transport": { - "name": "some_transport", - "config": { - "${secret:kubernetes:a/b}": "value", - "v1": "value", - "v2": [ "value" ] - } - }, - "format": { - "name": "some_format", - "config": { - "v3": "value" - } - }, - "index": "${secret:kubernetes:a/b}" - } - ``` + can be identified to be secret or environment variable references (this excludes keys), for example + (secret named `a` at data key `b` has value `value`): + ``` + { + "transport": { + "name": "some_transport", + "config": { + "${secret:kubernetes:a/b}": "${secret:kubernetes:a/b}", + "v1": "${secret:kubernetes:a/b}", + "v2": [ "${secret:kubernetes:a/b}" ] + } + }, + "format": { + "name": "some_format", + "config": { + "v3": "${secret:kubernetes:a/b}" + } + }, + "index": "${secret:kubernetes:a/b}" + } + ``` + ... will be resolved to: + ``` + { + "transport": { + "name": "some_transport", + "config": { + "${secret:kubernetes:a/b}": "value", + "v1": "value", + "v2": [ "value" ] + } + }, + "format": { + "name": "some_format", + "config": { + "v3": "value" + } + }, + "index": "${secret:kubernetes:a/b}" + } + ``` - Because connector configuration is validated during SQL compilation without secret resolution, string values that require certain format for the connector configuration to be valid will not allow secret references (enumerations in particular, such as for the datagen connector `strategy` field) -- It is not possible to specify a secret value type other than string -- It is not possible to specify a secret as a substring, for example - `abc${secret:kubernetes:a/b}def` does not work +- It is not possible to specify a reference value type other than string +- It is not possible to specify a reference as a substring, for example + `abc${secret:kubernetes:a/b}def` and `abc${env:MY_VAR}def` do not work diff --git a/docs.feldera.com/docs/connectors/sources/delta.md b/docs.feldera.com/docs/connectors/sources/delta.md index 77f8dd5100b..31d179f2e46 100644 --- a/docs.feldera.com/docs/connectors/sources/delta.md +++ b/docs.feldera.com/docs/connectors/sources/delta.md @@ -36,6 +36,7 @@ exactly once fault tolerance. | `cdc_delete_filer` | string | |

A predicate that determines whether the record represents a deletion.

This setting is only valid in the `cdc` mode. It specifies a predicate applied to each row in the Delta table to determine whether the row represents a deletion event. Its value must be a valid Boolean SQL expression that can be used in a query of the form `SELECT * from WHERE `.

| | `cdc_order_by` | string | |

An expression that determines the ordering of updates in the Delta table.

This setting is only valid in the `cdc` mode. It specifies a predicate applied to each row in the Delta table to determine the order in which updates in the table should be applied. Its value must be a valid SQL expression that can be used in a query of the form `SELECT * from

ORDER BY `.

| | `num_parsers` | string | | The number of parallel parsing tasks the connector uses to process data read from the table. Increasing this value can enhance performance by allowing more concurrent processing. Recommended range: 1–10. The default is 4.| +| `max_retries` | integer| unlimited retries|

Maximum number of retries for failed object store operations.

Controls how many times the connector retries high-level storage operations, such as reading a Delta log entry or a Parquet file.

This is in addition to lower-level retries (e.g., individual S3 operation retries governed by storage options like `retry_timeout`). If those retries are exhausted or the failure is otherwise unrecoverable at the storage layer, the connector retries the entire operation.

Defaults to unlimited retries. Set to 0 to disable retries.

See [retries and at-least-once delivery](#retries-and-at-least-once-delivery)

| | `skip_unused_columns` (DEPRECATED) | bool | false |

This property is deprecated. Use the [table-level `skip_unused_columns` property](/sql/grammar#ignoring-unused-columns) instead.

Don't read unused columns from the Delta table. When set to `true`, this option instructs the connector to avoid reading columns from the Delta table that are not used in any view definitions. To be skipped, the columns must be either nullable or have default values. This can improve ingestion performance, especially for wide tables.

Note: The simplest way to exclude unused columns is to omit them from the Feldera SQL table declaration. The connector never reads columns that aren't declared in the SQL schema. Additionally, the SQL compiler emits warnings for declared but unused columns—use these as a guide to optimize your schema.

| | `max_concurrent_readers` | integer| 6 |

Maximum number of concurrent object store reads performed by all Delta Lake connectors.

This setting is used to limit the number of concurrent reads of the object store in a pipeline with a large number of Delta Lake connectors. When multiple connectors are simultaneously reading from the object store, this can lead to transport timeouts.

When enabled, this setting limits the number of concurrent reads across all connectors. This is a global setting that affects all Delta Lake connectors, and not just the connector where it is specified. It should therefore be used at most once in a pipeline. If multiple connectors specify this setting, they must all use the same value.

The default value is 6.

| @@ -167,6 +168,8 @@ Additional configuration options to configure HTTP client for remote object stor | `proxy_excludes` | List of hosts that bypass proxy. | | `randomize_addresses` | Randomize order addresses that the DNS resolution yields. This will spread the connections across more servers. | | `timeout` | Request timeout. The timeout is applied from when the request starts connecting until the response body has finished. Format: ``, e.g., `30s`, `1.5m`.| +| `retry_timeout` | The maximum length of time from the initial request after which no further retries will be attempted. This not only bounds the length of time before a server error will be surfaced to the application, but also bounds the length of time a request’s credentials must remain valid. As requests are retried without renewing credentials or regenerating request payloads, this number should be kept below 5 minutes to avoid errors due to expired credentials and/or request payloads| +| `connect_timeout` | Set a timeout for only the connect phase of a client. This is the time allowed for the client to establish a connection and if the connection is not established within this time, the client returns a timeout error.| | `user_agent` | User-Agent header to be used by this client. | ## Data type mapping @@ -303,6 +306,39 @@ CREATE TABLE transaction( ]'); ``` +## Retries and at-least-once delivery + +When interacting with an object store such as Amazon S3, the Delta Lake connector must handle +transient failures, including timeouts and expired authentication tokens. + +These errors are first handled at the level of individual object store operations, which are +automatically retried when possible. This behavior is controlled by the +[HTTP client configuration](#http-client-configuration) settings: `connect_timeout`, +`timeout`, and `retry_timeout`. + +If these lower-level retries are exhausted—or if the error cannot be recovered at the storage +layer—the connector retries the entire operation (for example, re-reading a Delta log entry). +This behavior is controlled by the `max_retries` setting: + +* By default, the connector performs unbounded retries. +* Set `max_retries = N` to limit the number of attempts. +* Set `max_retries = 0` to disable retries entirely. + +If the connector cannot recover after `N` attempts, it fails with a fatal error and stops +ingesting inputs. + +Because retries may occur after partial progress (e.g., after partially processing a Delta log entry), +the same data may be ingested more than once. This is consistent with the connector’s **at-least-once delivery** +guarantee. + +To ensure idempotent ingestion, we recommend defining [primary keys](/connectors/unique_keys). + +Retry activity is reflected in the connector’s [health status](https://docs.feldera.com/api/get-input-status/): +it is marked **UNHEALTHY** while retrying failed operations. + +If the pipeline is stopped and restarted during a retry, the connector resumes from the last successfully +ingested table version. This guarantees that no data loss occurs due to object store read errors. + ## Additional examples ### Example: Setting `timestamp_column` diff --git a/docs.feldera.com/docs/sql/binary.md b/docs.feldera.com/docs/sql/binary.md index 5677e88dfd7..c5696cca759 100644 --- a/docs.feldera.com/docs/sql/binary.md +++ b/docs.feldera.com/docs/sql/binary.md @@ -111,8 +111,8 @@ aggregation functions `BIT_AND`, `BIT_OR`, and `BIT_XOR`.
- - + + diff --git a/js-packages/profiler-layout/src/lib/components/ProfilerLayout.svelte b/js-packages/profiler-layout/src/lib/components/ProfilerLayout.svelte index bd63756cc39..af78ae21492 100644 --- a/js-packages/profiler-layout/src/lib/components/ProfilerLayout.svelte +++ b/js-packages/profiler-layout/src/lib/components/ProfilerLayout.svelte @@ -231,8 +231,8 @@ e.key === 'Enter' && handleSearch()} class="input w-32 text-sm" /> diff --git a/js-packages/profiler-lib/src/cytograph.ts b/js-packages/profiler-lib/src/cytograph.ts index 63446905763..5f8e06176d1 100644 --- a/js-packages/profiler-lib/src/cytograph.ts +++ b/js-packages/profiler-lib/src/cytograph.ts @@ -543,12 +543,15 @@ export class CytographRendering { return this.metadataSelection.metric; } - search(value: string) { + /** Search a node by ID, return 'true' if found. */ + search(value: string): boolean { let el = this.cy.getElementById(value); - if (el === null) { - return; + // el may be an empty collection + if (!el.nonempty()) { + return false; } this.center(Option.some(value)); + return true; } // Layout to use for the first graph rendering diff --git a/js-packages/profiler-lib/src/profiler.ts b/js-packages/profiler-lib/src/profiler.ts index 65b8401c558..39938fa256b 100644 --- a/js-packages/profiler-lib/src/profiler.ts +++ b/js-packages/profiler-lib/src/profiler.ts @@ -281,10 +281,24 @@ export class Visualizer { } /** - * Search for a node by ID + * Search for a node by ID or a substring of the persistent ID. */ search(query: string): void { - this.rendering?.search(query); + // First search by ID + let success = this.rendering?.search(query); + if (success) { + return; + } + if (!this.profile) { + return; + } + + // Find ID of node with given persistent ID + for (const [pid, node] of this.profile.byPersistentId) { + if (pid.includes(query)) { + this.rendering?.search(node.id); + } + } } /** diff --git a/js-packages/profiler-lib/src/util.ts b/js-packages/profiler-lib/src/util.ts index 1394bf90c2d..c318abd5831 100644 --- a/js-packages/profiler-lib/src/util.ts +++ b/js-packages/profiler-lib/src/util.ts @@ -130,6 +130,10 @@ export class OMap { clear(): void { this.map.clear(); } + + [Symbol.iterator](): IterableIterator<[K, V]> { + return this.map[Symbol.iterator](); + } } /** A sublist which includes only specific elements from a list, identified by their indexes. */ diff --git a/openapi.json b/openapi.json index b571ec3be9f..ab9ee540746 100644 --- a/openapi.json +++ b/openapi.json @@ -10,7 +10,7 @@ "license": { "name": "MIT OR Apache-2.0" }, - "version": "0.286.0" + "version": "0.287.0" }, "paths": { "/config/authentication": { @@ -6312,6 +6312,152 @@ ] } }, + "/v0/pipelines/{pipeline_name}/views/{view_name}/connectors/{connector_name}/reset": { + "post": { + "tags": [ + "Output Connectors" + ], + "summary": "Reset Output Connector", + "description": "Reset an output connector configured in `snapshot_and_follow` mode.\n\nThis clears buffered output, asks the sink to reset itself, and then replays\na full snapshot before resuming incremental updates.", + "operationId": "post_pipeline_output_connector_reset", + "parameters": [ + { + "name": "pipeline_name", + "in": "path", + "description": "Unique pipeline name", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "view_name", + "in": "path", + "description": "SQL view name", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "connector_name", + "in": "path", + "description": "Output connector name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Output connector reset request has been processed" + }, + "400": { + "description": "The output connector does not support reset", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "404": { + "description": "Pipeline, view and/or output connector with that name does not exist", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + }, + "examples": { + "Pipeline with that name does not exist": { + "value": { + "message": "Unknown pipeline name 'non-existent-pipeline'", + "error_code": "UnknownPipelineName", + "details": { + "pipeline_name": "non-existent-pipeline" + } + } + } + } + } + } + }, + "500": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "503": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + }, + "examples": { + "Disconnected during response": { + "value": { + "message": "Error sending HTTP request to pipeline: the pipeline disconnected while it was processing this HTTP request. This could be because the pipeline either (a) encountered a fatal error or panic, (b) was stopped, or (c) experienced network issues -- retrying might help in the last case. Alternatively, check the pipeline logs. Failed request: /pause pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionUnreachable", + "details": { + "pipeline_name": "my_pipeline", + "request": "/pause", + "error": "the pipeline disconnected while it was processing this HTTP request. This could be because the pipeline either (a) encountered a fatal error or panic, (b) was stopped, or (c) experienced network issues -- retrying might help in the last case. Alternatively, check the pipeline logs." + } + } + }, + "Pipeline is currently unavailable": { + "value": { + "message": "Error sending HTTP request to pipeline: deployment status is currently 'unavailable' -- wait for it to become 'running' or 'paused' again Failed request: /pause pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionUnreachable", + "details": { + "pipeline_name": "my_pipeline", + "request": "/pause", + "error": "deployment status is currently 'unavailable' -- wait for it to become 'running' or 'paused' again" + } + } + }, + "Pipeline is not deployed": { + "value": { + "message": "Unable to interact with pipeline because the deployment status (stopped) indicates it is not (yet) fully provisioned pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionNotDeployed", + "details": { + "pipeline_name": "my_pipeline", + "status": "Stopped", + "desired_status": "Provisioned" + } + } + }, + "Response timeout": { + "value": { + "message": "Error sending HTTP request to pipeline: timeout (10s) was reached: this means the pipeline took too long to respond -- this can simply be because the request was too difficult to process in time, or other reasons (e.g., deadlock): the pipeline logs might contain additional information (original send request error: Timeout while waiting for response) Failed request: /pause pipeline-id=N/A pipeline-name=\"my_pipeline\"", + "error_code": "PipelineInteractionUnreachable", + "details": { + "pipeline_name": "my_pipeline", + "request": "/pause", + "error": "timeout (10s) was reached: this means the pipeline took too long to respond -- this can simply be because the request was too difficult to process in time, or other reasons (e.g., deadlock): the pipeline logs might contain additional information (original send request error: Timeout while waiting for response)" + } + } + } + } + } + } + } + }, + "security": [ + { + "JSON web token (JWT) or API key": [] + } + ] + } + }, "/v0/pipelines/{pipeline_name}/views/{view_name}/connectors/{connector_name}/stats": { "get": { "tags": [ @@ -7924,6 +8070,13 @@ "nullable": true, "minimum": 0 }, + "max_retries": { + "type": "integer", + "format": "int32", + "description": "Maximum number of retries for failed object store operations.\n\nControls how many times the connector retries high-level storage operations,\nsuch as reading a Delta log entry or a Parquet file.\n\nThis is in addition to lower-level retries (e.g., individual S3 operation retries governed\nby storage options like `retry_timeout`). If those retries are exhausted\nor the failure is otherwise unrecoverable at the storage layer, the\nconnector retries the entire operation.\n\nDefaults to unlimited retries. Set to 0 to disable retries.", + "nullable": true, + "minimum": 0 + }, "mode": { "$ref": "#/components/schemas/DeltaTableIngestMode" }, diff --git a/python/pyproject.toml b/python/pyproject.toml index 09a9b7f06ab..d36c73d36e7 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "feldera" readme = "README.md" description = "The feldera python client" -version = "0.286.0" +version = "0.287.0" license = "MIT" requires-python = ">=3.10" authors = [ diff --git a/python/uv.lock b/python/uv.lock index 4ddba81204f..cb5547411af 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -12,7 +12,7 @@ resolution-markers = [ ] [options] -exclude-newer = "2026-04-01T21:26:07.053428Z" +exclude-newer = "2026-04-02T08:07:18.404331674Z" exclude-newer-span = "P1W" [[package]] @@ -221,7 +221,7 @@ wheels = [ [[package]] name = "feldera" -version = "0.286.0" +version = "0.287.0" source = { editable = "." } dependencies = [ { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/calciteCompiler/CalciteFunctions.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/calciteCompiler/CalciteFunctions.java index 67cea5a32fd..dcccee05ad6 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/calciteCompiler/CalciteFunctions.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/calciteCompiler/CalciteFunctions.java @@ -370,7 +370,8 @@ record Func(SqlOperator function, String functionName, SqlLibrary library, new Func(SqlLibraryOperators.LENGTH, "LENGTH", SqlLibrary.POSTGRESQL, "string#char_length", FunctionDocumentation.NO_FILE, false), new Func(SqlLibraryOperators.SUBSTR_BIG_QUERY, "SUBSTR", SqlLibrary.BIG_QUERY, "string#substr", FunctionDocumentation.NO_FILE, false), new Func(SqlLibraryOperators.SPLIT, "SPLIT", SqlLibrary.BIG_QUERY, "string#split", FunctionDocumentation.NO_FILE, false), - new Func(SqlLibraryOperators.SPLIT_PART, "SPLIT_PART", SqlLibrary.POSTGRESQL, "string#split_part", FunctionDocumentation.NO_FILE, false), + // https://issues.apache.org/jira/browse/CALCITE-7468 + // new Func(SqlLibraryOperators.SPLIT_PART, "SPLIT_PART", SqlLibrary.POSTGRESQL, "string#split_part", FunctionDocumentation.NO_FILE, false), new Func(SqlLibraryOperators.GREATEST, "GREATEST", SqlLibrary.BIG_QUERY, "comparisons#greatest", FunctionDocumentation.NO_FILE, false), new Func(SqlLibraryOperators.LEAST, "LEAST", SqlLibrary.BIG_QUERY, "comparisons#least", FunctionDocumentation.NO_FILE, false), new Func(SqlLibraryOperators.SAFE_CAST, "SAFE_CAST", SqlLibrary.BIG_QUERY, "casts#safe-casts", FunctionDocumentation.NO_FILE, false), diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/calciteCompiler/CustomFunctions.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/calciteCompiler/CustomFunctions.java index 84fb9ebc512..5490000aa23 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/calciteCompiler/CustomFunctions.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/frontend/calciteCompiler/CustomFunctions.java @@ -66,8 +66,10 @@ public CustomFunctions() { this.functions.add(new ArrayTransformFunction()); this.functions.add(new ArrayUnion()); this.functions.add(new ArraysOverlapFunction()); + this.functions.add(new Bin2Utf8Function()); this.functions.add(new BlackboxFunction()); this.functions.add(new BroundFunction()); + this.functions.add(new ConnectorMetadataFunction()); this.functions.add(new ConvertTimezoneFunction()); this.functions.add(new FormatDateFunction()); this.functions.add(new FormatTimestampFunction()); @@ -81,12 +83,11 @@ public CustomFunctions() { this.functions.add(new ParseJsonFunction()); this.functions.add(new ParseTimeFunction()); this.functions.add(new ParseTimestampFunction()); - this.functions.add(new Bin2Utf8Function()); this.functions.add(new RlikeFunction()); this.functions.add(new SequenceFunction()); + this.functions.add(new SplitPartFunction()); this.functions.add(new ToIntFunction()); this.functions.add(new ToJsonFunction()); - this.functions.add(new ConnectorMetadataFunction()); this.functions.add(new WriteLogFunction()); this.udf = new HashMap<>(); this.aggregates = new HashMap<>(); @@ -194,6 +195,12 @@ private FormatTimestampFunction() { } } + static class SplitPartFunction extends CalciteFunctionClone { + private SplitPartFunction() { + super(SqlLibraryOperators.SPLIT_PART, "string#split_part", FunctionDocumentation.NO_FILE); + } + } + static class FormatTimeFunction extends CalciteFunctionClone { private FormatTimeFunction() { super(SqlLibraryOperators.FORMAT_TIME, "datetime#format_time", FunctionDocumentation.NO_FILE); diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java index 60f51a0bbea..33680492eaa 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java @@ -1280,7 +1280,7 @@ public void postorder(DBSPJoinFilterMapOperator join) { List monotoneFields = new ArrayList<>(); for (int varIndex = 0, field = 0; field < leftValueSize; field++) { int firstOutputField = iomap.firstOutputField(1, field); - Utilities.enforce(firstOutputField >= 0); + if (firstOutputField < 0) continue; // Field not used in the output IMaybeMonotoneType compareField = filterTuple.getField(firstOutputField); value.add(compareField); if (compareField.mayBeMonotone()) { @@ -1305,7 +1305,6 @@ public void postorder(DBSPJoinFilterMapOperator join) { } } - // Exact same procedure on the right hand side OutputPort rightLimiter = null; DBSPSimpleOperator rightMonotone = null; if (expansion.rightMap != null) { @@ -1330,7 +1329,7 @@ public void postorder(DBSPJoinFilterMapOperator join) { int varIndex = 0; for (int field = 0; field < leftValueSize; field++) { int firstOutputField = iomap.firstOutputField(1, field); - Utilities.enforce(firstOutputField >= 0); + if (firstOutputField < 0) continue; // field not used in the output IMaybeMonotoneType compareField = filterTuple.getField(firstOutputField); if (compareField.mayBeMonotone()) { varIndex++; @@ -1339,7 +1338,7 @@ public void postorder(DBSPJoinFilterMapOperator join) { for (int field = 0; field < rightValueSize; field++) { int firstOutputField = iomap.firstOutputField(2, field); - Utilities.enforce(firstOutputField >= 0); + if (firstOutputField < 0) continue; // field not used in the output IMaybeMonotoneType compareField = filterTuple.getField(firstOutputField); value.add(compareField); if (compareField.mayBeMonotone()) { diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/postgres/PostgresStringTests.java b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/postgres/PostgresStringTests.java index 650882dd671..1051c576d19 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/postgres/PostgresStringTests.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/postgres/PostgresStringTests.java @@ -1117,7 +1117,6 @@ public void testSplitPart() { } - // TODO: to_hex // TODO: sha, encode, decode @Test diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegression2Tests.java b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegression2Tests.java index 045f495cac5..9e50d0c07f9 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegression2Tests.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegression2Tests.java @@ -17,6 +17,7 @@ public CompilerOptions testOptions() { options.languageOptions.incrementalize = true; options.languageOptions.optimizationLevel = 2; options.languageOptions.ignoreOrderBy = true; + options.ioOptions.quiet = false; return options; } @@ -563,4 +564,52 @@ public void endVisit() { } }); } + + @Test + public void failInLateness() { + this.getCC(""" + CREATE TABLE ID_0 ( + ID_1 VARCHAR(90) NOT NULL PRIMARY KEY, + ID_2 INTEGER NOT NULL, + ID_3 VARCHAR(60) NOT NULL, + ID_6 TIMESTAMP NOT NULL, + ID_7 VARCHAR(60) NOT NULL + ); + CREATE TABLE ID_20 ( + ID_1 VARCHAR(110) NOT NULL PRIMARY KEY, + ID_2 SMALLINT NOT NULL, + ID_21 VARCHAR(64), + ID_22 DECIMAL(38, 0), + ID_23 INTEGER, + ID_24 BIGINT NOT NULL PRIMARY KEY LATENESS 216000 :: BIGINT, + ID_25 TIMESTAMP NOT NULL + ); + + CREATE LOCAL VIEW ID_26 AS + SELECT ID_2, ID_3 AS ID_21, CAST(ID_7 AS BIGINT UNSIGNED) AS ID_27, ID_6 AS ID_28 + FROM ID_0; + + CREATE LOCAL VIEW ID_36 AS + SELECT + ID_30.ID_2, + ID_30.ID_21, + SUM(CASE WHEN ID_30.ID_23 = 1 THEN ID_30.ID_22 ELSE 0 END) AS ID_37, + SUM(CASE WHEN ID_30.ID_23 = 2 THEN ID_30.ID_22 ELSE 0 END) AS ID_38, + MAX(ID_30.ID_24) AS ID_33, MAX(ID_30.ID_25) AS ID_34 + FROM ID_20 AS ID_30 + INNER JOIN ID_26 AS ID_35 ON ID_30.ID_2 = ID_35.ID_2 AND ID_30.ID_21 = ID_35.ID_21 + WHERE ID_30.ID_25 > ID_35.ID_28 + GROUP BY ID_30.ID_2, ID_30.ID_21; + + CREATE VIEW ID_39 AS + SELECT + ID_35.ID_2, + ID_35.ID_21, + COALESCE(ID_41.ID_37, 0) AS ID_31, + COALESCE(ID_41.ID_38, 0) AS ID_32, + COALESCE(ID_41.ID_33, 0) AS ID_33, + ID_35.ID_27 + COALESCE(ID_41.ID_37, 0) - COALESCE(ID_41.ID_38, 0) AS ID_40 + FROM ID_26 AS ID_35 + LEFT JOIN ID_36 AS ID_41 ON ID_35.ID_2 = ID_41.ID_2 AND ID_35.ID_21 = ID_41.ID_21;"""); + } } diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/Regression2Tests.java b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/Regression2Tests.java index c35ffb16445..fc8df9e2e0a 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/Regression2Tests.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/Regression2Tests.java @@ -903,4 +903,29 @@ h DECIMAL(38, 10), l TIMESTAMP ) WITH ('materialized' = 'true');"""); } + + @Test + public void issue5981() { + this.qs(""" + SELECT TO_HEX(x'48656c6c6f'); + r + --- + 48656c6c6f + (1 row) + + SELECT TO_HEX(x'0ABC'); + r + --- + 0abc + (1 row)"""); + } + + @Test + public void issue5982() { + this.q(""" + SELECT SPLIT_PART('11.12.13', '.', 2); + r + ---- + 12"""); + } }
TO_HEX(binary)Generate a `VARCHAR` string describing the value in hexadecimalTO_HEX(x'0abc') => '0ABC'Generate a `VARCHAR` string describing the value in hexadecimal (lowercase)TO_HEX(x'0abc') => '0abc'
TO_INT(binary)