Skip to content

Commit 6aedf79

Browse files
committed
[adapters] Include value being deleted in Debezium-flavored JSON.
Debezium specifies that its JSON format for changes should include the value being deleted as the "before" value with an "op" of "d", but our format did not do that. This fixes the problem. It improves Feldera compliance with the Debezium specifications. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 6ba722b commit 6aedf79

1 file changed

Lines changed: 25 additions & 26 deletions

File tree

crates/adapters/src/format/json/output.rs

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -397,25 +397,20 @@ impl Encoder for JsonEncoder {
397397
key_buffer.extend_from_slice(br#"}"#);
398398

399399
// Encode value.
400-
if w > 0 {
401-
if let Some(schema_str) = &self.value_schema_str {
402-
buffer.extend_from_slice(br#"{"schema":"#);
403-
buffer.extend_from_slice(schema_str.as_bytes());
404-
write!(buffer, r#","payload":{{"op":"c","after":"#)?;
405-
} else {
406-
write!(buffer, r#"{{"payload":{{"op":"c","after":"#)?;
407-
}
408-
409-
cursor.serialize_key(&mut buffer)?;
410-
buffer.extend_from_slice(br#"}}"#);
411-
} else if let Some(schema_str) = &self.value_schema_str {
412-
write!(
413-
buffer,
414-
r#"{{"schema":{schema_str},"payload":{{"op":"d"}}}}"#
415-
)?;
400+
let (op, when) = if w > 0 {
401+
("c", "after")
402+
} else {
403+
("d", "before")
404+
};
405+
if let Some(schema_str) = &self.value_schema_str {
406+
buffer.extend_from_slice(br#"{"schema":"#);
407+
buffer.extend_from_slice(schema_str.as_bytes());
408+
write!(buffer, r#","payload":{{"op":"{op}","{when}":"#)?;
416409
} else {
417-
write!(buffer, r#"{{"payload":{{"op":"d"}}}}"#)?;
410+
write!(buffer, r#"{{"payload":{{"op":"{op}","{when}":"#)?;
418411
}
412+
cursor.serialize_key(&mut buffer)?;
413+
buffer.extend_from_slice(br#"}}"#);
419414
}
420415
_ => {
421416
// Should never happen. Unsupported formats are rejected during
@@ -619,14 +614,18 @@ mod test {
619614

620615
fn update(insert: bool, value: Self::Val, _stream_id: u64, _sequence_num: u64) -> Self {
621616
DebeziumUpdate {
622-
payload: DebeziumPayload {
623-
op: if insert {
624-
DebeziumOp::Create
625-
} else {
626-
DebeziumOp::Delete
627-
},
628-
before: None,
629-
after: if insert { Some(value) } else { None },
617+
payload: if insert {
618+
DebeziumPayload {
619+
op: DebeziumOp::Create,
620+
before: None,
621+
after: Some(value),
622+
}
623+
} else {
624+
DebeziumPayload {
625+
op: DebeziumOp::Delete,
626+
before: Some(value),
627+
after: None,
628+
}
630629
},
631630
}
632631
}
@@ -917,7 +916,7 @@ mod test {
917916
})),
918917
json!({
919918
"schema":{"type":"struct","fields":[{"field":"after","type":"struct","fields":[{"field":"id","type":"int64","optional":false},{"field":"b","type":"boolean","optional":false},{"field":"i","type":"int64","optional":true},{"field":"s","type":"string","optional":false}],"optional":true},{"field":"op","type":"string","optional":false}],"name":"Envelope"},
920-
"payload":{"op":"d"}
919+
"payload":{"op":"d", "before": {"b": false, "i": 10, "id": 1, "s": "bar"}}
921920
}),
922921
),
923922
(

0 commit comments

Comments
 (0)