@@ -522,26 +522,22 @@ async fn wait_for_status(
522522 print_every_30_seconds = tokio:: time:: Instant :: now ( ) ;
523523 }
524524
525- if wait_for != PipelineStatus :: Stopped && pc. deployment_status == PipelineStatus :: Stopped {
526- if let Some ( deployment_error) = & pc. deployment_error {
527- if deployment_error. error_code == "StartFailedDueToFailedCompilation" {
528- eprintln ! ( "Pipeline failed to start due to the following compilation error:" ) ;
529- eprintln ! ( ) ;
530- eprintln ! (
531- "{}" ,
532- deployment_error
533- . details
534- . as_object( )
535- . unwrap_or( & serde_json:: Map :: new( ) )
536- . get( "compiler_error" )
537- . and_then( |e| e. as_str( ) )
538- . unwrap_or_default( )
539- ) ;
540- } else {
541- eprintln ! ( "{}" , deployment_error. message) ;
542- }
525+ if let Some ( deployment_error) = & pc. deployment_error {
526+ if deployment_error. error_code == "StartFailedDueToFailedCompilation" {
527+ eprintln ! ( "Pipeline failed to start due to the following compilation error:" ) ;
528+ eprintln ! ( ) ;
529+ eprintln ! (
530+ "{}" ,
531+ deployment_error
532+ . details
533+ . as_object( )
534+ . unwrap_or( & serde_json:: Map :: new( ) )
535+ . get( "compiler_error" )
536+ . and_then( |e| e. as_str( ) )
537+ . unwrap_or_default( )
538+ ) ;
543539 } else {
544- eprintln ! ( "Pipeline failed to reach status {:? }" , wait_for ) ;
540+ eprintln ! ( "{ }" , deployment_error . message ) ;
545541 }
546542 std:: process:: exit ( 1 ) ;
547543 }
@@ -818,7 +814,7 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
818814 PipelineAction :: Restart {
819815 name,
820816 recompile,
821- no_checkpoint ,
817+ checkpoint ,
822818 no_wait,
823819 } => {
824820 let current_status = client
@@ -836,7 +832,7 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
836832 let _r = client
837833 . post_pipeline_stop ( )
838834 . pipeline_name ( name. clone ( ) )
839- . force ( no_checkpoint )
835+ . force ( !checkpoint )
840836 . send ( )
841837 . await
842838 . map_err ( handle_errors_fatal (
@@ -871,12 +867,12 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
871867 PipelineAction :: Stop {
872868 name,
873869 no_wait,
874- no_checkpoint ,
870+ checkpoint ,
875871 } => {
876872 let response = client
877873 . post_pipeline_stop ( )
878874 . pipeline_name ( name. clone ( ) )
879- . force ( no_checkpoint )
875+ . force ( !checkpoint )
880876 . send ( )
881877 . await
882878 . map_err ( handle_errors_fatal (
@@ -899,6 +895,32 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
899895
900896 trace ! ( "{:#?}" , response) ;
901897 }
898+ PipelineAction :: Clear { name, no_wait } => {
899+ let response = client
900+ . post_pipeline_clear ( )
901+ . pipeline_name ( name. clone ( ) )
902+ . send ( )
903+ . await
904+ . map_err ( handle_errors_fatal (
905+ client. baseurl ( ) . clone ( ) ,
906+ "Failed to clear pipeline" ,
907+ 1 ,
908+ ) )
909+ . unwrap ( ) ;
910+
911+ trace ! ( "{:#?}" , response) ;
912+
913+ if !no_wait {
914+ wait_for_status (
915+ & client,
916+ name. clone ( ) ,
917+ PipelineStatus :: Stopped ,
918+ "Clearing pipeline..." ,
919+ )
920+ . await ;
921+ println ! ( "Pipeline cleared successfully." ) ;
922+ }
923+ }
902924 PipelineAction :: Stats { name } => {
903925 let response = client
904926 . get_pipeline_stats ( )
@@ -977,7 +999,19 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
977999 }
9781000 }
9791001 }
980- PipelineAction :: Delete { name } => {
1002+ PipelineAction :: Delete { name, force } => {
1003+ if force {
1004+ let _ = Box :: pin ( pipeline (
1005+ format,
1006+ PipelineAction :: Clear {
1007+ name : name. clone ( ) ,
1008+ no_wait : false ,
1009+ } ,
1010+ client. clone ( ) ,
1011+ ) )
1012+ . await ;
1013+ }
1014+
9811015 let response = client
9821016 . delete_pipeline ( )
9831017 . pipeline_name ( name)
0 commit comments