Skip to content

Commit 5162e0c

Browse files
committed
[pipeline-manager] Add ability to declate unit tests in udf.rs
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
1 parent 542b605 commit 5162e0c

File tree

15 files changed

+243
-18
lines changed

15 files changed

+243
-18
lines changed

crates/pipeline-manager/src/api/examples.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ fn extended_pipeline_1() -> ExtendedPipelineDescr {
5757
program_error: ProgramError {
5858
sql_compilation: None,
5959
rust_compilation: None,
60+
rust_test: None,
6061
system_error: None,
6162
},
6263
program_binary_source_checksum: None,
@@ -138,6 +139,7 @@ fn extended_pipeline_2() -> ExtendedPipelineDescr {
138139
program_error: ProgramError {
139140
sql_compilation: None,
140141
rust_compilation: None,
142+
rust_test: None,
141143
system_error: None,
142144
},
143145
program_binary_source_checksum: None,

crates/pipeline-manager/src/api/support_data_collector.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,6 +1119,7 @@ mod tests {
11191119
pipeline_id,
11201120
Version(1),
11211121
&RustCompilationInfo::success(),
1122+
&None,
11221123
"test-checksum",
11231124
"test-integrity-checksum",
11241125
)
@@ -1317,6 +1318,7 @@ mod tests {
13171318
pipeline_id,
13181319
Version(1),
13191320
&RustCompilationInfo::success(),
1321+
&None,
13201322
"checksum1",
13211323
"checksum2",
13221324
)
@@ -1484,6 +1486,7 @@ mod tests {
14841486
pipeline_id,
14851487
Version(1),
14861488
&RustCompilationInfo::success(),
1489+
&None,
14871490
"checksum1",
14881491
"checksum2",
14891492
)

crates/pipeline-manager/src/compiler/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ pub async fn compiler_precompile(
182182
})?;
183183

184184
// Rust
185-
let (source_checksum, integrity_checksum, rust_duration, _) = perform_rust_compilation(
185+
let (source_checksum, integrity_checksum, rust_duration, _, _) = perform_rust_compilation(
186186
&common_config,
187187
&config,
188188
None,

crates/pipeline-manager/src/compiler/rust_compiler.rs

Lines changed: 97 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ const CLEANUP_INTERVAL: Duration = Duration::from_secs(120);
5959
/// a significant amount of time.
6060
const CLEANUP_RETENTION: Duration = Duration::from_secs(3600);
6161

62+
/// Rust compiler stack size in bytes (20MB).
63+
/// Set to 10x the default to prevent SIGSEGV when the compiler runs out of stack on large programs.
64+
const RUST_COMPILER_STACK_SIZE: &str = "20971520";
65+
6266
/// Rust compilation task that wakes up periodically.
6367
/// Sleeps inbetween ticks which affects the response time of Rust compilation.
6468
/// Note that the logic in this task assumes only one is run at a time.
@@ -194,7 +198,7 @@ async fn attempt_end_to_end_rust_compilation(
194198

195199
// (5) Update database that Rust compilation is finished
196200
match compilation_result {
197-
Ok((source_checksum, integrity_checksum, duration, compilation_info)) => {
201+
Ok((source_checksum, integrity_checksum, duration, compilation_info, test_info)) => {
198202
info!(
199203
"Rust compilation success: pipeline {} (program version: {}) (took {:.2}s; source checksum: {}; integrity checksum: {})",
200204
pipeline.id,
@@ -210,6 +214,7 @@ async fn attempt_end_to_end_rust_compilation(
210214
pipeline.id,
211215
pipeline.program_version,
212216
&compilation_info,
217+
&test_info,
213218
&source_checksum,
214219
&integrity_checksum,
215220
)
@@ -341,7 +346,7 @@ impl From<UtilError> for RustCompilationError {
341346
/// Performs the Rust compilation.
342347
///
343348
/// Returns the program binary URL, source checksum, integrity checksum,
344-
/// duration, and compilation information.
349+
/// duration, compilation information, and test information.
345350
#[allow(clippy::too_many_arguments)]
346351
pub async fn perform_rust_compilation(
347352
common_config: &CommonConfig,
@@ -355,7 +360,16 @@ pub async fn perform_rust_compilation(
355360
program_info: &Option<serde_json::Value>,
356361
udf_rust: &str,
357362
udf_toml: &str,
358-
) -> Result<(String, String, Duration, RustCompilationInfo), RustCompilationError> {
363+
) -> Result<
364+
(
365+
String,
366+
String,
367+
Duration,
368+
RustCompilationInfo,
369+
Option<RustCompilationInfo>,
370+
),
371+
RustCompilationError,
372+
> {
359373
let start = Instant::now();
360374

361375
// These must always be the same, the Rust compiler should never pick up
@@ -446,7 +460,7 @@ pub async fn perform_rust_compilation(
446460
.await?;
447461

448462
// Perform the compilation in the workspace
449-
let (compilation_info, integrity_checksum) = call_compiler(
463+
let (compilation_info, test_info, integrity_checksum) = call_compiler(
450464
db,
451465
tenant_id,
452466
pipeline_id,
@@ -463,6 +477,7 @@ pub async fn perform_rust_compilation(
463477
integrity_checksum.clone(),
464478
start.elapsed(),
465479
compilation_info,
480+
test_info,
466481
))
467482
}
468483

@@ -984,6 +999,69 @@ extern crate sync_checkpoint;"#,
984999
Ok(())
9851000
}
9861001

1002+
/// Runs cargo test on the compiled project and returns test results.
1003+
async fn run_cargo_test(
1004+
workspace_dir: &Path,
1005+
pipeline_main_crate_dir: &Path,
1006+
env_path: &std::ffi::OsStr,
1007+
optional_env_rustflags: &Option<std::ffi::OsString>,
1008+
runtime_selector: &RuntimeSelector,
1009+
profile: &CompilationProfile,
1010+
) -> Result<RustCompilationInfo, RustCompilationError> {
1011+
// Create file where stdout will be written to
1012+
let stdout_file_path = pipeline_main_crate_dir.join("test_stdout.log");
1013+
let stdout_file = create_new_file(&stdout_file_path).await?;
1014+
1015+
// Create file where stderr will be written to
1016+
let stderr_file_path = pipeline_main_crate_dir.join("test_stderr.log");
1017+
let stderr_file = create_new_file(&stderr_file_path).await?;
1018+
1019+
// Formulate command
1020+
let mut command = Command::new("cargo");
1021+
command.env_clear();
1022+
command.env("PATH", env_path);
1023+
if !runtime_selector.is_platform() {
1024+
command.env("FELDERA_RUNTIME_OVERRIDE", runtime_selector.as_commitish());
1025+
}
1026+
if let Some(ref env_rustflags) = optional_env_rustflags {
1027+
command.env("RUSTFLAGS", env_rustflags);
1028+
}
1029+
command
1030+
.env("RUST_MIN_STACK", RUST_COMPILER_STACK_SIZE)
1031+
.current_dir(workspace_dir)
1032+
.arg("test")
1033+
.arg("--workspace")
1034+
.arg("--profile")
1035+
.arg(profile.to_string())
1036+
.stdin(Stdio::null())
1037+
.stdout(Stdio::from(stdout_file.into_std().await))
1038+
.stderr(Stdio::from(stderr_file.into_std().await));
1039+
1040+
// Spawn the test command and wait for it to complete
1041+
let mut process = command.spawn().map_err(|e| {
1042+
RustCompilationError::SystemError(
1043+
UtilError::IoError("running 'cargo test'".to_string(), e).to_string(),
1044+
)
1045+
})?;
1046+
1047+
// Wait for the process to complete
1048+
let exit_status = process.wait().await.map_err(|e| {
1049+
RustCompilationError::SystemError(
1050+
UtilError::IoError("waiting for 'cargo test'".to_string(), e).to_string(),
1051+
)
1052+
})?;
1053+
1054+
// Read stdout and stderr from the log files
1055+
let stdout = read_file_content(&stdout_file_path).await?;
1056+
let stderr = read_file_content(&stderr_file_path).await?;
1057+
1058+
Ok(RustCompilationInfo {
1059+
exit_code: exit_status.code().unwrap_or(-1),
1060+
stdout,
1061+
stderr,
1062+
})
1063+
}
1064+
9871065
/// Calls the compiler on the project with the provided source checksum and using the compilation profile.
9881066
#[allow(clippy::too_many_arguments)]
9891067
async fn call_compiler(
@@ -995,7 +1073,7 @@ async fn call_compiler(
9951073
source_checksum: &str,
9961074
profile: &CompilationProfile,
9971075
runtime_selector: &RuntimeSelector,
998-
) -> Result<(RustCompilationInfo, String), RustCompilationError> {
1076+
) -> Result<(RustCompilationInfo, Option<RustCompilationInfo>, String), RustCompilationError> {
9991077
// Workspace directory
10001078
let workspace_dir = config.working_dir().join("rust-compilation");
10011079
if !workspace_dir.is_dir() {
@@ -1049,17 +1127,15 @@ async fn call_compiler(
10491127
// Formulate command
10501128
let mut command = Command::new("cargo");
10511129
command.env_clear();
1052-
command.env("PATH", env_path);
1130+
command.env("PATH", &env_path);
10531131
if !runtime_selector.is_platform() {
10541132
command.env("FELDERA_RUNTIME_OVERRIDE", runtime_selector.as_commitish());
10551133
}
1056-
if let Some(env_rustflags) = optional_env_rustflags {
1134+
if let Some(ref env_rustflags) = optional_env_rustflags {
10571135
command.env("RUSTFLAGS", env_rustflags);
10581136
}
10591137
command
1060-
// Set compiler stack size to 20MB (10x the default) to prevent
1061-
// SIGSEGV when the compiler runs out of stack on large programs.
1062-
.env("RUST_MIN_STACK", "20971520")
1138+
.env("RUST_MIN_STACK", RUST_COMPILER_STACK_SIZE)
10631139
.current_dir(&workspace_dir)
10641140
.arg("build")
10651141
.arg("--workspace")
@@ -1147,6 +1223,16 @@ async fn call_compiler(
11471223

11481224
// Compilation is successful if the return exit code is present and zero
11491225
if exit_status.success() {
1226+
// Run cargo test to test UDFs
1227+
let test_info = run_cargo_test(
1228+
&workspace_dir,
1229+
&pipeline_main_crate_dir,
1230+
&env_path,
1231+
&optional_env_rustflags,
1232+
runtime_selector,
1233+
profile,
1234+
)
1235+
.await?;
11501236
// Source file
11511237
let source_file_path = workspace_dir
11521238
.join("target")
@@ -1172,7 +1258,7 @@ async fn call_compiler(
11721258
copy_file(&source_file_path, &target_file_path).await?;
11731259

11741260
// Success
1175-
Ok((compilation_info, integrity_checksum))
1261+
Ok((compilation_info, Some(test_info), integrity_checksum))
11761262
} else {
11771263
Err(RustCompilationError::RustError(compilation_info))
11781264
}

0 commit comments

Comments
 (0)