@@ -148,6 +148,8 @@ bool DataProcessingDevice::ConditionalRun()
148148 if (currentTime - lastSent < 5000 ) {
149149 return ;
150150 }
151+
152+ O2_SIGNPOST_START (MonitoringStatus::ID , MonitoringStatus::SEND , 0 , 0 , O2_SIGNPOST_BLUE );
151153 monitoring.send ({ (int )relayerStats.malformedInputs , " dpl/malformed_inputs" });
152154 monitoring.send ({ (int )relayerStats.droppedComputations , " dpl/dropped_computations" });
153155 monitoring.send ({ (int )relayerStats.droppedIncomingMessages , " dpl/dropped_incoming_messages" });
@@ -156,8 +158,15 @@ bool DataProcessingDevice::ConditionalRun()
156158 monitoring.send ({ (int ) stats.pendingInputs , " inputs/relayed/pending" });
157159 monitoring.send ({ (int ) stats.incomplete , " inputs/relayed/incomplete" });
158160 monitoring.send ({ (int ) stats.inputParts , " inputs/relayed/total" });
161+ monitoring.send ({ stats.lastElapsedTimeMs , " dpl/elapsed_time_ms" });
162+ monitoring.send ({ stats.lastTotalProcessedSize , " dpl/processed_input_size_bytes" });
163+ monitoring.send ({ (stats.lastTotalProcessedSize / (stats.lastElapsedTimeMs ? stats.lastElapsedTimeMs : 1 ) / 1000 ), " dpl/processing_rate_mb_s" });
164+ monitoring.send ({ stats.lastLatency .minLatency , " dpl/min_input_latency_ms" });
165+ monitoring.send ({ stats.lastLatency .maxLatency , " dpl/max_input_latency_ms" });
166+ monitoring.send ({ (stats.lastTotalProcessedSize / (stats.lastLatency .maxLatency ? stats.lastLatency .maxLatency : 1 ) / 1000 ), " dpl/input_rate_mb_s" });
159167
160168 lastSent = currentTime;
169+ O2_SIGNPOST_END (MonitoringStatus::ID , MonitoringStatus::SEND , 0 , 0 , O2_SIGNPOST_BLUE );
161170 };
162171
163172 // / This will flush metrics only once every second.
@@ -169,8 +178,11 @@ bool DataProcessingDevice::ConditionalRun()
169178 if (currentTime - lastFlushed < 1000 ) {
170179 return ;
171180 }
181+
182+ O2_SIGNPOST_START (MonitoringStatus::ID , MonitoringStatus::FLUSH , 0 , 0 , O2_SIGNPOST_RED );
172183 monitoring.flushBuffer ();
173184 lastFlushed = currentTime;
185+ O2_SIGNPOST_END (MonitoringStatus::ID , MonitoringStatus::FLUSH , 0 , 0 , O2_SIGNPOST_RED );
174186 };
175187
176188 auto now = std::chrono::high_resolution_clock::now ();
@@ -477,23 +489,16 @@ bool DataProcessingDevice::tryDispatchComputation()
477489 }
478490 };
479491
480- // We use this to keep track of the latency of the first message we get for a given input record
481- // and of the last one.
482- struct InputLatency {
483- uint64_t minLatency;
484- uint64_t maxLatency;
485- };
486-
487- auto calculateInputRecordLatency = [](InputRecord const & record, auto now) -> InputLatency {
488- InputLatency result{ static_cast <uint64_t >(-1 ), 0 };
492+ auto calculateInputRecordLatency = [](InputRecord const & record, auto now) -> DataProcessingStats::InputLatency {
493+ DataProcessingStats::InputLatency result{ static_cast <int >(-1 ), 0 };
489494
490495 auto currentTime = (uint64_t )std::chrono::duration<double , std::milli>(now.time_since_epoch ()).count ();
491496 for (auto & item : record) {
492497 auto * header = o2::header::get<DataProcessingHeader*>(item.header );
493498 if (header == nullptr ) {
494499 continue ;
495500 }
496- auto partLatency = currentTime - header->creation ;
501+ int partLatency = currentTime - header->creation ;
497502 result.minLatency = std::min (result.minLatency , partLatency);
498503 result.maxLatency = std::max (result.maxLatency , partLatency);
499504 }
@@ -546,29 +551,9 @@ bool DataProcessingDevice::tryDispatchComputation()
546551 errorHandling (e, record);
547552 }
548553 auto tEnd = std::chrono::high_resolution_clock::now ();
549- double elapsedTimeMs = std::chrono::duration<double , std::milli>(tEnd - tStart).count ();
550- auto totalProcessedSize = calculateTotalInputRecordSize (record);
551- auto latency = calculateInputRecordLatency (record, tStart);
552-
553- // / The size of all the input messages which have been processed in this iteration
554- monitoringService.send ({ totalProcessedSize, " dpl/processed_input_size_bytes" });
555- // / The time to do the processing for this iteration
556- monitoringService.send ({ elapsedTimeMs, " dpl/elapsed_time_ms" });
557- // / The rate at which processing was happening in this iteration
558- monitoringService.send ({ (int )((totalProcessedSize / elapsedTimeMs) / 1000 ), " dpl/processing_rate_mb_s" });
559- // / The smallest latency between an input message being created and its processing
560- // / starting.
561- monitoringService.send ({ (int )latency.minLatency , " dpl/min_input_latency_ms" });
562- // / The largest latency between an input message being created and its processing
563- // / starting.
564- monitoringService.send ({ (int )latency.maxLatency , " dpl/max_input_latency_ms" });
565- // / The rate at which we get inputs, i.e. the longest time between one of the inputs being
566- // / created and actually reaching the consumer device.
567- if (latency.maxLatency == 0 ) {
568- // avoid division by zero by assuming at least one ms of latency.
569- latency.maxLatency = 1 ;
570- }
571- monitoringService.send ({ (int )((totalProcessedSize / latency.maxLatency ) / 1000 ), " dpl/input_rate_mb_s" });
554+ mStats .lastElapsedTimeMs = std::chrono::duration<double , std::milli>(tEnd - tStart).count ();
555+ mStats .lastTotalProcessedSize = calculateTotalInputRecordSize (record);
556+ mStats .lastLatency = calculateInputRecordLatency (record, tStart);
572557
573558 // We forward inputs only when we consume them. If we simply Process them,
574559 // we keep them for next message arriving.
0 commit comments