Skip to content

Commit 5fcc887

Browse files
committed
Client: fda
Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent 869c87f commit 5fcc887

File tree

4 files changed

+179
-3
lines changed

4 files changed

+179
-3
lines changed

crates/fda/src/cli.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use std::fmt::Display;
44
use std::path::PathBuf;
55

66
use crate::make_client;
7-
use feldera_rest_api::types::{ClusterMonitorEventFieldSelector, CompilationProfile};
7+
use feldera_rest_api::types::{
8+
ClusterMonitorEventFieldSelector, CompilationProfile, PipelineMonitorEventFieldSelector,
9+
};
810

911
/// Autocompletion for pipeline names by trying to fetch them from the server.
1012
fn pipeline_names(current: &std::ffi::OsStr) -> Vec<CompletionCandidate> {
@@ -697,6 +699,23 @@ pub enum PipelineAction {
697699
#[command(flatten)]
698700
args: BenchmarkArgs,
699701
},
702+
/// Retrieves all pipeline events (status only) and prints them.
703+
Events {
704+
/// The name of the pipeline.
705+
#[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))]
706+
name: String,
707+
},
708+
/// Retrieve specific pipeline event.
709+
Event {
710+
/// The name of the pipeline.
711+
#[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))]
712+
name: String,
713+
/// Identifier (UUID) of the event or `latest`.
714+
id: String,
715+
/// Either `all` or `status` (default).
716+
#[arg(default_value = "status")]
717+
selector: PipelineMonitorEventFieldSelector,
718+
},
700719
}
701720

702721
#[derive(Args, Debug)]

crates/fda/src/main.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,6 +1864,161 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
18641864
println!("Initiated rebalancing for pipeline {name}.");
18651865
}
18661866
PipelineAction::Bench { args } => bench::bench(client, format, args).await,
1867+
PipelineAction::Events { name } => {
1868+
let response = client
1869+
.list_pipeline_events()
1870+
.pipeline_name(name.clone())
1871+
.send()
1872+
.await
1873+
.map_err(handle_errors_fatal(
1874+
client.baseurl().clone(),
1875+
"Unable to retrieve pipeline events",
1876+
1,
1877+
))
1878+
.unwrap();
1879+
match format {
1880+
OutputFormat::Text => {
1881+
let mut rows = vec![];
1882+
rows.push([
1883+
"id".to_string(),
1884+
"recorded_at".to_string(),
1885+
"resources_status".to_string(),
1886+
"resources_desired_status".to_string(),
1887+
"runtime_status".to_string(),
1888+
"runtime_desired_status".to_string(),
1889+
"program_status".to_string(),
1890+
"storage_status".to_string(),
1891+
]);
1892+
for event in response.iter() {
1893+
rows.push([
1894+
event.id.to_string(),
1895+
format!(
1896+
"{} ({:.0}s ago)",
1897+
event.recorded_at,
1898+
(Utc::now() - event.recorded_at).as_seconds_f64()
1899+
),
1900+
event.resources_status.to_string(),
1901+
event.resources_desired_status.to_string(),
1902+
event
1903+
.runtime_status
1904+
.map(|v| v.to_string())
1905+
.unwrap_or("(none)".to_string()),
1906+
event
1907+
.runtime_desired_status
1908+
.map(|v| v.to_string())
1909+
.unwrap_or("(none)".to_string()),
1910+
event.program_status.to_string(),
1911+
event.storage_status.to_string(),
1912+
]);
1913+
}
1914+
println!(
1915+
"{}",
1916+
Builder::from_iter(rows).build().with(Style::rounded())
1917+
);
1918+
}
1919+
OutputFormat::Json => {
1920+
println!(
1921+
"{}",
1922+
serde_json::to_string_pretty(&response.into_inner())
1923+
.expect("Failed to serialize pipeline events")
1924+
);
1925+
}
1926+
_ => {
1927+
eprintln!("Unsupported output format: {}", format);
1928+
std::process::exit(1);
1929+
}
1930+
}
1931+
}
1932+
PipelineAction::Event { name, id, selector } => {
1933+
let response = client
1934+
.get_pipeline_event()
1935+
.pipeline_name(name.clone())
1936+
.event_id(id)
1937+
.selector(selector)
1938+
.send()
1939+
.await
1940+
.map_err(handle_errors_fatal(
1941+
client.baseurl().clone(),
1942+
"Unable to retrieve pipeline event",
1943+
1,
1944+
))
1945+
.unwrap();
1946+
match format {
1947+
OutputFormat::Text => {
1948+
let mut rows = vec![];
1949+
rows.push(["Field".to_string(), "Value".to_string()]);
1950+
rows.push(["id".to_string(), response.id.to_string()]);
1951+
rows.push([
1952+
"recorded_at".to_string(),
1953+
format!(
1954+
"{} ({:.0}s ago)",
1955+
response.recorded_at,
1956+
(Utc::now() - response.recorded_at).as_seconds_f64()
1957+
),
1958+
]);
1959+
rows.push([
1960+
"resources_status".to_string(),
1961+
response.resources_status.to_string(),
1962+
]);
1963+
if selector == PipelineMonitorEventFieldSelector::All {
1964+
if let Some(value) = &response.resources_status_details {
1965+
rows.push(["resources_status_details".to_string(), value.to_string()]);
1966+
}
1967+
}
1968+
rows.push([
1969+
"resources_desired_status".to_string(),
1970+
response.resources_desired_status.to_string(),
1971+
]);
1972+
rows.push([
1973+
"runtime_status".to_string(),
1974+
response
1975+
.runtime_status
1976+
.map(|v| v.to_string())
1977+
.unwrap_or("(none)".to_string()),
1978+
]);
1979+
if selector == PipelineMonitorEventFieldSelector::All {
1980+
rows.push([
1981+
"runtime_status_details".to_string(),
1982+
response
1983+
.runtime_status_details
1984+
.as_ref()
1985+
.map(|v| v.to_string())
1986+
.unwrap_or("(none)".to_string()),
1987+
]);
1988+
}
1989+
rows.push([
1990+
"runtime_desired_status".to_string(),
1991+
response
1992+
.runtime_desired_status
1993+
.map(|v| v.to_string())
1994+
.unwrap_or("(none)".to_string()),
1995+
]);
1996+
rows.push([
1997+
"program_status".to_string(),
1998+
response.program_status.to_string(),
1999+
]);
2000+
rows.push([
2001+
"storage_status".to_string(),
2002+
response.storage_status.to_string(),
2003+
]);
2004+
println!(
2005+
"{}",
2006+
Builder::from_iter(rows).build().with(Style::rounded())
2007+
);
2008+
}
2009+
OutputFormat::Json => {
2010+
println!(
2011+
"{}",
2012+
serde_json::to_string_pretty(&response.into_inner())
2013+
.expect("Failed to serialize pipeline events")
2014+
);
2015+
}
2016+
_ => {
2017+
eprintln!("Unsupported output format: {}", format);
2018+
std::process::exit(1);
2019+
}
2020+
}
2021+
}
18672022
}
18682023
}
18692024

crates/fda/test.bash

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ fda program set-config p1 --profile dev
7979
fda program set-config p1 --profile optimized
8080
fda program config p1
8181
fda program status p1
82+
fda events p1
83+
fda event p1 latest
8284

8385
fda create pudf program.sql --udf-toml udf.toml --udf-rs udf.rs
8486
compare_output "fda program get pudf --udf-toml" "cat udf.toml"

openapi.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3342,7 +3342,7 @@
33423342
"Metrics & Debugging"
33433343
],
33443344
"summary": "List Pipeline Events",
3345-
"description": "Retrieve a list of retained pipeline monitor events ordered from most recent to least recent.\ncleaned up.",
3345+
"description": "Retrieve a list of retained pipeline monitor events ordered from most recent to least recent.\n\nThe returned events only have limited details, the full details can be retrieved using\nthe `GET /v0/pipelines/<pipeline>/events/<event-id>` endpoint.\n\nPipeline monitor events are collected at a periodic interval (every 10s), however only\nevery 10 minutes or if the overall health changes, does it get inserted into the database\n(and thus, served by this endpoint). The most recent 720 events are retained.",
33463346
"operationId": "list_pipeline_events",
33473347
"parameters": [
33483348
{
@@ -3402,7 +3402,7 @@
34023402
"Metrics & Debugging"
34033403
],
34043404
"summary": "Get Pipeline Event",
3405-
"description": "Get specific pipeline monitor event.",
3405+
"description": "Get specific pipeline monitor event.\n\nThe identifiers of the events can be retrieved via `GET /v0/pipelines/<pipeline>/events`.\nThe most recent 720 events are retained.\nThis endpoint can return a 404 for an event that no longer exists due to a cleanup.",
34063406
"operationId": "get_pipeline_event",
34073407
"parameters": [
34083408
{

0 commit comments

Comments
 (0)