Skip to content

Commit a6d3929

Browse files
Leonid Ryzhykryzhyk
authored andcommitted
[avro] Confluent JDBC sink connector Avro format.
Add support for the Avro output format compatible with Confluent JDBC sink connector. This format encodes primary key columns as Kafka message keys. Inserts and updates carry the complete value of the new record in the message payload. Deletes are represented as tombstone messages with null values. The connector supports all three standard subject name strategies: topic name, record name, and topic + record name. The connector automatically generates value schema where all non-key columns are nullable. This helps to make generated schemas predictable, as otherwise the user has little control over the nullability of SQL view columns. Here is a minimal connector config example, which should to the right thing in most cases: ``` create view test_view WITH ( 'connectors' = '[{ "format": { "name": "avro", "config": { "update_format": "confluent_jdbc", "registry_urls": ["http://localhost:18081"], "key_fields": ["id"] } }, "transport": { "name": "kafka_output", "config": { "bootstrap.servers": "localhost:19092", "topic": "avro_jdbc_test" } } } ]' ) as select * from test_table; ``` Docs and more tests are coming in a separate PR. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
1 parent f3327e2 commit a6d3929

File tree

22 files changed

+990
-230
lines changed

22 files changed

+990
-230
lines changed

crates/adapters/src/controller/mod.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1459,7 +1459,12 @@ impl ControllerInner {
14591459
let format = <dyn OutputFormat>::get_format(&format_config.name).ok_or_else(|| {
14601460
ControllerError::unknown_output_format(endpoint_name, &format_config.name)
14611461
})?;
1462-
format.new_encoder(endpoint_name, &format_config.config, &handles.schema, probe)?
1462+
format.new_encoder(
1463+
endpoint_name,
1464+
&endpoint_config.connector_config,
1465+
&handles.schema,
1466+
probe,
1467+
)?
14631468
} else {
14641469
// `endpoint` is `None` - instantiate an integrated endpoint.
14651470
let endpoint = create_integrated_output_endpoint(
@@ -1940,8 +1945,8 @@ impl OutputConsumer for OutputProbe {
19401945
}
19411946
}
19421947

1943-
fn push_key(&mut self, key: &[u8], val: &[u8], num_records: usize) {
1944-
let num_bytes = key.len() + val.len();
1948+
fn push_key(&mut self, key: &[u8], val: Option<&[u8]>, num_records: usize) {
1949+
let num_bytes = key.len() + val.map(|v| v.len()).unwrap_or_default();
19451950

19461951
match self.endpoint.push_key(key, val) {
19471952
Ok(()) => {

crates/adapters/src/format/avro/input.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,16 @@ impl AvroParser {
140140
));
141141
}
142142

143+
match config.update_format {
144+
AvroUpdateFormat::ConfluentJdbc => {
145+
return Err(ControllerError::invalid_parser_configuration(
146+
endpoint_name,
147+
"'confluent_jdbc' data change event format is not yet supported by the Avro parser",
148+
));
149+
}
150+
AvroUpdateFormat::Debezium | AvroUpdateFormat::Raw => (),
151+
}
152+
143153
let mut parser = Self {
144154
endpoint_name: endpoint_name.to_string(),
145155
input_handle,
@@ -230,6 +240,9 @@ impl AvroParser {
230240
schema_json(schema)
231241
))),
232242
},
243+
AvroUpdateFormat::ConfluentJdbc => {
244+
unreachable!()
245+
}
233246
}
234247
}
235248

@@ -357,6 +370,9 @@ impl AvroParser {
357370
})?;
358371
}
359372
}
373+
AvroUpdateFormat::ConfluentJdbc => {
374+
unreachable!()
375+
}
360376
}
361377

362378
Ok(())

0 commit comments

Comments
 (0)