Skip to content

Commit 9bd5576

Browse files
authored
DPL: extract helper functions (#3973)
1 parent 8af7175 commit 9bd5576

1 file changed

Lines changed: 32 additions & 26 deletions

File tree

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,38 @@ bool DataProcessingDevice::handleData(FairMQParts& parts, InputChannelInfo& info
669669
return true;
670670
}
671671

672+
namespace
673+
{
674+
auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> DataProcessingStats::InputLatency
675+
{
676+
DataProcessingStats::InputLatency result{static_cast<int>(-1), 0};
677+
678+
for (auto& item : record) {
679+
auto* header = o2::header::get<DataProcessingHeader*>(item.header);
680+
if (header == nullptr) {
681+
continue;
682+
}
683+
int partLatency = currentTime - header->creation;
684+
result.minLatency = std::min(result.minLatency, partLatency);
685+
result.maxLatency = std::max(result.maxLatency, partLatency);
686+
}
687+
return result;
688+
};
689+
690+
auto calculateTotalInputRecordSize(InputRecord const& record) -> int
691+
{
692+
size_t totalInputSize = 0;
693+
for (auto& item : record) {
694+
auto* header = o2::header::get<DataHeader*>(item.header);
695+
if (header == nullptr) {
696+
continue;
697+
}
698+
totalInputSize += header->payloadSize;
699+
}
700+
return totalInputSize;
701+
};
702+
} // namespace
703+
672704
bool DataProcessingDevice::tryDispatchComputation(std::vector<DataRelayer::RecordAction>& completed)
673705
{
674706
ZoneScopedN("DataProcessingDevice::tryDispatchComputation");
@@ -869,32 +901,6 @@ bool DataProcessingDevice::tryDispatchComputation(std::vector<DataRelayer::Recor
869901
}
870902
};
871903

872-
auto calculateInputRecordLatency = [](InputRecord const& record, uint64_t currentTime) -> DataProcessingStats::InputLatency {
873-
DataProcessingStats::InputLatency result{static_cast<int>(-1), 0};
874-
875-
for (auto& item : record) {
876-
auto* header = o2::header::get<DataProcessingHeader*>(item.header);
877-
if (header == nullptr) {
878-
continue;
879-
}
880-
int partLatency = currentTime - header->creation;
881-
result.minLatency = std::min(result.minLatency, partLatency);
882-
result.maxLatency = std::max(result.maxLatency, partLatency);
883-
}
884-
return result;
885-
};
886-
887-
auto calculateTotalInputRecordSize = [](InputRecord const& record) -> int {
888-
size_t totalInputSize = 0;
889-
for (auto& item : record) {
890-
auto* header = o2::header::get<DataHeader*>(item.header);
891-
if (header == nullptr) {
892-
continue;
893-
}
894-
totalInputSize += header->payloadSize;
895-
}
896-
return totalInputSize;
897-
};
898904

899905
auto switchState = [& control = mServiceRegistry.get<ControlService>(),
900906
&state = mState.streaming](StreamingState newState) {

0 commit comments

Comments
 (0)