@@ -278,28 +278,43 @@ fn patch_runtime_config(
278278 Ok ( ( ) )
279279}
280280
281- async fn read_program_code ( program_path : Option < String > , stdin : bool ) -> Result < String , ( ) > {
282- if program_path. is_none ( ) && !stdin {
283- eprintln ! ( "No program code provided. Use `--stdin` to read from stdin." ) ;
284- return Err ( ( ) ) ;
281+ /// Reads a file, returning the content as a string.
282+ ///
283+ /// If the file path is `None`, returns `None`.
284+ /// If the file cannot be read, returns `Err(())`.
285+ async fn read_file ( file_path : Option < String > ) -> Result < Option < String > , ( ) > {
286+ if let Some ( path) = file_path {
287+ match tokio:: fs:: read_to_string ( path. as_str ( ) ) . await {
288+ Ok ( udf_code) => {
289+ debug ! ( "Read from file: {}" , path) ;
290+ Ok ( Some ( udf_code) )
291+ }
292+ Err ( e) => {
293+ eprintln ! ( "Failed to read '{}': {}" , path, e) ;
294+ Err ( ( ) )
295+ }
296+ }
297+ } else {
298+ Ok ( None )
285299 }
286- let program_path = program_path. unwrap_or_else ( || "" . to_string ( ) ) ;
300+ }
301+
302+ async fn read_program_code (
303+ program_path : Option < String > ,
304+ stdin : bool ,
305+ ) -> Result < Option < String > , ( ) > {
287306 if stdin {
288307 let mut program_code = String :: new ( ) ;
289308 let mut stdin = std:: io:: stdin ( ) ;
290309 if stdin. read_to_string ( & mut program_code) . is_ok ( ) {
291310 debug ! ( "Read program code from stdin" ) ;
292- Ok ( program_code)
311+ Ok ( Some ( program_code) )
293312 } else {
294313 eprintln ! ( "Failed to read program code from stdin" ) ;
295314 Err ( ( ) )
296315 }
297- } else if let Ok ( program_code) = tokio:: fs:: read_to_string ( program_path. as_str ( ) ) . await {
298- debug ! ( "Read program code from file: {}" , program_path) ;
299- Ok ( program_code)
300316 } else {
301- eprintln ! ( "Failed to read program code from file: {}" , program_path) ;
302- Err ( ( ) )
317+ read_file ( program_path) . await
303318 }
304319}
305320
@@ -347,17 +362,23 @@ async fn pipeline(action: PipelineAction, client: Client) {
347362 name,
348363 program_path,
349364 profile,
365+ udf_rs,
366+ udf_toml,
350367 stdin,
351368 } => {
352- if let Ok ( program_code) = read_program_code ( program_path, stdin) . await {
369+ if let ( Ok ( program_code) , Ok ( udf_rust) , Ok ( udf_toml) ) = (
370+ read_program_code ( program_path, stdin) . await ,
371+ read_file ( udf_rs) . await ,
372+ read_file ( udf_toml) . await ,
373+ ) {
353374 let response = client
354375 . post_pipeline ( )
355376 . body ( PipelineDescr {
356377 description : "" . to_string ( ) ,
357378 name : name. to_string ( ) ,
358- program_code,
359- udf_rust : None ,
360- udf_toml : None ,
379+ program_code : program_code . unwrap_or_default ( ) ,
380+ udf_rust,
381+ udf_toml,
361382 program_config : profile. into ( ) ,
362383 runtime_config : RuntimeConfig :: default ( ) ,
363384 } )
@@ -372,7 +393,7 @@ async fn pipeline(action: PipelineAction, client: Client) {
372393 println ! ( "Pipeline created successfully." ) ;
373394 debug ! ( "{:#?}" , response) ;
374395 } else {
375- // Already reported error in read_program_code.
396+ // Already reported error in read_program_code or read_file .
376397 std:: process:: exit ( 1 ) ;
377398 }
378399 }
@@ -919,7 +940,11 @@ async fn endpoint(
919940
920941async fn program ( action : ProgramAction , client : Client ) {
921942 match action {
922- ProgramAction :: Get { name } => {
943+ ProgramAction :: Get {
944+ name,
945+ udf_rs,
946+ udf_toml,
947+ } => {
923948 let response = client
924949 . get_pipeline ( )
925950 . pipeline_name ( name)
@@ -931,7 +956,15 @@ async fn program(action: ProgramAction, client: Client) {
931956 1 ,
932957 ) )
933958 . unwrap ( ) ;
934- println ! ( "{}" , response. program_code) ;
959+ if !udf_rs && !udf_toml {
960+ println ! ( "{}" , response. program_code) ;
961+ }
962+ if udf_rs {
963+ println ! ( "{}" , response. udf_rust) ;
964+ }
965+ if udf_toml {
966+ println ! ( "{}" , response. udf_toml) ;
967+ }
935968 }
936969 ProgramAction :: Config { name } => {
937970 let response = client
@@ -997,15 +1030,21 @@ async fn program(action: ProgramAction, client: Client) {
9971030 ProgramAction :: Set {
9981031 name,
9991032 program_path,
1033+ udf_rs,
1034+ udf_toml,
10001035 stdin,
10011036 } => {
1002- if let Ok ( program_code) = read_program_code ( program_path, stdin) . await {
1037+ if let ( Ok ( program_code) , Ok ( udf_rust) , Ok ( udf_toml) ) = (
1038+ read_program_code ( program_path, stdin) . await ,
1039+ read_file ( udf_rs) . await ,
1040+ read_file ( udf_toml) . await ,
1041+ ) {
10031042 let pp = PatchPipeline {
10041043 description : None ,
10051044 name : None ,
1006- program_code : Some ( program_code ) ,
1007- udf_rust : None ,
1008- udf_toml : None ,
1045+ program_code,
1046+ udf_rust,
1047+ udf_toml,
10091048 program_config : None ,
10101049 runtime_config : None ,
10111050 } ;
@@ -1023,7 +1062,7 @@ async fn program(action: ProgramAction, client: Client) {
10231062 . unwrap ( ) ;
10241063 println ! ( "Program updated successfully." ) ;
10251064 } else {
1026- // Already reported error in read_program_code.
1065+ // Already reported error in read_program_code or read_file .
10271066 std:: process:: exit ( 1 ) ;
10281067 }
10291068 }
0 commit comments