diff --git a/bun.lock b/bun.lock index 0dd74824f8a..d4acbefa0cd 100644 --- a/bun.lock +++ b/bun.lock @@ -132,6 +132,7 @@ "@tailwindcss/forms": "0.5.11", "@tailwindcss/typography": "0.5.19", "@tailwindcss/vite": "4.2.0", + "@thisux/sveltednd": "^0.4.1", "@types/args": "5.0.4", "@types/d3-format": "3.0.4", "@types/eslint": "9.6.1", @@ -774,6 +775,8 @@ "@testing-library/svelte-core": ["@testing-library/svelte-core@1.0.0", "", { "peerDependencies": { "svelte": "^3 || ^4 || ^5 || ^5.0.0-next.0" } }, "sha512-VkUePoLV6oOYwSUvX6ShA8KLnJqZiYMIbP2JW2t0GLWLkJxKGvuH5qrrZBV/X7cXFnLGuFQEC7RheYiZOW68KQ=="], + "@thisux/sveltednd": ["@thisux/sveltednd@0.4.1", "", { "peerDependencies": { "svelte": "^5.0.0" } }, "sha512-qRcwstSmYOoGD9h0hPDI6Tm4nBXeMJpCs3vrOhnImi1ObL/D2ScfAmIbL9bMN1l2V51AVvD5l/G/ZJkx8iRbTA=="], + "@tokenizer/token": ["@tokenizer/token@0.3.0", "", {}, "sha512-OvjF+z51L3ov0OyAU0duzsYuvO01PH7x4t6DJx+guahgTnBHkhJdG7soQeTSFLWN3efnHyibZ4Z8l2EuWwJN3A=="], "@tootallnate/once": ["@tootallnate/once@2.0.0", "", {}, "sha512-XCuKFP5PS55gnMVu3dty8KPatLqUoy/ZYzDzAGCQ8JNFCkLXzmI7vNHCR+XpbZaMWQK/vQubr7PkYq8g470J/A=="], diff --git a/crates/pipeline-manager/migrations/V34__pipeline_metadata.sql b/crates/pipeline-manager/migrations/V34__pipeline_metadata.sql new file mode 100644 index 00000000000..dcefce3a31e --- /dev/null +++ b/crates/pipeline-manager/migrations/V34__pipeline_metadata.sql @@ -0,0 +1,6 @@ +-- Add metadata field to the pipeline table. +-- This is arbitrary, optional text provided by the user. +ALTER TABLE pipeline ADD COLUMN metadata TEXT NOT NULL DEFAULT ''; + +-- Copy existing description values into the metadata JSON "description" field. +UPDATE pipeline SET metadata = json_build_object('description', description)::text WHERE description != ''; diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs index f90b2549346..7627f44aff7 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_management.rs @@ -93,7 +93,10 @@ pub struct ConnectorStats { pub struct PipelineInfo { pub id: PipelineId, pub name: String, + /// Deprecated: use `metadata` instead. + #[schema(deprecated)] pub description: String, + pub metadata: String, pub created_at: DateTime, pub version: Version, pub platform_version: String, @@ -143,6 +146,7 @@ pub struct PipelineInfoInternal { pub id: PipelineId, pub name: String, pub description: String, + pub metadata: String, pub created_at: DateTime, pub version: Version, pub platform_version: String, @@ -184,6 +188,7 @@ impl PipelineInfoInternal { id: extended_pipeline.id, name: extended_pipeline.name, description: extended_pipeline.description, + metadata: extended_pipeline.metadata, created_at: extended_pipeline.created_at, version: extended_pipeline.version, platform_version: extended_pipeline.platform_version, @@ -245,7 +250,10 @@ impl PipelineInfoInternal { pub struct PipelineSelectedInfo { pub id: PipelineId, pub name: String, + /// Deprecated: use `metadata` instead. + #[schema(deprecated)] pub description: String, + pub metadata: String, pub created_at: DateTime, pub version: Version, pub platform_version: String, @@ -303,6 +311,7 @@ pub struct PipelineSelectedInfoInternal { pub id: PipelineId, pub name: String, pub description: String, + pub metadata: String, pub created_at: DateTime, pub version: Version, pub platform_version: String, @@ -355,6 +364,7 @@ impl PipelineSelectedInfoInternal { id: extended_pipeline.id, name: extended_pipeline.name, description: extended_pipeline.description, + metadata: extended_pipeline.metadata, created_at: extended_pipeline.created_at, version: extended_pipeline.version, platform_version: extended_pipeline.platform_version, @@ -418,6 +428,7 @@ impl PipelineSelectedInfoInternal { id: extended_pipeline.id, name: extended_pipeline.name, description: extended_pipeline.description, + metadata: extended_pipeline.metadata, created_at: extended_pipeline.created_at, version: extended_pipeline.version, platform_version: extended_pipeline.platform_version, @@ -490,7 +501,8 @@ pub enum PipelineFieldSelector { /// The selection includes the following fields: /// - `id` /// - `name` - /// - `description` + /// - `description` (deprecated, use `metadata`) + /// - `metadata` /// - `created_at` /// - `version` /// - `platform_version` @@ -530,7 +542,8 @@ pub enum PipelineFieldSelector { /// The selection includes the following fields: /// - `id` /// - `name` - /// - `description` + /// - `description` (deprecated, use `metadata`) + /// - `metadata` /// - `created_at` /// - `version` /// - `platform_version` @@ -591,7 +604,10 @@ pub struct GetPipelineParameters { #[derive(Debug, Serialize, Deserialize, ToSchema)] pub struct PostPutPipeline { pub name: String, + /// Deprecated: use `metadata` instead. + #[schema(deprecated)] pub description: Option, + pub metadata: Option, pub runtime_config: Option, pub program_code: String, pub udf_rust: Option, @@ -609,6 +625,7 @@ pub struct PostPutPipeline { pub struct PostPutPipelineInternal { pub name: String, pub description: Option, + pub metadata: Option, pub runtime_config: Option, pub program_code: String, pub udf_rust: Option, @@ -623,6 +640,7 @@ impl From for PipelineDescr { PipelineDescr { name: value.name.clone(), description: value.description.clone().unwrap_or("".to_string()), + metadata: value.metadata.clone().unwrap_or("".to_string()), runtime_config: value.runtime_config.clone().unwrap_or(json!({})), program_code: value.program_code.clone(), udf_rust: value.udf_rust.clone().unwrap_or("".to_string()), @@ -641,7 +659,13 @@ impl From for PipelineDescr { #[derive(Debug, Serialize, Deserialize, ToSchema)] pub struct PatchPipeline { pub name: Option, + /// Deprecated: use `metadata` instead. + #[schema(deprecated)] pub description: Option, + /// Free-form client-side annotation. Unlike the other fields, `metadata` + /// can be patched at any time — including while the pipeline is running — + /// because it has no effect on the deployed pipeline. + pub metadata: Option, pub runtime_config: Option, pub program_code: Option, pub udf_rust: Option, @@ -658,6 +682,7 @@ pub struct PatchPipeline { pub struct PatchPipelineInternal { pub name: Option, pub description: Option, + pub metadata: Option, pub runtime_config: Option, pub program_code: Option, pub udf_rust: Option, @@ -1151,6 +1176,7 @@ pub(crate) async fn patch_pipeline( &pipeline_name, &body.name, &body.description, + &body.metadata, &state.common_config.platform_version, false, &body.runtime_config, @@ -1240,6 +1266,7 @@ pub(crate) async fn post_update_runtime( &pipeline_name, &None, &None, + &None, &state.common_config.platform_version, true, // bump platform version. &None, diff --git a/crates/pipeline-manager/src/api/examples.rs b/crates/pipeline-manager/src/api/examples.rs index 7839f8557fe..7101bb60918 100644 --- a/crates/pipeline-manager/src/api/examples.rs +++ b/crates/pipeline-manager/src/api/examples.rs @@ -32,6 +32,7 @@ fn extended_pipeline_1() -> ExtendedPipelineDescr { id: PipelineId(uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8")), name: "example1".to_string(), description: "Description of the pipeline example1".to_string(), + metadata: "".to_string(), created_at: Default::default(), version: Version(4), platform_version: "v0".to_string(), @@ -90,6 +91,7 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr { id: PipelineId(uuid!("67e55044-10b1-426f-9247-bb680e5fe0c9")), name: "example2".to_string(), description: "Description of the pipeline example2".to_string(), + metadata: "".to_string(), created_at: Default::default(), version: Version(1), platform_version: "v0".to_string(), @@ -179,6 +181,7 @@ fn pipeline_info_internal_to_external(pipeline: PipelineInfoInternal) -> Pipelin id: pipeline.id, name: pipeline.name, description: pipeline.description, + metadata: pipeline.metadata, created_at: pipeline.created_at, version: pipeline.version, platform_version: pipeline.platform_version, @@ -237,6 +240,7 @@ fn pipeline_selected_info_internal_to_external( id: pipeline.id, name: pipeline.name, description: pipeline.description, + metadata: pipeline.metadata, created_at: pipeline.created_at, version: pipeline.version, platform_version: pipeline.platform_version, @@ -319,6 +323,7 @@ pub(crate) fn pipeline_post_put() -> PostPutPipeline { PostPutPipeline { name: "example1".to_string(), description: Some("Description of the pipeline example1".to_string()), + metadata: None, runtime_config: Some(RuntimeConfig { workers: 16, tracing_endpoint_jaeger: "".to_string(), @@ -340,6 +345,7 @@ pub(crate) fn patch_pipeline() -> PatchPipeline { PatchPipeline { name: None, description: Some("This is a new description".to_string()), + metadata: None, runtime_config: None, program_code: Some("CREATE TABLE table3 ( col3 INT );".to_string()), udf_rust: None, diff --git a/crates/pipeline-manager/src/compiler/test.rs b/crates/pipeline-manager/src/compiler/test.rs index 7e74dc84a4f..f9ee8651664 100644 --- a/crates/pipeline-manager/src/compiler/test.rs +++ b/crates/pipeline-manager/src/compiler/test.rs @@ -132,6 +132,7 @@ impl CompilerTest { PipelineDescr { name: name.to_string(), description: "not-used".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: program_code.to_string(), udf_rust: udf_rust.to_string(), diff --git a/crates/pipeline-manager/src/db/listen_table.rs b/crates/pipeline-manager/src/db/listen_table.rs index c981dd54c4c..ca147b99338 100644 --- a/crates/pipeline-manager/src/db/listen_table.rs +++ b/crates/pipeline-manager/src/db/listen_table.rs @@ -384,6 +384,7 @@ mod test { PipelineDescr { name: "example".to_string(), description: "Description of example".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "CREATE TABLE example ( col1 INT );".to_string(), udf_rust: "".to_string(), @@ -414,6 +415,7 @@ mod test { "example", &Some("example-renamed".to_string()), &Some("Description of example2".to_string()), + &None, "v0", false, &None, diff --git a/crates/pipeline-manager/src/db/operations/pipeline.rs b/crates/pipeline-manager/src/db/operations/pipeline.rs index 38f0c9fc301..40646153be8 100644 --- a/crates/pipeline-manager/src/db/operations/pipeline.rs +++ b/crates/pipeline-manager/src/db/operations/pipeline.rs @@ -210,7 +210,7 @@ pub(crate) async fn new_pipeline( let stmt = txn .prepare_cached( - "INSERT INTO pipeline (id, tenant_id, name, description, created_at, version, platform_version, runtime_config, + "INSERT INTO pipeline (id, tenant_id, name, description, metadata, created_at, version, platform_version, runtime_config, program_code, udf_rust, udf_toml, program_config, program_version, program_status, program_status_since, program_error, program_info, program_binary_source_checksum, program_binary_integrity_checksum, @@ -221,14 +221,14 @@ pub(crate) async fn new_pipeline( deployment_runtime_status, deployment_runtime_status_since, deployment_runtime_desired_status, deployment_runtime_desired_status_since ) - VALUES ($1, $2, $3, $4, now(), $5, $6, $7, - $8, $9, $10, $11, $12, $13, - now(), $14, NULL, + VALUES ($1, $2, $3, $4, $5, now(), $6, $7, $8, + $9, $10, $11, $12, $13, $14, + now(), $15, NULL, NULL, NULL, - NULL, NULL, NULL, TRUE, $15, $16, NULL, + NULL, NULL, NULL, TRUE, $16, $17, NULL, NULL, NULL, - $17, now(), $18, now(), + $19, now(), NULL, NULL, NULL, NULL)", ) @@ -240,25 +240,26 @@ pub(crate) async fn new_pipeline( &tenant_id.0, // $2: tenant_id &pipeline.name, // $3: name &pipeline.description, // $4: description - &Version(1).0, // $5: version - &platform_version.to_string(), // $6: platform_version - &runtime_config.to_string(), // $7: runtime_config - &pipeline.program_code, // $8: program_code - &pipeline.udf_rust, // $9: udf_rust - &pipeline.udf_toml, // $10: udf_toml - &program_config.to_string(), // $11: program_config - &Version(1).0, // $12: program_version - &ProgramStatus::Pending.to_string(), // $13: program_status - // $14: program_error + &pipeline.metadata, // $5: metadata + &Version(1).0, // $6: version + &platform_version.to_string(), // $7: platform_version + &runtime_config.to_string(), // $8: runtime_config + &pipeline.program_code, // $9: program_code + &pipeline.udf_rust, // $10: udf_rust + &pipeline.udf_toml, // $11: udf_toml + &program_config.to_string(), // $12: program_config + &Version(1).0, // $13: program_version + &ProgramStatus::Pending.to_string(), // $14: program_status + // $15: program_error &serialize_program_error(&ProgramError { sql_compilation: None, rust_compilation: None, system_error: None, })?, - &Version(1).0, // $15: refresh_version - &StorageStatus::Cleared.to_string(), // $16: storage_status - &ResourcesStatus::Stopped.to_string(), // $17: deployment_resources_status - &ResourcesDesiredStatus::Stopped.to_string(), // $18: deployment_resources_desired_status + &Version(1).0, // $16: refresh_version + &StorageStatus::Cleared.to_string(), // $17: storage_status + &ResourcesStatus::Stopped.to_string(), // $18: deployment_resources_status + &ResourcesDesiredStatus::Stopped.to_string(), // $19: deployment_resources_desired_status ], ) .await @@ -268,6 +269,47 @@ pub(crate) async fn new_pipeline( Ok((PipelineId(new_id), Version(1))) } +/// Bundle of patchable pipeline fields — i.e. the contents of a `PATCH` +/// request body. Methods that classify a patch (e.g. "is this a metadata-only +/// update?") destructure this struct exhaustively, so adding a new patchable +/// field forces every such classifier to be revisited or fail to compile. +pub(crate) struct PipelineFieldUpdates<'a> { + pub name: &'a Option, + pub description: &'a Option, + pub metadata: &'a Option, + pub runtime_config: &'a Option, + pub program_code: &'a Option, + pub udf_rust: &'a Option, + pub udf_toml: &'a Option, + pub program_config: &'a Option, +} + +impl PipelineFieldUpdates<'_> { + /// Returns true iff `metadata` is set and every other patchable field is + /// `None`. The exhaustive destructuring is load-bearing: it prevents a + /// future field from being silently classified as "not present" here. + pub fn is_metadata_only(&self) -> bool { + let Self { + name, + description, + metadata, + runtime_config, + program_code, + udf_rust, + udf_toml, + program_config, + } = self; + metadata.is_some() + && name.is_none() + && description.is_none() + && runtime_config.is_none() + && program_code.is_none() + && udf_rust.is_none() + && udf_toml.is_none() + && program_config.is_none() + } +} + /// Modify pipeline. /// /// # Arguments @@ -278,24 +320,29 @@ pub(crate) async fn new_pipeline( /// * `bump_platform_version` - if true, the platform_version of the pipeline will be updated to the /// provided `platform_version`. In addition, the platform_version will be updated unconditionally /// if the program code or program settings are getting updated by this request. -/// * Other arguments correspond to fields that can be updated. If an argument is `None`, the corresponding -/// field is not updated. -#[allow(clippy::too_many_arguments)] +/// * `updates` - patchable fields. Each `Some` value is applied; `None` leaves the corresponding +/// field unchanged. pub(crate) async fn update_pipeline( txn: &Transaction<'_>, is_compiler_update: bool, tenant_id: TenantId, original_name: &str, - name: &Option, - description: &Option, + updates: &PipelineFieldUpdates<'_>, platform_version: &str, mut bump_platform_version: bool, - runtime_config: &Option, - program_code: &Option, - udf_rust: &Option, - udf_toml: &Option, - program_config: &Option, ) -> Result { + // Dereference in the pattern so each binding has the original reference + // type (e.g. `name: &Option`) rather than `&&Option`. + let &PipelineFieldUpdates { + name, + description, + metadata, + runtime_config, + program_code, + udf_rust, + udf_toml, + program_config, + } = updates; if let Some(name) = name { validate_name(name)?; } @@ -344,23 +391,32 @@ pub(crate) async fn update_pipeline( // This will also return an error if the pipeline does not exist. let current = get_pipeline(txn, tenant_id, original_name).await?; + // `metadata` is a free-form client-side annotation that has no impact + // on the deployed pipeline, so we allow patching it regardless of deployment status. + // Requests that touch only `metadata` skip the stopped-state gate below. + let is_metadata_only_update = + !is_compiler_update && !bump_platform_version && updates.is_metadata_only(); + // Pipeline update is allowed if either: // - Current status is `Stopped` AND desired status is `Stopped` // - Current status is `Stopped` AND desired status is `Provisioned` AND it is the // compiler doing the update to bump platform version (the early start mechanism) - if !matches!( - ( - is_compiler_update, - current.deployment_resources_status, - current.deployment_resources_desired_status - ), - (_, ResourcesStatus::Stopped, ResourcesDesiredStatus::Stopped) - | ( - true, - ResourcesStatus::Stopped, - ResourcesDesiredStatus::Provisioned + // - The update only touches `metadata` (see above). + if !(is_metadata_only_update + || matches!( + ( + is_compiler_update, + current.deployment_resources_status, + current.deployment_resources_desired_status ), - ) { + (_, ResourcesStatus::Stopped, ResourcesDesiredStatus::Stopped) + | ( + true, + ResourcesStatus::Stopped, + ResourcesDesiredStatus::Provisioned + ), + )) + { return Err(DBError::UpdateRestrictedToStopped); } @@ -370,6 +426,7 @@ pub(crate) async fn update_pipeline( || (bump_platform_version && name.is_none() && description.is_none() + && metadata.is_none() && runtime_config.is_none() && program_code.is_none() && udf_rust.is_none() @@ -377,12 +434,55 @@ pub(crate) async fn update_pipeline( && program_config.is_none()) ); + // Metadata-only fast path. `metadata` is a free-form client-side annotation + // (e.g. folder paths in the UI tree view) with no deployment semantics, so + // patching it must be invisible to anything watching the pipeline's state: + // + // - `version` is *not* incremented. The runner automaton uses it as a guard + // on every resources-status transition (see `pipeline_automata.rs`); only + // `TransitionToProvisioning` retries on `OutdatedPipelineVersion`, every + // other transition surfaces it as a hard error. Bumping `version` here + // would crash mid-flight transitions whenever a metadata patch landed + // concurrently. + // - `refresh_version` is *not* incremented. It is the client-visible + // "material change happened" counter; folder moves are not material. + // - No `pipeline_monitor_event` is emitted. Such an event triggers a + // Postgres `NOTIFY` that wakes the per-pipeline runner. The runner has + // nothing to do for a metadata change, so emitting the event would be + // pure wake-up noise on every drag-and-drop. + // + // Consequently this branch issues a narrow `UPDATE pipeline SET metadata` + // (no version columns touched) and returns `current.version` unchanged. + if is_metadata_only_update { + // Short-circuit no-op patches: nothing to write, version stays put. + let new_metadata = metadata + .as_ref() + .expect("metadata-only update has Some(metadata)"); + if *new_metadata == current.metadata { + return Ok(current.version); + } + let stmt = txn + .prepare_cached( + "UPDATE pipeline + SET metadata = $1 + WHERE tenant_id = $2 AND name = $3", + ) + .await?; + let rows_affected = txn + .execute(&stmt, &[new_metadata, &tenant_id.0, &original_name]) + .await + .map_err(maybe_unique_violation)?; + assert_eq!(rows_affected, 1); // The row must exist as it has been retrieved above + return Ok(current.version); + } + // If nothing changes in any of the core fields, return the current version if (name.is_none() || name.as_ref().is_some_and(|v| *v == current.name)) && (description.is_none() || description .as_ref() .is_some_and(|v| *v == current.description)) + && (metadata.is_none() || metadata.as_ref().is_some_and(|v| *v == current.metadata)) && (!bump_platform_version || platform_version == current.platform_version.as_str()) && (runtime_config.is_none() || runtime_config @@ -476,15 +576,16 @@ pub(crate) async fn update_pipeline( "UPDATE pipeline SET name = COALESCE($1, name), description = COALESCE($2, description), - platform_version = COALESCE($3, platform_version), - runtime_config = COALESCE($4, runtime_config), - program_code = COALESCE($5, program_code), - udf_rust = COALESCE($6, udf_rust), - udf_toml = COALESCE($7, udf_toml), - program_config = COALESCE($8, program_config), + metadata = COALESCE($3, metadata), + platform_version = COALESCE($4, platform_version), + runtime_config = COALESCE($5, runtime_config), + program_code = COALESCE($6, program_code), + udf_rust = COALESCE($7, udf_rust), + udf_toml = COALESCE($8, udf_toml), + program_config = COALESCE($9, program_config), version = version + 1, refresh_version = refresh_version + 1 - WHERE tenant_id = $9 AND name = $10", + WHERE tenant_id = $10 AND name = $11", ) .await?; let rows_affected = txn @@ -493,6 +594,7 @@ pub(crate) async fn update_pipeline( &[ &name, &description, + &metadata, &if bump_platform_version { Some(platform_version.to_string()) } else { diff --git a/crates/pipeline-manager/src/db/operations/pipeline_parsing.rs b/crates/pipeline-manager/src/db/operations/pipeline_parsing.rs index 1ee5a679cfd..63dd61e5408 100644 --- a/crates/pipeline-manager/src/db/operations/pipeline_parsing.rs +++ b/crates/pipeline-manager/src/db/operations/pipeline_parsing.rs @@ -23,7 +23,7 @@ use tracing::error; use uuid::Uuid; pub const PIPELINE_COLUMNS_ALL: &str = - "p.id, p.name, p.description, p.created_at, p.version, p.platform_version, p.runtime_config, + "p.id, p.name, p.description, p.metadata, p.created_at, p.version, p.platform_version, p.runtime_config, p.program_code, p.udf_rust, p.udf_toml, p.program_config, p.program_version, p.program_status, p.program_status_since, p.program_error, p.program_info, p.program_binary_source_checksum, p.program_binary_integrity_checksum, p.program_info_integrity_checksum, @@ -36,7 +36,7 @@ pub const PIPELINE_COLUMNS_ALL: &str = "; pub const PIPELINE_COLUMNS_MONITORING: &str = - "p.id, p.name, p.description, p.created_at, p.version, p.platform_version, + "p.id, p.name, p.description, p.metadata, p.created_at, p.version, p.platform_version, p.program_config, p.program_version, p.program_status, p.program_status_since, p.deployment_error, p.deployment_location, p.refresh_version, p.storage_status, p.storage_status_details, p.deployment_id, p.deployment_initial, @@ -65,6 +65,7 @@ pub fn parse_pipeline_row_all(row: &Row) -> Result Result String { row.get("description") } +fn parse_from_row_metadata(row: &Row) -> String { + row.get("metadata") +} + fn parse_from_row_created_at(row: &Row) -> DateTime { row.get("created_at") } @@ -583,6 +589,7 @@ mod tests { id: PipelineId(Uuid::from_u128(1)), name: "".to_string(), description: "".to_string(), + metadata: "".to_string(), created_at: Default::default(), version: Version(1), platform_version: "".to_string(), @@ -638,6 +645,7 @@ mod tests { id: PipelineId(Uuid::from_u128(1)), name: "".to_string(), description: "".to_string(), + metadata: "".to_string(), created_at: Default::default(), version: Version(1), platform_version: "".to_string(), diff --git a/crates/pipeline-manager/src/db/storage.rs b/crates/pipeline-manager/src/db/storage.rs index 911956e7445..13410659bc5 100644 --- a/crates/pipeline-manager/src/db/storage.rs +++ b/crates/pipeline-manager/src/db/storage.rs @@ -34,6 +34,7 @@ impl ExtendedPipelineDescrRunner { id: pipeline.id, name: pipeline.name.clone(), description: pipeline.description.clone(), + metadata: pipeline.metadata.clone(), created_at: pipeline.created_at, version: pipeline.version, platform_version: pipeline.platform_version.clone(), @@ -214,6 +215,7 @@ pub(crate) trait Storage { original_name: &str, name: &Option, description: &Option, + metadata: &Option, platform_version: &str, bump_platform_version: bool, runtime_config: &Option, diff --git a/crates/pipeline-manager/src/db/storage_postgres.rs b/crates/pipeline-manager/src/db/storage_postgres.rs index 35296cc1ae9..f389609acee 100644 --- a/crates/pipeline-manager/src/db/storage_postgres.rs +++ b/crates/pipeline-manager/src/db/storage_postgres.rs @@ -326,15 +326,18 @@ impl Storage for StoragePostgres { false, // Done by user tenant_id, original_name, - &Some(pipeline.name.clone()), - &Some(pipeline.description.clone()), + &operations::pipeline::PipelineFieldUpdates { + name: &Some(pipeline.name.clone()), + description: &Some(pipeline.description.clone()), + metadata: &Some(pipeline.metadata.clone()), + runtime_config: &Some(pipeline.runtime_config.clone()), + program_code: &Some(pipeline.program_code.clone()), + udf_rust: &Some(pipeline.udf_rust.clone()), + udf_toml: &Some(pipeline.udf_toml.clone()), + program_config: &Some(pipeline.program_config.clone()), + }, platform_version, bump_platform_version, - &Some(pipeline.runtime_config.clone()), - &Some(pipeline.program_code.clone()), - &Some(pipeline.udf_rust.clone()), - &Some(pipeline.udf_toml.clone()), - &Some(pipeline.program_config.clone()), ) .await?; false @@ -421,6 +424,7 @@ impl Storage for StoragePostgres { original_name: &str, name: &Option, description: &Option, + metadata: &Option, platform_version: &str, bump_platform_version: bool, runtime_config: &Option, @@ -438,15 +442,18 @@ impl Storage for StoragePostgres { false, // Done by user tenant_id, original_name, - name, - description, + &operations::pipeline::PipelineFieldUpdates { + name, + description, + metadata, + runtime_config, + program_code, + udf_rust, + udf_toml, + program_config, + }, platform_version, bump_platform_version, - runtime_config, - program_code, - udf_rust, - udf_toml, - program_config, ) .await?; @@ -1115,15 +1122,18 @@ impl Storage for StoragePostgres { true, // Done by compiler tenant_id, &pipeline.name, - &None, - &None, + &operations::pipeline::PipelineFieldUpdates { + name: &None, + description: &None, + metadata: &None, + runtime_config: &None, + program_code: &None, + udf_rust: &None, + udf_toml: &None, + program_config: &None, + }, platform_version, true, - &None, - &None, - &None, - &None, - &None, ) .await?; } @@ -1204,15 +1214,18 @@ impl Storage for StoragePostgres { true, // Done by compiler tenant_id, &pipeline.name, - &None, - &None, + &operations::pipeline::PipelineFieldUpdates { + name: &None, + description: &None, + metadata: &None, + runtime_config: &None, + program_code: &None, + udf_rust: &None, + udf_toml: &None, + program_config: &None, + }, platform_version, true, - &None, - &None, - &None, - &None, - &None, ) .await?; } diff --git a/crates/pipeline-manager/src/db/test.rs b/crates/pipeline-manager/src/db/test.rs index ae20859af54..e1b11e00129 100644 --- a/crates/pipeline-manager/src/db/test.rs +++ b/crates/pipeline-manager/src/db/test.rs @@ -435,6 +435,7 @@ fn limited_pipeline_descr() -> impl Strategy { .prop_map(|val| PipelineDescr { name: map_val_to_limited_pipeline_name(val.0), description: val.1, + metadata: "".to_string(), runtime_config: map_val_to_limited_runtime_config(val.2), program_code: val.3, udf_rust: val.4, @@ -730,6 +731,7 @@ async fn pipeline_creation() { let new_descriptor = PipelineDescr { name: "test1".to_string(), description: "Test description".to_string(), + metadata: "".to_string(), runtime_config: json!({ "workers": 123 }), @@ -829,6 +831,7 @@ async fn pipeline_retrieval() { PipelineDescr { name: "test1".to_string(), description: "d1".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "c1".to_string(), udf_rust: "r1".to_string(), @@ -872,6 +875,7 @@ async fn pipeline_retrieval() { PipelineDescr { name: "test2".to_string(), description: "d2".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "c2".to_string(), udf_rust: "r2".to_string(), @@ -963,6 +967,7 @@ async fn pipeline_versioning() { PipelineDescr { name: "example".to_string(), description: "d1".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "c1".to_string(), udf_rust: "r1".to_string(), @@ -983,7 +988,8 @@ async fn pipeline_versioning() { handle .db .update_pipeline( - tenant_id, "example", &None, &None, "v0", false, &None, &None, &None, &None, &None, + tenant_id, "example", &None, &None, &None, "v0", false, &None, &None, &None, &None, + &None, ) .await .unwrap(); @@ -1000,6 +1006,7 @@ async fn pipeline_versioning() { "example", &None, &None, + &None, "v0", false, &None, @@ -1023,6 +1030,7 @@ async fn pipeline_versioning() { "example", &None, &Some("d1".to_string()), + &None, "v0", false, &None, @@ -1046,6 +1054,7 @@ async fn pipeline_versioning() { "example", &None, &None, + &None, "v0", false, &None, @@ -1070,6 +1079,7 @@ async fn pipeline_versioning() { "example", &None, &None, + &None, "v0", false, &None, @@ -1094,6 +1104,7 @@ async fn pipeline_versioning() { "example", &None, &None, + &None, "v0", false, &None, @@ -1118,6 +1129,7 @@ async fn pipeline_versioning() { "example", &None, &Some("d2".to_string()), + &None, "v0", false, &None, @@ -1148,6 +1160,7 @@ async fn pipeline_versioning() { "example", &None, &None, + &None, "v0", false, &None, @@ -1172,6 +1185,7 @@ async fn pipeline_versioning() { "example", &Some("example2".to_string()), &None, + &None, "v0", false, &None, @@ -1231,6 +1245,7 @@ async fn pipeline_versioning() { "example2", &None, &None, + &None, "v0", false, &Some(new_runtime_config.clone()), @@ -1262,6 +1277,7 @@ async fn pipeline_duplicate() { PipelineDescr { name: "example".to_string(), description: "d1".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "c1".to_string(), udf_rust: "r1".to_string(), @@ -1281,6 +1297,7 @@ async fn pipeline_duplicate() { PipelineDescr { name: "example".to_string(), description: "d2".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "c2".to_string(), udf_rust: "r2".to_string(), @@ -1331,6 +1348,7 @@ async fn pipeline_program_compilation() { PipelineDescr { name: "example1".to_string(), description: "d1".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "c1".to_string(), udf_rust: "r1".to_string(), @@ -1349,6 +1367,7 @@ async fn pipeline_program_compilation() { PipelineDescr { name: "example2".to_string(), description: "d2".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "c2".to_string(), udf_rust: "r2".to_string(), @@ -1504,6 +1523,7 @@ async fn pipeline_transition_after_quick_stop() { PipelineDescr { name: "example1".to_string(), description: "d1".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "c1".to_string(), udf_rust: "r1".to_string(), @@ -1725,6 +1745,7 @@ async fn pipeline_deployment() { PipelineDescr { name: "example1".to_string(), description: "d1".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "c1".to_string(), udf_rust: "r1".to_string(), @@ -2386,6 +2407,7 @@ async fn pipeline_provision_version_guard() { PipelineDescr { name: "example1".to_string(), description: "d1".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "c1".to_string(), udf_rust: "r1".to_string(), @@ -2478,6 +2500,7 @@ async fn pipeline_provision_version_guard() { &pipeline.name, &None, &None, + &None, "v0", false, &Some( @@ -2620,6 +2643,212 @@ async fn pipeline_provision_version_guard() { .unwrap(); } +#[tokio::test] +async fn pipeline_metadata_update_while_running() { + // `metadata` is a free-form annotation with no impact on the deployed + // pipeline, so it can be patched at any time. Every other patchable + // field still requires the pipeline to be fully stopped. + let handle = test_setup().await; + let tenant_id = TenantRecord::default().id; + + let pipeline = handle + .db + .new_pipeline( + tenant_id, + Uuid::now_v7(), + "v0", + PipelineDescr { + name: "example".to_string(), + description: "d1".to_string(), + metadata: "m1".to_string(), + runtime_config: json!({}), + program_code: "c1".to_string(), + udf_rust: "r1".to_string(), + udf_toml: "t1".to_string(), + program_config: json!({}), + }, + ) + .await + .unwrap(); + + // Walk the program through to `Success` so we can set the deployment + // desired status to `Provisioned`. Mirrors `pipeline_provision_version_guard`. + handle + .db + .transit_program_status_to_compiling_sql(tenant_id, pipeline.id, Version(1)) + .await + .unwrap(); + handle + .db + .transit_program_status_to_sql_compiled( + tenant_id, + pipeline.id, + Version(1), + &SqlCompilationInfo { + exit_code: 0, + messages: vec![], + }, + &serde_json::to_value(ProgramInfo { + schema: ProgramSchema { + inputs: vec![], + outputs: vec![], + }, + main_rust: "".to_string(), + udf_stubs: "".to_string(), + input_connectors: BTreeMap::new(), + output_connectors: BTreeMap::new(), + dataflow: None, + }) + .unwrap(), + ) + .await + .unwrap(); + handle + .db + .transit_program_status_to_compiling_rust(tenant_id, pipeline.id, Version(1)) + .await + .unwrap(); + handle + .db + .transit_program_status_to_success( + tenant_id, + pipeline.id, + Version(1), + &RustCompilationInfo { + exit_code: 0, + stdout: "".to_string(), + stderr: "".to_string(), + }, + "def", + "123", + "456", + ) + .await + .unwrap(); + + // Flip desired to `Provisioned` — this is the state in which most patches + // are rejected with `UpdateRestrictedToStopped`. + handle + .db + .set_deployment_resources_desired_status_provisioned( + tenant_id, + &pipeline.name, + RuntimeDesiredStatus::Running, + BootstrapPolicy::default(), + false, + ) + .await + .unwrap(); + + // A description-only patch is blocked by the stopped-state gate. + let err = handle + .db + .update_pipeline( + tenant_id, + "example", + &None, + &Some("d2".to_string()), + &None, + "v0", + false, + &None, + &None, + &None, + &None, + &None, + ) + .await + .unwrap_err(); + assert!(matches!(err, DBError::UpdateRestrictedToStopped)); + + // Snapshot the pre-patch versions and monitor-event count so we can + // assert that the metadata patch leaves them untouched. + let before = handle.db.get_pipeline(tenant_id, "example").await.unwrap(); + let event_count_before = handle + .db + .list_pipeline_monitor_events_short(tenant_id, "example".to_string()) + .await + .unwrap() + .len(); + + // A metadata-only patch is allowed, writes the new value, and must NOT + // bump `version` / `refresh_version` nor emit a monitor event — see the + // metadata-only fast path in `db/operations/pipeline.rs::update_pipeline`. + let updated = handle + .db + .update_pipeline( + tenant_id, + "example", + &None, + &None, + &Some("m2".to_string()), + "v0", + false, + &None, + &None, + &None, + &None, + &None, + ) + .await + .unwrap(); + assert_eq!(updated.metadata, "m2"); + assert_eq!(updated.description, "d1"); + assert_eq!(updated.version, before.version); + assert_eq!(updated.refresh_version, before.refresh_version); + let event_count_after = handle + .db + .list_pipeline_monitor_events_short(tenant_id, "example".to_string()) + .await + .unwrap() + .len(); + assert_eq!(event_count_after, event_count_before); + + // Combining metadata with any other field falls back to the stopped gate. + let err = handle + .db + .update_pipeline( + tenant_id, + "example", + &None, + &Some("d2".to_string()), + &Some("m3".to_string()), + "v0", + false, + &None, + &None, + &None, + &None, + &None, + ) + .await + .unwrap_err(); + assert!(matches!(err, DBError::UpdateRestrictedToStopped)); + + // No-op metadata patches (same value) are a no-op: nothing is written, + // and version + monitor events still don't move. + let unchanged = handle + .db + .update_pipeline( + tenant_id, + "example", + &None, + &None, + &Some("m2".to_string()), + "v0", + false, + &None, + &None, + &None, + &None, + &None, + ) + .await + .unwrap(); + assert_eq!(unchanged.version, before.version); + assert_eq!(unchanged.refresh_version, before.refresh_version); +} + ////////////////////////////////////////////////////////////////////////////// ///// PROP TESTS ///// @@ -2670,6 +2899,7 @@ enum StorageAction { #[proptest(strategy = "limited_pipeline_name()")] String, #[proptest(strategy = "limited_option_pipeline_name()")] Option, Option, + Option, #[proptest(strategy = "limited_platform_version()")] String, bool, #[proptest(strategy = "limited_option_runtime_config()")] Option, @@ -3411,10 +3641,10 @@ fn db_impl_behaves_like_model() { let impl_response = handle.db.new_or_update_pipeline(tenant_id, new_id, &original_name, &platform_version, bump_platform_version, pipeline_descr.clone()).await; check_response_pipeline_with_created(i, model_response, impl_response); } - StorageAction::UpdatePipeline(tenant_id, original_name, name, description, platform_version, bump_platform_version, runtime_config, program_code, udf_rust, udf_toml, program_config) => { + StorageAction::UpdatePipeline(tenant_id, original_name, name, description, metadata, platform_version, bump_platform_version, runtime_config, program_code, udf_rust, udf_toml, program_config) => { create_tenants_if_not_exists(&model, &handle, tenant_id).await.unwrap(); - let model_response = model.update_pipeline(tenant_id, &original_name, &name, &description, &platform_version, bump_platform_version, &runtime_config, &program_code, &udf_rust, &udf_toml, &program_config).await; - let impl_response = handle.db.update_pipeline(tenant_id, &original_name, &name, &description, &platform_version, bump_platform_version, &runtime_config, &program_code, &udf_rust, &udf_toml, &program_config).await; + let model_response = model.update_pipeline(tenant_id, &original_name, &name, &description, &metadata, &platform_version, bump_platform_version, &runtime_config, &program_code, &udf_rust, &udf_toml, &program_config).await; + let impl_response = handle.db.update_pipeline(tenant_id, &original_name, &name, &description, &metadata, &platform_version, bump_platform_version, &runtime_config, &program_code, &udf_rust, &udf_toml, &program_config).await; check_response_pipeline(i, model_response, impl_response); } StorageAction::DeletePipeline(tenant_id, pipeline_name) => { @@ -3697,6 +3927,7 @@ trait ModelHelpers { original_name: &str, name: &Option, description: &Option, + metadata: &Option, platform_version: &str, bump_platform_version: bool, runtime_config: &Option, @@ -3747,6 +3978,7 @@ impl ModelHelpers for Mutex { original_name: &str, name: &Option, description: &Option, + metadata: &Option, platform_version: &str, bump_platform_version: bool, runtime_config: &Option, @@ -3778,23 +4010,61 @@ impl ModelHelpers for Mutex { // Fetch existing pipeline let mut pipeline = self.get_pipeline(tenant_id, original_name).await?; - // Pipeline must be stopped - if !matches!( - ( - pipeline.deployment_resources_status, - pipeline.deployment_resources_desired_status, - is_compiler_update - ), - (ResourcesStatus::Stopped, ResourcesDesiredStatus::Stopped, _) - | ( - ResourcesStatus::Stopped, - ResourcesDesiredStatus::Provisioned, - true - ) - ) { + // `metadata` is a free-form annotation with no deployment impact, so + // requests that touch only it skip the stopped-state gate. Mirrors + // the rule in `db/operations/pipeline.rs::update_pipeline`; the same + // `PipelineFieldUpdates::is_metadata_only` is reused so both + // implementations classify patches identically. + let updates = crate::db::operations::pipeline::PipelineFieldUpdates { + name, + description, + metadata, + runtime_config, + program_code, + udf_rust, + udf_toml, + program_config, + }; + let is_metadata_only_update = + !is_compiler_update && !bump_platform_version && updates.is_metadata_only(); + + // Pipeline must be stopped (unless this is a metadata-only update). + if !is_metadata_only_update + && !matches!( + ( + pipeline.deployment_resources_status, + pipeline.deployment_resources_desired_status, + is_compiler_update + ), + (ResourcesStatus::Stopped, ResourcesDesiredStatus::Stopped, _) + | ( + ResourcesStatus::Stopped, + ResourcesDesiredStatus::Provisioned, + true + ) + ) + { return Err(DBError::UpdateRestrictedToStopped); } + // Metadata-only fast path: see `db/operations/pipeline.rs::update_pipeline` + // for the rationale. The mirror invariants enforced here are: + // - `version` and `refresh_version` are not bumped. + // - No `pipeline_monitor_event` is appended. + if is_metadata_only_update { + let new_metadata = metadata + .as_ref() + .expect("metadata-only update has Some(metadata)"); + if *new_metadata != pipeline.metadata { + pipeline.metadata = new_metadata.clone(); + self.lock() + .await + .pipelines + .insert((tenant_id, pipeline.id), pipeline.clone()); + } + return Ok(pipeline); + } + // While stopped, certain fields are not allowed to be updated when storage is not cleared if pipeline.storage_status != StorageStatus::Cleared { let mut not_allowed = vec![]; @@ -3885,6 +4155,12 @@ impl ModelHelpers for Mutex { } pipeline.description = description.clone(); } + if let Some(metadata) = metadata { + if *metadata != pipeline.metadata { + version_increment = true; + } + pipeline.metadata = metadata.clone(); + } if *platform_version != pipeline.platform_version && bump_platform_version { version_increment = true; program_version_increment = true; @@ -3972,6 +4248,7 @@ fn convert_descriptor_to_monitoring( id: pipeline.id, name: pipeline.name.clone(), description: pipeline.description.clone(), + metadata: pipeline.metadata.clone(), created_at: pipeline.created_at, version: pipeline.version, platform_version: pipeline.platform_version.clone(), @@ -4340,6 +4617,7 @@ impl Storage for Mutex { id: pipeline_id, name: pipeline.name, description: pipeline.description, + metadata: pipeline.metadata, created_at: now, version: Version(1), platform_version: platform_version.to_string(), @@ -4413,6 +4691,7 @@ impl Storage for Mutex { original_name, &Some(pipeline.name), &Some(pipeline.description), + &Some(pipeline.metadata), platform_version, bump_platform_version, &Some(pipeline.runtime_config), @@ -4445,6 +4724,7 @@ impl Storage for Mutex { original_name: &str, name: &Option, description: &Option, + metadata: &Option, platform_version: &str, bump_platform_version: bool, runtime_config: &Option, @@ -4459,6 +4739,7 @@ impl Storage for Mutex { original_name, name, description, + metadata, platform_version, bump_platform_version, runtime_config, @@ -5376,6 +5657,7 @@ impl Storage for Mutex { &pipeline.name, &None, &None, + &None, platform_version, true, &None, @@ -5436,6 +5718,7 @@ impl Storage for Mutex { &pipeline.name, &None, &None, + &None, platform_version, true, &None, diff --git a/crates/pipeline-manager/src/db/types/pipeline.rs b/crates/pipeline-manager/src/db/types/pipeline.rs index 2ef5402fb98..156e42cf541 100644 --- a/crates/pipeline-manager/src/db/types/pipeline.rs +++ b/crates/pipeline-manager/src/db/types/pipeline.rs @@ -117,6 +117,9 @@ pub struct PipelineDescr { /// Pipeline description. pub description: String, + /// Arbitrary user-provided metadata text. + pub metadata: String, + /// Pipeline runtime configuration. pub runtime_config: serde_json::Value, @@ -140,6 +143,7 @@ impl PipelineDescr { Self { name: "test_pipeline".to_string(), description: "Test pipeline".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "CREATE TABLE test (col1 INT);".to_string(), udf_rust: "".to_string(), @@ -165,6 +169,9 @@ pub struct ExtendedPipelineDescr { /// Pipeline description. pub description: String, + /// Arbitrary user-provided metadata text. + pub metadata: String, + /// Timestamp when the pipeline was originally created. pub created_at: DateTime, @@ -296,6 +303,7 @@ pub struct ExtendedPipelineDescrMonitoring { pub id: PipelineId, pub name: String, pub description: String, + pub metadata: String, pub created_at: DateTime, pub version: Version, pub platform_version: String, diff --git a/crates/pipeline-manager/src/runner/pipeline_automata.rs b/crates/pipeline-manager/src/runner/pipeline_automata.rs index 1fdbe0e912f..04677dde557 100644 --- a/crates/pipeline-manager/src/runner/pipeline_automata.rs +++ b/crates/pipeline-manager/src/runner/pipeline_automata.rs @@ -2000,6 +2000,7 @@ mod test { PipelineDescr { name: "example1".to_string(), description: "Description of example1".to_string(), + metadata: "".to_string(), runtime_config: json!({}), program_code: "CREATE TABLE example1 ( col1 INT );".to_string(), udf_rust: "".to_string(), diff --git a/js-packages/web-console/package.json b/js-packages/web-console/package.json index 43d9e049363..334712e05a9 100644 --- a/js-packages/web-console/package.json +++ b/js-packages/web-console/package.json @@ -5,9 +5,9 @@ "@auth/sveltekit": "1.11.1", "@axa-fr/oidc-client": "7.26.4", "@biomejs/biome": "2.4.3", + "@feldera/vite-plugin-monaco-editor": "workspace:*", "@fontsource-variable/dm-sans": "5.2.8", "@fontsource/dm-mono": "5.2.7", - "@feldera/vite-plugin-monaco-editor": "workspace:*", "@fortawesome/fontawesome-free": "7.2.0", "@hey-api/client-fetch": "0.13.1", "@hey-api/openapi-ts": "0.92.4", @@ -27,6 +27,7 @@ "@tailwindcss/forms": "0.5.11", "@tailwindcss/typography": "0.5.19", "@tailwindcss/vite": "4.2.0", + "@thisux/sveltednd": "^0.4.1", "@types/args": "5.0.4", "@types/d3-format": "3.0.4", "@types/eslint": "9.6.1", diff --git a/js-packages/web-console/src/lib/components/pipelines/DraggableTreeView.svelte b/js-packages/web-console/src/lib/components/pipelines/DraggableTreeView.svelte new file mode 100644 index 00000000000..5cb7f043ee7 --- /dev/null +++ b/js-packages/web-console/src/lib/components/pipelines/DraggableTreeView.svelte @@ -0,0 +1,588 @@ + + +{#snippet gap(parentPath: string, key: string, depth: number)} + {#if isDragging} + {@const active = dragOverGap === key} +
{ + if (!isDragging) return + dragOverGap = key + dragOverFolder = null + dragOverPipeline = null + }, + onDragLeave: () => { + if (dragOverGap === key) dragOverGap = null + }, + onDrop: handleGapDrop(parentPath) + } + }} + > + +
+ {/if} +{/snippet} + +{#snippet renderNode(node: TreeNode, depth: number)} + {#if node.kind === 'folder'} + {@const dragOver = dragOverFolder === node.path} + {@const isDragged = draggedFolderPath === node.path} +
{ + if (!isDragging) return + dragOverFolder = node.path + dragOverGap = null + }, + onDragLeave: () => { + if (dragOverFolder === node.path) dragOverFolder = null + }, + onDrop: handleFolderDrop(node.path) + } + }} + > + {@render folderRow(node, { + depth, + isDragged, + leafCount: folderLeafNames(node).length, + checkState: folderCheckState(node), + toggleSelection: () => toggleFolderSelection(node), + isExpanded: isExpanded(node.path), + toggleExpanded: () => toggleExpanded(node.path), + renameFolder: (newName: string) => { + const trimmed = newName.trim() + if (!trimmed || trimmed === node.name) return + const parentPath = node.path.split('/').slice(0, -1).join('/') + const newPath = parentPath ? `${parentPath}/${trimmed}` : trimmed + onMoveFolder(node.path, newPath) + } + })} +
+ {#if isExpanded(node.path)} + {#each node.children as child, i (child.kind === 'folder' ? `f:${child.path}` : `l:${child.item.name}`)} + {@render gap(node.path, `${node.path}:${i}`, depth + 1)} + {@render renderNode(child, depth + 1)} + {/each} + {@render gap(node.path, `${node.path}:end`, depth + 1)} + {/if} + {:else} + {@const isDragged = draggedName === node.item.name} + {@const dragOver = dragOverPipeline === node.item.name} +
{ + if (!isDragging) return + dragOverPipeline = node.item.name + dragOverGap = null + }, + onDragLeave: () => { + if (dragOverPipeline === node.item.name) dragOverPipeline = null + }, + onDrop: handleLeafDrop(node.item) + } + }} + > + {@render leafRow(node.item, { depth, isDragged })} +
+ {/if} +{/snippet} + +
+ {@render header?.()} + {#each tree.children as child, i (child.kind === 'folder' ? `f:${child.path}` : `l:${child.item.name}`)} + {@render gap('', `:${i}`, 0)} + {@render renderNode(child, 0)} + {/each} + {@render gap('', ':end', 0)} +
+ + diff --git a/js-packages/web-console/src/lib/components/pipelines/List.svelte b/js-packages/web-console/src/lib/components/pipelines/List.svelte index 9c526d5a990..9fdacff070d 100644 --- a/js-packages/web-console/src/lib/components/pipelines/List.svelte +++ b/js-packages/web-console/src/lib/components/pipelines/List.svelte @@ -3,8 +3,8 @@ + +
+ + {#snippet header()} +
+
+
+ + { + selected = allSelected ? [] : items.map((p) => p.name) + }} + /> +
+
+
Pipeline name
+
Storage
+
+ Status +
+
Message
+
+ Runtime + +
+
Status changed
+
Deployed on
+
+ Errors + +
+
+ {/snippet} + + {#snippet folderRow(node, ctx)} +
+
+ ⋮⋮ + ctx.toggleSelection()} + /> +
+
+
+
+ + ctx.renameFolder(name)} + class="font-medium" + inputClass="input py-0 pl-1 -ml-1 h-6 text-base w-fit" + > + {#snippet children()} + + + ctx.toggleExpanded()} class="font-medium" + >{node.name || '(root)'} + {/snippet} + + ({ctx.leafCount}) +
+
+ {/snippet} + + {#snippet leafRow(pipeline)} +
+
+ ⋮⋮ + { + selected = selected.includes(pipeline.name) + ? selected.filter((n) => n !== pipeline.name) + : [...selected, pipeline.name] + }} + /> +
+
+ +
+
+ {match(pipeline.storageStatus) + .with('InUse', () => 'Storage in use') + .with('Clearing', () => 'Clearing storage') + .with('Cleared', () => 'Storage cleared') + .exhaustive()} +
+
+ +
+
+ + {#if pipeline.deploymentError} + {@const message = pipeline.deploymentError.message} + + +
+ {message} +
+
+ {message.slice(0, ((idx) => (idx > 0 ? idx : undefined))(message.indexOf('\n')))} + {/if} +
+
+
+
+ +
+
+
+
+ {formatElapsedTime(pipeline.lastStatusSince, 'dhm')} ago +
+
+
+
+ {pipeline.deploymentResourcesStatus === 'Provisioned' + ? formatDateTime(pipeline.deploymentResourcesStatusSince) + : ''} +
+
+
+
+ {pipeline.connectors?.numErrors ?? '-'} +
+
+ {/snippet} +
+
+ + diff --git a/js-packages/web-console/src/lib/components/pipelines/SidebarPipelineTreeView.svelte b/js-packages/web-console/src/lib/components/pipelines/SidebarPipelineTreeView.svelte new file mode 100644 index 00000000000..05f2559e928 --- /dev/null +++ b/js-packages/web-console/src/lib/components/pipelines/SidebarPipelineTreeView.svelte @@ -0,0 +1,162 @@ + + +{#snippet renderNode(node: TreeNode, depth: number)} + {#if node.kind === 'folder'} + + {#if isExpanded(node.path)} + {#each node.children as child (child.kind === 'folder' ? `f:${child.path}` : `l:${child.item.name}`)} + {@render renderNode(child, depth + 1)} + {/each} + {/if} + {:else} + + {node.item.name} + + + {/if} +{/snippet} + +
+ {#each tree.children as child (child.kind === 'folder' ? `f:${child.path}` : `l:${child.item.name}`)} + {@render renderNode(child, 0)} + {/each} +
diff --git a/js-packages/web-console/src/lib/components/pipelines/Table.svelte b/js-packages/web-console/src/lib/components/pipelines/Table.svelte index 9facab3eded..868b5eedd31 100644 --- a/js-packages/web-console/src/lib/components/pipelines/Table.svelte +++ b/js-packages/web-console/src/lib/components/pipelines/Table.svelte @@ -1,74 +1,32 @@ -
+
{#if header} @@ -109,10 +144,7 @@ table.selectAll()} - /> - Pipeline name - Storage - Status - Message - p.connectors?.numErrors} - > - - Errors - - - - - - Runtime - - - Status changed - Deployed on - - - - {#each table.rows as pipeline} - - table.select(pipeline.name)} - /> - - {pipeline.name} - -
- {match(pipeline.storageStatus) - .with('InUse', () => 'Storage in use') - .with('Clearing', () => 'Clearing storage') - .with('Cleared', () => 'Storage cleared') - .exhaustive()} - - - - - {#if pipeline.deploymentError} - {@const message = pipeline.deploymentError.message} - - -
- {message} -
-
- {message.slice(0, ((idx) => (idx > 0 ? idx : undefined))(message.indexOf('\n')))} - {/if} -
- - -
- {pipeline.connectors?.numErrors ?? '-'} -
- - -
- -
- - -
- {formatElapsedTime(pipeline.lastStatusSince, 'dhm')} ago -
- - -
- {pipeline.deploymentResourcesStatus === 'Provisioned' - ? formatDateTime(pipeline.deploymentResourcesStatusSince) - : ''} -
- - - {:else} - - - No pipelines found - - {/each} - - - + {#if visiblePipelines.length === 0} +
No pipelines found
+ {:else} + + {/if}
- - diff --git a/js-packages/web-console/src/lib/services/manager/client/utils.gen.ts b/js-packages/web-console/src/lib/services/manager/client/utils.gen.ts index ca2b3164a12..ae165e084f8 100644 --- a/js-packages/web-console/src/lib/services/manager/client/utils.gen.ts +++ b/js-packages/web-console/src/lib/services/manager/client/utils.gen.ts @@ -36,7 +36,9 @@ export const createQuerySerializer = ({ value, ...options.array }) - if (serializedArray) search.push(serializedArray) + if (serializedArray) { + search.push(serializedArray) + } } else if (typeof value === 'object') { const serializedObject = serializeObjectParam({ allowReserved: options.allowReserved, @@ -46,14 +48,18 @@ export const createQuerySerializer = ({ value: value as Record, ...options.object }) - if (serializedObject) search.push(serializedObject) + if (serializedObject) { + search.push(serializedObject) + } } else { const serializedPrimitive = serializePrimitiveParam({ allowReserved: options.allowReserved, name, value: value as string }) - if (serializedPrimitive) search.push(serializedPrimitive) + if (serializedPrimitive) { + search.push(serializedPrimitive) + } } } } diff --git a/js-packages/web-console/src/lib/services/manager/sdk.gen.ts b/js-packages/web-console/src/lib/services/manager/sdk.gen.ts index 93e6787455a..53601bae88f 100644 --- a/js-packages/web-console/src/lib/services/manager/sdk.gen.ts +++ b/js-packages/web-console/src/lib/services/manager/sdk.gen.ts @@ -871,8 +871,18 @@ export const postPipelineDismissError = ( * Subscribe to a stream of updates from a SQL view or table. * * The pipeline responds with a continuous stream of changes to the specified - * table or view, encoded using the format specified in the `?format=` - * parameter. Updates are split into `Chunk`s. + * table or view. The stream is configurable two ways: + * + * - Simple configuration of the format may be provided using query parameters. + * Use `format` to specify `csv` or `json` output and, for `json` only, `array` + * to specify whether to group updates into JSON arrays. Specify + * `backpressure` to specify behavior when the HTTP client cannot keep up. + * + * - Comprehensive configuration may be provided by providing a connector + * configuration as a JSON body. In this case, no query parameters are + * allowed. + * + * Updates are split into `Chunk`s. * * The pipeline continues sending updates until the client closes the * connection or the pipeline is stopped. @@ -899,7 +909,7 @@ export const httpOutput = ( * - Only the status details changed, and it has been 10s since the last event * - Nothing has changed for more than 10 minutes * - * This endpoint returns the most recent persisted events, up to 720. + * This endpoint returns the most recent persisted events, up to by default approximately 720. */ export const listPipelineEvents = ( options: Options @@ -919,10 +929,10 @@ export const listPipelineEvents = ( /** * Get Pipeline Event * - * Get specific pipeline monitor event. + * Get a specific pipeline monitor event. * * The identifiers of the events can be retrieved via `GET /v0/pipelines//events`. - * The most recent 720 events are retained. + * The most recent approximately 720 (default) events are retained. * This endpoint can return a 404 for an event that no longer exists due to a cleanup. */ export const getPipelineEvent = ( diff --git a/js-packages/web-console/src/lib/services/manager/types.gen.ts b/js-packages/web-console/src/lib/services/manager/types.gen.ts index b2c1b505630..d255864b616 100644 --- a/js-packages/web-console/src/lib/services/manager/types.gen.ts +++ b/js-packages/web-console/src/lib/services/manager/types.gen.ts @@ -1424,6 +1424,10 @@ export type DevTweaks = { * shards by `hash(key) & (n - 1)`. */ buffer_max_buckets?: number | null + /** + * Don't automatically start a transaction for every step. + */ + disable_auto_transaction?: boolean | null /** * Evict eagerly from buffer caches as files get deleted. * @@ -1447,6 +1451,11 @@ export type DevTweaks = { * in the future. */ eager_evict?: boolean | null + /** + * Whether file-backed batches may use roaring membership filters when the + * key type supports them. + */ + enable_roaring?: boolean | null /** * Target number of cached bytes retained in each `FBuf` slab size class. * @@ -1533,6 +1542,10 @@ export type DevTweaks = { | null | boolean | null + | boolean + | null + | boolean + | null | number | null | boolean @@ -2024,7 +2037,7 @@ export type HealthStatus = { } /** - * Configuration for reading data via HTTP. + * Configuration for data input via HTTP. * * HTTP input adapters cannot be usefully configured as part of pipeline * configuration. Instead, instantiate them through the REST API as @@ -2037,6 +2050,28 @@ export type HttpInputConfig = { name: string } +/** + * Configuration for data output via HTTP. + * + * HTTP output adapters cannot be usefully configured as part of pipeline + * configuration. Instead, instantiate them through the REST API as + * `/pipelines/{pipeline_name}/egress`. + */ +export type HttpOutputConfig = { + /** + * Apply backpressure on the pipeline when the HTTP client cannot receive + * data fast enough. + * + * When this flag is set to false (the default), the HTTP connector drops data + * chunks if the client is not keeping up with its output. This prevents + * a slow HTTP client from slowing down the entire pipeline. + * + * When the flag is set to true, the connector waits for the client to receive + * each chunk and blocks the pipeline if the client cannot keep up. + */ + backpressure?: boolean +} + export type IcebergCatalogType = 'rest' | 'glue' /** @@ -3092,7 +3127,13 @@ export type PartialProgramInfo = { * change. */ export type PatchPipeline = { + /** + * Deprecated: use `metadata` instead. + * + * @deprecated + */ description?: string | null + metadata?: string | null name?: string | null program_code?: string | null program_config?: ProgramConfig | null @@ -3374,8 +3415,14 @@ export type PipelineInfo = { deployment_runtime_status_since?: string | null deployment_status: CombinedStatus deployment_status_since: string + /** + * Deprecated: use `metadata` instead. + * + * @deprecated + */ description: string id: PipelineId + metadata: string name: string platform_version: string program_code: string @@ -3446,8 +3493,14 @@ export type PipelineSelectedInfo = { deployment_runtime_status_since?: string | null deployment_status: CombinedStatus deployment_status_since: string + /** + * Deprecated: use `metadata` instead. + * + * @deprecated + */ description: string id: PipelineId + metadata: string name: string platform_version: string program_code?: string | null @@ -3518,7 +3571,13 @@ export type PipelineTemplateConfig = { * (for strings: an empty string `""`, for objects: an empty dictionary `{}`). */ export type PostPutPipeline = { + /** + * Deprecated: use `metadata` instead. + * + * @deprecated + */ description?: string | null + metadata?: string | null name: string program_code: string program_config?: ProgramConfig | null @@ -5196,6 +5255,7 @@ export type TransportConfig = name: 'http_input' } | { + config: HttpOutputConfig name: 'http_output' } | { @@ -6256,7 +6316,7 @@ export type HttpOutputData = { } query: { /** - * Output data format, e.g., 'csv' or 'json'. + * Output data format, either 'csv' or 'json'. */ format: string /** @@ -6425,7 +6485,7 @@ export type HttpInputData = { */ force: boolean /** - * Input data format, e.g., 'csv' or 'json'. + * Input data format, either `csv' or 'json'. */ format: string /** diff --git a/js-packages/web-console/src/lib/services/pipelineManager.ts b/js-packages/web-console/src/lib/services/pipelineManager.ts index 62fabe30edc..7bdb6cce4ec 100644 --- a/js-packages/web-console/src/lib/services/pipelineManager.ts +++ b/js-packages/web-console/src/lib/services/pipelineManager.ts @@ -243,6 +243,7 @@ const toPipelineThumb = ( ) => ({ name: pipeline.name, description: pipeline.description, + metadata: pipeline.metadata ?? '', storageStatus: pipeline.storage_status, ...consolidatePipelineStatus( pipeline.program_status, @@ -273,6 +274,7 @@ const toPipeline = < ) => ({ name: pipeline.name, description: pipeline.description ?? '', + metadata: pipeline.metadata ?? '', runtimeConfig: pipeline.runtime_config, programConfig: pipeline.program_config!, programCode: pipeline.program_code ?? '', @@ -294,6 +296,7 @@ const toExtendedPipeline = ({ deploymentStatusSince: pipeline.deployment_status_since, programStatusSince: pipeline.program_status_since, description: pipeline.description, + metadata: pipeline.metadata ?? '', id: pipeline.id, name: pipeline.name, programCode: pipeline.program_code ?? '', @@ -327,6 +330,7 @@ const toExtendedPipeline = ({ const fromPipeline = >(pipeline: T) => ({ name: pipeline?.name, description: pipeline?.description, + metadata: pipeline?.metadata, runtime_config: pipeline?.runtimeConfig, program_config: pipeline?.programConfig, program_code: pipeline?.programCode, @@ -529,17 +533,11 @@ export const getCheckpointStatus = (pipeline_name: string, options?: FetchOption } export const syncCheckpoint = (pipeline_name: string, options?: FetchOptions) => { - return mapResponse( - _syncCheckpoint({ path: { pipeline_name }, ...options }), - (v) => v - ) + return mapResponse(_syncCheckpoint({ path: { pipeline_name }, ...options }), (v) => v) } export const getCheckpointSyncStatus = (pipeline_name: string, options?: FetchOptions) => { - return mapResponse( - _getCheckpointSyncStatus({ path: { pipeline_name }, ...options }), - (v) => v - ) + return mapResponse(_getCheckpointSyncStatus({ path: { pipeline_name }, ...options }), (v) => v) } export const deletePipeline = async (pipeline_name: string, options?: FetchOptions) => { diff --git a/js-packages/web-console/src/routes/(system)/(authenticated)/+page.svelte b/js-packages/web-console/src/routes/(system)/(authenticated)/+page.svelte index 7d6a452fbb4..6dcccd13268 100644 --- a/js-packages/web-console/src/routes/(system)/(authenticated)/+page.svelte +++ b/js-packages/web-console/src/routes/(system)/(authenticated)/+page.svelte @@ -139,7 +139,7 @@
{:else if pipelines.pipelines.length} {@const ps = pipelines.pipelines} - + {#snippet preHeaderEnd()} {#if !selectedPipelines.length} diff --git a/openapi.json b/openapi.json index 7afdc8f826e..b9d597bb18c 100644 --- a/openapi.json +++ b/openapi.json @@ -578,6 +578,7 @@ "id": "67e55044-10b1-426f-9247-bb680e5fe0c8", "name": "example1", "description": "Description of the pipeline example1", + "metadata": "", "created_at": "1970-01-01T00:00:00Z", "version": 4, "platform_version": "v0", @@ -668,6 +669,7 @@ "id": "67e55044-10b1-426f-9247-bb680e5fe0c9", "name": "example2", "description": "Description of the pipeline example2", + "metadata": "", "created_at": "1970-01-01T00:00:00Z", "version": 1, "platform_version": "v0", @@ -791,6 +793,7 @@ "example": { "name": "example1", "description": "Description of the pipeline example1", + "metadata": null, "runtime_config": { "workers": 16, "max_rss_mb": null, @@ -861,6 +864,7 @@ "id": "67e55044-10b1-426f-9247-bb680e5fe0c8", "name": "example1", "description": "Description of the pipeline example1", + "metadata": "", "created_at": "1970-01-01T00:00:00Z", "version": 4, "platform_version": "v0", @@ -1044,6 +1048,7 @@ "id": "67e55044-10b1-426f-9247-bb680e5fe0c8", "name": "example1", "description": "Description of the pipeline example1", + "metadata": "", "created_at": "1970-01-01T00:00:00Z", "version": 4, "platform_version": "v0", @@ -1194,6 +1199,7 @@ "example": { "name": "example1", "description": "Description of the pipeline example1", + "metadata": null, "runtime_config": { "workers": 16, "max_rss_mb": null, @@ -1264,6 +1270,7 @@ "id": "67e55044-10b1-426f-9247-bb680e5fe0c8", "name": "example1", "description": "Description of the pipeline example1", + "metadata": "", "created_at": "1970-01-01T00:00:00Z", "version": 4, "platform_version": "v0", @@ -1364,6 +1371,7 @@ "id": "67e55044-10b1-426f-9247-bb680e5fe0c8", "name": "example1", "description": "Description of the pipeline example1", + "metadata": "", "created_at": "1970-01-01T00:00:00Z", "version": 4, "platform_version": "v0", @@ -1611,6 +1619,7 @@ "example": { "name": null, "description": "This is a new description", + "metadata": null, "runtime_config": null, "program_code": "CREATE TABLE table3 ( col3 INT );", "udf_rust": null, @@ -1633,6 +1642,7 @@ "id": "67e55044-10b1-426f-9247-bb680e5fe0c8", "name": "example1", "description": "Description of the pipeline example1", + "metadata": "", "created_at": "1970-01-01T00:00:00Z", "version": 4, "platform_version": "v0", @@ -6536,6 +6546,7 @@ "id": "67e55044-10b1-426f-9247-bb680e5fe0c8", "name": "example1", "description": "Description of the pipeline example1", + "metadata": "", "created_at": "1970-01-01T00:00:00Z", "version": 4, "platform_version": "v0", @@ -10672,6 +10683,12 @@ "description": "Partially update the pipeline (PATCH).\n\nNote that the patching only applies to the main fields, not subfields.\nFor instance, it is not possible to update only the number of workers;\nit is required to again pass the whole runtime configuration with the\nchange.", "properties": { "description": { + "type": "string", + "description": "Deprecated: use `metadata` instead.", + "deprecated": true, + "nullable": true + }, + "metadata": { "type": "string", "nullable": true }, @@ -11072,6 +11089,7 @@ "id", "name", "description", + "metadata", "created_at", "version", "platform_version", @@ -11182,11 +11200,16 @@ "format": "date-time" }, "description": { - "type": "string" + "type": "string", + "description": "Deprecated: use `metadata` instead.", + "deprecated": true }, "id": { "$ref": "#/components/schemas/PipelineId" }, + "metadata": { + "type": "string" + }, "name": { "type": "string" }, @@ -11332,6 +11355,7 @@ "id", "name", "description", + "metadata", "created_at", "version", "platform_version", @@ -11444,11 +11468,16 @@ "format": "date-time" }, "description": { - "type": "string" + "type": "string", + "description": "Deprecated: use `metadata` instead.", + "deprecated": true }, "id": { "$ref": "#/components/schemas/PipelineId" }, + "metadata": { + "type": "string" + }, "name": { "type": "string" }, @@ -11559,6 +11588,12 @@ ], "properties": { "description": { + "type": "string", + "description": "Deprecated: use `metadata` instead.", + "deprecated": true, + "nullable": true + }, + "metadata": { "type": "string", "nullable": true },