Skip to content

Commit 399d448

Browse files
committed
[pipeline-manager] Add 'metadata' pipeline field that can be edited independent of deployment status, deprecate 'description'
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
1 parent c430039 commit 399d448

13 files changed

Lines changed: 595 additions & 101 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
-- Add metadata field to the pipeline table.
2+
-- This is arbitrary, optional text provided by the user.
3+
ALTER TABLE pipeline ADD COLUMN metadata TEXT NOT NULL DEFAULT '';
4+
5+
-- Copy existing description values into the metadata JSON "description" field.
6+
UPDATE pipeline SET metadata = json_build_object('description', description)::text WHERE description != '';

crates/pipeline-manager/src/api/endpoints/pipeline_management.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ pub struct ConnectorStats {
9393
pub struct PipelineInfo {
9494
pub id: PipelineId,
9595
pub name: String,
96+
/// Deprecated: use `metadata` instead.
97+
#[schema(deprecated)]
9698
pub description: String,
99+
pub metadata: String,
97100
pub created_at: DateTime<Utc>,
98101
pub version: Version,
99102
pub platform_version: String,
@@ -143,6 +146,7 @@ pub struct PipelineInfoInternal {
143146
pub id: PipelineId,
144147
pub name: String,
145148
pub description: String,
149+
pub metadata: String,
146150
pub created_at: DateTime<Utc>,
147151
pub version: Version,
148152
pub platform_version: String,
@@ -184,6 +188,7 @@ impl PipelineInfoInternal {
184188
id: extended_pipeline.id,
185189
name: extended_pipeline.name,
186190
description: extended_pipeline.description,
191+
metadata: extended_pipeline.metadata,
187192
created_at: extended_pipeline.created_at,
188193
version: extended_pipeline.version,
189194
platform_version: extended_pipeline.platform_version,
@@ -245,7 +250,10 @@ impl PipelineInfoInternal {
245250
pub struct PipelineSelectedInfo {
246251
pub id: PipelineId,
247252
pub name: String,
253+
/// Deprecated: use `metadata` instead.
254+
#[schema(deprecated)]
248255
pub description: String,
256+
pub metadata: String,
249257
pub created_at: DateTime<Utc>,
250258
pub version: Version,
251259
pub platform_version: String,
@@ -303,6 +311,7 @@ pub struct PipelineSelectedInfoInternal {
303311
pub id: PipelineId,
304312
pub name: String,
305313
pub description: String,
314+
pub metadata: String,
306315
pub created_at: DateTime<Utc>,
307316
pub version: Version,
308317
pub platform_version: String,
@@ -355,6 +364,7 @@ impl PipelineSelectedInfoInternal {
355364
id: extended_pipeline.id,
356365
name: extended_pipeline.name,
357366
description: extended_pipeline.description,
367+
metadata: extended_pipeline.metadata,
358368
created_at: extended_pipeline.created_at,
359369
version: extended_pipeline.version,
360370
platform_version: extended_pipeline.platform_version,
@@ -418,6 +428,7 @@ impl PipelineSelectedInfoInternal {
418428
id: extended_pipeline.id,
419429
name: extended_pipeline.name,
420430
description: extended_pipeline.description,
431+
metadata: extended_pipeline.metadata,
421432
created_at: extended_pipeline.created_at,
422433
version: extended_pipeline.version,
423434
platform_version: extended_pipeline.platform_version,
@@ -490,7 +501,8 @@ pub enum PipelineFieldSelector {
490501
/// The selection includes the following fields:
491502
/// - `id`
492503
/// - `name`
493-
/// - `description`
504+
/// - `description` (deprecated, use `metadata`)
505+
/// - `metadata`
494506
/// - `created_at`
495507
/// - `version`
496508
/// - `platform_version`
@@ -530,7 +542,8 @@ pub enum PipelineFieldSelector {
530542
/// The selection includes the following fields:
531543
/// - `id`
532544
/// - `name`
533-
/// - `description`
545+
/// - `description` (deprecated, use `metadata`)
546+
/// - `metadata`
534547
/// - `created_at`
535548
/// - `version`
536549
/// - `platform_version`
@@ -591,7 +604,10 @@ pub struct GetPipelineParameters {
591604
#[derive(Debug, Serialize, Deserialize, ToSchema)]
592605
pub struct PostPutPipeline {
593606
pub name: String,
607+
/// Deprecated: use `metadata` instead.
608+
#[schema(deprecated)]
594609
pub description: Option<String>,
610+
pub metadata: Option<String>,
595611
pub runtime_config: Option<RuntimeConfig>,
596612
pub program_code: String,
597613
pub udf_rust: Option<String>,
@@ -609,6 +625,7 @@ pub struct PostPutPipeline {
609625
pub struct PostPutPipelineInternal {
610626
pub name: String,
611627
pub description: Option<String>,
628+
pub metadata: Option<String>,
612629
pub runtime_config: Option<serde_json::Value>,
613630
pub program_code: String,
614631
pub udf_rust: Option<String>,
@@ -623,6 +640,7 @@ impl From<PostPutPipelineInternal> for PipelineDescr {
623640
PipelineDescr {
624641
name: value.name.clone(),
625642
description: value.description.clone().unwrap_or("".to_string()),
643+
metadata: value.metadata.clone().unwrap_or("".to_string()),
626644
runtime_config: value.runtime_config.clone().unwrap_or(json!({})),
627645
program_code: value.program_code.clone(),
628646
udf_rust: value.udf_rust.clone().unwrap_or("".to_string()),
@@ -641,7 +659,13 @@ impl From<PostPutPipelineInternal> for PipelineDescr {
641659
#[derive(Debug, Serialize, Deserialize, ToSchema)]
642660
pub struct PatchPipeline {
643661
pub name: Option<String>,
662+
/// Deprecated: use `metadata` instead.
663+
#[schema(deprecated)]
644664
pub description: Option<String>,
665+
/// Free-form client-side annotation. Unlike the other fields, `metadata`
666+
/// can be patched at any time — including while the pipeline is running —
667+
/// because it has no effect on the deployed pipeline.
668+
pub metadata: Option<String>,
645669
pub runtime_config: Option<RuntimeConfig>,
646670
pub program_code: Option<String>,
647671
pub udf_rust: Option<String>,
@@ -658,6 +682,7 @@ pub struct PatchPipeline {
658682
pub struct PatchPipelineInternal {
659683
pub name: Option<String>,
660684
pub description: Option<String>,
685+
pub metadata: Option<String>,
661686
pub runtime_config: Option<serde_json::Value>,
662687
pub program_code: Option<String>,
663688
pub udf_rust: Option<String>,
@@ -1151,6 +1176,7 @@ pub(crate) async fn patch_pipeline(
11511176
&pipeline_name,
11521177
&body.name,
11531178
&body.description,
1179+
&body.metadata,
11541180
&state.common_config.platform_version,
11551181
false,
11561182
&body.runtime_config,
@@ -1240,6 +1266,7 @@ pub(crate) async fn post_update_runtime(
12401266
&pipeline_name,
12411267
&None,
12421268
&None,
1269+
&None,
12431270
&state.common_config.platform_version,
12441271
true, // bump platform version.
12451272
&None,

crates/pipeline-manager/src/api/examples.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ fn extended_pipeline_1() -> ExtendedPipelineDescr {
3232
id: PipelineId(uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8")),
3333
name: "example1".to_string(),
3434
description: "Description of the pipeline example1".to_string(),
35+
metadata: "".to_string(),
3536
created_at: Default::default(),
3637
version: Version(4),
3738
platform_version: "v0".to_string(),
@@ -90,6 +91,7 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr {
9091
id: PipelineId(uuid!("67e55044-10b1-426f-9247-bb680e5fe0c9")),
9192
name: "example2".to_string(),
9293
description: "Description of the pipeline example2".to_string(),
94+
metadata: "".to_string(),
9395
created_at: Default::default(),
9496
version: Version(1),
9597
platform_version: "v0".to_string(),
@@ -179,6 +181,7 @@ fn pipeline_info_internal_to_external(pipeline: PipelineInfoInternal) -> Pipelin
179181
id: pipeline.id,
180182
name: pipeline.name,
181183
description: pipeline.description,
184+
metadata: pipeline.metadata,
182185
created_at: pipeline.created_at,
183186
version: pipeline.version,
184187
platform_version: pipeline.platform_version,
@@ -237,6 +240,7 @@ fn pipeline_selected_info_internal_to_external(
237240
id: pipeline.id,
238241
name: pipeline.name,
239242
description: pipeline.description,
243+
metadata: pipeline.metadata,
240244
created_at: pipeline.created_at,
241245
version: pipeline.version,
242246
platform_version: pipeline.platform_version,
@@ -319,6 +323,7 @@ pub(crate) fn pipeline_post_put() -> PostPutPipeline {
319323
PostPutPipeline {
320324
name: "example1".to_string(),
321325
description: Some("Description of the pipeline example1".to_string()),
326+
metadata: None,
322327
runtime_config: Some(RuntimeConfig {
323328
workers: 16,
324329
tracing_endpoint_jaeger: "".to_string(),
@@ -340,6 +345,7 @@ pub(crate) fn patch_pipeline() -> PatchPipeline {
340345
PatchPipeline {
341346
name: None,
342347
description: Some("This is a new description".to_string()),
348+
metadata: None,
343349
runtime_config: None,
344350
program_code: Some("CREATE TABLE table3 ( col3 INT );".to_string()),
345351
udf_rust: None,

crates/pipeline-manager/src/compiler/test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ impl CompilerTest {
132132
PipelineDescr {
133133
name: name.to_string(),
134134
description: "not-used".to_string(),
135+
metadata: "".to_string(),
135136
runtime_config: json!({}),
136137
program_code: program_code.to_string(),
137138
udf_rust: udf_rust.to_string(),

crates/pipeline-manager/src/db/listen_table.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ mod test {
384384
PipelineDescr {
385385
name: "example".to_string(),
386386
description: "Description of example".to_string(),
387+
metadata: "".to_string(),
387388
runtime_config: json!({}),
388389
program_code: "CREATE TABLE example ( col1 INT );".to_string(),
389390
udf_rust: "".to_string(),
@@ -414,6 +415,7 @@ mod test {
414415
"example",
415416
&Some("example-renamed".to_string()),
416417
&Some("Description of example2".to_string()),
418+
&None,
417419
"v0",
418420
false,
419421
&None,

0 commit comments

Comments
 (0)