adapters: improvements to the postgres CDC input connector#6533
Conversation
mihaibudiu
left a comment
There was a problem hiding this comment.
You seem to call the right functions... based on their names.
| fn request(&self, command: InputReaderCommand) { | ||
| if matches!(command, InputReaderCommand::Replay { .. }) { | ||
| panic!( | ||
| "replay command is not supported by PostgresCdcInputReader; this is a bug, please report it to developers" |
| /// `POSTGRES_SSL_URL`/`POSTGRES_SSL_CA_LOCATION` or `POSTGRES_SSL_CA_PEM`. | ||
| #[test] | ||
| #[serial] | ||
| #[ignore] |
There was a problem hiding this comment.
adding an ignored test?
There was a problem hiding this comment.
The postgres instance isn't currently configured with wal level logical.
| /// Requires: wal_level=logical, user with REPLICATION privilege. | ||
| #[test] | ||
| #[serial] | ||
| #[ignore] |
| return Err( | ||
| "client-certificate TLS options (ssl_client_pem, ssl_client_location, \ | ||
| ssl_client_key, ssl_client_key_location, ssl_certificate_chain_location) \ | ||
| are not supported by the Postgres CDC connector" |
There was a problem hiding this comment.
currently
Can you direct users to file an issue if we expect to support them eventually?
Or you can file one yourself and mention it.
mythical-fred
left a comment
There was a problem hiding this comment.
Draft, so high-level only.
Direction is solid. Four discrete commits, each one is independently legible, each one fixes a real concern. Reading top to bottom:
validate() on PostgresCdcReaderConfig. Right move — the etl crate silently ignores client-cert options today and disabling hostname verification isn't honored either; rejecting both at config time with explicit error messages is much better than the alternative of "we accepted your config, here's a confusingly insecure connection." Empty-after-trim checks on publication and source_table are cheap and catch a real footgun. One small thing: consider also asserting that at least one of the TLS fields makes sense when ssl_ca_pem/ssl_ca_location are absent but verify_hostname == Some(true) — currently "no CA, verify hostname" produces a runtime failure rather than a config-time one. Not blocking; just a thought.
Replay panic. Agree with the intent — a connector that genuinely cannot replay should fail loudly rather than silently producing wrong output. But panic!() inside request() is the wrong shape: the controller traps panics, but the error path through consumer.error(true, …) is the documented way to surface a fatal connector misconfiguration. Look at how delta_table_input or kafka_input reject unsupported commands — they call consumer.error(true, anyhow!("...")) and return. That keeps the failure inside the connector-health and toast machinery rather than as a generic backtrace in the logs. Either way, this should also be visible in PostgresCdcReaderConfig::fault_tolerance() (or wherever the framework asks "does this connector support replay?") — if the answer is "no" then the controller shouldn't be sending Replay in the first place, and panic-on-receipt is defense in depth, not the main barrier.
TLS reshuffle and make_etl_tls_config. The resolve_ca_pem refactor is a clear win — one place for the precedence rule (inline PEM > file path), one place for the warning, and the file-read error now carries the actual path that failed. The new make_etl_tls_config is small and the unit tests pin both the disabled and enabled paths. The validate() change already rejects client-cert TLS up front, so the cdc connector never reaches a state where it would need to handle them — good layering. _endpoint_name is unused; either drop the parameter or wire it into a tracing::warn! for the precedence message so the user knows which connector logged it.
debezium/postgres:15 CI image plus un-#[ignore]ing test_cdc_basic_insert. Big quality-of-life improvement, and the right call — the upstream image's default wal_level=replica is exactly what makes the CDC tests pointless in CI. Two things to flag for full review:
- Removing the
#[ignore]fromtest_cdc_basic_insertmeans every PR now needs the debezium image. The header comment block abovecdc_testsstill says "they are gated behind#[ignore]so they don't run in normal CI" for the others — worth updating that comment to say "the basic insert test runs in CI; the rest still require manual setup," or just removing the obsolete sentence entirely. Inconsistency between code and comment is the kind of thing that bites in six months. test_cdc_basic_insert_tlsstays#[ignore]because it needsPOSTGRES_SSL_URL/POSTGRES_SSL_CA_*. Reasonable. Worth a one-line note in the docstring on how to run it locally — the existingtest_cdc_*tests do this and it's helped me before.
Architecturally — and the bit I'd want resolved before ready-for-review — the validation lives in feldera-types but the constraints it encodes ("etl crate doesn't support client certs," "etl crate doesn't honor verify_hostname=false") are properties of the etl crate, not the config schema. If etl gains support for those tomorrow, the validation in feldera-types will silently keep rejecting valid configs until someone remembers. Three options:
- Move
validate()to the adapters crate, gated on#[cfg(feature = "with-postgres-cdc")]. The downside is the validation only fires when the connector actually instantiates, butmake_etl_tls_configalready validates the same shape — collapsing them removes the duplication. - Express the etl crate's capabilities as an explicit
PostgresEtlCapabilitiesconst in the adapters crate and reference it fromvalidate(). Verbose but the dependency is explicit. - Leave it where it is, but add a
// XXX(etl-tls): if etl crate gains client-cert support, drop this rejection.comment so the next maintainer knows why it's there.
Option 3 is fine if 1 is too invasive. Don't ship without one of them — silent overspecification is the kind of bug that lives forever.
Smaller things, save for full review:
validate()returnsResult<(), String>. Most of the codebase usesanyhow::Erroror a typed error here; matching the surrounding convention costs nothing.controller_error::invalid_transport_configuration(endpoint_name, &e.to_string())— theString -> Stringround-trip through.to_string()is a smell. Ifvalidatereturned&strreferences this would be cleaner; alternatively makeinvalid_transport_configurationtakeimpl Display.- The TLS test fixture
postgres_cdc_ssl_config()mirrorspostgres_ssl_config()minus the client-cert fields. Awith_supported_cdc_options(self) -> Selfbuilder onPostgresTlsConfigwould let both call sites share the "drop unsupported fields" logic and make the relationship between them obvious. cdc_simple_test_circuit_with_tlsusesserde_json::to_value(tls).unwrap()plus.as_object_mut().unwrap().extend(...)to flatten TLS fields into the parent config. That's fine for tests but theunwraps assumePostgresTlsConfigserializes to an object; a future#[serde(untagged)]variant would silently panic. A typedTestConnectorConfigstruct that derivesSerializeand inlines the TLS fields directly would be safer.
Not blocking anything since it's a draft. The replay-panic-vs-consumer-error question and the "where does validation live" question are the two I'd want answered before ready-for-review.
74010e0 to
f84381f
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
Prior draft concerns addressed — bytea now uses ByteArray with a derived Deserialize, TLS is cleanly refactored through resolve_ca_pem, and schema handling uses ReplicatedTableSchema directly. New tests for pause/unpause, TLS, compatible schema changes, and PK-column-drop rejection are solid. Docs page is good. See inline nits.
|
|
||
| fn request(&self, command: InputReaderCommand) { | ||
| if matches!(command, InputReaderCommand::Replay { .. }) { | ||
| panic!( |
There was a problem hiding this comment.
Nit: panic!() here means the controller gets a generic backtrace instead of a structured connector error. consumer.error(true, anyhow!("...")) followed by return keeps the failure inside the connector-health machinery and produces a user-visible toast — same pattern delta_table_input uses for unsupported commands. The panic is defense-in-depth (the framework shouldn't send Replay if fault_tolerance() doesn't claim support), so not blocking, but the error path is cleaner.
| let (sender, _flag_rx) = make_tracked_sender_sync(); | ||
| let mut waiting: Vec<(u64, DeferredSenders)> = vec![(10, vec![sender])]; | ||
| fire_completed(&mut waiting, 10); | ||
| fn test_required_columns_present() { |
There was a problem hiding this comment.
12 unit tests for fire_completed and completion_watcher_task were removed, but both functions still exist unchanged (lines 1112 and 1156). Those tests covered edge cases like equal-frontier semantics, partial checkpoints in strict mode, and multi-batch ordering — scenarios that are hard to exercise end-to-end. Could you restore them? The RelationInfo / cache tests that were removed are fine since the struct is gone.
| | `ssl_ca_pem` | string | | CA certificates in PEM format. Setting this enables TLS and takes precedence over `ssl_ca_location`. | | ||
| | `ssl_ca_location` | string | | Path to a PEM file containing CA certificates. Used when `ssl_ca_pem` is not set. | | ||
|
|
||
| [*]: Required fields |
There was a problem hiding this comment.
This markdown link-definition syntax ([*]: ...) won't render as visible text in most renderers — it defines a reference-style link target, so the "Required fields" note will silently disappear from the rendered page. Replace with a plain-text note, e.g.:
* Required fields
| return Err("source_table cannot be empty".to_string()); | ||
| } | ||
|
|
||
| if self.tls.ssl_client_pem.is_some() |
There was a problem hiding this comment.
Nit (carry-over from draft): this validation encodes etl-specific constraints ("etl doesn't support client certs", "etl doesn't honor verify_hostname=false"). If etl gains support for those, this validation will silently keep rejecting valid configs. A // XXX(etl-tls): drop these rejections if etl gains client-cert support comment would make the dependency explicit.
| #[cfg(feature = "with-postgres-cdc")] | ||
| pub(crate) fn make_etl_tls_config( | ||
| tls: &PostgresTlsConfig, | ||
| _endpoint_name: &str, |
There was a problem hiding this comment.
Nit: _endpoint_name is still unused. Either drop the parameter or wire it into the tracing::warn! log in resolve_ca_pem (which currently doesn't say which connector logged the precedence warning).
swanandx
left a comment
There was a problem hiding this comment.
LGTM, plz address Mihai & mythical-fred's comments before merging, and cleanup commit history ( if required )
| #[cfg(feature = "with-postgres-cdc")] | ||
| pub(crate) fn make_etl_tls_config( | ||
| tls: &PostgresTlsConfig, | ||
| _endpoint_name: &str, |
There was a problem hiding this comment.
why take endpoint name if we don't use it?
| completion_task_tx: Option<mpsc::UnboundedSender<(u64, DeferredSenders)>>, | ||
| /// Receiver half, taken once by worker_task_inner to spawn the background task. | ||
| completion_task_rx: Mutex<Option<mpsc::UnboundedReceiver<(u64, DeferredSenders)>>>, | ||
| /// etl shutdown handle for the currently running pipeline. |
There was a problem hiding this comment.
etl is the underlying crate we use to get data from Postgres.
Makes a series of improvements to the postgres cdc connector to stabilize it. feat(adapters): support TLS for postgres-cdc Adds TLS support for the postgres-cdc input connector and adds test for it. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> types: validate postgres-cdc config Adds a validation check to prevent empty source table or publication names. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> adapters: postgres-cdc panic on Replay request Adds an explicit panic on a Replay command on Postgres CDC input connector. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> ci: use debezium/postgres Switch to using debezium/postgres which comes with wal_level=logical, so that it allows us to run Postgres CDC tests. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> test(pg-cdc): treat col_bytea as ByteArray instead of String Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> tests(adapters): pg-cdc: add database name to postgres url Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> sqllib: impl Deserialize for ByteArray Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> pg-cdc: fix test_cdc_ft_mode_holds_slot This test used to fail because it tries to use `exactly_once` fault tolerance, when it only supports `at_least_once` fault tolerance. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> postgres-cdc: upgrade etl and align connector behavior Upgrade the etl dependency to b2aab62d and update the Postgres CDC connector for the new API. Also simplify connector-side schema handling, improve shutdown/pause behavior, and add CDC coverage for pause/unpause and compatible vs incompatible source schema changes. Uses sqlx 0.9.0-alpha.1 until we bump MSRV to 1.94. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> docs: add docs for postgresql-cdc input connector Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> apply suggestions from code review Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
f84381f to
f66187f
Compare
Makes a series of improvements to stabilize the PostgreSQL CDC input connector.
Each improvement is its own commit, so I recommend reviewing per commit.
Tested using a local postgres instance, typically the docker image debezium/postgres, that comes with the WAL level set to logical.