Skip to content
Prev Previous commit
Next Next commit
support insert_delete for s2 connector
  • Loading branch information
Mrhs121 committed Mar 23, 2026
commit 3956c0d1a8580606cd0921f68368d3bbd2c2b66c
105 changes: 103 additions & 2 deletions crates/adapters/src/format/json/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use feldera_adapterlib::format::Splitter;
use feldera_sqllib::Variant;
use feldera_types::format::json::{JsonLines, JsonParserConfig, JsonUpdateFormat};
use serde::Deserialize;
use serde_json::json;
use serde_json::value::RawValue;
use serde_json::{Value as JsonValue, json};
use serde_urlencoded::Deserializer as UrlDeserializer;
use std::borrow::Cow;

Expand Down Expand Up @@ -255,6 +255,51 @@ fn validate_parser_config(
Ok(())
}

fn is_insert_delete_action_key(key: &str) -> bool {
matches!(key, "insert" | "delete" | "update")
}

fn looks_like_insert_delete_envelope_object(value: &JsonValue) -> bool {
let Some(object) = value.as_object() else {
return false;
};

let mut action_key = None;
for key in object.keys() {
if key == "table" {
continue;
}
if is_insert_delete_action_key(key) {
if action_key.is_some() {
return false;
}
action_key = Some(key);
} else {
return false;
}
}

let Some(action_key) = action_key else {
return false;
};

matches!(
object.get(action_key),
Some(JsonValue::Object(_) | JsonValue::Array(_))
)
}

fn looks_like_insert_delete_envelope(update: &RawValue) -> bool {
let Ok(value) = serde_json::from_str::<JsonValue>(update.get()) else {
return false;
};

looks_like_insert_delete_envelope_object(&value)
|| value.as_array().is_some_and(|array| {
!array.is_empty() && array.iter().all(looks_like_insert_delete_envelope_object)
})
}

struct JsonParser {
/// Input handle to push parsed data to.
input_stream: Box<dyn DeCollectionStream>,
Expand All @@ -263,6 +308,18 @@ struct JsonParser {
}

impl JsonParser {
fn raw_format_insert_delete_mismatch_error(update: &RawValue) -> Option<ParseError> {
looks_like_insert_delete_envelope(update).then(|| {
ParseError::text_envelope_error(
"raw JSON update format expects plain rows, but received an insert/delete/update envelope".to_string(),
update.get(),
Some(Cow::from(
"Set `format.config.update_format` to `insert_delete` for payloads like {\"insert\": {...}} or {\"delete\": {...}}.",
)),
)
})
}

fn new(input_stream: Box<dyn DeCollectionStream>, config: JsonParserConfig) -> Self {
Self {
input_stream,
Expand Down Expand Up @@ -404,7 +461,12 @@ impl Parser for JsonParser {
self.apply_update::<WeightedUpdate<_>>(update, &metadata, &mut errors)
}
JsonUpdateFormat::Raw => {
self.apply_update::<&RawValue>(update, &metadata, &mut errors)
if let Some(error) = Self::raw_format_insert_delete_mismatch_error(update) {
Comment thread
Mrhs121 marked this conversation as resolved.
Outdated
errors.push(error);
self.last_event_number += 1;
} else {
self.apply_update::<&RawValue>(update, &metadata, &mut errors);
}
}
JsonUpdateFormat::Redis | JsonUpdateFormat::Snowflake => {
panic!("Unexpected update format: {:?}", &self.config.update_format)
Expand Down Expand Up @@ -851,6 +913,45 @@ mod test {
, (r#"[[false, 100, "foo"]]"#.to_string(), Vec::new())],
vec![MockUpdate::with_polarity(TestStruct::new(true, 0, Some("e")), true), MockUpdate::with_polarity(TestStruct::new(false, 100, Some("foo")), true)],
),
// raw: insert/delete envelope hint.
TestCase::new(
JsonParserConfig {
update_format: JsonUpdateFormat::Raw,
json_flavor: JsonFlavor::Default,
array: false,
lines: JsonLines::Single,
},
vec![(
r#"{"insert": {"b": true, "i": 0}}"#.to_string(),
vec![ParseError::text_envelope_error(
"raw JSON update format expects plain rows, but received an insert/delete/update envelope".to_string(),
"{\"insert\": {\"b\": true, \"i\": 0}}",
Some(Cow::from(
"Set `format.config.update_format` to `insert_delete` for payloads like {\"insert\": {...}} or {\"delete\": {...}}.",
)),
)],
)],
Vec::new(),
),
TestCase::new(
JsonParserConfig {
update_format: JsonUpdateFormat::Raw,
json_flavor: JsonFlavor::Default,
array: true,
lines: JsonLines::Single,
},
vec![(
r#"[{"insert": {"b": true, "i": 0}}]"#.to_string(),
vec![ParseError::text_envelope_error(
"raw JSON update format expects plain rows, but received an insert/delete/update envelope".to_string(),
"[{\"insert\": {\"b\": true, \"i\": 0}}]",
Some(Cow::from(
"Set `format.config.update_format` to `insert_delete` for payloads like {\"insert\": {...}} or {\"delete\": {...}}.",
)),
)],
)],
Vec::new(),
),
// raw: invalid json.
TestCase::new(
JsonParserConfig {
Expand Down
3 changes: 2 additions & 1 deletion crates/adapters/src/transport/s2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ mod input;
#[cfg(test)]
mod test;

pub use input::S2InputEndpoint;
#[cfg(test)]
pub(crate) use input::Metadata as S2Metadata;
pub use input::S2InputEndpoint;
52 changes: 46 additions & 6 deletions crates/adapters/src/transport/s2/test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
#[cfg(test)]
mod tests {
use std::borrow::Cow;

use feldera_types::config::{
ConnectorConfig, FormatConfig, OutputBufferConfig, TransportConfig,
default_max_queued_records,
};
use feldera_types::transport::s2::{S2InputConfig, S2StartFrom};
use serde_json;
use serde_json::{self, json};

#[test]
fn config_serialization_roundtrip() {
Expand Down Expand Up @@ -33,9 +39,8 @@ mod tests {
(r#""Beginning""#, S2StartFrom::Beginning),
(r#""Tail""#, S2StartFrom::Tail),
] {
let json = format!(
r#"{{"basin":"b","stream":"s","auth_token":"t","start_from":{variant}}}"#
);
let json =
format!(r#"{{"basin":"b","stream":"s","auth_token":"t","start_from":{variant}}}"#);
let config: S2InputConfig = serde_json::from_str(&json).unwrap();
assert_eq!(config.start_from, expected);
}
Expand All @@ -46,13 +51,17 @@ mod tests {
use crate::transport::s2::S2Metadata as Metadata;

// Empty range (no messages processed)
let meta = Metadata { seq_num_range: 0..0 };
let meta = Metadata {
seq_num_range: 0..0,
};
let json = serde_json::to_value(&meta).unwrap();
let restored = Metadata::from_resume_info(Some(json)).unwrap();
assert_eq!(restored.seq_num_range, 0..0);

// Non-empty range
let meta = Metadata { seq_num_range: 6..10 };
let meta = Metadata {
seq_num_range: 6..10,
};
let json = serde_json::to_value(&meta).unwrap();
let restored = Metadata::from_resume_info(Some(json)).unwrap();
assert_eq!(restored.seq_num_range, 6..10);
Expand Down Expand Up @@ -117,4 +126,35 @@ mod tests {
let json = serde_json::to_string(&config).unwrap();
assert!(!json.contains("endpoint"));
}

#[test]
fn connector_config_insert_delete_format_roundtrip() {
let config = ConnectorConfig {
transport: TransportConfig::S2Input(S2InputConfig {
basin: "test-basin".to_string(),
stream: "test-stream".to_string(),
auth_token: "tok_test123".to_string(),
endpoint: Some("http://localhost:8080".to_string()),
start_from: S2StartFrom::Beginning,
}),
format: Some(FormatConfig {
name: Cow::from("json"),
config: json!({
"update_format": "insert_delete"
}),
}),
index: None,
output_buffer_config: OutputBufferConfig::default(),
max_batch_size: None,
max_worker_batch_size: None,
max_queued_records: default_max_queued_records(),
paused: false,
labels: Vec::new(),
start_after: None,
};

let serialized = serde_json::to_value(&config).unwrap();
let deserialized: ConnectorConfig = serde_json::from_value(serialized).unwrap();
assert_eq!(config, deserialized);
}
}