@@ -5,8 +5,7 @@ use crate::db::storage_postgres::StoragePostgres;
55use crate :: db:: types:: common:: Version ;
66use crate :: db:: types:: pipeline:: { ExtendedPipelineDescr , PipelineId } ;
77use crate :: db:: types:: program:: {
8- generate_program_info_from_schema, CompilationProfile , ProgramInfo , ProgramStatus ,
9- SqlCompilerMessage ,
8+ generate_program_info, CompilationProfile , ProgramInfo , ProgramStatus , SqlCompilerMessage ,
109} ;
1110use crate :: db:: types:: tenant:: TenantId ;
1211use crate :: error:: ManagerError ;
@@ -201,7 +200,7 @@ impl Compiler {
201200 )
202201 } ) ?;
203202
204- Compiler :: write_project_toml ( config, pipeline_id) . await ?;
203+ Compiler :: write_project_toml ( config, "" , pipeline_id) . await ?;
205204
206205 fs:: write ( & rust_file_path, "fn main() {}" )
207206 . await
@@ -364,6 +363,7 @@ inherits = "release"
364363 /// Generate project-level `Cargo.toml`.
365364 async fn write_project_toml (
366365 config : & CompilerConfig ,
366+ udf_toml : & str ,
367367 pipeline_id : PipelineId ,
368368 ) -> Result < ( ) , ManagerError > {
369369 let template_path = config. project_toml_template_path ( ) ;
@@ -385,6 +385,18 @@ inherits = "release"
385385 )
386386 . replace ( "../../crates" , & format ! ( "{p}/crates" ) )
387387 . replace ( "../lib" , & format ! ( "{}" , config. sql_lib_path( ) . display( ) ) ) ;
388+
389+ let udf_dependencies = format ! (
390+ r#"# START: UDF dependencies
391+ {}
392+ # END: UDF dependencies
393+ "# ,
394+ & udf_toml
395+ ) ;
396+ project_toml_code = project_toml_code. replace (
397+ "[dependencies]" ,
398+ & format ! ( "[dependencies]\n {udf_dependencies}" ) ,
399+ ) ;
388400 debug ! ( "TOML:\n {project_toml_code}" ) ;
389401
390402 let toml_path = config. project_toml_path ( pipeline_id) ;
@@ -712,9 +724,16 @@ inherits = "release"
712724 . map_err( |e| {
713725 ManagerError :: io_error( format!( "reading '{}'" , schema_path. display( ) ) , e)
714726 } ) ?;
727+
728+ let stub_path = config. udf_stub_path( pipeline_id) ;
729+ let stubs = fs:: read_to_string( & stub_path) . await
730+ . map_err( |e| {
731+ ManagerError :: io_error( format!( "reading '{}'" , stub_path. display( ) ) , e)
732+ } ) ?;
733+
715734 let schema: ProgramSchema = serde_json:: from_str( & schema_json)
716735 . map_err( |e| { ManagerError :: invalid_program_schema( e. to_string( ) ) } ) ?;
717- match generate_program_info_from_schema ( schema) {
736+ match generate_program_info ( schema, stubs ) {
718737 Ok ( program_info) => {
719738 // SQL compiler succeeded -- start the Rust job and update the
720739 // program schema.
@@ -940,19 +959,15 @@ impl CompilationJob {
940959 ManagerError :: io_error ( format ! ( "creating error log '{}'" , stderr_path. display( ) ) , e)
941960 } ) ?;
942961
943- // `main.rs` file.
944- let rust_file = File :: create ( & rust_file_path) . await . map_err ( |e| {
945- ManagerError :: io_error (
946- format ! ( "failed to create '{}'" , rust_file_path. display( ) ) ,
947- e,
948- )
949- } ) ?;
950-
951962 // Run compiler, direct output to `main.rs`.
963+ // It will also generate UDF stubs in the same directory as main.rs and program schema
964+ // in `schema_path`.
952965 let schema_path = config. schema_path ( pipeline_id) ;
953966 let compiler_process = Command :: new ( config. sql_compiler_path ( ) )
954967 . arg ( "-js" )
955968 . arg ( schema_path)
969+ . arg ( "-o" )
970+ . arg ( rust_file_path)
956971 . arg ( sql_file_path. as_os_str ( ) )
957972 . arg ( "-i" )
958973 . arg ( "-je" )
@@ -962,7 +977,6 @@ impl CompilationJob {
962977 . arg ( "lower" )
963978 . stdin ( Stdio :: null ( ) )
964979 . stderr ( Stdio :: from ( err_file. into_std ( ) . await ) )
965- . stdout ( Stdio :: from ( rust_file. into_std ( ) . await ) )
966980 . spawn ( )
967981 . map_err ( |e| {
968982 ManagerError :: io_error (
@@ -1000,8 +1014,15 @@ impl CompilationJob {
10001014 . map_err ( |e| ManagerError :: io_error ( format ! ( "writing '{}'" , rust_path. display( ) ) , e) ) ?;
10011015 drop ( main_rs) ;
10021016
1017+ let udf_path = config. udf_path ( pipeline_id) ;
1018+ fs:: write ( & udf_path, & job. pipeline . udf_rust )
1019+ . await
1020+ . map_err ( |e| ManagerError :: io_error ( format ! ( "writing '{}'" , udf_path. display( ) ) , e) ) ?;
1021+
10031022 // Write `project/Cargo.toml`.
1004- Compiler :: write_project_toml ( config, pipeline_id) . await ?;
1023+ Compiler :: write_project_toml ( config, & job. pipeline . udf_toml , pipeline_id) . await ?;
1024+
1025+ //Compiler::create_udf_crate(config, pipeline_id, &job.pipeline).await?;
10051026
10061027 // Write workspace `Cargo.toml`. The workspace contains SQL libs and the
10071028 // generated project crate.
@@ -1076,7 +1097,6 @@ mod test {
10761097 auth:: TenantRecord , compiler:: ProgramStatus , config:: CompilerConfig , db:: storage:: Storage ,
10771098 } ;
10781099 use feldera_types:: config:: RuntimeConfig ;
1079- use feldera_types:: program_schema:: ProgramSchema ;
10801100 use std:: { fs:: File , sync:: Arc } ;
10811101 use tempfile:: TempDir ;
10821102 use tokio:: { fs, sync:: Mutex } ;
@@ -1099,6 +1119,8 @@ mod test {
10991119 description : "Description of the pipeline" . to_string ( ) ,
11001120 runtime_config : RuntimeConfig :: from_yaml ( "" ) ,
11011121 program_code : "code-not-used" . to_string ( ) ,
1122+ udf_rust : "" . to_string ( ) ,
1123+ udf_toml : "" . to_string ( ) ,
11021124 program_config : ProgramConfig {
11031125 profile : Some ( CompilationProfile :: Unoptimized ) ,
11041126 } ,
@@ -1159,19 +1181,7 @@ mod test {
11591181 . unwrap ( ) ;
11601182 db. lock ( )
11611183 . await
1162- . transit_program_status_to_compiling_rust (
1163- tid,
1164- pid,
1165- vid,
1166- & ProgramInfo {
1167- schema : ProgramSchema {
1168- inputs : vec ! [ ] ,
1169- outputs : vec ! [ ] ,
1170- } ,
1171- input_connectors : Default :: default ( ) ,
1172- output_connectors : Default :: default ( ) ,
1173- } ,
1174- )
1184+ . transit_program_status_to_compiling_rust ( tid, pid, vid, & ProgramInfo :: default ( ) )
11751185 . await
11761186 . unwrap ( ) ;
11771187 Compiler :: reconcile_local_state ( & conf, & db) . await . unwrap ( ) ;
@@ -1185,19 +1195,7 @@ mod test {
11851195 . unwrap ( ) ;
11861196 db. lock ( )
11871197 . await
1188- . transit_program_status_to_compiling_rust (
1189- tid,
1190- pid,
1191- vid,
1192- & ProgramInfo {
1193- schema : ProgramSchema {
1194- inputs : vec ! [ ] ,
1195- outputs : vec ! [ ] ,
1196- } ,
1197- input_connectors : Default :: default ( ) ,
1198- output_connectors : Default :: default ( ) ,
1199- } ,
1200- )
1198+ . transit_program_status_to_compiling_rust ( tid, pid, vid, & ProgramInfo :: default ( ) )
12011199 . await
12021200 . unwrap ( ) ;
12031201 db. lock ( )
@@ -1236,19 +1234,7 @@ mod test {
12361234 . unwrap ( ) ;
12371235 db. lock ( )
12381236 . await
1239- . transit_program_status_to_compiling_rust (
1240- tid,
1241- pid,
1242- vid,
1243- & ProgramInfo {
1244- schema : ProgramSchema {
1245- inputs : vec ! [ ] ,
1246- outputs : vec ! [ ] ,
1247- } ,
1248- input_connectors : Default :: default ( ) ,
1249- output_connectors : Default :: default ( ) ,
1250- } ,
1251- )
1237+ . transit_program_status_to_compiling_rust ( tid, pid, vid, & ProgramInfo :: default ( ) )
12521238 . await
12531239 . unwrap ( ) ;
12541240 db. lock ( )
@@ -1268,19 +1254,7 @@ mod test {
12681254 . unwrap ( ) ;
12691255 db. lock ( )
12701256 . await
1271- . transit_program_status_to_compiling_rust (
1272- tid,
1273- pid,
1274- vid,
1275- & ProgramInfo {
1276- schema : ProgramSchema {
1277- inputs : vec ! [ ] ,
1278- outputs : vec ! [ ] ,
1279- } ,
1280- input_connectors : Default :: default ( ) ,
1281- output_connectors : Default :: default ( ) ,
1282- } ,
1283- )
1257+ . transit_program_status_to_compiling_rust ( tid, pid, vid, & ProgramInfo :: default ( ) )
12841258 . await
12851259 . unwrap ( ) ;
12861260 db. lock ( )
@@ -1347,36 +1321,12 @@ mod test {
13471321 . unwrap ( ) ;
13481322 db. lock ( )
13491323 . await
1350- . transit_program_status_to_compiling_rust (
1351- tid,
1352- pid1,
1353- v1,
1354- & ProgramInfo {
1355- schema : ProgramSchema {
1356- inputs : vec ! [ ] ,
1357- outputs : vec ! [ ] ,
1358- } ,
1359- input_connectors : Default :: default ( ) ,
1360- output_connectors : Default :: default ( ) ,
1361- } ,
1362- )
1324+ . transit_program_status_to_compiling_rust ( tid, pid1, v1, & ProgramInfo :: default ( ) )
13631325 . await
13641326 . unwrap ( ) ;
13651327 db. lock ( )
13661328 . await
1367- . transit_program_status_to_compiling_rust (
1368- tid,
1369- pid2,
1370- v2,
1371- & ProgramInfo {
1372- schema : ProgramSchema {
1373- inputs : vec ! [ ] ,
1374- outputs : vec ! [ ] ,
1375- } ,
1376- input_connectors : Default :: default ( ) ,
1377- output_connectors : Default :: default ( ) ,
1378- } ,
1379- )
1329+ . transit_program_status_to_compiling_rust ( tid, pid2, v2, & ProgramInfo :: default ( ) )
13801330 . await
13811331 . unwrap ( ) ;
13821332 Compiler :: reconcile_local_state ( & conf, & db) . await . unwrap ( ) ;
0 commit comments