Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion crates/fda/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,15 @@ pub enum PipelineAction {
)]
stdin: bool,
},
/// Copy a pipeline's program and configuration into a new pipeline.
#[clap(aliases = &["clone"])]
Copy {
/// The name of the pipeline to copy from.
#[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))]
source: String,
/// The name of the new pipeline.
destination: String,
},
/// Start a pipeline.
///
/// If the pipeline is compiling it will wait for the compilation to finish.
Expand Down Expand Up @@ -1011,7 +1020,7 @@ pub enum ConnectorAction {

#[cfg(test)]
mod tests {
use crate::cli::Cli;
use crate::cli::{Cli, Commands, PipelineAction};
use clap::Parser;

/// [clap] will panic inside `try_parse` if it finds anything invalid in the
Expand All @@ -1021,4 +1030,20 @@ mod tests {
fn basic_validation() {
let _ = Cli::try_parse();
}

#[test]
fn parse_pipeline_copy_command_and_alias() {
for command in ["copy", "clone"] {
let cli = Cli::try_parse_from(["fda", command, "source", "destination"])
.expect("copy command should parse");

assert!(matches!(
cli.command,
Commands::Pipeline(PipelineAction::Copy {
source,
destination
}) if source == "source" && destination == "destination"
));
}
}
}
78 changes: 78 additions & 0 deletions crates/fda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,84 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
std::process::exit(1);
}
}
PipelineAction::Copy {
source,
destination,
} => {
let source_pipeline = client
.get_pipeline()
.pipeline_name(source.clone())
.send()
.await
.map_err(handle_errors_fatal(
client.baseurl().clone(),
"Failed to get source pipeline",
1,
))
.unwrap()
.into_inner();

let Some(program_code) = source_pipeline.program_code else {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for this PR, but I wonder how this can even be None, it should always have a value (maybe cc @snkas knows)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good point. afaict this should not be none here.

for this call we don't pass a selector, and the api default selector is all, so program_code should come back on a current server. same for runtime_config and program_config.

i kept the guards bc the generated client type has these as option, since selected pipeline responses can omit fields when a narrower selector is used. but imo for fda copy this is basically unreachable unless the server is old or returning a weird response.

ltm maybe the better move is to treat this as an invariant instead of showing the upgrade msg, since that msg makes it look like the user can fix it. lgtm to change if you prefer.

eprintln!(
"Source pipeline response did not include program code. {}",
UPGRADE_NOTICE
);
std::process::exit(1);
};
let Some(runtime_config) = source_pipeline.runtime_config else {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for this one

eprintln!(
"Source pipeline response did not include runtime configuration. {}",
UPGRADE_NOTICE
);
std::process::exit(1);
};
let Some(program_config) = source_pipeline.program_config else {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this one

eprintln!(
"Source pipeline response did not include compilation configuration. {}",
UPGRADE_NOTICE
);
std::process::exit(1);
};

let response = client
.post_pipeline()
.body(PostPutPipeline {
description: source_pipeline.description,
tags: source_pipeline.tags,
name: destination.clone(),
program_code,
udf_rust: source_pipeline.udf_rust,
udf_toml: source_pipeline.udf_toml,
program_config: Some(program_config),
runtime_config: Some(runtime_config),
})
.send()
.await
.map_err(handle_errors_fatal(
client.baseurl().clone(),
"Failed to copy pipeline",
1,
))
.unwrap();

match format {
OutputFormat::Text => {
println!("Pipeline copied successfully.");
debug!("{:#?}", response);
}
OutputFormat::Json => {
println!(
"{}",
serde_json::to_string_pretty(&response.into_inner())
.expect("Failed to serialize pipeline response")
);
}
_ => {
eprintln!("Unsupported output format: {}", format);
std::process::exit(1);
}
}
}
PipelineAction::Start {
name,
recompile,
Expand Down
2 changes: 1 addition & 1 deletion crates/fda/test.bash
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ CREATE VIEW example_count WITH ('connectors' = '[{ "name": "c", "transport": { "
EOF

fda create p1 program.sql
fda program get p1 | fda create p2 -s
fda copy p1 p2
compare_output "fda program get p1" "fda program get p2"
fda program set-config p1 --profile dev
fda program set-config p1 --profile optimized
Expand Down