Skip to content

Commit 1c8a065

Browse files
snkasryzhyk
authored andcommitted
pipeline-manager: pipeline interaction metrics endpoint
Exposes the retrieval of circuit metrics to the user via `GET /v0/pipelines/{pipeline_name}/metrics`. The output format can be specified via the `format` query parameter, with two formats currently being supported: `prometheus` (default) and `json`. This separate retrieval functionality is needed because metrics are no longer included in the `/stats` endpoint response. This commit does not change the underlying snapshot mechanism which supplies the metrics, and as such does not improve or affect the existing performance overhead it incurs. Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent 1b2f565 commit 1c8a065

File tree

11 files changed

+391
-26
lines changed

11 files changed

+391
-26
lines changed

benchmark/feldera-sql/run.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -443,8 +443,18 @@ def main():
443443
print("Failed to get stats: ", req)
444444
if req.status_code == 400:
445445
break
446-
447446
stats = req.json()
447+
448+
req = requests.get(
449+
f"{api_url}/v0/pipelines/{full_name}/metrics?format=json",
450+
headers=headers,
451+
)
452+
if req.status_code != 200:
453+
print("Failed to get metrics: ", req)
454+
if req.status_code == 400:
455+
break
456+
metrics_json = req.json()
457+
448458
# for input in stats["inputs"]:
449459
# print(input["endpoint_name"], input["metrics"]["end_of_input"])
450460
elapsed = time.time() - start
@@ -458,7 +468,7 @@ def main():
458468
for key, value in global_metrics.items():
459469
metrics_seen.add(key)
460470
metrics_dict[key] = value
461-
for s in stats["metrics"]:
471+
for s in metrics_json:
462472
key = s["key"].replace(".", "_")
463473
value = s["value"]
464474
if "Counter" in value and value["Counter"] is not None:

crates/adapters/src/server/mod.rs

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use actix_web::{
2323
use clap::Parser;
2424
use colored::{ColoredString, Colorize};
2525
use dbsp::{circuit::CircuitConfig, DBSPHandle};
26+
use feldera_types::query_params::{MetricsFormat, MetricsParameters};
2627
use feldera_types::{
2728
config::{default_max_batch_size, TransportConfig},
2829
transport::http::HttpInputConfig,
@@ -586,26 +587,32 @@ async fn stats(state: WebData<ServerState>) -> impl Responder {
586587
}
587588
}
588589

589-
/// Outputs controller metrics in the format expected by Prometheus, as
590-
/// documented at
591-
/// <https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md>.
590+
/// Retrieve circuit metrics.
592591
#[get("/metrics")]
593-
async fn metrics(state: WebData<ServerState>) -> impl Responder {
592+
async fn metrics(
593+
state: WebData<ServerState>,
594+
query_params: web::Query<MetricsParameters>,
595+
) -> impl Responder {
594596
match &*state.controller.lock().unwrap() {
595-
Some(controller) => match state
596-
.prometheus
597-
.read()
598-
.unwrap()
599-
.as_ref()
600-
.unwrap()
601-
.metrics(controller)
602-
{
603-
Ok(metrics) => Ok(HttpResponse::Ok()
604-
.content_type(mime::TEXT_PLAIN)
605-
.body(metrics)),
606-
Err(e) => Err(PipelineError::PrometheusError {
607-
error: e.to_string(),
608-
}),
597+
Some(controller) => match &query_params.format {
598+
MetricsFormat::Prometheus => {
599+
match state
600+
.prometheus
601+
.read()
602+
.unwrap()
603+
.as_ref()
604+
.unwrap()
605+
.metrics(controller)
606+
{
607+
Ok(metrics) => Ok(HttpResponse::Ok()
608+
.content_type(mime::TEXT_PLAIN)
609+
.body(metrics)),
610+
Err(e) => Err(PipelineError::PrometheusError {
611+
error: e.to_string(),
612+
}),
613+
}
614+
}
615+
MetricsFormat::Json => Ok(HttpResponse::Ok().json(&controller.status().metrics)),
609616
},
610617
None => Err(missing_controller_error(&state)),
611618
}

crates/feldera-types/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod error;
33
pub mod format;
44
pub mod program_schema;
55
pub mod query;
6+
pub mod query_params;
67
pub mod secret_ref;
78
pub mod serde_with_context;
89
pub mod transport;
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
//! Types for the query parameters of the pipeline endpoints.
2+
3+
use serde::Deserialize;
4+
use utoipa::{IntoParams, ToSchema};
5+
6+
/// Circuit metrics output format.
7+
/// - `prometheus`: format expected by Prometheus, as documented at:
8+
/// <https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md>
9+
/// - `json`: JSON format
10+
#[derive(Debug, Deserialize, ToSchema)]
11+
#[serde(rename_all = "snake_case")]
12+
pub enum MetricsFormat {
13+
Prometheus,
14+
Json,
15+
}
16+
17+
/// Returns default metrics format.
18+
fn default_metrics_format() -> MetricsFormat {
19+
MetricsFormat::Prometheus
20+
}
21+
22+
/// Query parameters to retrieve pipeline circuit metrics.
23+
#[derive(Debug, Deserialize, IntoParams, ToSchema)]
24+
pub struct MetricsParameters {
25+
#[serde(default = "default_metrics_format")]
26+
pub format: MetricsFormat,
27+
}

crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use actix_web::{
1616
HttpRequest, HttpResponse,
1717
};
1818
use feldera_types::program_schema::SqlIdentifier;
19+
use feldera_types::query_params::MetricsParameters;
1920
use log::{debug, info};
2021
use std::time::Duration;
2122

@@ -503,7 +504,7 @@ pub(crate) async fn get_pipeline_logs(
503504
.await
504505
}
505506

506-
/// Retrieve statistics (e.g., metrics, performance counters) of a running or paused pipeline.
507+
/// Retrieve statistics (e.g., performance counters) of a running or paused pipeline.
507508
#[utoipa::path(
508509
context_path = "/v0",
509510
security(("JSON web token (JWT) or API key" = [])),
@@ -514,7 +515,7 @@ pub(crate) async fn get_pipeline_logs(
514515
// TODO: implement `ToSchema` for `ControllerStatus`, which is the
515516
// actual type returned by this endpoint and move it to feldera-types.
516517
(status = OK
517-
, description = "Pipeline metrics retrieved successfully"
518+
, description = "Pipeline statistics retrieved successfully"
518519
, body = Object),
519520
(status = NOT_FOUND
520521
, description = "Pipeline with that name does not exist"
@@ -556,6 +557,59 @@ pub(crate) async fn get_pipeline_stats(
556557
.await
557558
}
558559

560+
/// Retrieve circuit metrics of a running or paused pipeline.
561+
#[utoipa::path(
562+
context_path = "/v0",
563+
security(("JSON web token (JWT) or API key" = [])),
564+
params(
565+
("pipeline_name" = String, Path, description = "Unique pipeline name"),
566+
MetricsParameters
567+
),
568+
responses(
569+
(status = OK
570+
, description = "Pipeline circuit metrics retrieved successfully"
571+
, body = Object),
572+
(status = NOT_FOUND
573+
, description = "Pipeline with that name does not exist"
574+
, body = ErrorResponse
575+
, example = json!(examples::error_unknown_pipeline_name())),
576+
(status = SERVICE_UNAVAILABLE
577+
, body = ErrorResponse
578+
, examples(
579+
("Pipeline is not deployed" = (value = json!(examples::error_pipeline_interaction_not_deployed()))),
580+
("Pipeline is currently unavailable" = (value = json!(examples::error_pipeline_interaction_currently_unavailable()))),
581+
("Disconnected during response" = (value = json!(examples::error_pipeline_interaction_disconnected()))),
582+
("Response timeout" = (value = json!(examples::error_pipeline_interaction_timeout())))
583+
)
584+
),
585+
(status = INTERNAL_SERVER_ERROR, body = ErrorResponse),
586+
),
587+
tag = "Pipeline interaction"
588+
)]
589+
#[get("/pipelines/{pipeline_name}/metrics")]
590+
pub(crate) async fn get_pipeline_metrics(
591+
state: WebData<ServerState>,
592+
client: WebData<awc::Client>,
593+
tenant_id: ReqData<TenantId>,
594+
path: web::Path<String>,
595+
_query: web::Query<MetricsParameters>,
596+
request: HttpRequest,
597+
) -> Result<HttpResponse, ManagerError> {
598+
let pipeline_name = path.into_inner();
599+
state
600+
.runner
601+
.forward_http_request_to_pipeline_by_name(
602+
client.as_ref(),
603+
*tenant_id,
604+
&pipeline_name,
605+
Method::GET,
606+
"metrics",
607+
request.query_string(),
608+
None,
609+
)
610+
.await
611+
}
612+
559613
/// Retrieve the circuit performance profile of a running or paused pipeline.
560614
#[utoipa::path(
561615
context_path = "/v0",

crates/pipeline-manager/src/api/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ only the program-related core fields, and is used by the compiler to discern whe
8080
endpoints::pipeline_interaction::get_pipeline_output_connector_status,
8181
endpoints::pipeline_interaction::get_pipeline_logs,
8282
endpoints::pipeline_interaction::get_pipeline_stats,
83+
endpoints::pipeline_interaction::get_pipeline_metrics,
8384
endpoints::pipeline_interaction::get_pipeline_circuit_profile,
8485
endpoints::pipeline_interaction::get_pipeline_heap_profile,
8586
endpoints::pipeline_interaction::pipeline_adhoc_sql,
@@ -205,6 +206,8 @@ only the program-related core fields, and is used by the compiler to discern whe
205206
feldera_types::program_schema::SourcePosition,
206207
feldera_types::program_schema::PropertyValue,
207208
feldera_types::program_schema::SqlIdentifier,
209+
feldera_types::query_params::MetricsFormat,
210+
feldera_types::query_params::MetricsParameters,
208211
feldera_types::error::ErrorResponse,
209212
),),
210213
tags(
@@ -254,6 +257,7 @@ fn api_scope() -> Scope {
254257
.service(endpoints::pipeline_interaction::get_pipeline_output_connector_status)
255258
.service(endpoints::pipeline_interaction::get_pipeline_logs)
256259
.service(endpoints::pipeline_interaction::get_pipeline_stats)
260+
.service(endpoints::pipeline_interaction::get_pipeline_metrics)
257261
.service(endpoints::pipeline_interaction::get_pipeline_circuit_profile)
258262
.service(endpoints::pipeline_interaction::get_pipeline_heap_profile)
259263
.service(endpoints::pipeline_interaction::pipeline_adhoc_sql)

crates/pipeline-manager/tests/integration_test.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3129,3 +3129,43 @@ async fn refresh_version() {
31293129
let value: Value = response.json().await.unwrap();
31303130
assert_eq!(value["refresh_version"], json!(4));
31313131
}
3132+
3133+
/// Tests that circuit metrics can be retrieved from the pipeline.
3134+
#[actix_web::test]
3135+
#[serial]
3136+
async fn pipeline_metrics() {
3137+
let config = setup().await;
3138+
create_and_deploy_test_pipeline(&config, "").await;
3139+
3140+
// Retrieve metrics in default format
3141+
let mut response = config.get("/v0/pipelines/test/metrics").await;
3142+
assert_eq!(response.status(), StatusCode::OK);
3143+
let body = response.body().await.unwrap();
3144+
let metrics_default = std::str::from_utf8(&body).unwrap();
3145+
3146+
// Retrieve metrics in Prometheus format
3147+
let mut response = config
3148+
.get("/v0/pipelines/test/metrics?format=prometheus")
3149+
.await;
3150+
assert_eq!(response.status(), StatusCode::OK);
3151+
let body = response.body().await.unwrap();
3152+
let metrics_prometheus = std::str::from_utf8(&body).unwrap();
3153+
3154+
// Retrieve metrics in JSON format
3155+
let mut response = config.get("/v0/pipelines/test/metrics?format=json").await;
3156+
assert_eq!(response.status(), StatusCode::OK);
3157+
let body = response.body().await.unwrap();
3158+
let metrics_json = std::str::from_utf8(&body).unwrap();
3159+
let _metrics_json_value: Value = serde_json::from_str(metrics_json).unwrap();
3160+
3161+
// Unknown format
3162+
let response = config
3163+
.get("/v0/pipelines/test/metrics?format=does-not-exist")
3164+
.await;
3165+
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
3166+
3167+
// Minimally check formats
3168+
assert!(metrics_default.contains("# TYPE total_processed_records gauge"));
3169+
assert!(metrics_prometheus.contains("# TYPE total_processed_records gauge"));
3170+
assert!(metrics_json.contains("\"key\":\"total_input_records\""));
3171+
}

0 commit comments

Comments
 (0)