Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/pipeline-manager/migrations/V34__pipeline_metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Add metadata field to the pipeline table.
-- This is arbitrary, optional text provided by the user.
ALTER TABLE pipeline ADD COLUMN metadata TEXT NOT NULL DEFAULT '';
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's good to have a catch-all JSON element this. We're already strictly structured, we dont have to switch to the dark side of using unstructured data on top of a relational database.


-- Copy existing description values into the metadata JSON "description" field.
UPDATE pipeline SET metadata = json_build_object('description', description)::text WHERE description != '';
31 changes: 29 additions & 2 deletions crates/pipeline-manager/src/api/endpoints/pipeline_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ pub struct ConnectorStats {
pub struct PipelineInfo {
pub id: PipelineId,
pub name: String,
/// Deprecated: use `metadata` instead.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to mark it deprecated; nothing has changed we weren't using it before we dont need to use it now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to discuss this more with Simon once he's back; I think it's benefitial to fold the "description" column into the "metadata" column model I propose here

#[schema(deprecated)]
pub description: String,
pub metadata: String,
pub created_at: DateTime<Utc>,
pub version: Version,
pub platform_version: String,
Expand Down Expand Up @@ -143,6 +146,7 @@ pub struct PipelineInfoInternal {
pub id: PipelineId,
pub name: String,
pub description: String,
pub metadata: String,
pub created_at: DateTime<Utc>,
pub version: Version,
pub platform_version: String,
Expand Down Expand Up @@ -184,6 +188,7 @@ impl PipelineInfoInternal {
id: extended_pipeline.id,
name: extended_pipeline.name,
description: extended_pipeline.description,
metadata: extended_pipeline.metadata,
created_at: extended_pipeline.created_at,
version: extended_pipeline.version,
platform_version: extended_pipeline.platform_version,
Expand Down Expand Up @@ -245,7 +250,10 @@ impl PipelineInfoInternal {
pub struct PipelineSelectedInfo {
pub id: PipelineId,
pub name: String,
/// Deprecated: use `metadata` instead.
#[schema(deprecated)]
pub description: String,
pub metadata: String,
pub created_at: DateTime<Utc>,
pub version: Version,
pub platform_version: String,
Expand Down Expand Up @@ -303,6 +311,7 @@ pub struct PipelineSelectedInfoInternal {
pub id: PipelineId,
pub name: String,
pub description: String,
pub metadata: String,
pub created_at: DateTime<Utc>,
pub version: Version,
pub platform_version: String,
Expand Down Expand Up @@ -355,6 +364,7 @@ impl PipelineSelectedInfoInternal {
id: extended_pipeline.id,
name: extended_pipeline.name,
description: extended_pipeline.description,
metadata: extended_pipeline.metadata,
created_at: extended_pipeline.created_at,
version: extended_pipeline.version,
platform_version: extended_pipeline.platform_version,
Expand Down Expand Up @@ -418,6 +428,7 @@ impl PipelineSelectedInfoInternal {
id: extended_pipeline.id,
name: extended_pipeline.name,
description: extended_pipeline.description,
metadata: extended_pipeline.metadata,
created_at: extended_pipeline.created_at,
version: extended_pipeline.version,
platform_version: extended_pipeline.platform_version,
Expand Down Expand Up @@ -490,7 +501,8 @@ pub enum PipelineFieldSelector {
/// The selection includes the following fields:
/// - `id`
/// - `name`
/// - `description`
/// - `description` (deprecated, use `metadata`)
/// - `metadata`
/// - `created_at`
/// - `version`
/// - `platform_version`
Expand Down Expand Up @@ -530,7 +542,8 @@ pub enum PipelineFieldSelector {
/// The selection includes the following fields:
/// - `id`
/// - `name`
/// - `description`
/// - `description` (deprecated, use `metadata`)
/// - `metadata`
/// - `created_at`
/// - `version`
/// - `platform_version`
Expand Down Expand Up @@ -591,7 +604,10 @@ pub struct GetPipelineParameters {
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct PostPutPipeline {
pub name: String,
/// Deprecated: use `metadata` instead.
#[schema(deprecated)]
pub description: Option<String>,
pub metadata: Option<String>,
pub runtime_config: Option<RuntimeConfig>,
pub program_code: String,
pub udf_rust: Option<String>,
Expand All @@ -609,6 +625,7 @@ pub struct PostPutPipeline {
pub struct PostPutPipelineInternal {
pub name: String,
pub description: Option<String>,
pub metadata: Option<String>,
pub runtime_config: Option<serde_json::Value>,
pub program_code: String,
pub udf_rust: Option<String>,
Expand All @@ -623,6 +640,7 @@ impl From<PostPutPipelineInternal> for PipelineDescr {
PipelineDescr {
name: value.name.clone(),
description: value.description.clone().unwrap_or("".to_string()),
metadata: value.metadata.clone().unwrap_or("".to_string()),
runtime_config: value.runtime_config.clone().unwrap_or(json!({})),
program_code: value.program_code.clone(),
udf_rust: value.udf_rust.clone().unwrap_or("".to_string()),
Expand All @@ -641,7 +659,13 @@ impl From<PostPutPipelineInternal> for PipelineDescr {
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct PatchPipeline {
pub name: Option<String>,
/// Deprecated: use `metadata` instead.
#[schema(deprecated)]
pub description: Option<String>,
/// Free-form client-side annotation. Unlike the other fields, `metadata`
/// can be patched at any time — including while the pipeline is running —
/// because it has no effect on the deployed pipeline.
pub metadata: Option<String>,
pub runtime_config: Option<RuntimeConfig>,
pub program_code: Option<String>,
pub udf_rust: Option<String>,
Expand All @@ -658,6 +682,7 @@ pub struct PatchPipeline {
pub struct PatchPipelineInternal {
pub name: Option<String>,
pub description: Option<String>,
pub metadata: Option<String>,
pub runtime_config: Option<serde_json::Value>,
pub program_code: Option<String>,
pub udf_rust: Option<String>,
Expand Down Expand Up @@ -1151,6 +1176,7 @@ pub(crate) async fn patch_pipeline(
&pipeline_name,
&body.name,
&body.description,
&body.metadata,
&state.common_config.platform_version,
false,
&body.runtime_config,
Expand Down Expand Up @@ -1240,6 +1266,7 @@ pub(crate) async fn post_update_runtime(
&pipeline_name,
&None,
&None,
&None,
&state.common_config.platform_version,
true, // bump platform version.
&None,
Expand Down
6 changes: 6 additions & 0 deletions crates/pipeline-manager/src/api/examples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ fn extended_pipeline_1() -> ExtendedPipelineDescr {
id: PipelineId(uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8")),
name: "example1".to_string(),
description: "Description of the pipeline example1".to_string(),
metadata: "".to_string(),
created_at: Default::default(),
version: Version(4),
platform_version: "v0".to_string(),
Expand Down Expand Up @@ -90,6 +91,7 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr {
id: PipelineId(uuid!("67e55044-10b1-426f-9247-bb680e5fe0c9")),
name: "example2".to_string(),
description: "Description of the pipeline example2".to_string(),
metadata: "".to_string(),
created_at: Default::default(),
version: Version(1),
platform_version: "v0".to_string(),
Expand Down Expand Up @@ -179,6 +181,7 @@ fn pipeline_info_internal_to_external(pipeline: PipelineInfoInternal) -> Pipelin
id: pipeline.id,
name: pipeline.name,
description: pipeline.description,
metadata: pipeline.metadata,
created_at: pipeline.created_at,
version: pipeline.version,
platform_version: pipeline.platform_version,
Expand Down Expand Up @@ -237,6 +240,7 @@ fn pipeline_selected_info_internal_to_external(
id: pipeline.id,
name: pipeline.name,
description: pipeline.description,
metadata: pipeline.metadata,
created_at: pipeline.created_at,
version: pipeline.version,
platform_version: pipeline.platform_version,
Expand Down Expand Up @@ -319,6 +323,7 @@ pub(crate) fn pipeline_post_put() -> PostPutPipeline {
PostPutPipeline {
name: "example1".to_string(),
description: Some("Description of the pipeline example1".to_string()),
metadata: None,
runtime_config: Some(RuntimeConfig {
workers: 16,
tracing_endpoint_jaeger: "".to_string(),
Expand All @@ -340,6 +345,7 @@ pub(crate) fn patch_pipeline() -> PatchPipeline {
PatchPipeline {
name: None,
description: Some("This is a new description".to_string()),
metadata: None,
runtime_config: None,
program_code: Some("CREATE TABLE table3 ( col3 INT );".to_string()),
udf_rust: None,
Expand Down
1 change: 1 addition & 0 deletions crates/pipeline-manager/src/compiler/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl CompilerTest {
PipelineDescr {
name: name.to_string(),
description: "not-used".to_string(),
metadata: "".to_string(),
runtime_config: json!({}),
program_code: program_code.to_string(),
udf_rust: udf_rust.to_string(),
Expand Down
2 changes: 2 additions & 0 deletions crates/pipeline-manager/src/db/listen_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ mod test {
PipelineDescr {
name: "example".to_string(),
description: "Description of example".to_string(),
metadata: "".to_string(),
runtime_config: json!({}),
program_code: "CREATE TABLE example ( col1 INT );".to_string(),
udf_rust: "".to_string(),
Expand Down Expand Up @@ -414,6 +415,7 @@ mod test {
"example",
&Some("example-renamed".to_string()),
&Some("Description of example2".to_string()),
&None,
"v0",
false,
&None,
Expand Down
Loading
Loading