@@ -53,6 +53,7 @@ struct pipelinePrepareMessage {
5353 fair::mq::RegionInfo regionInfo;
5454 size_t pointerCounts[GPUTrackingInOutZS::NSLICES ][GPUTrackingInOutZS::NENDPOINTS ];
5555 size_t pointersTotal;
56+ bool flagEndOfStream;
5657};
5758
5859int GPURecoWorkflowSpec::handlePipeline (ProcessingContext& pc, GPUTrackingInOutPointers& ptrs, GPURecoWorkflowSpec_TPCZSBuffers& tpcZSmeta, o2::gpu::GPUTrackingInOutZS& tpcZS)
@@ -94,6 +95,7 @@ int GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingInOutP
9495 preMessage.magicWord = preMessage.MAGIC_WORD ;
9596 preMessage.timeSliceId = tinfo.timeslice ;
9697 preMessage.pointersTotal = ptrsTotal;
98+ preMessage.flagEndOfStream = false ;
9799 memcpy ((void *)&preMessage.tfSettings , (const void *)ptrs.settingsTF , sizeof (preMessage.tfSettings ));
98100
99101 if (ptrsTotal) {
@@ -133,13 +135,30 @@ int GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingInOutP
133135 return 0 ;
134136}
135137
138+ void GPURecoWorkflowSpec::handlePipelineEndOfStream (EndOfStreamContext& ec)
139+ {
140+ if (mSpecConfig .enableDoublePipeline == 1 ) {
141+ TerminateReceiveThread (); // TODO: Apparently breaks START / STOP / START
142+ }
143+ if (mSpecConfig .enableDoublePipeline == 2 ) {
144+ auto * device = ec.services ().get <RawDeviceService>().device ();
145+ pipelinePrepareMessage preMessage;
146+ preMessage.flagEndOfStream = true ;
147+ auto channel = device->GetChannels ().find (" gpu-prepare-channel" );
148+ fair::mq::MessagePtr payload (device->NewMessage ());
149+ LOG (info) << " Sending end-of-stream message over out-of-bands channel" ;
150+ payload->Rebuild (&preMessage, sizeof (preMessage), nullptr , nullptr );
151+ channel->second [0 ].Send (payload);
152+ }
153+ }
154+
136155void GPURecoWorkflowSpec::RunReceiveThread ()
137156{
138- std::unique_lock lk (mPipeline ->threadMutex );
139- mPipeline ->notifyThread .wait (lk, [this ]() { return mPipeline ->shouldTerminate || mPipeline ->mayReceive ; });
140- lk.unlock ();
157+ auto * device = mPipeline ->fmqDevice ;
158+ while (device->GetCurrentState () != fair::mq::State::Running) {
159+ usleep (300000 );
160+ }
141161 while (!mPipeline ->shouldTerminate ) {
142- auto * device = mPipeline ->fmqDevice ;
143162 bool received = false ;
144163 int recvTimeot = 1000 ;
145164 fair::mq::MessagePtr msg;
@@ -164,6 +183,10 @@ void GPURecoWorkflowSpec::RunReceiveThread()
164183 if (m->magicWord != m->MAGIC_WORD ) {
165184 LOG (fatal) << " Prepare message corrupted, invalid magic word" ;
166185 }
186+ if (m->flagEndOfStream ) {
187+ LOG (info) << " Received end-of-stream from out-of-band channel" ;
188+ continue ;
189+ }
167190
168191 auto o = std::make_unique<GPURecoWorkflow_QueueObject>();
169192 o->timeSliceId = m->timeSliceId ;
0 commit comments