Skip to content

[adapters] Add Postgres CDC input connector with crash-safe replication#5988

Open
flak153 wants to merge 1 commit intofeldera:mainfrom
flak153:postgres-cdc-input
Open

[adapters] Add Postgres CDC input connector with crash-safe replication#5988
flak153 wants to merge 1 commit intofeldera:mainfrom
flak153:postgres-cdc-input

Conversation

@flak153
Copy link
Copy Markdown

@flak153 flak153 commented Apr 3, 2026

Summary

Adds an integrated input connector that reads from Postgres via logical replication using supabase/etl. Handles both initial table snapshot and ongoing WAL streaming with crash-safe replication slot advancement.

Closes #5208

Key features

  • Snapshot + streaming via etl's Pipeline (table copy + CDC follow)
  • Crash-safe replication: defers ETL's async flush confirmation until Feldera completes the circuit step that processed the data, ensuring the Postgres replication slot only advances after data is durably processed
  • InputConsumer::completion_watcher(): new trait method (backwards-compatible default None) that exposes Feldera's step completion notifications to input adapters

How crash safety works

  1. write_events pushes data to InputQueue and stores the ETL async result sender in a side channel
  2. When the controller calls Queue, data flushes to the circuit and the sender is routed to a background task with the current total_completed_steps value
  3. The background task watches completion_notifier and fires the sender when total_completed_steps advances past the value at flush time
  4. ETL receives the confirmation and advances the replication slot

On crash, unconfirmed data replays from the slot position — consistent with at-least-once delivery.

Upstream work

What's left

  • Integration tests (need Postgres with wal_level=logical)
  • Error handling refinement
  • Additional configuration options (slot name, TLS, batch config)
  • Documentation

@flak153 flak153 force-pushed the postgres-cdc-input branch from e1231a8 to b2f3351 Compare April 3, 2026 07:31
@lalithsuresh lalithsuresh requested review from abhizer and blp April 3, 2026 12:07
@lalithsuresh
Copy link
Copy Markdown
Contributor

@flak153 this is fantastic! Thank you for your contribution. We'll get this some reviews.

Copy link
Copy Markdown
Contributor

@abhizer abhizer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

It would also be really nice to have tests for this connector, with all the different data types as such: https://docs.feldera.com/connectors/sources/postgresql/#an-example-for-every-type


let input_stream = input_handle
.handle
.configure_deserializer(RecordFormat::Json(JsonFlavor::Datagen))?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the Datagen format correctly represent all incoming JSON records? Specially for date and time related records.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigated — found two mismatches and fixed both. Timestamps now use RFC 3339 format (T separator) and bytes use byte arrays instead of hex strings. Added unit tests covering all Cell/ArrayCell variants.

Copy link
Copy Markdown
Member

@blp blp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much!

I read the code and I have some comments. I didn't try it or study the way it works in much detail.

It would be helpful to add some documentation under docs.feldera.com/docs/connectors/sources/.

I'll enable the workflows so we can see how an initial CI run goes.

impl InputEndpoint for PostgresCdcInputEndpoint {
fn fault_tolerance(&self) -> Option<FtModel> {
None
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No fault tolerance will be a problem for a lot of users.

Comment on lines +98 to +107
thread::Builder::new()
.name("postgres-cdc-input-tokio-wrapper".to_string())
.spawn(move || {
TOKIO.block_on(async {
let _ = endpoint_clone
.worker_task(input_stream, receiver, init_status_sender)
.await;
})
})
.expect("failed to create Postgres CDC input connector thread");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably would have used TOKIO.spawn rather than creating a thread and then running TOKIO.block_on in it. Maybe you made this as a considered choice though; I don't know whether there is some advantage to it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping the thread because open() is sync and needs blocking_recv for init status. The actual CDC work runs on TOKIO via block_on inside the thread.

Comment on lines +410 to +411
let indices: Vec<String> =
(0..cells.len()).map(|i| format!("col_{i}")).collect();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will format and allocate a whole array of strings for every row. It would be better to unconditionally construct column_names above so that it can be reused instead of reconstructed for every row.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — will fix to pre-construct column_names outside the loop.

Comment on lines +643 to +646
Cell::Numeric(n) => {
// Preserve precision by encoding as string.
json!(n.to_string())
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know whether this is necessary, since we enable the serde_json arbitrary_precision feature.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for ArrayCell::Numeric.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked — Feldera does enable arbitrary_precision. However, etl's Cell::Numeric uses pg_numeric which implements Serialize, so json!(n) would serialize it as a number with full precision. The string conversion is a safety choice but you're right it may not be needed. Will investigate further.

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two blockers not already raised by blp/abhizer.

@flak153
Copy link
Copy Markdown
Author

flak153 commented Apr 5, 2026

Thanks everyone for the thorough reviews. Pushed fixes for all comments:

@abhizer — Good catch on the Datagen format. Found two mismatches and fixed both:

  • Cell::Timestamp now uses RFC 3339 format (T separator) instead of space-separated
  • Cell::Bytes now encodes as byte arrays instead of hex strings to match BinaryFormat::Array
  • Same fixes for the ArrayCell variants
  • Added 47 unit tests covering all cell_to_json/array_cell_to_json variants plus URI parsing and table matching

@blp — All comments addressed:

  • completion_watcher default impl removed, added explicit None to all 3 implementations
  • with-postgres-cdc added to default features
  • mpsconeshot for init status channel
  • mem::take instead of drain().collect()
  • Kept thread::Builder because open() is sync and needs blocking_recv for init status — happy to switch if there's a better pattern
  • On FT: the path is switching MemoryStorePostgresStore so etl persists table phases across restarts, then implementing Resume::Seek. Planning that as a follow-up since it's a significant addition.

@mythical-fred — Both blockers addressed:

  • etl deps pinned to rev = "05cb11ae"
  • Added 3 integration tests (test_cdc_basic_insert, test_cdc_all_data_types, test_cdc_update_delete) marked #[ignore] since they require wal_level=logical

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of my previous blockers are addressed — etl deps pinned, integration tests added. Thank you for the thorough response.

One remaining issue before this can merge: the second commit must be squashed into the first.

Commit 72b20821 has the message "Address review comments on Postgres CDC connector" — that's not a commit message that belongs in Feldera's linear history. Please use git rebase -i to squash it into the initial commit and produce a single, clean commit with a complete message (or two well-named commits if there's a logical split). Resources:

@flak153 flak153 force-pushed the postgres-cdc-input branch from 72b2082 to 194f366 Compare April 5, 2026 13:40
@flak153
Copy link
Copy Markdown
Author

flak153 commented Apr 5, 2026

Squashed into a single commit.

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@flak153
Copy link
Copy Markdown
Author

flak153 commented Apr 6, 2026

Question — beyond the current tests, there are several edge cases we could cover. Would you like any of these before merge, or are they better as follow-ups?

High priority (most likely to surface real bugs):

  • Crash-resume / failover: start pipeline, insert data, kill without confirming, restart, verify data replays
  • Schema changes during streaming: column added/dropped while CDC is running
  • Large transactions: single transaction with thousands of rows (tests 2MB buffer splitting)
  • Multiple tables in publication: verify source_table filtering ignores other tables

Medium priority:

  • TRUNCATE events (currently ignored — verify it doesn't crash)
  • Empty table snapshot (zero rows, then streaming starts)
  • REPLICA IDENTITY behavior without FULL (no old row on updates/deletes)
  • Rapid insert/update/delete cycles on same row

Lower priority (edge cases):

  • Connection loss / Postgres restart mid-stream
  • Slot invalidation (slot dropped externally)
  • Long-running transactions
  • Special float values end-to-end (NaN, Infinity)

Happy to add whichever subset you think is important for this PR vs follow-ups.

@flak153 flak153 force-pushed the postgres-cdc-input branch from 194f366 to 95bf3a6 Compare April 8, 2026 01:22
@flak153
Copy link
Copy Markdown
Author

flak153 commented Apr 8, 2026

Pushed updates:

  • Fault tolerance implemented: Switched MemoryStorePostgresStore so etl persists table replication phases across restarts. The connector now returns Some(FtModel::AtLeastOnce). On restart, etl reads the stored phases from Postgres and resumes from the replication slot position instead of re-snapshotting.
  • Column names pre-constructed outside the per-row loop in write_table_rows (blp's comment)
  • Numeric: Keeping .to_string()PgNumeric is a custom enum (NaN/Infinity/Value) without Serialize, so json!(n) wouldn't work even with arbitrary_precision
  • Pipeline ID: Uses a deterministic hash of (URI, publication, source_table) so the same replication slot and stored state are reused across restarts

@flak153 flak153 force-pushed the postgres-cdc-input branch from 265f888 to 78a712a Compare April 8, 2026 03:17
@flak153
Copy link
Copy Markdown
Author

flak153 commented Apr 8, 2026

Question on fault tolerance implementation — looking for guidance on the right pattern.

The connector now uses PostgresStore so etl persists table replication phases in Postgres. On restart, etl checks the stored phases and resumes from the replication slot position (no re-snapshot). This works regardless of what fault_tolerance() returns.

To properly return Some(FtModel::AtLeastOnce), we need to provide Resume::Seek metadata via consumer.extended(). The issue is that InputQueue::queue() hardcodes resume: None in its internal extended() call, and we use queue() to avoid reimplementing its batching/transaction logic.

Would it be valid to call extended() a second time after queue() with empty data + resume metadata?

self.inner.queue.queue();  // flushes data, calls extended(total, None, watermarks)
self.inner.consumer.extended(
    BufferSize::empty(),
    Some(Resume::Seek { seek: json!({"pipeline_id": pipeline_id}) }),
    vec![],
);

Or is there a better pattern for integrated endpoints that need resume metadata but want to use queue() for the data path?

For now I've set fault_tolerance() to AtLeastOnce with the PostgresStore backing — the crash recovery works because etl manages its own state. But want to make sure the Feldera checkpoint integration is correct too.

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two blockers, see inline.

// Use a deterministic pipeline_id so etl reuses the same replication slot
// and stored state across restarts.
let pipeline_id = {
let mut hasher = DefaultHasher::new();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DefaultHasher is not stable across Rust versions/builds, so pipeline_id can change on upgrade and the replication slot/state will not be reused. Please switch to a stable hash (e.g., xxhash/sha256) and consider hashing a normalized connection identity (exclude password/other volatile fields).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — switched to xxh3 (stable across Rust versions) with a normalized identity string: host:port/db + publication + source_table (excludes password and other volatile fields).

invalidated_slot_behavior: InvalidatedSlotBehavior::default(),
};

// Use PostgresStore to persist table replication phases across restarts.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching to PostgresStore + AtLeastOnce changes restart semantics (resume from slot instead of re-snapshot). Per test rule, this behavior change needs an integration test that restarts the pipeline and verifies it resumes correctly.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test_cdc_restart_resumes_from_slot — starts pipeline, inserts data, stops, restarts with a new output file, inserts more data, then asserts the original snapshot row (id=1) does NOT reappear in the second run's output (proving PostgresStore state was preserved and no re-snapshot occurred).

Add an integrated input connector that reads from Postgres via logical
replication using the supabase/etl library. The connector handles both
initial table snapshot and ongoing WAL streaming.

Key features:
- Snapshot + streaming via etl's Pipeline (table copy + CDC follow)
- Crash-safe replication slot advancement: the connector defers ETL's
  async flush confirmation until Feldera completes the circuit step
  that processed the data, ensuring the Postgres replication slot only
  advances after data is durably processed
- Completion tracking via InputConsumer::completion_watcher() that
  exposes Feldera's step completion notifications to input adapters
- RFC 3339 timestamp format and byte array encoding for correct
  Datagen parser compatibility

Files added:
- crates/adapters/src/integrated/postgres/cdc_input.rs

Tests:
- 47 unit tests (cell_to_json, array_cell_to_json, row_to_json,
  parse_pg_uri, FelderaDestination helpers)
- 3 integration tests (basic insert, all data types, update/delete)
@flak153 flak153 force-pushed the postgres-cdc-input branch from 78a712a to 2740fb1 Compare April 8, 2026 04:45
@blp
Copy link
Copy Markdown
Member

blp commented Apr 8, 2026

Question on fault tolerance implementation — looking for guidance on the right pattern.

The connector now uses PostgresStore so etl persists table replication phases in Postgres. On restart, etl checks the stored phases and resumes from the replication slot position (no re-snapshot). This works regardless of what fault_tolerance() returns.

To properly return Some(FtModel::AtLeastOnce), we need to provide Resume::Seek metadata via consumer.extended(). The issue is that InputQueue::queue() hardcodes resume: None in its internal extended() call, and we use queue() to avoid reimplementing its batching/transaction logic.

Right, InputQueue::queue can't be used for fault tolerance.

One way to achieve fault tolerance along with InputQueue is to add auxiliary data (the A generic parameter to InputQueue) for the resume information to each batch in the queue, and then use flush_with_aux instead of queue and subsequently call extended directly. The nats connector does this.

Would it be valid to call extended() a second time after queue() with empty data + resume metadata?

No, it needs to be called exactly once.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Connector for postgres snapshot-and-follow

5 participants