Skip to content

Commit afbc160

Browse files
committed
[adapters] Add metrics and statistics for pipeline backpressure stalls.
I noticed these showing up in the log and immediately wanted them to show up elsewhere, since they're important. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent c4b5c59 commit afbc160

File tree

6 files changed

+75
-3
lines changed

6 files changed

+75
-3
lines changed

crates/adapters/src/controller.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1230,6 +1230,16 @@ impl Controller {
12301230
labels,
12311231
COMPACTION_STALL_TIME_NANOSECONDS.load(Ordering::Relaxed) as f64 / 1_000_000_000.0,
12321232
);
1233+
metrics.counter(
1234+
"output_stall_seconds_total",
1235+
"Time in seconds that the pipeline was stalled because one or more output connectors' output buffers were full.\n\nThis value is greater than or equal to `output_stall_seconds`.",
1236+
labels,
1237+
status.global_metrics.total_output_stall_duration().as_secs_f64());
1238+
metrics.gauge(
1239+
"output_stall_seconds",
1240+
"If the pipeline is currently stalled because one or more output connectors' output buffers were full, this is the time in seconds for which it has been stalled.\n\nIf the pipeline is not currently stalled, this is zero.\n\nIf this is nonzero, then the output connectors causing the stall can be identified by observing which values of `output_connector_queued_records` are greater than or equal to the configured maximum (which defaults to 1,000,000).",
1241+
labels,
1242+
status.global_metrics.current_output_stall_duration().as_secs_f64());
12331243
metrics.counter(
12341244
"files_created_total",
12351245
"Total number of files created.",
@@ -2440,7 +2450,12 @@ impl CircuitThread {
24402450

24412451
// Backpressure in the output pipeline: wait for room in output buffers to
24422452
// become available.
2443-
if self.controller.output_buffers_full() {
2453+
let stalled = self.controller.output_buffers_full();
2454+
self.controller
2455+
.status
2456+
.global_metrics
2457+
.update_output_stall_start(stalled);
2458+
if stalled {
24442459
debug!("circuit thread: park waiting for output buffer space");
24452460
let warning = output_backpressure_warning
24462461
.get_or_insert_with(|| LongOperationWarning::new(Duration::from_secs(1)));

crates/adapters/src/controller/stats.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ use std::{
7474
Mutex,
7575
atomic::{AtomicBool, AtomicU64, Ordering},
7676
},
77-
time::Duration,
77+
time::{Duration, Instant},
7878
};
7979
use tokio::sync::{broadcast, watch};
8080
use tracing::{debug, error, info, warn};
@@ -237,6 +237,14 @@ pub struct GlobalControllerMetrics {
237237
/// all outputs derived from it have been processed by all output connectors.
238238
pub total_completed_records: AtomicU64,
239239

240+
/// If the pipeline is stalled because one or more output connectors' output
241+
/// buffers are full, this is the time at which the stall began.
242+
pub output_stall_start: Mutex<Option<Instant>>,
243+
244+
/// The amount of time the pipeline has stalled due to output connectors,
245+
/// excluding any current stall in `output_stall_start`.
246+
pub accumulated_output_stall: Mutex<Duration>,
247+
240248
/// Number of steps that have been initiated.
241249
///
242250
/// # Interpretation
@@ -297,6 +305,8 @@ impl GlobalControllerMetrics {
297305
total_processed_records: AtomicU64::new(processed_records),
298306
total_processed_bytes: AtomicU64::new(0),
299307
total_completed_records: AtomicU64::new(processed_records),
308+
output_stall_start: Mutex::new(None),
309+
accumulated_output_stall: Mutex::new(Duration::ZERO),
300310
step_requested: AtomicBool::new(false),
301311
total_initiated_steps: Atomic::new(0),
302312
total_completed_steps: Atomic::new(0),
@@ -395,6 +405,28 @@ impl GlobalControllerMetrics {
395405
pub fn set_commit_progress(&self, commit_progress: Option<CommitProgressSummary>) {
396406
*self.commit_progress.lock().unwrap() = commit_progress;
397407
}
408+
409+
pub fn update_output_stall_start(&self, stalled: bool) {
410+
let mut output_stall_start = self.output_stall_start.lock().unwrap();
411+
if stalled != output_stall_start.is_some() {
412+
if let Some(start) = &*output_stall_start {
413+
*self.accumulated_output_stall.lock().unwrap() += start.elapsed();
414+
}
415+
*output_stall_start = stalled.then(Instant::now);
416+
}
417+
}
418+
419+
pub fn current_output_stall_duration(&self) -> Duration {
420+
self.output_stall_start
421+
.lock()
422+
.unwrap()
423+
.map(|instant| instant.elapsed())
424+
.unwrap_or_default()
425+
}
426+
427+
pub fn total_output_stall_duration(&self) -> Duration {
428+
self.current_output_stall_duration() + *self.accumulated_output_stall.lock().unwrap()
429+
}
398430
}
399431

400432
// `ShardedLock` is a read/write lock optimized for fast reads.
@@ -1201,6 +1233,12 @@ impl ControllerStatus {
12011233
.global_metrics
12021234
.total_completed_records
12031235
.load(Ordering::Acquire),
1236+
output_stall_msecs: self
1237+
.global_metrics
1238+
.current_output_stall_duration()
1239+
.as_millis()
1240+
.try_into()
1241+
.unwrap_or(u64::MAX),
12041242
total_initiated_steps: self.global_metrics.total_initiated_steps(),
12051243
total_completed_steps: self.global_metrics.total_completed_steps(),
12061244
pipeline_complete,

crates/feldera-types/src/adapter_stats.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,16 @@ pub struct ExternalGlobalControllerMetrics {
283283
pub total_processed_bytes: u64,
284284
/// Total number of input records processed to completion.
285285
pub total_completed_records: u64,
286+
/// If the pipeline is stalled because one or more output connectors' output
287+
/// buffers are full, this is the number of milliseconds that the current
288+
/// stall has lasted.
289+
///
290+
/// If this is nonzero, then the output connectors causing the stall can be
291+
/// identified by noticing `ExternalOutputEndpointMetrics::queued_records`
292+
/// is greater than or equal to `ConnectorConfig::max_queued_records`.
293+
///
294+
/// In the ordinary case, the pipeline is not stalled, and this value is 0.
295+
pub output_stall_msecs: u64,
286296
/// Number of steps that have been initiated.
287297
///
288298
/// # Interpretation

docs.feldera.com/docs/operations/metrics.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ which Feldera is built.
7171
| <a name='dbsp_runtime_elapsed_seconds_total'>`dbsp_runtime_elapsed_seconds_total`</a> |counter | Time elapsed while the pipeline is executing a step, multiplied by the number of foreground and background threads, in seconds. |
7272
| <a name='dbsp_step_latency_seconds'>`dbsp_step_latency_seconds`</a> |histogram | Latency of DBSP steps over the last 60 seconds or 1000 steps, whichever is less, in seconds |
7373
| <a name='dbsp_steps_total'>`dbsp_steps_total`</a> |counter | Total number of DBSP steps executed. |
74+
| <a name='output_stall_seconds'>`output_stall_seconds`</a> |gauge | If the pipeline is currently stalled because one or more output connectors' output buffers were full, this is the time in seconds for which it has been stalled.<br/><br/>If the pipeline is not currently stalled, this is zero.<br/><br/>If this is nonzero, then the output connectors causing the stall can be identified by observing which values of `output_connector_queued_records` are greater than or equal to the configured maximum (which defaults to 1,000,000). |
75+
| <a name='output_stall_seconds_total'>`output_stall_seconds_total`</a> |counter | Time in seconds that the pipeline was stalled because one or more output connectors' output buffers were full.<br/><br/>This value is greater than or equal to `output_stall_seconds`. |
7476

7577
## Record Processing
7678

docs.feldera.com/docs/operations/metrics.md.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ which Feldera is built.
5656

5757
[DBSP]: https://docs.rs/dbsp/latest/dbsp/
5858

59-
{{dbsp_|compaction_}}
59+
{{dbsp_|compaction_|output_stall_}}
6060

6161
## Record Processing
6262

openapi.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8266,6 +8266,7 @@
82668266
"total_processed_records",
82678267
"total_processed_bytes",
82688268
"total_completed_records",
8269+
"output_stall_msecs",
82698270
"total_initiated_steps",
82708271
"total_completed_steps",
82718272
"pipeline_complete"
@@ -8312,6 +8313,12 @@
83128313
"description": "Time at which the pipeline process from which we resumed started, in seconds since the epoch.",
83138314
"minimum": 0
83148315
},
8316+
"output_stall_msecs": {
8317+
"type": "integer",
8318+
"format": "int64",
8319+
"description": "If the pipeline is stalled because one or more output connectors' output\nbuffers are full, this is the number of milliseconds that the current\nstall has lasted.\n\nIf this is nonzero, then the output connectors causing the stall can be\nidentified by noticing `ExternalOutputEndpointMetrics::queued_records`\nis greater than or equal to `ConnectorConfig::max_queued_records`.\n\nIn the ordinary case, the pipeline is not stalled, and this value is 0.",
8320+
"minimum": 0
8321+
},
83158322
"pipeline_complete": {
83168323
"type": "boolean",
83178324
"description": "True if the pipeline has processed all input data to completion."

0 commit comments

Comments
 (0)