Skip to content

Commit 194f366

Browse files
author
Mohammed Ali
committed
[adapters] Add Postgres CDC input connector with crash-safe replication
Add an integrated input connector that reads from Postgres via logical replication using the supabase/etl library. The connector handles both initial table snapshot and ongoing WAL streaming. Key features: - Snapshot + streaming via etl's Pipeline (table copy + CDC follow) - Crash-safe replication slot advancement: the connector defers ETL's async flush confirmation until Feldera completes the circuit step that processed the data, ensuring the Postgres replication slot only advances after data is durably processed - Completion tracking via InputConsumer::completion_watcher() that exposes Feldera's step completion notifications to input adapters - RFC 3339 timestamp format and byte array encoding for correct Datagen parser compatibility Files added: - crates/adapters/src/integrated/postgres/cdc_input.rs Tests: - 47 unit tests (cell_to_json, array_cell_to_json, row_to_json, parse_pg_uri, FelderaDestination helpers) - 3 integration tests (basic insert, all data types, update/delete)
1 parent 3d7046f commit 194f366

File tree

14 files changed

+2841
-42
lines changed

14 files changed

+2841
-42
lines changed

Cargo.lock

Lines changed: 559 additions & 39 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ tikv-jemallocator = "0.6.0"
265265
time = { version = "0.3.47", features = ["serde", "serde-well-known"] }
266266
tokio = "1.50.0"
267267
tokio-postgres = "0.7"
268-
tokio-stream = "0.1.15"
268+
tokio-stream = "0.1.18"
269269
tokio-util = "0.7.11"
270270
tracing = "0.1.40"
271271
tracing-subscriber = "0.3.20"
@@ -283,6 +283,9 @@ zip = "6.0.0"
283283
zstd = "0.12.0"
284284
backtrace = "0.3.75"
285285
parking_lot = "0.12.4"
286+
etl = { git = "https://github.com/supabase/etl", rev = "05cb11ae" }
287+
etl-config = { git = "https://github.com/supabase/etl", rev = "05cb11ae" }
288+
etl-postgres = { git = "https://github.com/supabase/etl", rev = "05cb11ae" }
286289

287290
[workspace.metadata.release]
288291
release = false

crates/adapterlib/src/transport.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use anyhow::{Error as AnyError, Result as AnyResult};
22
use chrono::{DateTime, Utc};
33
use dyn_clone::DynClone;
44
use feldera_types::config::FtModel;
5+
use feldera_types::coordination::Completion;
56
use feldera_types::program_schema::Relation;
67
use rmpv::{Value as RmpValue, ext::Error as RmpDecodeError};
78
use serde::Deserialize;
@@ -765,6 +766,19 @@ pub trait InputConsumer: Send + Sync + DynClone {
765766
/// so connectors that have no custom metrics need not override it.
766767
fn set_custom_metrics(&self, _metrics: Arc<dyn ConnectorMetrics>) {}
767768

769+
/// Returns a watch receiver that tracks completion of pipeline steps.
770+
///
771+
/// The receiver yields [`Completion`] values whose `total_completed_steps`
772+
/// field indicates how many steps have been fully processed (circuit
773+
/// execution + all output connectors).
774+
///
775+
/// Input adapters that need to defer acknowledgment until data is durably
776+
/// processed (e.g., CDC adapters controlling a replication slot) can use
777+
/// this to detect when their data has been consumed.
778+
///
779+
/// Return `None` if the consumer does not support completion tracking.
780+
fn completion_watcher(&self) -> Option<tokio::sync::watch::Receiver<Completion>>;
781+
768782
/// Endpoint failed.
769783
///
770784
/// Reports that the endpoint failed and that it will not queue any more

crates/adapters/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ default = [
2424
"with-pubsub",
2525
"with-redis",
2626
"with-nats",
27+
"with-postgres-cdc",
2728
]
2829
with-kafka = ["rdkafka"]
2930
with-deltalake = ["deltalake", "deltalake-catalog-unity"]
@@ -52,6 +53,7 @@ iceberg-tests-glue = []
5253
iceberg-tests-rest = []
5354
fips = ["rustls/fips"]
5455
bench-mode = []
56+
with-postgres-cdc = ["etl", "etl-config", "etl-postgres"]
5557

5658
[dependencies]
5759
feldera-types = { workspace = true }
@@ -200,6 +202,9 @@ zip = { workspace = true }
200202
smallvec = { workspace = true }
201203
delta_kernel = { workspace = true }
202204
flate2 = { workspace = true }
205+
etl = { workspace = true, optional = true }
206+
etl-config = { workspace = true, optional = true }
207+
etl-postgres = { workspace = true, optional = true }
203208

204209
[package.metadata.cargo-machete]
205210
ignored = ["num-traits"]

crates/adapters/src/controller.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7080,6 +7080,10 @@ impl InputConsumer for InputProbe {
70807080
self.transaction_in_progress.store(false, Ordering::Release);
70817081
}
70827082

7083+
fn completion_watcher(&self) -> Option<tokio::sync::watch::Receiver<Completion>> {
7084+
Some(self.controller.status.completion_notifier.subscribe())
7085+
}
7086+
70837087
fn error(&self, fatal: bool, error: AnyError, tag: Option<&str>) {
70847088
self.controller.input_transport_error(
70857089
self.endpoint_id,

crates/adapters/src/integrated.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub mod delta_table;
1010
mod postgres;
1111

1212
use crate::integrated::postgres::PostgresInputEndpoint;
13+
#[cfg(feature = "with-postgres-cdc")]
14+
use crate::integrated::postgres::PostgresCdcInputEndpoint;
1315
pub use crate::integrated::postgres::PostgresOutputEndpoint;
1416

1517
/// An integrated output connector implements both transport endpoint
@@ -100,6 +102,10 @@ pub fn create_integrated_input_endpoint(
100102
TransportConfig::PostgresInput(config) => {
101103
Box::new(PostgresInputEndpoint::new(endpoint_name, config, consumer))
102104
}
105+
#[cfg(feature = "with-postgres-cdc")]
106+
TransportConfig::PostgresCdcInput(config) => {
107+
Box::new(PostgresCdcInputEndpoint::new(endpoint_name, config, consumer))
108+
}
103109
transport => {
104110
return Err(ControllerError::unknown_input_transport(
105111
endpoint_name,

crates/adapters/src/integrated/postgres.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,14 @@ mod output_macros;
55
mod prepared_statements;
66
mod tls;
77

8+
#[cfg(feature = "with-postgres-cdc")]
9+
pub(crate) mod cdc_input;
10+
811
#[cfg(test)]
912
mod test;
1013

1114
pub use input::PostgresInputEndpoint;
1215
pub use output::PostgresOutputEndpoint;
16+
17+
#[cfg(feature = "with-postgres-cdc")]
18+
pub use cdc_input::PostgresCdcInputEndpoint;

0 commit comments

Comments
 (0)