Skip to content

Commit 913f6e7

Browse files
davidrohrktf
authored andcommitted
DPL: Make sure that DPL proxy ready fast channels fast-enough in multi-channel setup
1 parent 779fd6d commit 913f6e7

1 file changed

Lines changed: 24 additions & 15 deletions

File tree

Framework/Core/src/ExternalFairMQDeviceProxy.cxx

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -547,23 +547,32 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
547547

548548
for (size_t ci = 0; ci < channels.size(); ++ci) {
549549
std::string const& channel = channels[ci];
550-
fair::mq::Parts parts;
551-
device->Receive(parts, channel, 0, channels.size() == 1 ? -1 : 1);
552-
// Populate TimingInfo from the first message
553-
if (parts.Size() != 0) {
554-
auto const dh = o2::header::get<DataHeader*>(parts.At(0)->GetData());
555-
auto& timingInfo = ctx.services().get<TimingInfo>();
556-
if (dh != nullptr) {
557-
timingInfo.runNumber = dh->runNumber;
558-
timingInfo.firstTForbit = dh->firstTForbit;
559-
timingInfo.tfCounter = dh->tfCounter;
550+
int waitTime = channels.size() == 1 ? -1 : 1;
551+
int maxRead = 1000;
552+
while (maxRead-- > 0) {
553+
fair::mq::Parts parts;
554+
device->Receive(parts, channel, 0, waitTime);
555+
// Populate TimingInfo from the first message
556+
unsigned int nReceived = parts.Size();
557+
if (nReceived != 0) {
558+
auto const dh = o2::header::get<DataHeader*>(parts.At(0)->GetData());
559+
auto& timingInfo = ctx.services().get<TimingInfo>();
560+
if (dh != nullptr) {
561+
timingInfo.runNumber = dh->runNumber;
562+
timingInfo.firstTForbit = dh->firstTForbit;
563+
timingInfo.tfCounter = dh->tfCounter;
564+
}
565+
auto const dph = o2::header::get<DataProcessingHeader*>(parts.At(0)->GetData());
566+
if (dph != nullptr) {
567+
timingInfo.timeslice = dph->startTime;
568+
timingInfo.creation = dph->creation;
569+
}
570+
dataHandler(timingInfo, parts, 0, ci);
560571
}
561-
auto const dph = o2::header::get<DataProcessingHeader*>(parts.At(0)->GetData());
562-
if (dph != nullptr) {
563-
timingInfo.timeslice = dph->startTime;
564-
timingInfo.creation = dph->creation;
572+
if (nReceived == 0 || channels.size() == 1) {
573+
break;
565574
}
566-
dataHandler(timingInfo, parts, 0, ci);
575+
waitTime = 0;
567576
}
568577
}
569578
};

0 commit comments

Comments
 (0)