@@ -669,6 +669,38 @@ bool DataProcessingDevice::handleData(FairMQParts& parts, InputChannelInfo& info
669669 return true ;
670670}
671671
672+ namespace
673+ {
674+ auto calculateInputRecordLatency (InputRecord const & record, uint64_t currentTime) -> DataProcessingStats::InputLatency
675+ {
676+ DataProcessingStats::InputLatency result{static_cast <int >(-1 ), 0 };
677+
678+ for (auto & item : record) {
679+ auto * header = o2::header::get<DataProcessingHeader*>(item.header );
680+ if (header == nullptr ) {
681+ continue ;
682+ }
683+ int partLatency = currentTime - header->creation ;
684+ result.minLatency = std::min (result.minLatency , partLatency);
685+ result.maxLatency = std::max (result.maxLatency , partLatency);
686+ }
687+ return result;
688+ };
689+
690+ auto calculateTotalInputRecordSize (InputRecord const & record) -> int
691+ {
692+ size_t totalInputSize = 0 ;
693+ for (auto & item : record) {
694+ auto * header = o2::header::get<DataHeader*>(item.header );
695+ if (header == nullptr ) {
696+ continue ;
697+ }
698+ totalInputSize += header->payloadSize ;
699+ }
700+ return totalInputSize;
701+ };
702+ } // namespace
703+
672704bool DataProcessingDevice::tryDispatchComputation (std::vector<DataRelayer::RecordAction>& completed)
673705{
674706 ZoneScopedN (" DataProcessingDevice::tryDispatchComputation" );
@@ -869,32 +901,6 @@ bool DataProcessingDevice::tryDispatchComputation(std::vector<DataRelayer::Recor
869901 }
870902 };
871903
872- auto calculateInputRecordLatency = [](InputRecord const & record, uint64_t currentTime) -> DataProcessingStats::InputLatency {
873- DataProcessingStats::InputLatency result{static_cast <int >(-1 ), 0 };
874-
875- for (auto & item : record) {
876- auto * header = o2::header::get<DataProcessingHeader*>(item.header );
877- if (header == nullptr ) {
878- continue ;
879- }
880- int partLatency = currentTime - header->creation ;
881- result.minLatency = std::min (result.minLatency , partLatency);
882- result.maxLatency = std::max (result.maxLatency , partLatency);
883- }
884- return result;
885- };
886-
887- auto calculateTotalInputRecordSize = [](InputRecord const & record) -> int {
888- size_t totalInputSize = 0 ;
889- for (auto & item : record) {
890- auto * header = o2::header::get<DataHeader*>(item.header );
891- if (header == nullptr ) {
892- continue ;
893- }
894- totalInputSize += header->payloadSize ;
895- }
896- return totalInputSize;
897- };
898904
899905 auto switchState = [& control = mServiceRegistry .get <ControlService>(),
900906 &state = mState .streaming ](StreamingState newState) {
0 commit comments