Skip to content

Commit aeb8b34

Browse files
committed
pipeline-manager: pipeline monitoring events
Both at a regular interval and whenever the status of the pipeline changes, store this as an event in the database. At most 720 events are stored for each pipeline. If there are no changes, every 10 minutes it stores a duplicate event. If there are only changes to the details, it only updates it every 10 seconds. As such, the retained events span approximately between 2 hours and 5 days. API endpoints: - `GET /v0/pipelines/<pipeline>/events` - `GET /v0/pipelines/<pipeline>/event/latest?selector=all/status` - `GET /v0/pipelines/<pipeline>/event/<event-id>?selector=all/status` The clients have been updated with corresponding functionality: - CLI: - `fda events <pipeline>` - `fda event <pipeline> latest <selector>` - `fda event <pipeline> <event-id> <selector>` - Python: - `pipeline.events()` - `pipeline.event("latest", selector)` - `pipeline.event(event_id, selector)` Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent ca35538 commit aeb8b34

File tree

24 files changed

+1834
-27
lines changed

24 files changed

+1834
-27
lines changed

crates/fda/src/cli.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use std::fmt::Display;
44
use std::path::PathBuf;
55

66
use crate::make_client;
7-
use feldera_rest_api::types::{ClusterMonitorEventFieldSelector, CompilationProfile};
7+
use feldera_rest_api::types::{
8+
ClusterMonitorEventFieldSelector, CompilationProfile, PipelineMonitorEventFieldSelector,
9+
};
810

911
/// Autocompletion for pipeline names by trying to fetch them from the server.
1012
fn pipeline_names(current: &std::ffi::OsStr) -> Vec<CompletionCandidate> {
@@ -697,6 +699,23 @@ pub enum PipelineAction {
697699
#[command(flatten)]
698700
args: BenchmarkArgs,
699701
},
702+
/// Retrieves all pipeline events (status only) and prints them.
703+
Events {
704+
/// The name of the pipeline.
705+
#[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))]
706+
name: String,
707+
},
708+
/// Retrieve specific pipeline event.
709+
Event {
710+
/// The name of the pipeline.
711+
#[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))]
712+
name: String,
713+
/// Identifier (UUID) of the event or `latest`.
714+
id: String,
715+
/// Either `all` or `status` (default).
716+
#[arg(default_value = "status")]
717+
selector: PipelineMonitorEventFieldSelector,
718+
},
700719
}
701720

702721
#[derive(Args, Debug)]

crates/fda/src/main.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,6 +1864,161 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client)
18641864
println!("Initiated rebalancing for pipeline {name}.");
18651865
}
18661866
PipelineAction::Bench { args } => bench::bench(client, format, args).await,
1867+
PipelineAction::Events { name } => {
1868+
let response = client
1869+
.list_pipeline_events()
1870+
.pipeline_name(name.clone())
1871+
.send()
1872+
.await
1873+
.map_err(handle_errors_fatal(
1874+
client.baseurl().clone(),
1875+
"Unable to retrieve pipeline events",
1876+
1,
1877+
))
1878+
.unwrap();
1879+
match format {
1880+
OutputFormat::Text => {
1881+
let mut rows = vec![];
1882+
rows.push([
1883+
"id".to_string(),
1884+
"recorded_at".to_string(),
1885+
"resources_status".to_string(),
1886+
"resources_desired_status".to_string(),
1887+
"runtime_status".to_string(),
1888+
"runtime_desired_status".to_string(),
1889+
"program_status".to_string(),
1890+
"storage_status".to_string(),
1891+
]);
1892+
for event in response.iter() {
1893+
rows.push([
1894+
event.id.to_string(),
1895+
format!(
1896+
"{} ({:.0}s ago)",
1897+
event.recorded_at,
1898+
(Utc::now() - event.recorded_at).as_seconds_f64()
1899+
),
1900+
event.resources_status.to_string(),
1901+
event.resources_desired_status.to_string(),
1902+
event
1903+
.runtime_status
1904+
.map(|v| v.to_string())
1905+
.unwrap_or("(none)".to_string()),
1906+
event
1907+
.runtime_desired_status
1908+
.map(|v| v.to_string())
1909+
.unwrap_or("(none)".to_string()),
1910+
event.program_status.to_string(),
1911+
event.storage_status.to_string(),
1912+
]);
1913+
}
1914+
println!(
1915+
"{}",
1916+
Builder::from_iter(rows).build().with(Style::rounded())
1917+
);
1918+
}
1919+
OutputFormat::Json => {
1920+
println!(
1921+
"{}",
1922+
serde_json::to_string_pretty(&response.into_inner())
1923+
.expect("Failed to serialize pipeline events")
1924+
);
1925+
}
1926+
_ => {
1927+
eprintln!("Unsupported output format: {}", format);
1928+
std::process::exit(1);
1929+
}
1930+
}
1931+
}
1932+
PipelineAction::Event { name, id, selector } => {
1933+
let response = client
1934+
.get_pipeline_event()
1935+
.pipeline_name(name.clone())
1936+
.event_id(id)
1937+
.selector(selector)
1938+
.send()
1939+
.await
1940+
.map_err(handle_errors_fatal(
1941+
client.baseurl().clone(),
1942+
"Unable to retrieve pipeline event",
1943+
1,
1944+
))
1945+
.unwrap();
1946+
match format {
1947+
OutputFormat::Text => {
1948+
let mut rows = vec![];
1949+
rows.push(["Field".to_string(), "Value".to_string()]);
1950+
rows.push(["id".to_string(), response.id.to_string()]);
1951+
rows.push([
1952+
"recorded_at".to_string(),
1953+
format!(
1954+
"{} ({:.0}s ago)",
1955+
response.recorded_at,
1956+
(Utc::now() - response.recorded_at).as_seconds_f64()
1957+
),
1958+
]);
1959+
rows.push([
1960+
"resources_status".to_string(),
1961+
response.resources_status.to_string(),
1962+
]);
1963+
if selector == PipelineMonitorEventFieldSelector::All
1964+
&& let Some(value) = &response.resources_status_details
1965+
{
1966+
rows.push(["resources_status_details".to_string(), value.to_string()]);
1967+
}
1968+
rows.push([
1969+
"resources_desired_status".to_string(),
1970+
response.resources_desired_status.to_string(),
1971+
]);
1972+
rows.push([
1973+
"runtime_status".to_string(),
1974+
response
1975+
.runtime_status
1976+
.map(|v| v.to_string())
1977+
.unwrap_or("(none)".to_string()),
1978+
]);
1979+
if selector == PipelineMonitorEventFieldSelector::All {
1980+
rows.push([
1981+
"runtime_status_details".to_string(),
1982+
response
1983+
.runtime_status_details
1984+
.as_ref()
1985+
.map(|v| v.to_string())
1986+
.unwrap_or("(none)".to_string()),
1987+
]);
1988+
}
1989+
rows.push([
1990+
"runtime_desired_status".to_string(),
1991+
response
1992+
.runtime_desired_status
1993+
.map(|v| v.to_string())
1994+
.unwrap_or("(none)".to_string()),
1995+
]);
1996+
rows.push([
1997+
"program_status".to_string(),
1998+
response.program_status.to_string(),
1999+
]);
2000+
rows.push([
2001+
"storage_status".to_string(),
2002+
response.storage_status.to_string(),
2003+
]);
2004+
println!(
2005+
"{}",
2006+
Builder::from_iter(rows).build().with(Style::rounded())
2007+
);
2008+
}
2009+
OutputFormat::Json => {
2010+
println!(
2011+
"{}",
2012+
serde_json::to_string_pretty(&response.into_inner())
2013+
.expect("Failed to serialize pipeline events")
2014+
);
2015+
}
2016+
_ => {
2017+
eprintln!("Unsupported output format: {}", format);
2018+
std::process::exit(1);
2019+
}
2020+
}
2021+
}
18672022
}
18682023
}
18692024

crates/fda/test.bash

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ fda program set-config p1 --profile dev
7979
fda program set-config p1 --profile optimized
8080
fda program config p1
8181
fda program status p1
82+
fda events p1
83+
fda event p1 latest
8284

8385
fda create pudf program.sql --udf-toml udf.toml --udf-rs udf.rs
8486
compare_output "fda program get pudf --udf-toml" "cat udf.toml"
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
-- Regularly the status of the pipeline is observed and stored in this table.
2+
-- The rows in this table are regularly cleaned up to prevent it growing unbound.
3+
CREATE TABLE IF NOT EXISTS pipeline_monitor_event (
4+
id UUID PRIMARY KEY NOT NULL, -- Unique event identifier.
5+
pipeline_id UUID NOT NULL, -- Identifier of the pipeline the event corresponds to.
6+
recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, -- Timestamp when status recording started.
7+
resources_status VARCHAR NOT NULL, -- Resources status.
8+
resources_status_details VARCHAR NOT NULL, -- Resources status details.
9+
resources_desired_status VARCHAR NOT NULL, -- Resources desired status.
10+
runtime_status VARCHAR NULL, -- Runtime status (only set during deployment).
11+
runtime_status_details VARCHAR NULL, -- Runtime status details (only set during deployment).
12+
runtime_desired_status VARCHAR NULL, -- Runtime desired status (only set during deployment).
13+
program_status VARCHAR NOT NULL, -- Program status.
14+
storage_status VARCHAR NOT NULL, -- Storage status.
15+
FOREIGN KEY (pipeline_id) REFERENCES pipeline(id) ON DELETE CASCADE
16+
);

crates/pipeline-manager/proptest-regressions/db/test.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,5 @@ cc c0ddd7741bf1667b3d70ea81ca4c298875235277256ee70f1fe2b6a9a50303ba # shrinks to
4545
cc 126dfeb51d3f8f4707aaf4d12a3d6f90a0ac9fe7f12cffcaa014e8604dfb5f3c # shrinks to [NewPipeline(TenantId(01000000-0000-0000-0000-000000000000), 03000000-0000-0000-0000-000000000000, "v0", PipelineDescr { name: "pipeline-4", description: "", runtime_config: Object {"workers": Number(0), "storage": Null, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(false), "tracing": Bool(false), "tracing_endpoint_jaeger": String(""), "min_batch_size_records": Number(0), "max_buffering_delay_usecs": Number(0), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Null, "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Null, "storage_class": Null}, "clock_resolution_usecs": Null, "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(false), "dev_tweaks": Object {}}, program_code: "", udf_rust: "", udf_toml: "", program_config: Object {"profile": String("optimized"), "cache": Bool(false)} }), TransitProgramStatusToCompilingSql(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1)), TransitProgramStatusToSqlCompiled(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1), SqlCompilationInfo { exit_code: 0, messages: [] }, Object {"schema": Object {"inputs": Array [], "outputs": Array []}, "main_rust": String("main-rust-0"), "udf_stubs": String("udf-stubs-0"), "dataflow": Null, "input_connectors": Object {}, "output_connectors": Object {}}), TransitProgramStatusToCompilingRust(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1)), TransitProgramStatusToSuccess(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1), RustCompilationInfo { exit_code: 0, stdout: "stdout-0", stderr: "stderr-0" }, "source_checksum_0", "integrity_checksum_0", ""), NewOrUpdatePipeline(TenantId(01000000-0000-0000-0000-000000000000), 01000000-0000-0000-0000-000000000000, "pipeline-4", "v0", PipelineDescr { name: "pipeline-3", description: "\u{11cb6}^ $gn𑍐dÊ'kڦ=~\u{1cf42}>𖭯ℶ🛟q", runtime_config: Object {"workers": Number(9614), "storage": Object {"backend": Object {"name": String("default")}, "min_storage_bytes": Null, "min_step_storage_bytes": Null, "compression": String("default"), "cache_mib": Null}, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(true), "tracing": Bool(false), "tracing_endpoint_jaeger": String("\\&🕴"), "min_batch_size_records": Number(9821004243620302662), "max_buffering_delay_usecs": Number(409483829293609342), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Number(2119326179280987330), "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Number(7151796549280257912), "storage_class": Null}, "clock_resolution_usecs": Number(9929288629303979490), "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(true), "dev_tweaks": Object {}}, program_code: "ெ:ඍ𑎄2𝒪]₄&&𑶨<𑃠/🏿ꬵஓP", udf_rust: "=ঊോ🈕স?T𐀇ὙὛ*lૐ>X\u{1e00b}(ȺਠE🕴=Ѩ﷏;1", udf_toml: ".5oy�%ᏺj𖮎𐨓,", program_config: Object {"profile": Null, "cache": Bool(true)} }), TransitProgramStatusToSystemError(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(2), "יּ?!\u{113c7}<%\u{1e023},&ꠑÐ𐑠"), GetPipelineById(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000))]
4646
cc 849625f53652fb9e6c8f1438bcc512d6d2f5a8abe49286048fce5fd6a1a71c87
4747
cc 4058ca3fe2e46f2fb8bf26a0f86d8e2036b07b1ddd469b6af343004ae67373c0 # shrinks to [NewOrUpdatePipeline(TenantId(01000000-0000-0000-0000-000000000000), 03000000-0000-0000-0000-000000000000, "pipeline-3", "v0", false, PipelineDescr { name: "pipeline-3", description: "", runtime_config: Object {"workers": Number(0), "storage": Null, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(false), "tracing": Bool(false), "tracing_endpoint_jaeger": String(""), "min_batch_size_records": Number(0), "max_buffering_delay_usecs": Number(0), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Null, "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Null, "storage_class": Null, "service_account_name": Null, "namespace": Null}, "clock_resolution_usecs": Null, "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(false), "http_workers": Null, "io_workers": Null, "dev_tweaks": Object {}, "logging": Null}, program_code: "", udf_rust: "", udf_toml: "", program_config: Object {"profile": String("optimized"), "cache": Bool(false), "runtime_version": Null} }), TransitProgramStatusToCompilingSql(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1)), TransitProgramStatusToSqlCompiled(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1), SqlCompilationInfo { exit_code: 1, messages: [] }, Object {"schema": Object {"inputs": Array [], "outputs": Array []}, "main_rust": String("main-rust-0"), "udf_stubs": String("udf-stubs-0"), "dataflow": Null, "input_connectors": Object {}, "output_connectors": Object {}}), TransitProgramStatusToCompilingRust(TenantId(01000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1)), ClearOngoingRustCompilation("v0"), GetNextRustCompilation("v0")]
48+
cc 4e812fc686724fb39aff5c4f0dbde67e36f477522457ca2b9fa5e5374c9ce61f # shrinks to [NewOrUpdatePipeline(TenantId(03000000-0000-0000-0000-000000000000), 03000000-0000-0000-0000-000000000000, "pipeline-4", "v2", false, PipelineDescr { name: "pipeline-4", description: "", runtime_config: Object {"workers": Number(0), "storage": Null, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(false), "tracing": Bool(false), "tracing_endpoint_jaeger": String(""), "min_batch_size_records": Number(0), "max_buffering_delay_usecs": Number(0), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Null, "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Null, "storage_class": Null, "service_account_name": Null, "namespace": Null}, "clock_resolution_usecs": Null, "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(false), "http_workers": Null, "io_workers": Null, "dev_tweaks": Object {}, "logging": Null, "pipeline_template_configmap": Null}, program_code: "", udf_rust: "", udf_toml: "", program_config: Object {"profile": String("optimized"), "cache": Bool(false), "runtime_version": Null} }), ClearOngoingSqlCompilation("v0"), SetDeploymentResourcesDesiredStatusProvisioned(TenantId(03000000-0000-0000-0000-000000000000), "pipeline-4", Paused), TransitDeploymentResourcesStatusToStopping(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(2), None, None), ListPipelineMonitorEvents(TenantId(03000000-0000-0000-0000-000000000000), "pipeline-4")]
49+
cc db90e416d47dfca52aa161e45eb457083c8b2009dcce37f072d3173889981432 # shrinks to [NewPipeline(TenantId(03000000-0000-0000-0000-000000000000), 01000000-0000-0000-0000-000000000000, "v0", PipelineDescr { name: "pipeline-1", description: "", runtime_config: Object {"workers": Number(0), "hosts": Number(0), "storage": Null, "fault_tolerance": Object {"model": String("none"), "checkpoint_interval_secs": Number(60)}, "cpu_profiler": Bool(false), "tracing": Bool(false), "tracing_endpoint_jaeger": String(""), "min_batch_size_records": Number(0), "max_buffering_delay_usecs": Number(0), "resources": Object {"cpu_cores_min": Null, "cpu_cores_max": Null, "memory_mb_min": Null, "memory_mb_max": Null, "storage_mb_max": Null, "storage_class": Null, "service_account_name": Null, "namespace": Null}, "clock_resolution_usecs": Null, "pin_cpus": Array [], "provisioning_timeout_secs": Null, "max_parallel_connector_init": Null, "init_containers": Null, "checkpoint_during_suspend": Bool(false), "http_workers": Null, "io_workers": Null, "dev_tweaks": Object {}, "logging": Null, "pipeline_template_configmap": Null}, program_code: "", udf_rust: "", udf_toml: "", program_config: Object {"profile": String("optimized"), "cache": Bool(false), "runtime_version": Null} }), UpdatePipeline(TenantId(03000000-0000-0000-0000-000000000000), "pipeline-1", None, None, "v0", false, None, None, None, None, None), ListPipelineMonitorEvents(TenantId(03000000-0000-0000-0000-000000000000), "pipeline-1")]

crates/pipeline-manager/src/api/endpoints/pipeline_management.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod pipeline_events;
2+
13
use crate::api::error::ApiError;
24
use crate::api::examples;
35
use crate::api::main::ServerState;

0 commit comments

Comments
 (0)