Skip to content

Commit 56264a0

Browse files
committed
DPL: send more metrics only once every 5 seconds
In real world scenarios we cannot afford sending metrics every message. This moves more to the relayed (5 seconds) flush periods.
1 parent 88f21d8 commit 56264a0

3 files changed

Lines changed: 43 additions & 37 deletions

File tree

Framework/Core/include/Framework/DataProcessingStats.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,20 @@ namespace o2
1717
namespace framework
1818
{
1919

20+
/// Helper struct to hold statistics about the data processing happening.
2021
struct DataProcessingStats {
22+
// We use this to keep track of the latency of the first message we get for a given input record
23+
// and of the last one.
24+
struct InputLatency {
25+
int minLatency;
26+
int maxLatency;
27+
};
2128
int pendingInputs;
2229
int incomplete;
2330
int inputParts;
31+
int lastElapsedTimeMs;
32+
int lastTotalProcessedSize;
33+
InputLatency lastLatency;
2434
};
2535

2636
} // namespace framework

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ bool DataProcessingDevice::ConditionalRun()
148148
if (currentTime - lastSent < 5000) {
149149
return;
150150
}
151+
152+
O2_SIGNPOST_START(MonitoringStatus::ID, MonitoringStatus::SEND, 0, 0, O2_SIGNPOST_BLUE);
151153
monitoring.send({ (int)relayerStats.malformedInputs, "dpl/malformed_inputs" });
152154
monitoring.send({ (int)relayerStats.droppedComputations, "dpl/dropped_computations" });
153155
monitoring.send({ (int)relayerStats.droppedIncomingMessages, "dpl/dropped_incoming_messages" });
@@ -156,8 +158,15 @@ bool DataProcessingDevice::ConditionalRun()
156158
monitoring.send({ (int) stats.pendingInputs, "inputs/relayed/pending" });
157159
monitoring.send({ (int) stats.incomplete, "inputs/relayed/incomplete" });
158160
monitoring.send({ (int) stats.inputParts, "inputs/relayed/total" });
161+
monitoring.send({ stats.lastElapsedTimeMs, "dpl/elapsed_time_ms" });
162+
monitoring.send({ stats.lastTotalProcessedSize, "dpl/processed_input_size_bytes" });
163+
monitoring.send({ (stats.lastTotalProcessedSize / (stats.lastElapsedTimeMs ? stats.lastElapsedTimeMs : 1) / 1000), "dpl/processing_rate_mb_s" });
164+
monitoring.send({ stats.lastLatency.minLatency, "dpl/min_input_latency_ms" });
165+
monitoring.send({ stats.lastLatency.maxLatency, "dpl/max_input_latency_ms" });
166+
monitoring.send({ (stats.lastTotalProcessedSize / (stats.lastLatency.maxLatency ? stats.lastLatency.maxLatency : 1) / 1000), "dpl/input_rate_mb_s" });
159167

160168
lastSent = currentTime;
169+
O2_SIGNPOST_END(MonitoringStatus::ID, MonitoringStatus::SEND, 0, 0, O2_SIGNPOST_BLUE);
161170
};
162171

163172
/// This will flush metrics only once every second.
@@ -169,8 +178,11 @@ bool DataProcessingDevice::ConditionalRun()
169178
if (currentTime - lastFlushed < 1000) {
170179
return;
171180
}
181+
182+
O2_SIGNPOST_START(MonitoringStatus::ID, MonitoringStatus::FLUSH, 0, 0, O2_SIGNPOST_RED);
172183
monitoring.flushBuffer();
173184
lastFlushed = currentTime;
185+
O2_SIGNPOST_END(MonitoringStatus::ID, MonitoringStatus::FLUSH, 0, 0, O2_SIGNPOST_RED);
174186
};
175187

176188
auto now = std::chrono::high_resolution_clock::now();
@@ -477,23 +489,16 @@ bool DataProcessingDevice::tryDispatchComputation()
477489
}
478490
};
479491

480-
// We use this to keep track of the latency of the first message we get for a given input record
481-
// and of the last one.
482-
struct InputLatency {
483-
uint64_t minLatency;
484-
uint64_t maxLatency;
485-
};
486-
487-
auto calculateInputRecordLatency = [](InputRecord const& record, auto now) -> InputLatency {
488-
InputLatency result{ static_cast<uint64_t>(-1), 0 };
492+
auto calculateInputRecordLatency = [](InputRecord const& record, auto now) -> DataProcessingStats::InputLatency {
493+
DataProcessingStats::InputLatency result{ static_cast<int>(-1), 0 };
489494

490495
auto currentTime = (uint64_t)std::chrono::duration<double, std::milli>(now.time_since_epoch()).count();
491496
for (auto& item : record) {
492497
auto* header = o2::header::get<DataProcessingHeader*>(item.header);
493498
if (header == nullptr) {
494499
continue;
495500
}
496-
auto partLatency = currentTime - header->creation;
501+
int partLatency = currentTime - header->creation;
497502
result.minLatency = std::min(result.minLatency, partLatency);
498503
result.maxLatency = std::max(result.maxLatency, partLatency);
499504
}
@@ -546,29 +551,9 @@ bool DataProcessingDevice::tryDispatchComputation()
546551
errorHandling(e, record);
547552
}
548553
auto tEnd = std::chrono::high_resolution_clock::now();
549-
double elapsedTimeMs = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
550-
auto totalProcessedSize = calculateTotalInputRecordSize(record);
551-
auto latency = calculateInputRecordLatency(record, tStart);
552-
553-
/// The size of all the input messages which have been processed in this iteration
554-
monitoringService.send({ totalProcessedSize, "dpl/processed_input_size_bytes" });
555-
/// The time to do the processing for this iteration
556-
monitoringService.send({ elapsedTimeMs, "dpl/elapsed_time_ms" });
557-
/// The rate at which processing was happening in this iteration
558-
monitoringService.send({ (int)((totalProcessedSize / elapsedTimeMs) / 1000), "dpl/processing_rate_mb_s" });
559-
/// The smallest latency between an input message being created and its processing
560-
/// starting.
561-
monitoringService.send({ (int)latency.minLatency, "dpl/min_input_latency_ms" });
562-
/// The largest latency between an input message being created and its processing
563-
/// starting.
564-
monitoringService.send({ (int)latency.maxLatency, "dpl/max_input_latency_ms" });
565-
/// The rate at which we get inputs, i.e. the longest time between one of the inputs being
566-
/// created and actually reaching the consumer device.
567-
if (latency.maxLatency == 0) {
568-
// avoid division by zero by assuming at least one ms of latency.
569-
latency.maxLatency = 1;
570-
}
571-
monitoringService.send({ (int)((totalProcessedSize / latency.maxLatency) / 1000), "dpl/input_rate_mb_s" });
554+
mStats.lastElapsedTimeMs = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
555+
mStats.lastTotalProcessedSize = calculateTotalInputRecordSize(record);
556+
mStats.lastLatency = calculateInputRecordLatency(record, tStart);
572557

573558
// We forward inputs only when we consume them. If we simply Process them,
574559
// we keep them for next message arriving.

Framework/Core/src/DataProcessingStatus.h

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,27 @@
1010
#ifndef o2_framework_DataProcessingStatus_H_INCLUDED
1111
#define o2_framework_DataProcessingStatus_H_INCLUDED
1212

13+
#include "Framework/Signpost.h"
14+
#include <cstdint>
15+
1316
namespace o2
1417
{
1518
namespace framework
1619
{
1720

18-
enum DataProcessingStatus {
21+
/// Describe the possible states for DataProcessing
22+
enum struct DataProcessingStatus : uint32_t {
1923
ID = 0,
20-
IN_DPL_OVERHEAD = 0,
21-
IN_DPL_USER_CALLBACK = 1,
22-
IN_DPL_ERROR_CALLBACK = 2
24+
IN_DPL_OVERHEAD = O2_SIGNPOST_RED,
25+
IN_DPL_USER_CALLBACK = O2_SIGNPOST_GREEN,
26+
IN_DPL_ERROR_CALLBACK = O2_SIGNPOST_PURPLE
27+
};
28+
29+
/// Describe the possible states for Monitoring
30+
enum struct MonitoringStatus : uint32_t {
31+
ID = 1,
32+
SEND = 0,
33+
FLUSH = 1,
2334
};
2435

2536
} // namespace framework

0 commit comments

Comments
 (0)