[adapters] Add Postgres CDC input connector with crash-safe replication#5988
[adapters] Add Postgres CDC input connector with crash-safe replication#5988flak153 wants to merge 1 commit intofeldera:mainfrom
Conversation
e1231a8 to
b2f3351
Compare
|
@flak153 this is fantastic! Thank you for your contribution. We'll get this some reviews. |
abhizer
left a comment
There was a problem hiding this comment.
Thank you!
It would also be really nice to have tests for this connector, with all the different data types as such: https://docs.feldera.com/connectors/sources/postgresql/#an-example-for-every-type
|
|
||
| let input_stream = input_handle | ||
| .handle | ||
| .configure_deserializer(RecordFormat::Json(JsonFlavor::Datagen))?; |
There was a problem hiding this comment.
Does the Datagen format correctly represent all incoming JSON records? Specially for date and time related records.
There was a problem hiding this comment.
Investigated — found two mismatches and fixed both. Timestamps now use RFC 3339 format (T separator) and bytes use byte arrays instead of hex strings. Added unit tests covering all Cell/ArrayCell variants.
blp
left a comment
There was a problem hiding this comment.
Thank you very much!
I read the code and I have some comments. I didn't try it or study the way it works in much detail.
It would be helpful to add some documentation under docs.feldera.com/docs/connectors/sources/.
I'll enable the workflows so we can see how an initial CI run goes.
| impl InputEndpoint for PostgresCdcInputEndpoint { | ||
| fn fault_tolerance(&self) -> Option<FtModel> { | ||
| None | ||
| } |
There was a problem hiding this comment.
No fault tolerance will be a problem for a lot of users.
| thread::Builder::new() | ||
| .name("postgres-cdc-input-tokio-wrapper".to_string()) | ||
| .spawn(move || { | ||
| TOKIO.block_on(async { | ||
| let _ = endpoint_clone | ||
| .worker_task(input_stream, receiver, init_status_sender) | ||
| .await; | ||
| }) | ||
| }) | ||
| .expect("failed to create Postgres CDC input connector thread"); |
There was a problem hiding this comment.
I probably would have used TOKIO.spawn rather than creating a thread and then running TOKIO.block_on in it. Maybe you made this as a considered choice though; I don't know whether there is some advantage to it.
There was a problem hiding this comment.
Keeping the thread because open() is sync and needs blocking_recv for init status. The actual CDC work runs on TOKIO via block_on inside the thread.
| let indices: Vec<String> = | ||
| (0..cells.len()).map(|i| format!("col_{i}")).collect(); |
There was a problem hiding this comment.
This will format and allocate a whole array of strings for every row. It would be better to unconditionally construct column_names above so that it can be reused instead of reconstructed for every row.
There was a problem hiding this comment.
Good catch — will fix to pre-construct column_names outside the loop.
| Cell::Numeric(n) => { | ||
| // Preserve precision by encoding as string. | ||
| json!(n.to_string()) | ||
| } |
There was a problem hiding this comment.
I don't know whether this is necessary, since we enable the serde_json arbitrary_precision feature.
There was a problem hiding this comment.
Checked — Feldera does enable arbitrary_precision. However, etl's Cell::Numeric uses pg_numeric which implements Serialize, so json!(n) would serialize it as a number with full precision. The string conversion is a safety choice but you're right it may not be needed. Will investigate further.
mythical-fred
left a comment
There was a problem hiding this comment.
Two blockers not already raised by blp/abhizer.
|
Thanks everyone for the thorough reviews. Pushed fixes for all comments: @abhizer — Good catch on the Datagen format. Found two mismatches and fixed both:
@blp — All comments addressed:
@mythical-fred — Both blockers addressed:
|
mythical-fred
left a comment
There was a problem hiding this comment.
Both of my previous blockers are addressed — etl deps pinned, integration tests added. Thank you for the thorough response.
One remaining issue before this can merge: the second commit must be squashed into the first.
Commit 72b20821 has the message "Address review comments on Postgres CDC connector" — that's not a commit message that belongs in Feldera's linear history. Please use git rebase -i to squash it into the initial commit and produce a single, clean commit with a complete message (or two well-named commits if there's a logical split). Resources:
72b2082 to
194f366
Compare
|
Squashed into a single commit. |
|
Question — beyond the current tests, there are several edge cases we could cover. Would you like any of these before merge, or are they better as follow-ups? High priority (most likely to surface real bugs):
Medium priority:
Lower priority (edge cases):
Happy to add whichever subset you think is important for this PR vs follow-ups. |
194f366 to
95bf3a6
Compare
|
Pushed updates:
|
265f888 to
78a712a
Compare
|
Question on fault tolerance implementation — looking for guidance on the right pattern. The connector now uses To properly return Would it be valid to call self.inner.queue.queue(); // flushes data, calls extended(total, None, watermarks)
self.inner.consumer.extended(
BufferSize::empty(),
Some(Resume::Seek { seek: json!({"pipeline_id": pipeline_id}) }),
vec![],
);Or is there a better pattern for integrated endpoints that need resume metadata but want to use For now I've set |
| // Use a deterministic pipeline_id so etl reuses the same replication slot | ||
| // and stored state across restarts. | ||
| let pipeline_id = { | ||
| let mut hasher = DefaultHasher::new(); |
There was a problem hiding this comment.
DefaultHasher is not stable across Rust versions/builds, so pipeline_id can change on upgrade and the replication slot/state will not be reused. Please switch to a stable hash (e.g., xxhash/sha256) and consider hashing a normalized connection identity (exclude password/other volatile fields).
There was a problem hiding this comment.
Fixed — switched to xxh3 (stable across Rust versions) with a normalized identity string: host:port/db + publication + source_table (excludes password and other volatile fields).
| invalidated_slot_behavior: InvalidatedSlotBehavior::default(), | ||
| }; | ||
|
|
||
| // Use PostgresStore to persist table replication phases across restarts. |
There was a problem hiding this comment.
Switching to PostgresStore + AtLeastOnce changes restart semantics (resume from slot instead of re-snapshot). Per test rule, this behavior change needs an integration test that restarts the pipeline and verifies it resumes correctly.
There was a problem hiding this comment.
Added test_cdc_restart_resumes_from_slot — starts pipeline, inserts data, stops, restarts with a new output file, inserts more data, then asserts the original snapshot row (id=1) does NOT reappear in the second run's output (proving PostgresStore state was preserved and no re-snapshot occurred).
Add an integrated input connector that reads from Postgres via logical replication using the supabase/etl library. The connector handles both initial table snapshot and ongoing WAL streaming. Key features: - Snapshot + streaming via etl's Pipeline (table copy + CDC follow) - Crash-safe replication slot advancement: the connector defers ETL's async flush confirmation until Feldera completes the circuit step that processed the data, ensuring the Postgres replication slot only advances after data is durably processed - Completion tracking via InputConsumer::completion_watcher() that exposes Feldera's step completion notifications to input adapters - RFC 3339 timestamp format and byte array encoding for correct Datagen parser compatibility Files added: - crates/adapters/src/integrated/postgres/cdc_input.rs Tests: - 47 unit tests (cell_to_json, array_cell_to_json, row_to_json, parse_pg_uri, FelderaDestination helpers) - 3 integration tests (basic insert, all data types, update/delete)
78a712a to
2740fb1
Compare
Right, One way to achieve fault tolerance along with
No, it needs to be called exactly once. |
Summary
Adds an integrated input connector that reads from Postgres via logical replication using supabase/etl. Handles both initial table snapshot and ongoing WAL streaming with crash-safe replication slot advancement.
Closes #5208
Key features
InputConsumer::completion_watcher(): new trait method (backwards-compatible defaultNone) that exposes Feldera's step completion notifications to input adaptersHow crash safety works
write_eventspushes data toInputQueueand stores the ETL async result sender in a side channelQueue, data flushes to the circuit and the sender is routed to a background task with the currenttotal_completed_stepsvaluecompletion_notifierand fires the sender whentotal_completed_stepsadvances past the value at flush timeOn crash, unconfirmed data replays from the slot position — consistent with at-least-once delivery.
Upstream work
What's left
wal_level=logical)