Skip to content

Commit 14e13cc

Browse files
Leonid Ryzhykryzhyk
authored andcommitted
[manager] Add support for UDF to pipeline manager.
* Add two new fields to the pipeline descriptor and related sructs: `udf_rust` and `udf_toml`, which, together with the SQL code (`program_code`) now constitute a complete description of the program to run. * Add the `udf_stubs` field to `ProgramInfo`. This stucts contains program metadata generated by the compiler. * The compiler server generates a Rust project, including UDF definitions. The project stucture is: - `Cargo.toml` - generated from template by adding the contents of `udf_toml` to the dependencies section - `src/main.rs` - generated by the SQL compiler; includes `stubs.rs` and `udf.rs` as submodules. - `src/stubs.rs` - generated by the compiler - `src/udf.rs` - provided by the user via `udf_rust`. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
1 parent f082ffb commit 14e13cc

File tree

16 files changed

+353
-176
lines changed

16 files changed

+353
-176
lines changed

crates/fda/src/main.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ async fn pipeline(action: PipelineAction, client: Client) {
351351
description: "".to_string(),
352352
name: name.to_string(),
353353
program_code,
354+
udf_rust: None,
355+
udf_toml: None,
354356
program_config: profile.into(),
355357
runtime_config: RuntimeConfig::default(),
356358
})
@@ -401,6 +403,8 @@ async fn pipeline(action: PipelineAction, client: Client) {
401403
description: None,
402404
name: None,
403405
program_code: Some(new_program),
406+
udf_rust: None,
407+
udf_toml: None,
404408
program_config: None,
405409
runtime_config: None,
406410
})
@@ -713,6 +717,8 @@ async fn pipeline(action: PipelineAction, client: Client) {
713717
description: None,
714718
name: None,
715719
program_code: None,
720+
udf_rust: None,
721+
udf_toml: None,
716722
program_config: None,
717723
runtime_config: Some(rc),
718724
})
@@ -945,6 +951,8 @@ async fn program(name: String, action: Option<ProgramAction>, client: Client) {
945951
description: None,
946952
name: None,
947953
program_code: None,
954+
udf_rust: None,
955+
udf_toml: None,
948956
program_config: Some(ProgramConfig {
949957
profile: Some(profile.into()),
950958
}),
@@ -990,6 +998,8 @@ async fn program(name: String, action: Option<ProgramAction>, client: Client) {
990998
description: None,
991999
name: None,
9921000
program_code: Some(program_code),
1001+
udf_rust: None,
1002+
udf_toml: None,
9931003
program_config: None,
9941004
runtime_config: None,
9951005
};

crates/feldera-types/src/program_schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl Display for SqlIdentifier {
136136
/// A struct containing the tables (inputs) and views for a program.
137137
///
138138
/// Parse from the JSON data-type of the DDL generated by the SQL compiler.
139-
#[derive(Serialize, Deserialize, ToSchema, Debug, Eq, PartialEq, Clone)]
139+
#[derive(Serialize, Deserialize, ToSchema, Debug, Eq, PartialEq, Clone, Default)]
140140
#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
141141
pub struct ProgramSchema {
142142
#[cfg_attr(
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE pipeline
2+
ADD COLUMN udf_rust VARCHAR NOT NULL DEFAULT '',
3+
ADD COLUMN udf_toml VARCHAR NOT NULL DEFAULT '';

crates/pipeline-manager/src/api/examples.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ pub(crate) fn pipeline_1() -> PipelineDescr {
5656
clock_resolution_usecs: Some(100_000),
5757
},
5858
program_code: "CREATE TABLE table1 ( col1 INT );".to_string(),
59+
udf_rust: "".to_string(),
60+
udf_toml: "".to_string(),
5961
program_config: ProgramConfig {
6062
profile: Some(CompilationProfile::Optimized),
6163
},
@@ -82,6 +84,8 @@ pub(crate) fn extended_pipeline_1() -> ExtendedPipelineDescr {
8284
clock_resolution_usecs: Some(100_000),
8385
},
8486
program_code: "CREATE TABLE table1 ( col1 INT );".to_string(),
87+
udf_rust: "".to_string(),
88+
udf_toml: "".to_string(),
8589
program_config: ProgramConfig {
8690
profile: Some(CompilationProfile::Optimized),
8791
},
@@ -126,6 +130,8 @@ pub(crate) fn extended_pipeline_2() -> ExtendedPipelineDescr {
126130
clock_resolution_usecs: Some(100_000),
127131
},
128132
program_code: "CREATE TABLE table2 ( col2 VARCHAR );".to_string(),
133+
udf_rust: "".to_string(),
134+
udf_toml: "".to_string(),
129135
program_config: ProgramConfig {
130136
profile: Some(CompilationProfile::Unoptimized),
131137
},
@@ -156,6 +162,8 @@ pub(crate) fn patch_pipeline() -> PatchPipeline {
156162
description: Some("This is a new description".to_string()),
157163
runtime_config: None,
158164
program_code: Some("CREATE TABLE table3 ( col3 INT );".to_string()),
165+
udf_rust: None,
166+
udf_toml: None,
159167
program_config: None,
160168
}
161169
}

crates/pipeline-manager/src/api/pipeline.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ pub struct ExtendedPipelineDescrOptionalCode {
3838
pub created_at: DateTime<Utc>,
3939
pub runtime_config: RuntimeConfig,
4040
pub program_code: Option<String>,
41+
pub udf_rust: Option<String>,
42+
pub udf_toml: Option<String>,
4143
pub program_config: ProgramConfig,
4244
pub program_version: Version,
4345
pub program_status: ProgramStatus,
@@ -66,6 +68,16 @@ impl ExtendedPipelineDescrOptionalCode {
6668
} else {
6769
None
6870
},
71+
udf_rust: if include_code {
72+
Some(extended_pipeline.udf_rust)
73+
} else {
74+
None
75+
},
76+
udf_toml: if include_code {
77+
Some(extended_pipeline.udf_toml)
78+
} else {
79+
None
80+
},
6981
program_config: extended_pipeline.program_config,
7082
program_version: extended_pipeline.program_version,
7183
program_status: extended_pipeline.program_status,
@@ -109,6 +121,8 @@ pub struct PatchPipeline {
109121
pub description: Option<String>,
110122
pub runtime_config: Option<RuntimeConfig>,
111123
pub program_code: Option<String>,
124+
pub udf_rust: Option<String>,
125+
pub udf_toml: Option<String>,
112126
pub program_config: Option<ProgramConfig>,
113127
}
114128

@@ -332,6 +346,8 @@ pub(crate) async fn patch_pipeline(
332346
&body.description,
333347
&body.runtime_config,
334348
&body.program_code,
349+
&body.udf_rust,
350+
&body.udf_toml,
335351
&body.program_config,
336352
)
337353
.await?;

crates/pipeline-manager/src/compiler.rs

Lines changed: 43 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ use crate::db::storage_postgres::StoragePostgres;
55
use crate::db::types::common::Version;
66
use crate::db::types::pipeline::{ExtendedPipelineDescr, PipelineId};
77
use crate::db::types::program::{
8-
generate_program_info_from_schema, CompilationProfile, ProgramInfo, ProgramStatus,
9-
SqlCompilerMessage,
8+
generate_program_info, CompilationProfile, ProgramInfo, ProgramStatus, SqlCompilerMessage,
109
};
1110
use crate::db::types::tenant::TenantId;
1211
use crate::error::ManagerError;
@@ -201,7 +200,7 @@ impl Compiler {
201200
)
202201
})?;
203202

204-
Compiler::write_project_toml(config, pipeline_id).await?;
203+
Compiler::write_project_toml(config, "", pipeline_id).await?;
205204

206205
fs::write(&rust_file_path, "fn main() {}")
207206
.await
@@ -364,6 +363,7 @@ inherits = "release"
364363
/// Generate project-level `Cargo.toml`.
365364
async fn write_project_toml(
366365
config: &CompilerConfig,
366+
udf_toml: &str,
367367
pipeline_id: PipelineId,
368368
) -> Result<(), ManagerError> {
369369
let template_path = config.project_toml_template_path();
@@ -385,6 +385,18 @@ inherits = "release"
385385
)
386386
.replace("../../crates", &format!("{p}/crates"))
387387
.replace("../lib", &format!("{}", config.sql_lib_path().display()));
388+
389+
let udf_dependencies = format!(
390+
r#"# START: UDF dependencies
391+
{}
392+
# END: UDF dependencies
393+
"#,
394+
&udf_toml
395+
);
396+
project_toml_code = project_toml_code.replace(
397+
"[dependencies]",
398+
&format!("[dependencies]\n{udf_dependencies}"),
399+
);
388400
debug!("TOML:\n{project_toml_code}");
389401

390402
let toml_path = config.project_toml_path(pipeline_id);
@@ -712,9 +724,16 @@ inherits = "release"
712724
.map_err(|e| {
713725
ManagerError::io_error(format!("reading '{}'", schema_path.display()), e)
714726
})?;
727+
728+
let stub_path = config.udf_stub_path(pipeline_id);
729+
let stubs = fs::read_to_string(&stub_path).await
730+
.map_err(|e| {
731+
ManagerError::io_error(format!("reading '{}'", stub_path.display()), e)
732+
})?;
733+
715734
let schema: ProgramSchema = serde_json::from_str(&schema_json)
716735
.map_err(|e| { ManagerError::invalid_program_schema(e.to_string()) })?;
717-
match generate_program_info_from_schema(schema) {
736+
match generate_program_info(schema, stubs) {
718737
Ok(program_info) => {
719738
// SQL compiler succeeded -- start the Rust job and update the
720739
// program schema.
@@ -940,19 +959,15 @@ impl CompilationJob {
940959
ManagerError::io_error(format!("creating error log '{}'", stderr_path.display()), e)
941960
})?;
942961

943-
// `main.rs` file.
944-
let rust_file = File::create(&rust_file_path).await.map_err(|e| {
945-
ManagerError::io_error(
946-
format!("failed to create '{}'", rust_file_path.display()),
947-
e,
948-
)
949-
})?;
950-
951962
// Run compiler, direct output to `main.rs`.
963+
// It will also generate UDF stubs in the same directory as main.rs and program schema
964+
// in `schema_path`.
952965
let schema_path = config.schema_path(pipeline_id);
953966
let compiler_process = Command::new(config.sql_compiler_path())
954967
.arg("-js")
955968
.arg(schema_path)
969+
.arg("-o")
970+
.arg(rust_file_path)
956971
.arg(sql_file_path.as_os_str())
957972
.arg("-i")
958973
.arg("-je")
@@ -962,7 +977,6 @@ impl CompilationJob {
962977
.arg("lower")
963978
.stdin(Stdio::null())
964979
.stderr(Stdio::from(err_file.into_std().await))
965-
.stdout(Stdio::from(rust_file.into_std().await))
966980
.spawn()
967981
.map_err(|e| {
968982
ManagerError::io_error(
@@ -1000,8 +1014,15 @@ impl CompilationJob {
10001014
.map_err(|e| ManagerError::io_error(format!("writing '{}'", rust_path.display()), e))?;
10011015
drop(main_rs);
10021016

1017+
let udf_path = config.udf_path(pipeline_id);
1018+
fs::write(&udf_path, &job.pipeline.udf_rust)
1019+
.await
1020+
.map_err(|e| ManagerError::io_error(format!("writing '{}'", udf_path.display()), e))?;
1021+
10031022
// Write `project/Cargo.toml`.
1004-
Compiler::write_project_toml(config, pipeline_id).await?;
1023+
Compiler::write_project_toml(config, &job.pipeline.udf_toml, pipeline_id).await?;
1024+
1025+
//Compiler::create_udf_crate(config, pipeline_id, &job.pipeline).await?;
10051026

10061027
// Write workspace `Cargo.toml`. The workspace contains SQL libs and the
10071028
// generated project crate.
@@ -1076,7 +1097,6 @@ mod test {
10761097
auth::TenantRecord, compiler::ProgramStatus, config::CompilerConfig, db::storage::Storage,
10771098
};
10781099
use feldera_types::config::RuntimeConfig;
1079-
use feldera_types::program_schema::ProgramSchema;
10801100
use std::{fs::File, sync::Arc};
10811101
use tempfile::TempDir;
10821102
use tokio::{fs, sync::Mutex};
@@ -1099,6 +1119,8 @@ mod test {
10991119
description: "Description of the pipeline".to_string(),
11001120
runtime_config: RuntimeConfig::from_yaml(""),
11011121
program_code: "code-not-used".to_string(),
1122+
udf_rust: "".to_string(),
1123+
udf_toml: "".to_string(),
11021124
program_config: ProgramConfig {
11031125
profile: Some(CompilationProfile::Unoptimized),
11041126
},
@@ -1159,19 +1181,7 @@ mod test {
11591181
.unwrap();
11601182
db.lock()
11611183
.await
1162-
.transit_program_status_to_compiling_rust(
1163-
tid,
1164-
pid,
1165-
vid,
1166-
&ProgramInfo {
1167-
schema: ProgramSchema {
1168-
inputs: vec![],
1169-
outputs: vec![],
1170-
},
1171-
input_connectors: Default::default(),
1172-
output_connectors: Default::default(),
1173-
},
1174-
)
1184+
.transit_program_status_to_compiling_rust(tid, pid, vid, &ProgramInfo::default())
11751185
.await
11761186
.unwrap();
11771187
Compiler::reconcile_local_state(&conf, &db).await.unwrap();
@@ -1185,19 +1195,7 @@ mod test {
11851195
.unwrap();
11861196
db.lock()
11871197
.await
1188-
.transit_program_status_to_compiling_rust(
1189-
tid,
1190-
pid,
1191-
vid,
1192-
&ProgramInfo {
1193-
schema: ProgramSchema {
1194-
inputs: vec![],
1195-
outputs: vec![],
1196-
},
1197-
input_connectors: Default::default(),
1198-
output_connectors: Default::default(),
1199-
},
1200-
)
1198+
.transit_program_status_to_compiling_rust(tid, pid, vid, &ProgramInfo::default())
12011199
.await
12021200
.unwrap();
12031201
db.lock()
@@ -1236,19 +1234,7 @@ mod test {
12361234
.unwrap();
12371235
db.lock()
12381236
.await
1239-
.transit_program_status_to_compiling_rust(
1240-
tid,
1241-
pid,
1242-
vid,
1243-
&ProgramInfo {
1244-
schema: ProgramSchema {
1245-
inputs: vec![],
1246-
outputs: vec![],
1247-
},
1248-
input_connectors: Default::default(),
1249-
output_connectors: Default::default(),
1250-
},
1251-
)
1237+
.transit_program_status_to_compiling_rust(tid, pid, vid, &ProgramInfo::default())
12521238
.await
12531239
.unwrap();
12541240
db.lock()
@@ -1268,19 +1254,7 @@ mod test {
12681254
.unwrap();
12691255
db.lock()
12701256
.await
1271-
.transit_program_status_to_compiling_rust(
1272-
tid,
1273-
pid,
1274-
vid,
1275-
&ProgramInfo {
1276-
schema: ProgramSchema {
1277-
inputs: vec![],
1278-
outputs: vec![],
1279-
},
1280-
input_connectors: Default::default(),
1281-
output_connectors: Default::default(),
1282-
},
1283-
)
1257+
.transit_program_status_to_compiling_rust(tid, pid, vid, &ProgramInfo::default())
12841258
.await
12851259
.unwrap();
12861260
db.lock()
@@ -1347,36 +1321,12 @@ mod test {
13471321
.unwrap();
13481322
db.lock()
13491323
.await
1350-
.transit_program_status_to_compiling_rust(
1351-
tid,
1352-
pid1,
1353-
v1,
1354-
&ProgramInfo {
1355-
schema: ProgramSchema {
1356-
inputs: vec![],
1357-
outputs: vec![],
1358-
},
1359-
input_connectors: Default::default(),
1360-
output_connectors: Default::default(),
1361-
},
1362-
)
1324+
.transit_program_status_to_compiling_rust(tid, pid1, v1, &ProgramInfo::default())
13631325
.await
13641326
.unwrap();
13651327
db.lock()
13661328
.await
1367-
.transit_program_status_to_compiling_rust(
1368-
tid,
1369-
pid2,
1370-
v2,
1371-
&ProgramInfo {
1372-
schema: ProgramSchema {
1373-
inputs: vec![],
1374-
outputs: vec![],
1375-
},
1376-
input_connectors: Default::default(),
1377-
output_connectors: Default::default(),
1378-
},
1379-
)
1329+
.transit_program_status_to_compiling_rust(tid, pid2, v2, &ProgramInfo::default())
13801330
.await
13811331
.unwrap();
13821332
Compiler::reconcile_local_state(&conf, &db).await.unwrap();

0 commit comments

Comments
 (0)