Skip to content

adapters: improvements to the postgres CDC input connector#6533

Merged
abhizer merged 1 commit into
mainfrom
pg-cdc-improvements
Jul 1, 2026
Merged

adapters: improvements to the postgres CDC input connector#6533
abhizer merged 1 commit into
mainfrom
pg-cdc-improvements

Conversation

@abhizer

@abhizer abhizer commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Makes a series of improvements to stabilize the PostgreSQL CDC input connector.

Each improvement is its own commit, so I recommend reviewing per commit.

Tested using a local postgres instance, typically the docker image debezium/postgres, that comes with the WAL level set to logical.

@abhizer abhizer changed the title Pg cdc improvements adapters: improvements to the postgres CDC input connector Jun 24, 2026

@mihaibudiu mihaibudiu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You seem to call the right functions... based on their names.

fn request(&self, command: InputReaderCommand) {
if matches!(command, InputReaderCommand::Replay { .. }) {
panic!(
"replay command is not supported by PostgresCdcInputReader; this is a bug, please report it to developers"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feldera developers?

/// `POSTGRES_SSL_URL`/`POSTGRES_SSL_CA_LOCATION` or `POSTGRES_SSL_CA_PEM`.
#[test]
#[serial]
#[ignore]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding an ignored test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The postgres instance isn't currently configured with wal level logical.

/// Requires: wal_level=logical, user with REPLICATION privilege.
#[test]
#[serial]
#[ignore]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this I like

return Err(
"client-certificate TLS options (ssl_client_pem, ssl_client_location, \
ssl_client_key, ssl_client_key_location, ssl_certificate_chain_location) \
are not supported by the Postgres CDC connector"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently
Can you direct users to file an issue if we expect to support them eventually?
Or you can file one yourself and mention it.

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Draft, so high-level only.

Direction is solid. Four discrete commits, each one is independently legible, each one fixes a real concern. Reading top to bottom:

validate() on PostgresCdcReaderConfig. Right move — the etl crate silently ignores client-cert options today and disabling hostname verification isn't honored either; rejecting both at config time with explicit error messages is much better than the alternative of "we accepted your config, here's a confusingly insecure connection." Empty-after-trim checks on publication and source_table are cheap and catch a real footgun. One small thing: consider also asserting that at least one of the TLS fields makes sense when ssl_ca_pem/ssl_ca_location are absent but verify_hostname == Some(true) — currently "no CA, verify hostname" produces a runtime failure rather than a config-time one. Not blocking; just a thought.

Replay panic. Agree with the intent — a connector that genuinely cannot replay should fail loudly rather than silently producing wrong output. But panic!() inside request() is the wrong shape: the controller traps panics, but the error path through consumer.error(true, …) is the documented way to surface a fatal connector misconfiguration. Look at how delta_table_input or kafka_input reject unsupported commands — they call consumer.error(true, anyhow!("...")) and return. That keeps the failure inside the connector-health and toast machinery rather than as a generic backtrace in the logs. Either way, this should also be visible in PostgresCdcReaderConfig::fault_tolerance() (or wherever the framework asks "does this connector support replay?") — if the answer is "no" then the controller shouldn't be sending Replay in the first place, and panic-on-receipt is defense in depth, not the main barrier.

TLS reshuffle and make_etl_tls_config. The resolve_ca_pem refactor is a clear win — one place for the precedence rule (inline PEM > file path), one place for the warning, and the file-read error now carries the actual path that failed. The new make_etl_tls_config is small and the unit tests pin both the disabled and enabled paths. The validate() change already rejects client-cert TLS up front, so the cdc connector never reaches a state where it would need to handle them — good layering. _endpoint_name is unused; either drop the parameter or wire it into a tracing::warn! for the precedence message so the user knows which connector logged it.

debezium/postgres:15 CI image plus un-#[ignore]ing test_cdc_basic_insert. Big quality-of-life improvement, and the right call — the upstream image's default wal_level=replica is exactly what makes the CDC tests pointless in CI. Two things to flag for full review:

  • Removing the #[ignore] from test_cdc_basic_insert means every PR now needs the debezium image. The header comment block above cdc_tests still says "they are gated behind #[ignore] so they don't run in normal CI" for the others — worth updating that comment to say "the basic insert test runs in CI; the rest still require manual setup," or just removing the obsolete sentence entirely. Inconsistency between code and comment is the kind of thing that bites in six months.
  • test_cdc_basic_insert_tls stays #[ignore] because it needs POSTGRES_SSL_URL / POSTGRES_SSL_CA_*. Reasonable. Worth a one-line note in the docstring on how to run it locally — the existing test_cdc_* tests do this and it's helped me before.

Architecturally — and the bit I'd want resolved before ready-for-review — the validation lives in feldera-types but the constraints it encodes ("etl crate doesn't support client certs," "etl crate doesn't honor verify_hostname=false") are properties of the etl crate, not the config schema. If etl gains support for those tomorrow, the validation in feldera-types will silently keep rejecting valid configs until someone remembers. Three options:

  1. Move validate() to the adapters crate, gated on #[cfg(feature = "with-postgres-cdc")]. The downside is the validation only fires when the connector actually instantiates, but make_etl_tls_config already validates the same shape — collapsing them removes the duplication.
  2. Express the etl crate's capabilities as an explicit PostgresEtlCapabilities const in the adapters crate and reference it from validate(). Verbose but the dependency is explicit.
  3. Leave it where it is, but add a // XXX(etl-tls): if etl crate gains client-cert support, drop this rejection. comment so the next maintainer knows why it's there.

Option 3 is fine if 1 is too invasive. Don't ship without one of them — silent overspecification is the kind of bug that lives forever.

Smaller things, save for full review:

  • validate() returns Result<(), String>. Most of the codebase uses anyhow::Error or a typed error here; matching the surrounding convention costs nothing.
  • controller_error::invalid_transport_configuration(endpoint_name, &e.to_string()) — the String -> String round-trip through .to_string() is a smell. If validate returned &str references this would be cleaner; alternatively make invalid_transport_configuration take impl Display.
  • The TLS test fixture postgres_cdc_ssl_config() mirrors postgres_ssl_config() minus the client-cert fields. A with_supported_cdc_options(self) -> Self builder on PostgresTlsConfig would let both call sites share the "drop unsupported fields" logic and make the relationship between them obvious.
  • cdc_simple_test_circuit_with_tls uses serde_json::to_value(tls).unwrap() plus .as_object_mut().unwrap().extend(...) to flatten TLS fields into the parent config. That's fine for tests but the unwraps assume PostgresTlsConfig serializes to an object; a future #[serde(untagged)] variant would silently panic. A typed TestConnectorConfig struct that derives Serialize and inlines the TLS fields directly would be safer.

Not blocking anything since it's a draft. The replay-panic-vs-consumer-error question and the "where does validation live" question are the two I'd want answered before ready-for-review.

@abhizer abhizer force-pushed the pg-cdc-improvements branch from 74010e0 to f84381f Compare June 30, 2026 09:00
@abhizer abhizer marked this pull request as ready for review June 30, 2026 09:01
@abhizer abhizer requested a review from swanandx June 30, 2026 09:02

@mythical-fred mythical-fred left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior draft concerns addressed — bytea now uses ByteArray with a derived Deserialize, TLS is cleanly refactored through resolve_ca_pem, and schema handling uses ReplicatedTableSchema directly. New tests for pause/unpause, TLS, compatible schema changes, and PK-column-drop rejection are solid. Docs page is good. See inline nits.


fn request(&self, command: InputReaderCommand) {
if matches!(command, InputReaderCommand::Replay { .. }) {
panic!(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: panic!() here means the controller gets a generic backtrace instead of a structured connector error. consumer.error(true, anyhow!("...")) followed by return keeps the failure inside the connector-health machinery and produces a user-visible toast — same pattern delta_table_input uses for unsupported commands. The panic is defense-in-depth (the framework shouldn't send Replay if fault_tolerance() doesn't claim support), so not blocking, but the error path is cleaner.

let (sender, _flag_rx) = make_tracked_sender_sync();
let mut waiting: Vec<(u64, DeferredSenders)> = vec![(10, vec![sender])];
fire_completed(&mut waiting, 10);
fn test_required_columns_present() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

12 unit tests for fire_completed and completion_watcher_task were removed, but both functions still exist unchanged (lines 1112 and 1156). Those tests covered edge cases like equal-frontier semantics, partial checkpoints in strict mode, and multi-batch ordering — scenarios that are hard to exercise end-to-end. Could you restore them? The RelationInfo / cache tests that were removed are fine since the struct is gone.

| `ssl_ca_pem` | string | | CA certificates in PEM format. Setting this enables TLS and takes precedence over `ssl_ca_location`. |
| `ssl_ca_location` | string | | Path to a PEM file containing CA certificates. Used when `ssl_ca_pem` is not set. |

[*]: Required fields

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This markdown link-definition syntax ([*]: ...) won't render as visible text in most renderers — it defines a reference-style link target, so the "Required fields" note will silently disappear from the rendered page. Replace with a plain-text note, e.g.:

* Required fields

return Err("source_table cannot be empty".to_string());
}

if self.tls.ssl_client_pem.is_some()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit (carry-over from draft): this validation encodes etl-specific constraints ("etl doesn't support client certs", "etl doesn't honor verify_hostname=false"). If etl gains support for those, this validation will silently keep rejecting valid configs. A // XXX(etl-tls): drop these rejections if etl gains client-cert support comment would make the dependency explicit.

#[cfg(feature = "with-postgres-cdc")]
pub(crate) fn make_etl_tls_config(
tls: &PostgresTlsConfig,
_endpoint_name: &str,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: _endpoint_name is still unused. Either drop the parameter or wire it into the tracing::warn! log in resolve_ca_pem (which currently doesn't say which connector logged the precedence warning).

@swanandx swanandx left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, plz address Mihai & mythical-fred's comments before merging, and cleanup commit history ( if required )

#[cfg(feature = "with-postgres-cdc")]
pub(crate) fn make_etl_tls_config(
tls: &PostgresTlsConfig,
_endpoint_name: &str,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why take endpoint name if we don't use it?

completion_task_tx: Option<mpsc::UnboundedSender<(u64, DeferredSenders)>>,
/// Receiver half, taken once by worker_task_inner to spawn the background task.
completion_task_rx: Mutex<Option<mpsc::UnboundedReceiver<(u64, DeferredSenders)>>>,
/// etl shutdown handle for the currently running pipeline.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does etl mean btw?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

etl is the underlying crate we use to get data from Postgres.

Makes a series of improvements to the postgres cdc connector to
stabilize it.

feat(adapters): support TLS for postgres-cdc

Adds TLS support for the postgres-cdc input connector and adds test for
it.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

types: validate postgres-cdc config

Adds a validation check to prevent empty source table or publication
names.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

adapters: postgres-cdc panic on Replay request

Adds an explicit panic on a Replay command on Postgres CDC input
connector.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

ci: use debezium/postgres

Switch to using debezium/postgres which comes with wal_level=logical, so
that it allows us to run Postgres CDC tests.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

test(pg-cdc): treat col_bytea as ByteArray instead of String

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

tests(adapters): pg-cdc: add database name to postgres url

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

sqllib: impl Deserialize for ByteArray

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

pg-cdc: fix test_cdc_ft_mode_holds_slot

This test used to fail because it tries to use `exactly_once` fault
tolerance, when it only supports `at_least_once` fault tolerance.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

postgres-cdc: upgrade etl and align connector behavior

Upgrade the etl dependency to b2aab62d and update the Postgres CDC
connector for the new API.

Also simplify connector-side schema handling, improve shutdown/pause
behavior, and add CDC coverage for pause/unpause and compatible vs
incompatible source schema changes.

Uses sqlx 0.9.0-alpha.1 until we bump MSRV to 1.94.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

docs: add docs for postgresql-cdc input connector

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>

apply suggestions from code review

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
@abhizer abhizer force-pushed the pg-cdc-improvements branch from f84381f to f66187f Compare July 1, 2026 05:08
@abhizer abhizer enabled auto-merge July 1, 2026 05:08
@abhizer abhizer added this pull request to the merge queue Jul 1, 2026
Merged via the queue into main with commit 9b4c369 Jul 1, 2026
1 check passed
@abhizer abhizer deleted the pg-cdc-improvements branch July 1, 2026 06:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants