Skip to content

Commit 5ad966f

Browse files
committed
Wait for status to reach desired for pause commands.
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 2dfbae0 commit 5ad966f

1 file changed

Lines changed: 59 additions & 26 deletions

File tree

crates/fda/src/main.rs

Lines changed: 59 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,35 @@ async fn read_program_code(program_path: String, stdin: bool) -> Result<String,
292292
}
293293
}
294294

295+
async fn wait_for_status(
296+
client: &Client,
297+
name: String,
298+
wait_for: PipelineStatus,
299+
waiting_text: &str,
300+
) {
301+
let mut print_every_30_seconds = tokio::time::Instant::now();
302+
let mut is_transitioning = true;
303+
while is_transitioning {
304+
let pc = client
305+
.get_pipeline()
306+
.pipeline_name(name.clone())
307+
.send()
308+
.await
309+
.map_err(handle_errors_fatal(
310+
client.baseurl.clone(),
311+
"Failed to get program config",
312+
1,
313+
))
314+
.unwrap();
315+
is_transitioning = pc.deployment_status != wait_for;
316+
if print_every_30_seconds.elapsed().as_secs() > 30 {
317+
info!("{}", waiting_text);
318+
print_every_30_seconds = tokio::time::Instant::now();
319+
}
320+
sleep(Duration::from_millis(500)).await;
321+
}
322+
}
323+
295324
async fn pipeline(action: PipelineAction, client: Client) {
296325
match action {
297326
PipelineAction::Create {
@@ -410,16 +439,25 @@ async fn pipeline(action: PipelineAction, client: Client) {
410439
PipelineAction::Pause { name } => {
411440
let response = client
412441
.post_pipeline_action()
413-
.pipeline_name(name)
442+
.pipeline_name(name.clone())
414443
.action("pause")
415444
.send()
416445
.await
417446
.map_err(handle_errors_fatal(
418-
client.baseurl,
447+
client.baseurl.clone(),
419448
"Failed to pause pipeline",
420449
1,
421450
))
422451
.unwrap();
452+
453+
wait_for_status(
454+
&client,
455+
name.clone(),
456+
PipelineStatus::Paused,
457+
"Pausing the pipeline...",
458+
)
459+
.await;
460+
423461
println!("Pipeline paused successfully.");
424462
trace!("{:#?}", response);
425463
}
@@ -437,44 +475,39 @@ async fn pipeline(action: PipelineAction, client: Client) {
437475
))
438476
.unwrap();
439477

440-
let mut print_every_30_seconds = tokio::time::Instant::now();
441-
let mut shutting_down = true;
442-
while shutting_down {
443-
let pc = client
444-
.get_pipeline()
445-
.pipeline_name(name.clone())
446-
.send()
447-
.await
448-
.map_err(handle_errors_fatal(
449-
client.baseurl.clone(),
450-
"Failed to get program config",
451-
1,
452-
))
453-
.unwrap();
454-
shutting_down = !matches!(pc.deployment_status, PipelineStatus::Shutdown);
455-
if print_every_30_seconds.elapsed().as_secs() > 30 {
456-
info!("Shutting down the pipeline...");
457-
print_every_30_seconds = tokio::time::Instant::now();
458-
}
459-
sleep(Duration::from_millis(500)).await;
460-
}
461-
println!("Pipeline shutdown successful.");
478+
wait_for_status(
479+
&client,
480+
name.clone(),
481+
PipelineStatus::Shutdown,
482+
"Shutting down the pipeline...",
483+
)
484+
.await;
462485

486+
println!("Pipeline shutdown successful.");
463487
let _ = Box::pin(pipeline(PipelineAction::Start { name, recompile }, client)).await;
464488
}
465489
PipelineAction::Shutdown { name } => {
466490
let response = client
467491
.post_pipeline_action()
468-
.pipeline_name(name)
492+
.pipeline_name(name.clone())
469493
.action("shutdown")
470494
.send()
471495
.await
472496
.map_err(handle_errors_fatal(
473-
client.baseurl,
497+
client.baseurl.clone(),
474498
"Failed to stop pipeline",
475499
1,
476500
))
477501
.unwrap();
502+
503+
wait_for_status(
504+
&client,
505+
name.clone(),
506+
PipelineStatus::Shutdown,
507+
"Shutting down the pipeline...",
508+
)
509+
.await;
510+
478511
println!("Pipeline shutdown successful.");
479512
trace!("{:#?}", response);
480513
}

0 commit comments

Comments
 (0)