Skip to content

Commit b35dc2e

Browse files
committed
Add --udf-rs and --udf-toml flags to fda.
For `create`, `program set` and `program get` commands. Fixes #2676. Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 822426e commit b35dc2e

File tree

6 files changed

+158
-44
lines changed

6 files changed

+158
-44
lines changed

crates/fda/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
# fda
22

33
A CLI tool for interacting with [Feldera](https://github.com/feldera/feldera).
4-
See the [fda documentation](https://docs.feldera.com/api/cli) for more information.
4+
See the [fda documentation](https://docs.feldera.com/reference/cli/) for more information.

crates/fda/src/cli.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,16 @@ pub enum PipelineAction {
185185
name: String,
186186
/// A path to a file containing the SQL code.
187187
///
188+
/// If no path is provided, the pipeline will be created with an empty program.
188189
/// See the `stdin` flag for reading from stdin instead.
189190
#[arg(value_hint = ValueHint::FilePath, conflicts_with = "stdin")]
190191
program_path: Option<String>,
192+
/// A path to a file containing the Rust UDF functions.
193+
#[arg(short = 'u', long, value_hint = ValueHint::FilePath, conflicts_with = "stdin")]
194+
udf_rs: Option<String>,
195+
/// A path to the TOML file containing the dependencies for the UDF functions.
196+
#[arg(short = 't', long, value_hint = ValueHint::FilePath, conflicts_with = "stdin")]
197+
udf_toml: Option<String>,
191198
/// The compilation profile to use.
192199
#[arg(default_value = "optimized")]
193200
profile: Profile,
@@ -398,38 +405,45 @@ pub enum PipelineAction {
398405

399406
#[derive(Subcommand)]
400407
pub enum ProgramAction {
401-
/// Retrieve the program.
408+
/// Retrieve the program code.
409+
///
410+
/// By default, this returns the SQL code, but you can use the flags to retrieve
411+
/// the Rust UDF code instead.
402412
Get {
403413
/// The name of the pipeline.
404414
#[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))]
405415
name: String,
416+
/// Retrieve the Rust UDF code.
417+
#[arg(short = 'u', long, default_value_t = false)]
418+
udf_rs: bool,
419+
/// Retrieve the TOML dependencies file for the UDF code.
420+
#[arg(short = 't', long, default_value_t = false)]
421+
udf_toml: bool,
406422
},
407423
/// Sets a new program.
408424
Set {
409425
/// The name of the pipeline.
410426
#[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))]
411427
name: String,
412-
413428
/// A path to a file containing the SQL code.
414429
///
415430
/// See the `stdin` flag for reading from stdin instead.
416431
#[arg(value_hint = ValueHint::FilePath, conflicts_with = "stdin")]
417432
program_path: Option<String>,
418-
433+
/// A path to a file containing the Rust UDF functions.
434+
#[arg(short = 'u', long, value_hint = ValueHint::FilePath, conflicts_with = "stdin")]
435+
udf_rs: Option<String>,
436+
/// A path to the TOML file containing the dependencies for the UDF functions.
437+
#[arg(short = 't', long, value_hint = ValueHint::FilePath, conflicts_with = "stdin")]
438+
udf_toml: Option<String>,
419439
/// Read the SQL program code from stdin.
420440
///
421441
/// EXAMPLES:
422442
///
423443
/// * cat program.sql | fda program set p1 -s
424444
/// * echo "SELECT 1" | fda program set p1 -s
425445
/// * fda program get p2 | fda program set p1 -s
426-
#[arg(
427-
verbatim_doc_comment,
428-
short = 's',
429-
long,
430-
default_value_t = false,
431-
conflicts_with = "program_path"
432-
)]
446+
#[arg(verbatim_doc_comment, short = 's', long, default_value_t = false)]
433447
stdin: bool,
434448
},
435449
/// Retrieve the configuration of the program.

crates/fda/src/main.rs

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -278,28 +278,43 @@ fn patch_runtime_config(
278278
Ok(())
279279
}
280280

281-
async fn read_program_code(program_path: Option<String>, stdin: bool) -> Result<String, ()> {
282-
if program_path.is_none() && !stdin {
283-
eprintln!("No program code provided. Use `--stdin` to read from stdin.");
284-
return Err(());
281+
/// Reads a file, returning the content as a string.
282+
///
283+
/// If the file path is `None`, returns `None`.
284+
/// If the file cannot be read, returns `Err(())`.
285+
async fn read_file(file_path: Option<String>) -> Result<Option<String>, ()> {
286+
if let Some(path) = file_path {
287+
match tokio::fs::read_to_string(path.as_str()).await {
288+
Ok(udf_code) => {
289+
debug!("Read from file: {}", path);
290+
Ok(Some(udf_code))
291+
}
292+
Err(e) => {
293+
eprintln!("Failed to read '{}': {}", path, e);
294+
Err(())
295+
}
296+
}
297+
} else {
298+
Ok(None)
285299
}
286-
let program_path = program_path.unwrap_or_else(|| "".to_string());
300+
}
301+
302+
async fn read_program_code(
303+
program_path: Option<String>,
304+
stdin: bool,
305+
) -> Result<Option<String>, ()> {
287306
if stdin {
288307
let mut program_code = String::new();
289308
let mut stdin = std::io::stdin();
290309
if stdin.read_to_string(&mut program_code).is_ok() {
291310
debug!("Read program code from stdin");
292-
Ok(program_code)
311+
Ok(Some(program_code))
293312
} else {
294313
eprintln!("Failed to read program code from stdin");
295314
Err(())
296315
}
297-
} else if let Ok(program_code) = tokio::fs::read_to_string(program_path.as_str()).await {
298-
debug!("Read program code from file: {}", program_path);
299-
Ok(program_code)
300316
} else {
301-
eprintln!("Failed to read program code from file: {}", program_path);
302-
Err(())
317+
read_file(program_path).await
303318
}
304319
}
305320

@@ -347,17 +362,23 @@ async fn pipeline(action: PipelineAction, client: Client) {
347362
name,
348363
program_path,
349364
profile,
365+
udf_rs,
366+
udf_toml,
350367
stdin,
351368
} => {
352-
if let Ok(program_code) = read_program_code(program_path, stdin).await {
369+
if let (Ok(program_code), Ok(udf_rust), Ok(udf_toml)) = (
370+
read_program_code(program_path, stdin).await,
371+
read_file(udf_rs).await,
372+
read_file(udf_toml).await,
373+
) {
353374
let response = client
354375
.post_pipeline()
355376
.body(PipelineDescr {
356377
description: "".to_string(),
357378
name: name.to_string(),
358-
program_code,
359-
udf_rust: None,
360-
udf_toml: None,
379+
program_code: program_code.unwrap_or_default(),
380+
udf_rust,
381+
udf_toml,
361382
program_config: profile.into(),
362383
runtime_config: RuntimeConfig::default(),
363384
})
@@ -372,7 +393,7 @@ async fn pipeline(action: PipelineAction, client: Client) {
372393
println!("Pipeline created successfully.");
373394
debug!("{:#?}", response);
374395
} else {
375-
// Already reported error in read_program_code.
396+
// Already reported error in read_program_code or read_file.
376397
std::process::exit(1);
377398
}
378399
}
@@ -919,7 +940,11 @@ async fn endpoint(
919940

920941
async fn program(action: ProgramAction, client: Client) {
921942
match action {
922-
ProgramAction::Get { name } => {
943+
ProgramAction::Get {
944+
name,
945+
udf_rs,
946+
udf_toml,
947+
} => {
923948
let response = client
924949
.get_pipeline()
925950
.pipeline_name(name)
@@ -931,7 +956,15 @@ async fn program(action: ProgramAction, client: Client) {
931956
1,
932957
))
933958
.unwrap();
934-
println!("{}", response.program_code);
959+
if !udf_rs && !udf_toml {
960+
println!("{}", response.program_code);
961+
}
962+
if udf_rs {
963+
println!("{}", response.udf_rust);
964+
}
965+
if udf_toml {
966+
println!("{}", response.udf_toml);
967+
}
935968
}
936969
ProgramAction::Config { name } => {
937970
let response = client
@@ -997,15 +1030,21 @@ async fn program(action: ProgramAction, client: Client) {
9971030
ProgramAction::Set {
9981031
name,
9991032
program_path,
1033+
udf_rs,
1034+
udf_toml,
10001035
stdin,
10011036
} => {
1002-
if let Ok(program_code) = read_program_code(program_path, stdin).await {
1037+
if let (Ok(program_code), Ok(udf_rust), Ok(udf_toml)) = (
1038+
read_program_code(program_path, stdin).await,
1039+
read_file(udf_rs).await,
1040+
read_file(udf_toml).await,
1041+
) {
10031042
let pp = PatchPipeline {
10041043
description: None,
10051044
name: None,
1006-
program_code: Some(program_code),
1007-
udf_rust: None,
1008-
udf_toml: None,
1045+
program_code,
1046+
udf_rust,
1047+
udf_toml,
10091048
program_config: None,
10101049
runtime_config: None,
10111050
};
@@ -1023,7 +1062,7 @@ async fn program(action: ProgramAction, client: Client) {
10231062
.unwrap();
10241063
println!("Program updated successfully.");
10251064
} else {
1026-
// Already reported error in read_program_code.
1065+
// Already reported error in read_program_code or read_file.
10271066
std::process::exit(1);
10281067
}
10291068
}

crates/fda/src/shell.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ Type: \h for help with fda shell commands
2121
The following fda shell commands are available in the shell
2222
to interact with the current pipeline:
2323
24+
- SQL commands: SELECT or INSERT
2425
- start
2526
- pause
2627
- restart [-r, --recompile]

crates/fda/test.bash

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,76 @@
1+
#!/bin/bash
2+
set -ex
3+
4+
fail_on_success() {
5+
# Run the command with `|| true` to prevent `set -e` from exiting the script
6+
set +e
7+
"$@"
8+
9+
# Check the exit status
10+
if [ $? -eq 0 ]; then
11+
echo "Command succeeded but was expected to fail"
12+
exit 1
13+
fi
14+
set -e
15+
}
16+
17+
compare_output() {
18+
# Capture the output of both commands
19+
output1=`$1`
20+
output2=`$2`
21+
22+
# Compare the outputs
23+
if [ "$output1" = "$output2" ]; then
24+
return 0
25+
else
26+
echo "The outputs are different."
27+
exit 1
28+
fi
29+
}
30+
131
#export FELDERA_HOST=http://localhost:8080
232
BINARY_PATH=../../target/debug/fda
333

434
cargo build
535

636
$BINARY_PATH pipelines
737

8-
# Cleanup
9-
$BINARY_PATH delete p1
10-
$BINARY_PATH delete p2
11-
$BINARY_PATH delete unknown
12-
$BINARY_PATH delete punknown
13-
$BINARY_PATH apikey delete a
38+
# Cleanup, these commands might fail if the resources do not exist yet
39+
$BINARY_PATH shutdown p1 || true
40+
$BINARY_PATH delete p1 || true
41+
$BINARY_PATH delete p2 || true
42+
$BINARY_PATH delete pudf || true
43+
$BINARY_PATH delete unknown || true
44+
$BINARY_PATH delete punknown || true
45+
$BINARY_PATH apikey delete a || true
1446

1547
# Tests
1648
$BINARY_PATH apikey create a
1749
$BINARY_PATH apikey list
1850
$BINARY_PATH apikey delete a
1951

52+
echo "base64 = '0.22.1'" > udf.toml
53+
echo "use feldera_sqllib::F32;" > udf.rs
2054
echo "CREATE TABLE example ( id INT NOT NULL PRIMARY KEY );
2155
CREATE VIEW example_count AS ( SELECT COUNT(*) AS num_rows FROM example );" > program.sql
56+
2257
$BINARY_PATH create p1 program.sql
23-
$BINARY_PATH program p1 | $BINARY_PATH create p2 -s
58+
$BINARY_PATH program get p1 | $BINARY_PATH create p2 -s
59+
compare_output "${BINARY_PATH} program get p1" "${BINARY_PATH} program get p2"
2460
$BINARY_PATH program set-config p1 dev
2561
$BINARY_PATH program config p1
2662
$BINARY_PATH program status p1
2763

64+
$BINARY_PATH create pudf program.sql --udf-toml udf.toml --udf-rs udf.rs
65+
compare_output "${BINARY_PATH} program get pudf --udf-toml" "cat udf.toml"
66+
compare_output "${BINARY_PATH} program get pudf --udf-rs" "cat udf.rs"
67+
2868
$BINARY_PATH set-config p1 storage true
69+
$BINARY_PATH program set p1 --udf-toml udf.toml --udf-rs udf.rs
70+
$BINARY_PATH program set p2 --udf-rs udf.rs
71+
compare_output "${BINARY_PATH} program get p1" "${BINARY_PATH} program get p2"
72+
compare_output "${BINARY_PATH} program get p1 --udf-rs" "${BINARY_PATH} program get p2 --udf-rs"
73+
2974
$BINARY_PATH config p1
3075

3176
$BINARY_PATH start p1
@@ -36,9 +81,16 @@ $BINARY_PATH shutdown p1
3681

3782
$BINARY_PATH delete p1
3883
$BINARY_PATH delete p2
84+
$BINARY_PATH delete pudf
3985
$BINARY_PATH shell unknown || true
4086

41-
$BINARY_PATH create set punknown --stdin || true # error: the argument '[PROGRAM_PATH]' cannot be used with '--stdin'
42-
$BINARY_PATH program set punknown file-path --stdin || true # error: the argument '[PROGRAM_PATH]' cannot be used with '--stdin'
87+
# Verify argument conflicts, these invocations should fail
88+
fail_on_success $BINARY_PATH create set punknown --stdin
89+
fail_on_success $BINARY_PATH program set punknown file-path --stdin
90+
fail_on_success $BINARY_PATH program set p1 --udf-toml udf.toml --stdin
91+
fail_on_success $BINARY_PATH program set p1 --udf-rs udf.toml --stdin
92+
fail_on_success $BINARY_PATH program set p1 --udf-rs udf.toml --udf-toml udf.toml --stdin
4393

4494
rm program.sql
95+
rm udf.toml
96+
rm udf.rs

docs/reference/cli.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Command line utility (fda)
22

33
`fda` is a command line utility for interacting with the Feldera Manager's REST API.
4+
It allows you to create, manage, and monitor pipelines. It also features an interactive
5+
[shell](#shell) for inspecting and modifying the state of tables and views using SQL commands.
46

57
## Installation
68

@@ -83,7 +85,7 @@ right. Go to **Manage API Keys** and click **Generate new key**.
8385

8486
:::
8587

86-
## Usage Examples
88+
## Examples
8789

8890
Specify the host and API key as command line arguments or environment variables:
8991

@@ -114,6 +116,12 @@ Enable storage for `p1`:
114116
fda set-config p1 storage true
115117
```
116118

119+
Add Rust UDF code to `p1`:
120+
121+
```commandline
122+
fda program set p1 --udf-toml udf.toml --udf-rs udf.rs
123+
```
124+
117125
Run the pipeline `p1`:
118126

119127
```commandline

0 commit comments

Comments
 (0)