Skip to content

Commit 42c1ccc

Browse files
committed
Introduce support for user-defined parser preprocessors.
Note that this API is still experimental, and may be changed without notice. Preprocessors are supported for connectors that receive raw data (i.e., byte arrays). A preprocessor runs between the transport and the parser and has a chance to transform the byte arrays before they are handed off to the parser. Preprocessors are implemented by supplying several trait implementations in udf.rs; the two essential traits are `Preprocessor` and `PreprocessorFactory`. The factory is responsible for creating a preprocessor based on the configuration supplied in the connector. As an example, consider the following fragment of a connector configuration for a table: ``` 'connectors' = '[{ "transport": { ... }, "preprocessor": [{ "name": "example", "message_oriented": true, "config": {} }], ``` This configuration expects the user to supply two structs in udf.rs: `ExamplePreprocessor implements Preprocessor` and `ExamplePreprocessorFactory implements PreprocessorFactory`. The compiler will generate code to create an instance of `ExamplePreprocessor` at runtime for this connector. There are two flavors of preprocessors: message-oriented, and streaming. Currently only the message-oriented preprocessors are supported with fault-tolerance. Briefly, a message-oriented preprocessor splits the input stream into messages such that each message can be parsed independently by the following parser. Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent 19a69ae commit 42c1ccc

File tree

48 files changed

+2195
-272
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2195
-272
lines changed

crates/adapterlib/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ datafusion = { workspace = true }
4141
thiserror = { workspace = true }
4242
serde_json_path_to_error = { workspace = true }
4343
chrono = { workspace = true }
44+
tracing = { workspace = true }
4445

4546
[package.metadata.cargo-machete]
4647
ignored = ["num-traits"]

crates/adapterlib/src/catalog.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::any::Any;
22
use std::collections::HashSet;
33
use std::fmt::{Debug, Formatter};
4-
use std::sync::Arc;
54
use std::sync::atomic::AtomicUsize;
5+
use std::sync::{Arc, Mutex};
66

77
use anyhow::Result as AnyResult;
88
#[cfg(feature = "with-avro")]
@@ -27,6 +27,7 @@ use std::collections::HashMap;
2727

2828
use crate::errors::controller::ControllerError;
2929
use crate::format::InputBuffer;
30+
use crate::preprocess::PreprocessorRegistry;
3031

3132
/// Descriptor that specifies the format in which records are received
3233
/// or into which they should be encoded before sending.
@@ -888,6 +889,9 @@ pub trait CircuitCatalog: Send + Sync {
888889
fn output_handles(&self, name: &SqlIdentifier) -> Option<&OutputCollectionHandles>;
889890

890891
fn output_handles_mut(&mut self, name: &SqlIdentifier) -> Option<&mut OutputCollectionHandles>;
892+
893+
/// The registry used to insert new user-defined preprocessors
894+
fn preprocessor_registry(&self) -> Arc<Mutex<PreprocessorRegistry>>;
891895
}
892896

893897
#[doc(hidden)]

crates/adapterlib/src/errors/controller.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,12 @@ pub enum ControllerError {
679679
endpoint_name: String,
680680
},
681681

682+
/// Error creating a user-defined preprocessor
683+
PreprocessorCreateError {
684+
endpoint_name: String,
685+
error: String,
686+
},
687+
682688
/// Unknown output endpoint name.
683689
UnknownOutputEndpoint {
684690
endpoint_name: String,
@@ -937,6 +943,7 @@ impl DbspDetailedError for ControllerError {
937943
// TODO: attempts to cast `AnyError` to `DetailedError`.
938944
fn error_code(&self) -> Cow<'static, str> {
939945
match self {
946+
Self::PreprocessorCreateError { .. } => Cow::from("PreprocessorCreateError"),
940947
Self::IoError { .. } => Cow::from("ControllerIoError"),
941948
Self::NotSupported { .. } => Cow::from("NotSupported"),
942949
Self::SchemaParseError { .. } => Cow::from("SchemaParseError"),
@@ -999,6 +1006,15 @@ impl StdError for ControllerError {}
9991006
impl Display for ControllerError {
10001007
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
10011008
match self {
1009+
Self::PreprocessorCreateError {
1010+
endpoint_name,
1011+
error,
1012+
} => {
1013+
write!(
1014+
f,
1015+
"Error creating preprocessor for endpoint {endpoint_name}: {error}"
1016+
)
1017+
}
10021018
Self::IoError {
10031019
context, io_error, ..
10041020
} => {
@@ -1567,6 +1583,7 @@ impl ControllerError {
15671583
| Self::UnexpectedBootstrap { .. }
15681584
| Self::InvalidStandby(_)
15691585
| Self::InvalidStartupTransition { .. }
1586+
| Self::PreprocessorCreateError { .. }
15701587
| Self::UnexpectedRuntimeVersion { .. } => ErrorKind::Other,
15711588
}
15721589
}

0 commit comments

Comments
 (0)