@@ -292,6 +292,35 @@ async fn read_program_code(program_path: String, stdin: bool) -> Result<String,
292292 }
293293}
294294
295+ async fn wait_for_status (
296+ client : & Client ,
297+ name : String ,
298+ wait_for : PipelineStatus ,
299+ waiting_text : & str ,
300+ ) {
301+ let mut print_every_30_seconds = tokio:: time:: Instant :: now ( ) ;
302+ let mut is_transitioning = true ;
303+ while is_transitioning {
304+ let pc = client
305+ . get_pipeline ( )
306+ . pipeline_name ( name. clone ( ) )
307+ . send ( )
308+ . await
309+ . map_err ( handle_errors_fatal (
310+ client. baseurl . clone ( ) ,
311+ "Failed to get program config" ,
312+ 1 ,
313+ ) )
314+ . unwrap ( ) ;
315+ is_transitioning = pc. deployment_status != wait_for;
316+ if print_every_30_seconds. elapsed ( ) . as_secs ( ) > 30 {
317+ info ! ( "{}" , waiting_text) ;
318+ print_every_30_seconds = tokio:: time:: Instant :: now ( ) ;
319+ }
320+ sleep ( Duration :: from_millis ( 500 ) ) . await ;
321+ }
322+ }
323+
295324async fn pipeline ( action : PipelineAction , client : Client ) {
296325 match action {
297326 PipelineAction :: Create {
@@ -410,16 +439,25 @@ async fn pipeline(action: PipelineAction, client: Client) {
410439 PipelineAction :: Pause { name } => {
411440 let response = client
412441 . post_pipeline_action ( )
413- . pipeline_name ( name)
442+ . pipeline_name ( name. clone ( ) )
414443 . action ( "pause" )
415444 . send ( )
416445 . await
417446 . map_err ( handle_errors_fatal (
418- client. baseurl ,
447+ client. baseurl . clone ( ) ,
419448 "Failed to pause pipeline" ,
420449 1 ,
421450 ) )
422451 . unwrap ( ) ;
452+
453+ wait_for_status (
454+ & client,
455+ name. clone ( ) ,
456+ PipelineStatus :: Paused ,
457+ "Pausing the pipeline..." ,
458+ )
459+ . await ;
460+
423461 println ! ( "Pipeline paused successfully." ) ;
424462 trace ! ( "{:#?}" , response) ;
425463 }
@@ -437,44 +475,39 @@ async fn pipeline(action: PipelineAction, client: Client) {
437475 ) )
438476 . unwrap ( ) ;
439477
440- let mut print_every_30_seconds = tokio:: time:: Instant :: now ( ) ;
441- let mut shutting_down = true ;
442- while shutting_down {
443- let pc = client
444- . get_pipeline ( )
445- . pipeline_name ( name. clone ( ) )
446- . send ( )
447- . await
448- . map_err ( handle_errors_fatal (
449- client. baseurl . clone ( ) ,
450- "Failed to get program config" ,
451- 1 ,
452- ) )
453- . unwrap ( ) ;
454- shutting_down = !matches ! ( pc. deployment_status, PipelineStatus :: Shutdown ) ;
455- if print_every_30_seconds. elapsed ( ) . as_secs ( ) > 30 {
456- info ! ( "Shutting down the pipeline..." ) ;
457- print_every_30_seconds = tokio:: time:: Instant :: now ( ) ;
458- }
459- sleep ( Duration :: from_millis ( 500 ) ) . await ;
460- }
461- println ! ( "Pipeline shutdown successful." ) ;
478+ wait_for_status (
479+ & client,
480+ name. clone ( ) ,
481+ PipelineStatus :: Shutdown ,
482+ "Shutting down the pipeline..." ,
483+ )
484+ . await ;
462485
486+ println ! ( "Pipeline shutdown successful." ) ;
463487 let _ = Box :: pin ( pipeline ( PipelineAction :: Start { name, recompile } , client) ) . await ;
464488 }
465489 PipelineAction :: Shutdown { name } => {
466490 let response = client
467491 . post_pipeline_action ( )
468- . pipeline_name ( name)
492+ . pipeline_name ( name. clone ( ) )
469493 . action ( "shutdown" )
470494 . send ( )
471495 . await
472496 . map_err ( handle_errors_fatal (
473- client. baseurl ,
497+ client. baseurl . clone ( ) ,
474498 "Failed to stop pipeline" ,
475499 1 ,
476500 ) )
477501 . unwrap ( ) ;
502+
503+ wait_for_status (
504+ & client,
505+ name. clone ( ) ,
506+ PipelineStatus :: Shutdown ,
507+ "Shutting down the pipeline..." ,
508+ )
509+ . await ;
510+
478511 println ! ( "Pipeline shutdown successful." ) ;
479512 trace ! ( "{:#?}" , response) ;
480513 }
0 commit comments