Skip to content

Commit fdb549f

Browse files
committed
[fda] fixes for state-consolidation.
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 764320e commit fdb549f

File tree

6 files changed

+95
-38
lines changed

6 files changed

+95
-38
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ members = [
2525
"crates/iceberg",
2626
"crates/storage",
2727
"crates/rest-api",
28-
"crates/ir"
28+
"crates/ir",
2929
]
3030
exclude = [
3131
"sql-to-dbsp-compiler/temp",

crates/dbsp/src/circuit/circuit_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6157,7 +6157,7 @@ impl CircuitHandle {
61576157
let start = Instant::now();
61586158
node.commit(base)?;
61596159
let elapsed = start.elapsed();
6160-
if elapsed >= Duration::from_millis(100) {
6160+
if elapsed >= Duration::from_secs(3) {
61616161
info!(
61626162
"{:?}: committing {} node took {:.2} s",
61636163
node.global_id(),

crates/fda/src/bench/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ pub(crate) async fn bench(client: Client, format: OutputFormat, args: BenchmarkA
469469
PipelineAction::Stop {
470470
name: pipeline_name.clone(),
471471
no_wait: false,
472-
no_checkpoint: true,
472+
checkpoint: false,
473473
},
474474
client.clone(),
475475
))
@@ -499,7 +499,7 @@ pub(crate) async fn bench(client: Client, format: OutputFormat, args: BenchmarkA
499499
PipelineAction::Stop {
500500
name: pipeline_name.clone(),
501501
no_wait: true,
502-
no_checkpoint: true,
502+
checkpoint: false,
503503
},
504504
client.clone(),
505505
))

crates/fda/src/cli.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,9 @@ pub enum PipelineAction {
312312
/// This is useful for dev purposes in case the Feldera source-code has changed.
313313
#[arg(long, short = 'r', default_value_t = false)]
314314
recompile: bool,
315-
/// Don't checkpoint the pipeline before restarting it.
316-
#[arg(long, short = 'n', default_value_t = false)]
317-
no_checkpoint: bool,
315+
/// Checkpoint the pipeline before restarting it.
316+
#[arg(long, short = 'c', default_value_t = false)]
317+
checkpoint: bool,
318318
/// Don't wait for pipeline to reach the status before returning.
319319
#[arg(long, short = 'n', default_value_t = false)]
320320
no_wait: bool,
@@ -325,9 +325,9 @@ pub enum PipelineAction {
325325
/// The name of the pipeline.
326326
#[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))]
327327
name: String,
328-
/// Don't checkpoint the pipeline before restarting it.
329-
#[arg(long, short = 'n', default_value_t = false)]
330-
no_checkpoint: bool,
328+
/// Checkpoint the pipeline before stopping it.
329+
#[arg(long, short = 'c', default_value_t = false)]
330+
checkpoint: bool,
331331
/// Don't wait for pipeline to reach the status before returning.
332332
#[arg(long, short = 'n', default_value_t = false)]
333333
no_wait: bool,
@@ -398,6 +398,17 @@ pub enum PipelineAction {
398398
/// The name of the pipeline.
399399
#[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))]
400400
name: String,
401+
/// Clears any associated storage first to force deletion of the pipeline.
402+
///
403+
/// EXAMPLES:
404+
///
405+
/// - fda delete --force my-pipeline
406+
///
407+
/// Is equivalent to:
408+
///
409+
/// - fda clear my-pipeline && fda delete my-pipeline
410+
#[arg(long, short = 'f')]
411+
force: bool,
401412
},
402413
/// Control an input connector belonging to a table of a pipeline.
403414
Connector {
@@ -504,6 +515,17 @@ pub enum PipelineAction {
504515
/// Or when ingesting data over HTTP.
505516
token: String,
506517
},
518+
/// Clear the storage resources of a pipeline.
519+
///
520+
/// Note that the pipeline must be stopped before clearing its storage resources.
521+
Clear {
522+
/// The name of the pipeline.
523+
#[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))]
524+
name: String,
525+
/// Don't wait for pipeline storage to clear before returning.
526+
#[arg(long, short = 'n', default_value_t = false)]
527+
no_wait: bool,
528+
},
507529
/// Benchmark the performance of a pipeline.
508530
///
509531
/// This command will perform the following steps

crates/fda/src/main.rs

Lines changed: 58 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -522,26 +522,22 @@ async fn wait_for_status(
522522
print_every_30_seconds = tokio::time::Instant::now();
523523
}
524524

525-
if wait_for != PipelineStatus::Stopped && pc.deployment_status == PipelineStatus::Stopped {
526-
if let Some(deployment_error) = &pc.deployment_error {
527-
if deployment_error.error_code == "StartFailedDueToFailedCompilation" {
528-
eprintln!("Pipeline failed to start due to the following compilation error:");
529-
eprintln!();
530-
eprintln!(
531-
"{}",
532-
deployment_error
533-
.details
534-
.as_object()
535-
.unwrap_or(&serde_json::Map::new())
536-
.get("compiler_error")
537-
.and_then(|e| e.as_str())
538-
.unwrap_or_default()
539-
);
540-
} else {
541-
eprintln!("{}", deployment_error.message);
542-
}
525+
if let Some(deployment_error) = &pc.deployment_error {
526+
if deployment_error.error_code == "StartFailedDueToFailedCompilation" {
527+
eprintln!("Pipeline failed to start due to the following compilation error:");
528+
eprintln!();
529+
eprintln!(
530+
"{}",
531+
deployment_error
532+
.details
533+
.as_object()
534+
.unwrap_or(&serde_json::Map::new())
535+
.get("compiler_error")
536+
.and_then(|e| e.as_str())
537+
.unwrap_or_default()
538+
);
543539
} else {
544-
eprintln!("Pipeline failed to reach status {:?}", wait_for);
540+
eprintln!("{}", deployment_error.message);
545541
}
546542
std::process::exit(1);
547543
}
@@ -818,7 +814,7 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
818814
PipelineAction::Restart {
819815
name,
820816
recompile,
821-
no_checkpoint,
817+
checkpoint,
822818
no_wait,
823819
} => {
824820
let current_status = client
@@ -836,7 +832,7 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
836832
let _r = client
837833
.post_pipeline_stop()
838834
.pipeline_name(name.clone())
839-
.force(no_checkpoint)
835+
.force(!checkpoint)
840836
.send()
841837
.await
842838
.map_err(handle_errors_fatal(
@@ -871,12 +867,12 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
871867
PipelineAction::Stop {
872868
name,
873869
no_wait,
874-
no_checkpoint,
870+
checkpoint,
875871
} => {
876872
let response = client
877873
.post_pipeline_stop()
878874
.pipeline_name(name.clone())
879-
.force(no_checkpoint)
875+
.force(!checkpoint)
880876
.send()
881877
.await
882878
.map_err(handle_errors_fatal(
@@ -899,6 +895,32 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
899895

900896
trace!("{:#?}", response);
901897
}
898+
PipelineAction::Clear { name, no_wait } => {
899+
let response = client
900+
.post_pipeline_clear()
901+
.pipeline_name(name.clone())
902+
.send()
903+
.await
904+
.map_err(handle_errors_fatal(
905+
client.baseurl().clone(),
906+
"Failed to clear pipeline",
907+
1,
908+
))
909+
.unwrap();
910+
911+
trace!("{:#?}", response);
912+
913+
if !no_wait {
914+
wait_for_status(
915+
&client,
916+
name.clone(),
917+
PipelineStatus::Stopped,
918+
"Clearing pipeline...",
919+
)
920+
.await;
921+
println!("Pipeline cleared successfully.");
922+
}
923+
}
902924
PipelineAction::Stats { name } => {
903925
let response = client
904926
.get_pipeline_stats()
@@ -977,7 +999,19 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
977999
}
9781000
}
9791001
}
980-
PipelineAction::Delete { name } => {
1002+
PipelineAction::Delete { name, force } => {
1003+
if force {
1004+
let _ = Box::pin(pipeline(
1005+
format,
1006+
PipelineAction::Clear {
1007+
name: name.clone(),
1008+
no_wait: false,
1009+
},
1010+
client.clone(),
1011+
))
1012+
.await;
1013+
}
1014+
9811015
let response = client
9821016
.delete_pipeline()
9831017
.pipeline_name(name)

crates/fda/test.bash

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ fda pipelines
5353

5454
# Cleanup, these commands might fail if the resources do not exist yet
5555
fda shutdown p1 || true
56-
fda delete p1 || true
57-
fda delete p2 || true
58-
fda delete pudf || true
56+
fda delete --force p1 || true
57+
fda delete --force p2 || true
58+
fda clear pudf && fda delete pudf || true
5959
fda delete unknown || true
60-
fda delete punknown || true
60+
fda delete --force punknown || true
6161
fda apikey delete a || true
6262

6363
# Tests
@@ -111,6 +111,7 @@ if $enterprise; then
111111
else
112112
enterprise_only=fail_on_success
113113
fi
114+
$enterprise_only fda clear p1
114115
$enterprise_only fda set-config p1 fault_tolerance true
115116
fda set-config p1 fault_tolerance false
116117
fda set-config p1 fault_tolerance none

0 commit comments

Comments
 (0)