Skip to content

Commit e1231a8

Browse files
author
Mohammed Ali
committed
[adapters] Add Postgres CDC input connector with crash-safe replication
Add an integrated input connector that reads from Postgres via logical replication using the supabase/etl library. The connector handles both initial table snapshot and ongoing WAL streaming. Key features: - Snapshot + streaming via etl's Pipeline (table copy + CDC follow) - Crash-safe replication slot advancement: the connector defers ETL's async flush confirmation until Feldera completes the circuit step that processed the data, ensuring the Postgres replication slot only advances after data is durably processed - Completion tracking via a new InputConsumer::completion_watcher() method that exposes Feldera's step completion notifications to input adapters (backwards-compatible default returning None) Files added: - crates/adapters/src/integrated/postgres/cdc_input.rs Files modified: - crates/adapterlib/src/transport.rs (InputConsumer trait extension) - crates/adapters/src/controller.rs (InputProbe implementation) - crates/adapters/src/integrated.rs (connector registration) - crates/adapters/src/integrated/postgres.rs (module declaration) - crates/adapters/src/transport.rs (config variant) - crates/adapters/Cargo.toml (etl dependencies) - crates/feldera-types/src/config.rs (transport config) - crates/feldera-types/src/transport/postgres.rs (CDC config struct)
1 parent 6c71c44 commit e1231a8

File tree

11 files changed

+1496
-36
lines changed

11 files changed

+1496
-36
lines changed

Cargo.lock

Lines changed: 553 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ tikv-jemallocator = "0.6.0"
262262
time = { version = "0.3.47", features = ["serde", "serde-well-known"] }
263263
tokio = "1.44.2"
264264
tokio-postgres = "0.7"
265-
tokio-stream = "0.1.15"
265+
tokio-stream = "0.1.18"
266266
tokio-util = "0.7.11"
267267
tracing = "0.1.40"
268268
tracing-subscriber = "0.3.20"
@@ -280,6 +280,9 @@ zip = "6.0.0"
280280
zstd = "0.12.0"
281281
backtrace = "0.3.75"
282282
parking_lot = "0.12.4"
283+
etl = { git = "https://github.com/supabase/etl" }
284+
etl-config = { git = "https://github.com/supabase/etl" }
285+
etl-postgres = { git = "https://github.com/supabase/etl" }
283286

284287
[workspace.metadata.release]
285288
release = false

crates/adapterlib/src/transport.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use anyhow::{Error as AnyError, Result as AnyResult};
22
use chrono::{DateTime, Utc};
33
use dyn_clone::DynClone;
44
use feldera_types::config::FtModel;
5+
use feldera_types::coordination::Completion;
56
use feldera_types::program_schema::Relation;
67
use rmpv::{Value as RmpValue, ext::Error as RmpDecodeError};
78
use serde::Deserialize;
@@ -763,6 +764,22 @@ pub trait InputConsumer: Send + Sync + DynClone {
763764
/// so connectors that have no custom metrics need not override it.
764765
fn set_custom_metrics(&self, _metrics: Arc<dyn ConnectorMetrics>) {}
765766

767+
/// Returns a watch receiver that tracks completion of pipeline steps.
768+
///
769+
/// The receiver yields [`Completion`] values whose `total_completed_steps`
770+
/// field indicates how many steps have been fully processed (circuit
771+
/// execution + all output connectors).
772+
///
773+
/// Input adapters that need to defer acknowledgment until data is durably
774+
/// processed (e.g., CDC adapters controlling a replication slot) can use
775+
/// this to detect when their data has been consumed.
776+
///
777+
/// Returns `None` if the consumer does not support completion tracking
778+
/// (the default).
779+
fn completion_watcher(&self) -> Option<tokio::sync::watch::Receiver<Completion>> {
780+
None
781+
}
782+
766783
/// Endpoint failed.
767784
///
768785
/// Reports that the endpoint failed and that it will not queue any more

crates/adapters/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ iceberg-tests-glue = []
5252
iceberg-tests-rest = []
5353
fips = ["rustls/fips"]
5454
bench-mode = []
55+
with-postgres-cdc = ["etl", "etl-config", "etl-postgres"]
5556

5657
[dependencies]
5758
feldera-types = { workspace = true }
@@ -200,6 +201,9 @@ sentry = { workspace = true }
200201
zip = { workspace = true }
201202
smallvec = { workspace = true }
202203
delta_kernel = { workspace = true }
204+
etl = { workspace = true, optional = true }
205+
etl-config = { workspace = true, optional = true }
206+
etl-postgres = { workspace = true, optional = true }
203207

204208
[package.metadata.cargo-machete]
205209
ignored = ["num-traits"]

crates/adapters/src/controller.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6681,6 +6681,10 @@ impl InputConsumer for InputProbe {
66816681
self.transaction_in_progress.store(false, Ordering::Release);
66826682
}
66836683

6684+
fn completion_watcher(&self) -> Option<tokio::sync::watch::Receiver<Completion>> {
6685+
Some(self.controller.status.completion_notifier.subscribe())
6686+
}
6687+
66846688
fn error(&self, fatal: bool, error: AnyError, tag: Option<&'static str>) {
66856689
self.controller.input_transport_error(
66866690
self.endpoint_id,

crates/adapters/src/integrated.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ mod delta_table;
1010
mod postgres;
1111

1212
use crate::integrated::postgres::PostgresInputEndpoint;
13+
#[cfg(feature = "with-postgres-cdc")]
14+
use crate::integrated::postgres::PostgresCdcInputEndpoint;
1315
pub use crate::integrated::postgres::PostgresOutputEndpoint;
1416

1517
/// An integrated output connector implements both transport endpoint
@@ -100,6 +102,10 @@ pub fn create_integrated_input_endpoint(
100102
TransportConfig::PostgresInput(config) => {
101103
Box::new(PostgresInputEndpoint::new(endpoint_name, config, consumer))
102104
}
105+
#[cfg(feature = "with-postgres-cdc")]
106+
TransportConfig::PostgresCdcInput(config) => {
107+
Box::new(PostgresCdcInputEndpoint::new(endpoint_name, config, consumer))
108+
}
103109
transport => {
104110
return Err(ControllerError::unknown_input_transport(
105111
endpoint_name,

crates/adapters/src/integrated/postgres.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,14 @@ mod output_macros;
55
mod prepared_statements;
66
mod tls;
77

8+
#[cfg(feature = "with-postgres-cdc")]
9+
pub(crate) mod cdc_input;
10+
811
#[cfg(test)]
912
mod test;
1013

1114
pub use input::PostgresInputEndpoint;
1215
pub use output::PostgresOutputEndpoint;
16+
17+
#[cfg(feature = "with-postgres-cdc")]
18+
pub use cdc_input::PostgresCdcInputEndpoint;

0 commit comments

Comments
 (0)