@@ -547,23 +547,32 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
547547
548548 for (size_t ci = 0 ; ci < channels.size (); ++ci) {
549549 std::string const & channel = channels[ci];
550- fair::mq::Parts parts;
551- device->Receive (parts, channel, 0 , channels.size () == 1 ? -1 : 1 );
552- // Populate TimingInfo from the first message
553- if (parts.Size () != 0 ) {
554- auto const dh = o2::header::get<DataHeader*>(parts.At (0 )->GetData ());
555- auto & timingInfo = ctx.services ().get <TimingInfo>();
556- if (dh != nullptr ) {
557- timingInfo.runNumber = dh->runNumber ;
558- timingInfo.firstTForbit = dh->firstTForbit ;
559- timingInfo.tfCounter = dh->tfCounter ;
550+ int waitTime = channels.size () == 1 ? -1 : 1 ;
551+ int maxRead = 1000 ;
552+ while (maxRead-- > 0 ) {
553+ fair::mq::Parts parts;
554+ device->Receive (parts, channel, 0 , waitTime);
555+ // Populate TimingInfo from the first message
556+ unsigned int nReceived = parts.Size ();
557+ if (nReceived != 0 ) {
558+ auto const dh = o2::header::get<DataHeader*>(parts.At (0 )->GetData ());
559+ auto & timingInfo = ctx.services ().get <TimingInfo>();
560+ if (dh != nullptr ) {
561+ timingInfo.runNumber = dh->runNumber ;
562+ timingInfo.firstTForbit = dh->firstTForbit ;
563+ timingInfo.tfCounter = dh->tfCounter ;
564+ }
565+ auto const dph = o2::header::get<DataProcessingHeader*>(parts.At (0 )->GetData ());
566+ if (dph != nullptr ) {
567+ timingInfo.timeslice = dph->startTime ;
568+ timingInfo.creation = dph->creation ;
569+ }
570+ dataHandler (timingInfo, parts, 0 , ci);
560571 }
561- auto const dph = o2::header::get<DataProcessingHeader*>(parts.At (0 )->GetData ());
562- if (dph != nullptr ) {
563- timingInfo.timeslice = dph->startTime ;
564- timingInfo.creation = dph->creation ;
572+ if (nReceived == 0 || channels.size () == 1 ) {
573+ break ;
565574 }
566- dataHandler (timingInfo, parts, 0 , ci) ;
575+ waitTime = 0 ;
567576 }
568577 }
569578 };
0 commit comments