Skip to content

Commit d8b6c57

Browse files
committed
[adapters] Add metrics for additional memory used by connectors.
Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent a89983d commit d8b6c57

9 files changed

Lines changed: 125 additions & 24 deletions

File tree

crates/adapterlib/src/format.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,14 @@ pub trait OutputConsumer: Send {
302302
num_records: usize,
303303
);
304304
fn batch_end(&mut self);
305+
306+
/// Returns the approximate amount of memory used by the connector's
307+
/// underlying implementation. For the Kafka connectors, for example, this
308+
/// is the amount of memory used by librdkafka. Not all connectors use a
309+
/// substantial amount of memory, so the default implementation returns 0.
310+
fn memory(&self) -> usize {
311+
0
312+
}
305313
}
306314

307315
/// The largest weight of a record that can be output using

crates/adapterlib/src/transport.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,14 @@ pub trait InputReader: Send + Sync {
620620
fn disconnect(&self) {
621621
self.request(InputReaderCommand::Disconnect);
622622
}
623+
624+
/// Returns the approximate amount of memory used by the connector's
625+
/// underlying implementation. For the Kafka connectors, for example, this
626+
/// is the amount of memory used by librdkafka. Not all connectors use a
627+
/// substantial amount of memory, so the default implementation returns 0.
628+
fn memory(&self) -> usize {
629+
0
630+
}
623631
}
624632

625633
/// Position in an input stream, including the timestamp when the data was ingested
@@ -991,6 +999,14 @@ pub trait OutputEndpoint: Send {
991999

9921000
/// Whether this endpoint is [fault tolerant](crate#fault-tolerance).
9931001
fn is_fault_tolerant(&self) -> bool;
1002+
1003+
/// Returns the approximate amount of memory used by the connector's
1004+
/// underlying implementation. For the Kafka connectors, for example, this
1005+
/// is the amount of memory used by librdkafka. Not all connectors use a
1006+
/// substantial amount of memory, so the default implementation returns 0.
1007+
fn memory(&self) -> usize {
1008+
0
1009+
}
9941010
}
9951011

9961012
/// An [UnboundedReceiver] wrapper for [InputReaderCommand] for fault-tolerant connectors.

crates/adapters/src/controller.rs

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::controller::sync::{
3232
};
3333
use crate::samply::SamplySpan;
3434
use crate::server::metrics::{
35-
HistogramDiv, LabelStack, MetricsFormatter, MetricsWriter, ValueType,
35+
HistogramDiv, LabelStack, MetricsFormatter, MetricsWriter, Value, ValueType,
3636
};
3737
use crate::server::{InitializationState, ServerState};
3838
use crate::transport::clock::now_endpoint_config;
@@ -1172,7 +1172,7 @@ impl Controller {
11721172
&CHECKPOINT_SYNC_PULL_FAILURES,
11731173
);
11741174

1175-
fn write_input_metric<F, M>(
1175+
fn write_input_metric<F, M, T>(
11761176
metrics: &mut MetricsWriter<F>,
11771177
labels: &LabelStack,
11781178
status: &ControllerStatus,
@@ -1182,14 +1182,12 @@ impl Controller {
11821182
func: M,
11831183
) where
11841184
F: MetricsFormatter,
1185-
M: Fn(&InputEndpointMetrics) -> &AtomicU64,
1185+
M: Fn(&InputEndpointStatus) -> T,
1186+
T: Value,
11861187
{
11871188
metrics.values(name, help, value_type, |w| {
11881189
for input in status.input_status().values() {
1189-
w.write_value(
1190-
&labels.with("endpoint", &input.endpoint_name),
1191-
func(&input.metrics),
1192-
);
1190+
w.write_value(&labels.with("endpoint", &input.endpoint_name), func(input));
11931191
}
11941192
});
11951193
}
@@ -1200,7 +1198,7 @@ impl Controller {
12001198
"input_connector_bytes_total",
12011199
"Total number of bytes received by an input connector.",
12021200
ValueType::Counter,
1203-
|m| &m.total_bytes,
1201+
|s| s.metrics.total_bytes.load(Ordering::Relaxed),
12041202
);
12051203
write_input_metric(
12061204
metrics,
@@ -1209,7 +1207,7 @@ impl Controller {
12091207
"input_connector_records_total",
12101208
"Total number of records received by an input connector.",
12111209
ValueType::Counter,
1212-
|m| &m.total_records,
1210+
|m| m.metrics.total_records.load(Ordering::Relaxed),
12131211
);
12141212
write_input_metric(
12151213
metrics,
@@ -1218,7 +1216,7 @@ impl Controller {
12181216
"input_connector_buffered_records",
12191217
"Amount of data currently buffered by an input connector, in records.",
12201218
ValueType::Gauge,
1221-
|m| &m.buffered_records,
1219+
|s| s.metrics.buffered_records.load(Ordering::Relaxed),
12221220
);
12231221
write_input_metric(
12241222
metrics,
@@ -1227,7 +1225,7 @@ impl Controller {
12271225
"input_connector_buffered_records_bytes",
12281226
"Amount of data currently buffered by an input connector, in bytes.",
12291227
ValueType::Gauge,
1230-
|m| &m.buffered_bytes,
1228+
|s| s.metrics.buffered_bytes.load(Ordering::Relaxed),
12311229
);
12321230
write_input_metric(
12331231
metrics,
@@ -1236,7 +1234,7 @@ impl Controller {
12361234
"input_connector_errors_transport_total",
12371235
"Total number of errors encountered by the input connector at the transport layer.",
12381236
ValueType::Counter,
1239-
|m| &m.num_transport_errors,
1237+
|s| s.metrics.num_transport_errors.load(Ordering::Relaxed),
12401238
);
12411239
write_input_metric(
12421240
metrics,
@@ -1245,7 +1243,16 @@ impl Controller {
12451243
"input_connector_errors_parse_total",
12461244
"Total number of errors encountered parsing records received by the input connector.",
12471245
ValueType::Counter,
1248-
|m| &m.num_parse_errors,
1246+
|s| s.metrics.num_parse_errors.load(Ordering::Relaxed),
1247+
);
1248+
write_input_metric(
1249+
metrics,
1250+
labels,
1251+
status,
1252+
"input_connector_extra_memory_bytes",
1253+
"Additional memory used by an input connector beyond that used for buffered records.",
1254+
ValueType::Gauge,
1255+
|s| s.reader.as_ref().map_or(0, |reader| reader.memory()),
12491256
);
12501257

12511258
fn write_input_histogram<F, M>(
@@ -1378,6 +1385,15 @@ impl Controller {
13781385
ValueType::Counter,
13791386
|m| &m.num_encode_errors,
13801387
);
1388+
write_output_metric(
1389+
metrics,
1390+
labels,
1391+
status,
1392+
"output_connector_extra_memory_bytes",
1393+
"Additional memory used by an output connector beyond that used for buffered records.",
1394+
ValueType::Gauge,
1395+
|m| &m.memory,
1396+
);
13811397
}
13821398

13831399
/// Execute a SQL query over materialized tables and views;
@@ -4649,6 +4665,9 @@ impl ControllerInner {
46494665
return;
46504666
}
46514667

4668+
controller
4669+
.status
4670+
.update_output_memory(endpoint_id, encoder.consumer().memory());
46524671
if output_buffer.flush_needed(&output_buffer_config) {
46534672
// One of the triggering conditions for flushing the output buffer is satisfied:
46544673
// go ahead and flush the buffer; we will check for more messages at the next iteration
@@ -5535,6 +5554,10 @@ impl OutputConsumer for OutputProbe {
55355554
);
55365555
})
55375556
}
5557+
5558+
fn memory(&self) -> usize {
5559+
self.endpoint.memory()
5560+
}
55385561
}
55395562

55405563
/// An in-progress checkpoint.

crates/adapters/src/controller/stats.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,15 @@ impl ControllerStatus {
10041004
self.update_total_completed_records();
10051005
}
10061006

1007+
pub fn update_output_memory(&self, endpoint_id: EndpointId, memory: usize) {
1008+
if let Some(endpoint_stats) = self.output_status().get(&endpoint_id) {
1009+
endpoint_stats
1010+
.metrics
1011+
.memory
1012+
.store(memory as u64, Ordering::Release);
1013+
}
1014+
}
1015+
10071016
pub fn output_buffered_batches(&self, endpoint_id: EndpointId, total_processed_records: u64) {
10081017
if let Some(endpoint_stats) = self.output_status().get(&endpoint_id) {
10091018
endpoint_stats.output_buffered_batches(total_processed_records);
@@ -1952,6 +1961,10 @@ pub struct OutputEndpointMetrics {
19521961
/// of this endpoint is equal to the output of the circuit after
19531962
/// processing `total_processed_input_records` records.
19541963
pub total_processed_input_records: AtomicU64,
1964+
1965+
/// Extra memory in use beyond that used for queuing records. Not all
1966+
/// output connectors report this.
1967+
pub memory: AtomicU64,
19551968
}
19561969

19571970
impl OutputEndpointMetrics {
@@ -1970,6 +1983,7 @@ impl OutputEndpointMetrics {
19701983
num_encode_errors: AtomicU64::new(initial_statistics.num_encode_errors),
19711984
num_transport_errors: AtomicU64::new(initial_statistics.num_transport_errors),
19721985
total_processed_input_records: AtomicU64::new(total_processed_input_records),
1986+
memory: AtomicU64::new(0),
19731987
}
19741988
}
19751989

@@ -1988,6 +2002,7 @@ impl OutputEndpointMetrics {
19882002
total_processed_input_records: this
19892003
.total_processed_input_records
19902004
.load(Ordering::Relaxed),
2005+
memory: this.memory.load(Ordering::Relaxed),
19912006
}
19922007
}
19932008

@@ -2020,6 +2035,7 @@ pub struct OutputEndpointMetricsSnapshot {
20202035
pub num_encode_errors: u64,
20212036
pub num_transport_errors: u64,
20222037
pub total_processed_input_records: u64,
2038+
pub memory: u64,
20232039
}
20242040

20252041
/// Output endpoint status information.

crates/adapters/src/transport/kafka.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -370,38 +370,47 @@ struct MemoryUseReporter {
370370
/// will take some time to reach what we hope is a steady state.
371371
start: Instant,
372372

373-
/// The memory use we last reported and when we reported it.
373+
/// The most recently measured memory use, in bytes.
374+
current: u64,
375+
376+
/// The peak memory use we last reported, in bytes, and when we reported it.
374377
///
375378
/// This is a peak value: we only ever report a new value when the usage
376379
/// increases substantially from the previously reported value.
377-
last_report: Option<(Instant, u64)>,
380+
peak: Option<(Instant, u64)>,
378381
}
379382

380383
impl MemoryUseReporter {
381384
fn new() -> Self {
382385
Self {
383386
start: Instant::now(),
384-
last_report: None,
387+
current: 0,
388+
peak: None,
385389
}
386390
}
391+
/// The most recent measured memory use in bytes.
392+
fn current(&self) -> usize {
393+
self.current as usize
394+
}
387395
fn update(&mut self, statistics: &Statistics) {
388396
/// Minimum time before first report.
389397
const REPORT_DELAY: Duration = Duration::from_secs(60);
390398

391399
/// Minimum amount of memory to report on.
392400
const MIN_MEMORY: u64 = 1024 * 1024;
393401

394-
if self.start.elapsed() < REPORT_DELAY {
395-
return;
396-
}
397-
398402
let mut memory = 0;
399403
for topic in statistics.topics.values() {
400404
for partition in topic.partitions.values() {
401405
memory += partition.msgq_bytes + partition.xmit_msgq_bytes + partition.fetchq_size;
402406
}
403407
}
404-
match &self.last_report {
408+
self.current = memory;
409+
if self.start.elapsed() < REPORT_DELAY {
410+
return;
411+
}
412+
413+
match &self.peak {
405414
None if memory > MIN_MEMORY => {
406415
info!(
407416
"Buffered {} after {} seconds",
@@ -420,6 +429,6 @@ impl MemoryUseReporter {
420429
}
421430
_ => return,
422431
}
423-
self.last_report = Some((Instant::now(), memory));
432+
self.peak = Some((Instant::now(), memory));
424433
}
425434
}

crates/adapters/src/transport/kafka/ft/input.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl KafkaFtInputEndpoint {
7474
}
7575

7676
struct KafkaFtInputReader {
77-
_inner: Arc<KafkaFtInputReaderInner>,
77+
inner: Arc<KafkaFtInputReaderInner>,
7878
command_sender: UnboundedSender<InputReaderCommand>,
7979
poller_thread: Thread,
8080
}
@@ -757,7 +757,7 @@ impl KafkaFtInputReader {
757757
.expect("failed to spawn Kafka input poller thread");
758758
let poller_thread = poller_handle.thread().clone();
759759
Ok(KafkaFtInputReader {
760-
_inner: inner,
760+
inner,
761761
command_sender,
762762
poller_thread,
763763
})
@@ -795,6 +795,15 @@ impl InputReader for KafkaFtInputReader {
795795
fn is_closed(&self) -> bool {
796796
self.command_sender.is_closed()
797797
}
798+
fn memory(&self) -> usize {
799+
self.inner
800+
.kafka_consumer
801+
.context()
802+
.memory_use_reporter
803+
.lock()
804+
.unwrap()
805+
.current()
806+
}
798807
}
799808

800809
impl Drop for KafkaFtInputReader {

crates/adapters/src/transport/kafka/ft/output.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,15 @@ impl OutputEndpoint for KafkaOutputEndpoint {
321321
fn is_fault_tolerant(&self) -> bool {
322322
true
323323
}
324+
325+
fn memory(&self) -> usize {
326+
self.kafka_producer
327+
.context()
328+
.memory_use_reporter
329+
.lock()
330+
.unwrap()
331+
.current()
332+
}
324333
}
325334

326335
struct DataProducerContext {

crates/adapters/src/transport/kafka/nonft/output.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,15 @@ impl OutputEndpoint for KafkaOutputEndpoint {
250250
fn is_fault_tolerant(&self) -> bool {
251251
false
252252
}
253+
254+
fn memory(&self) -> usize {
255+
self.kafka_producer
256+
.context()
257+
.memory_use_reporter
258+
.lock()
259+
.unwrap()
260+
.current()
261+
}
253262
}
254263

255264
#[cfg(test)]

docs.feldera.com/docs/operations/metrics.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ invisible to users unless a pause or checkpoint happens mid-batch.
139139
| <a name='input_connector_completion_latency_seconds'>`input_connector_completion_latency_seconds`</a> |histogram | Time between when the connector receives new data and when the pipeline processes this data, computes output updates, and sends these updates to all output connectors, over the last 600 seconds or 10,000 samples. |
140140
| <a name='input_connector_errors_parse_total'>`input_connector_errors_parse_total`</a> |counter | Total number of errors encountered parsing records received by the input connector. |
141141
| <a name='input_connector_errors_transport_total'>`input_connector_errors_transport_total`</a> |counter | Total number of errors encountered by the input connector at the transport layer. |
142+
| <a name='input_connector_extra_memory_bytes'>`input_connector_extra_memory_bytes`</a> |gauge | Additional memory used by an input connector beyond that used for buffered records. |
142143
| <a name='input_connector_processing_latency_seconds'>`input_connector_processing_latency_seconds`</a> |histogram | Time between when the connector receives new data and when the pipeline processes this data and computes output updates, over the last 600 seconds or 10,000 samples. |
143144
| <a name='input_connector_records_total'>`input_connector_records_total`</a> |counter | Total number of records received by an input connector. |
144145

@@ -158,5 +159,6 @@ These metrics accumulate across checkpoint and resume.
158159
| <a name='output_connector_bytes_total'>`output_connector_bytes_total`</a> |counter | Total number of bytes of records sent by the output connector. |
159160
| <a name='output_connector_errors_encode_total'>`output_connector_errors_encode_total`</a> |counter | Total number of errors encountered encoding records to send. |
160161
| <a name='output_connector_errors_transport_total'>`output_connector_errors_transport_total`</a> |counter | Total number of errors encountered at the transport layer sending records. |
162+
| <a name='output_connector_extra_memory_bytes'>`output_connector_extra_memory_bytes`</a> |gauge | Additional memory used by an output connector beyond that used for buffered records. |
161163
| <a name='output_connector_records_total'>`output_connector_records_total`</a> |counter | Total number of records sent by the output connector. |
162164

0 commit comments

Comments
 (0)