Skip to content

Commit ccefb56

Browse files
committed
pipeline-manager: use JSON as storage format and store JSON directly
- Replace YAML with JSON as the storage format for struct fields within the database. A SQL migration adds the boolean `uses_json` column to keep track of which tuples are converted. A programmatic migration performs the conversion by reading the fields as YAML, and writing them back as JSON along with setting the `uses_json` column to `TRUE`. - The struct fields `runtime_config`, `program_config`, `program_info` and `deployment_config` are now stored as the JSON that was directly provided for them, either by the user (for `runtime_config` and `program_config`) or the system (for `program_info` and `deployment_config`). To illustrate the difference, suppose a user provides for `program_config` the following: ``` { "profile": "unoptimized" } ``` Before, it would get first deserialized as `ProgramConfig` and then serialized as JSON, ending up with the following stored in the database: ``` { "profile": "unoptimized", "cache": true } ``` Whereas now, with this commit, it will store only what was provided, in other words: ``` { "profile": "unoptimized" } ``` Most notably, this means that defaults are no longer set at creation time, but instead during use where they are deserialized as their actual type. - Backward incompatible changes to the struct fields `runtime_config`, `program_config`, `program_info` or `deployment_config` no longer cause the API to be unable to retrieve them. This is achieved by making their struct field types JSON. As a consequence, the type returned by the endpoint and what is stated by the OpenAPI specification differ, but should be compatible as long as no backward incompatible changes were made. Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent 3a9d3fd commit ccefb56

38 files changed

+2394
-667
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/feldera-types/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ testing = ["proptest", "proptest-derive"]
1414

1515
[dependencies]
1616
serde = { version = "1.0", features = ["derive", "rc"] }
17-
serde_yaml = "0.9.14"
17+
serde_yaml = "0.9.34"
1818
serde_json = { version = "1.0.127", features = ["raw_value"] }
1919
anyhow = { version = "1.0.57", features = ["backtrace"] }
2020
libc = "0.2.153"

crates/feldera-types/src/config.rs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
//!
33
//! This module defines the controller configuration structure. The leaves of
44
//! this structure are individual transport-specific and data-format-specific
5-
//! endpoint configs. We represent these configs as opaque yaml values, so
6-
//! that the entire configuration tree can be deserialized from a yaml file.
5+
//! endpoint configs. We represent these configs as opaque JSON values, so
6+
//! that the entire configuration tree can be deserialized from a JSON file.
77
88
use crate::transport::adhoc::AdHocInputConfig;
99
use crate::transport::datagen::DatagenInputConfig;
@@ -68,16 +68,6 @@ pub struct PipelineConfig {
6868
pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
6969
}
7070

71-
impl PipelineConfig {
72-
pub fn from_yaml(s: &str) -> Self {
73-
serde_yaml::from_str(s).unwrap()
74-
}
75-
76-
pub fn to_yaml(&self) -> String {
77-
serde_yaml::to_string(self).unwrap()
78-
}
79-
}
80-
8171
/// Configuration for persistent storage in a [`PipelineConfig`].
8272
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
8373
pub struct StorageConfig {
@@ -288,16 +278,6 @@ impl Default for RuntimeConfig {
288278
}
289279
}
290280

291-
impl RuntimeConfig {
292-
pub fn from_yaml(s: &str) -> Self {
293-
serde_yaml::from_str(s).unwrap()
294-
}
295-
296-
pub fn to_yaml(&self) -> String {
297-
serde_yaml::to_string(self).unwrap()
298-
}
299-
}
300-
301281
/// Fault-tolerance configuration for runtime startup.
302282
#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
303283
#[serde(default)]

crates/feldera-types/src/error.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,4 @@ impl ErrorResponse {
110110
details,
111111
}
112112
}
113-
114-
pub fn from_yaml(s: &str) -> Self {
115-
serde_yaml::from_str(s).unwrap()
116-
}
117-
118-
pub fn to_yaml(&self) -> String {
119-
serde_yaml::to_string(self).unwrap()
120-
}
121113
}

crates/feldera-types/src/program_schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ impl Display for SqlIdentifier {
153153
/// A struct containing the tables (inputs) and views for a program.
154154
///
155155
/// Parse from the JSON data-type of the DDL generated by the SQL compiler.
156-
#[derive(Serialize, Deserialize, ToSchema, Debug, Eq, PartialEq, Clone, Default)]
156+
#[derive(Serialize, Deserialize, ToSchema, Debug, Eq, PartialEq, Clone)]
157157
#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
158158
pub struct ProgramSchema {
159159
#[cfg_attr(

crates/pipeline-manager/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pipeline-manager"
3-
version = "0.34.1"
3+
version = "0.34.1-b1"
44
edition = "2021"
55
license = "MIT OR Apache-2.0"
66
description = "Data pipeline manager for the Feldera continuous analytics platform."
@@ -30,7 +30,7 @@ log = "0.4.20"
3030
env_logger = "0.10.0"
3131
serde = { version = "1.0", features = ["derive"] }
3232
serde_json = "1.0.127"
33-
serde_yaml = "0.9.14"
33+
serde_yaml = "0.9.34"
3434
clap = { version = "4.0.32", features = ["derive", "env"] }
3535
utoipa = { version = "4.2", features = ["actix_extras", "chrono", "uuid"] }
3636
utoipa-swagger-ui = { version = "7.1", features = ["actix-web"] }
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- Adds the `uses_json` field to the `pipeline` table.
2+
3+
ALTER TABLE pipeline
4+
ADD COLUMN uses_json BOOLEAN NOT NULL DEFAULT FALSE;

crates/pipeline-manager/src/api/examples.rs

Lines changed: 117 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
// Example errors for use in OpenAPI docs.
22
use crate::api::error::ApiError;
3-
use crate::api::pipeline::{PatchPipeline, PipelineInfo, PipelineSelectedInfo, PostPutPipeline};
3+
use crate::api::pipeline::{
4+
PatchPipeline, PipelineInfo, PipelineInfoInternal, PipelineSelectedInfo,
5+
PipelineSelectedInfoInternal, PostPutPipeline,
6+
};
47
use crate::db::error::DBError;
5-
use crate::db::types::common::Version;
68
use crate::db::types::pipeline::{
79
ExtendedPipelineDescr, PipelineDesiredStatus, PipelineId, PipelineStatus,
810
};
911
use crate::db::types::program::{CompilationProfile, ProgramConfig, ProgramStatus};
12+
use crate::db::types::utils::{
13+
validate_program_config, validate_program_info, validate_runtime_config,
14+
};
15+
use crate::db::types::version::Version;
1016
use crate::error::ManagerError;
1117
use crate::runner::error::RunnerError;
1218
use feldera_types::config::ResourceConfig;
@@ -46,25 +52,7 @@ pub(crate) fn error_stream_terminated() -> ErrorResponse {
4652
})
4753
}
4854

49-
pub(crate) fn pipeline_post_put() -> PostPutPipeline {
50-
PostPutPipeline {
51-
name: "example1".to_string(),
52-
description: Some("Description of the pipeline example1".to_string()),
53-
runtime_config: Some(RuntimeConfig {
54-
workers: 16,
55-
tracing_endpoint_jaeger: "".to_string(),
56-
..RuntimeConfig::default()
57-
}),
58-
program_code: "CREATE TABLE table1 ( col1 INT );".to_string(),
59-
udf_rust: None,
60-
udf_toml: None,
61-
program_config: Some(ProgramConfig {
62-
profile: Some(CompilationProfile::Optimized),
63-
cache: true,
64-
}),
65-
}
66-
}
67-
55+
/// First example [`ExtendedPipelineDescr`] the database could return.
6856
fn extended_pipeline_1() -> ExtendedPipelineDescr {
6957
ExtendedPipelineDescr {
7058
id: PipelineId(uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8")),
@@ -73,18 +61,20 @@ fn extended_pipeline_1() -> ExtendedPipelineDescr {
7361
created_at: Default::default(),
7462
version: Version(4),
7563
platform_version: "v0".to_string(),
76-
runtime_config: RuntimeConfig {
64+
runtime_config: serde_json::to_value(RuntimeConfig {
7765
workers: 16,
7866
tracing_endpoint_jaeger: "".to_string(),
7967
..RuntimeConfig::default()
80-
},
68+
})
69+
.unwrap(),
8170
program_code: "CREATE TABLE table1 ( col1 INT );".to_string(),
8271
udf_rust: "".to_string(),
8372
udf_toml: "".to_string(),
84-
program_config: ProgramConfig {
73+
program_config: serde_json::to_value(ProgramConfig {
8574
profile: Some(CompilationProfile::Optimized),
8675
cache: true,
87-
},
76+
})
77+
.unwrap(),
8878
program_version: Version(2),
8979
program_info: None,
9080
program_status: ProgramStatus::Pending,
@@ -101,6 +91,7 @@ fn extended_pipeline_1() -> ExtendedPipelineDescr {
10191
}
10292
}
10393

94+
/// Second example [`ExtendedPipelineDescr`] the database could return.
10495
fn extended_pipeline_2() -> ExtendedPipelineDescr {
10596
ExtendedPipelineDescr {
10697
id: PipelineId(uuid!("67e55044-10b1-426f-9247-bb680e5fe0c9")),
@@ -109,7 +100,7 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr {
109100
created_at: Default::default(),
110101
version: Version(1),
111102
platform_version: "v0".to_string(),
112-
runtime_config: RuntimeConfig {
103+
runtime_config: serde_json::to_value(RuntimeConfig {
113104
workers: 10,
114105
storage: true,
115106
fault_tolerance: None,
@@ -128,14 +119,16 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr {
128119
},
129120
min_storage_bytes: None,
130121
clock_resolution_usecs: Some(100_000),
131-
},
122+
})
123+
.unwrap(),
132124
program_code: "CREATE TABLE table2 ( col2 VARCHAR );".to_string(),
133125
udf_rust: "".to_string(),
134126
udf_toml: "".to_string(),
135-
program_config: ProgramConfig {
127+
program_config: serde_json::to_value(ProgramConfig {
136128
profile: Some(CompilationProfile::Unoptimized),
137129
cache: true,
138-
},
130+
})
131+
.unwrap(),
139132
program_version: Version(1),
140133
program_info: None,
141134
program_status: ProgramStatus::Pending,
@@ -152,22 +145,114 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr {
152145
}
153146
}
154147

148+
/// Converts the actual serialized type [`PipelineInfoInternal`] to the type the endpoint
149+
/// OpenAPI specification states it will return ([`PipelineInfo`]). The conversion for this
150+
/// example should always succeed as it uses field values that were directly serialized prior.
151+
fn pipeline_info_internal_to_external(pipeline: PipelineInfoInternal) -> PipelineInfo {
152+
PipelineInfo {
153+
id: pipeline.id,
154+
name: pipeline.name,
155+
description: pipeline.description,
156+
created_at: pipeline.created_at,
157+
version: pipeline.version,
158+
platform_version: pipeline.platform_version,
159+
runtime_config: validate_runtime_config(&pipeline.runtime_config, true)
160+
.expect("example must have a valid runtime_config"),
161+
program_code: pipeline.program_code,
162+
udf_rust: pipeline.udf_rust,
163+
udf_toml: pipeline.udf_toml,
164+
program_config: validate_program_config(&pipeline.program_config, true)
165+
.expect("example must have a valid program_config"),
166+
program_version: pipeline.program_version,
167+
program_status: pipeline.program_status,
168+
program_status_since: pipeline.program_status_since,
169+
program_info: pipeline.program_info.map(|v| {
170+
validate_program_info(&v).expect("example must have a valid program_info if specified")
171+
}),
172+
deployment_status: pipeline.deployment_status,
173+
deployment_status_since: pipeline.deployment_status_since,
174+
deployment_desired_status: pipeline.deployment_desired_status,
175+
deployment_error: pipeline.deployment_error,
176+
}
177+
}
178+
155179
pub(crate) fn pipeline_1_info() -> PipelineInfo {
156-
PipelineInfo::new(&extended_pipeline_1())
180+
pipeline_info_internal_to_external(PipelineInfoInternal::new(&extended_pipeline_1()))
181+
}
182+
183+
/// Converts the actual serialized type [`PipelineSelectedInfoInternal`] to the type the endpoint
184+
/// OpenAPI specification states it will return ([`PipelineSelectedInfo`]). The conversion for this
185+
/// example should always succeed as it uses field values that were directly serialized prior.
186+
fn pipeline_selected_info_internal_to_external(
187+
pipeline: PipelineSelectedInfoInternal,
188+
) -> PipelineSelectedInfo {
189+
PipelineSelectedInfo {
190+
id: pipeline.id,
191+
name: pipeline.name,
192+
description: pipeline.description,
193+
created_at: pipeline.created_at,
194+
version: pipeline.version,
195+
platform_version: pipeline.platform_version,
196+
runtime_config: pipeline.runtime_config.map(|v| {
197+
validate_runtime_config(&v, true).expect("example must have a valid runtime_config")
198+
}),
199+
program_code: pipeline.program_code,
200+
udf_rust: pipeline.udf_rust,
201+
udf_toml: pipeline.udf_toml,
202+
program_config: pipeline.program_config.map(|v| {
203+
validate_program_config(&v, true).expect("example must have a valid program_config")
204+
}),
205+
program_version: pipeline.program_version,
206+
program_status: pipeline.program_status,
207+
program_status_since: pipeline.program_status_since,
208+
program_info: pipeline.program_info.map(|v| {
209+
v.map(|v| {
210+
validate_program_info(&v)
211+
.expect("example must have a valid program_info if specified")
212+
})
213+
}),
214+
deployment_status: pipeline.deployment_status,
215+
deployment_status_since: pipeline.deployment_status_since,
216+
deployment_desired_status: pipeline.deployment_desired_status,
217+
deployment_error: pipeline.deployment_error,
218+
}
157219
}
158220

159221
pub(crate) fn pipeline_1_selected_info() -> PipelineSelectedInfo {
160-
PipelineSelectedInfo::new_all(&extended_pipeline_1())
222+
pipeline_selected_info_internal_to_external(PipelineSelectedInfoInternal::new_all(
223+
&extended_pipeline_1(),
224+
))
161225
}
162226

163227
pub(crate) fn pipeline_2_selected_info() -> PipelineSelectedInfo {
164-
PipelineSelectedInfo::new_all(&extended_pipeline_2())
228+
pipeline_selected_info_internal_to_external(PipelineSelectedInfoInternal::new_all(
229+
&extended_pipeline_2(),
230+
))
165231
}
166232

167233
pub(crate) fn list_pipeline_selected_info() -> Vec<PipelineSelectedInfo> {
168234
vec![pipeline_1_selected_info(), pipeline_2_selected_info()]
169235
}
170236

237+
pub(crate) fn pipeline_post_put() -> PostPutPipeline {
238+
PostPutPipeline {
239+
name: "example1".to_string(),
240+
description: Some("Description of the pipeline example1".to_string()),
241+
runtime_config: Some(RuntimeConfig {
242+
workers: 16,
243+
tracing_endpoint_jaeger: "".to_string(),
244+
..RuntimeConfig::default()
245+
}),
246+
program_code: "CREATE TABLE table1 ( col1 INT );".to_string(),
247+
udf_rust: None,
248+
udf_toml: None,
249+
program_config: Some(ProgramConfig {
250+
profile: Some(CompilationProfile::Optimized),
251+
cache: true,
252+
}),
253+
}
254+
}
255+
171256
pub(crate) fn patch_pipeline() -> PatchPipeline {
172257
PatchPipeline {
173258
name: None,

crates/pipeline-manager/src/api/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ The program version is used internally by the compiler to know when to recompile
137137
crate::auth::ProviderGoogleIdentity,
138138
139139
// Common
140-
crate::db::types::common::Version,
140+
crate::db::types::version::Version,
141141
crate::api::config_api::Configuration,
142142
143143
// Pipeline
@@ -204,6 +204,7 @@ The program version is used internally by the compiler to know when to recompile
204204
feldera_types::transport::delta_table::DeltaTableIngestMode,
205205
feldera_types::transport::delta_table::DeltaTableWriteMode,
206206
feldera_types::transport::delta_table::DeltaTableReaderConfig,
207+
feldera_types::transport::delta_table::DeltaTableWriterConfig,
207208
feldera_types::transport::iceberg::IcebergReaderConfig,
208209
feldera_types::transport::iceberg::IcebergIngestMode,
209210
feldera_types::transport::iceberg::IcebergCatalogType,

0 commit comments

Comments
 (0)