@@ -56,6 +56,23 @@ struct pipelinePrepareMessage {
5656 bool flagEndOfStream;
5757};
5858
59+ void GPURecoWorkflowSpec::initPipeline (o2::framework::InitContext& ic)
60+ {
61+ if (mSpecConfig .enableDoublePipeline == 1 ) {
62+ mPipeline ->fmqDevice = ic.services ().get <RawDeviceService>().device ();
63+ mPolicyOrder = [this ](o2::framework::DataProcessingHeader::StartTime timeslice) {
64+ std::unique_lock lk (mPipeline ->completionPolicyMutex );
65+ mPipeline ->completionPolicyNotify .wait (lk, [pipeline = mPipeline .get ()] { return pipeline->pipelineSenderTerminating || !pipeline->completionPolicyQueue .empty (); });
66+ if (mPipeline ->completionPolicyQueue .front () == timeslice) {
67+ mPipeline ->completionPolicyQueue .pop ();
68+ return true ;
69+ }
70+ return false ;
71+ };
72+ mPipeline ->receiveThread = std::thread ([this ]() { RunReceiveThread (); });
73+ }
74+ }
75+
5976int GPURecoWorkflowSpec::handlePipeline (ProcessingContext& pc, GPUTrackingInOutPointers& ptrs, GPURecoWorkflowSpec_TPCZSBuffers& tpcZSmeta, o2::gpu::GPUTrackingInOutZS& tpcZS)
6077{
6178 auto * device = pc.services ().get <RawDeviceService>().device ();
@@ -184,8 +201,8 @@ void GPURecoWorkflowSpec::RunReceiveThread()
184201 LOG (fatal) << " Prepare message corrupted, invalid magic word" ;
185202 }
186203 if (m->flagEndOfStream ) {
187- LOG (info) << " Received end-of-stream from out-of-band channel" ;
188- continue ;
204+ LOG (info) << " Received end-of-stream from out-of-band channel, terminating receive thread " ; // TODO: Breaks START / STOP / START
205+ break ;
189206 }
190207
191208 auto o = std::make_unique<GPURecoWorkflow_QueueObject>();
@@ -220,12 +237,22 @@ void GPURecoWorkflowSpec::RunReceiveThread()
220237 }
221238 }
222239 o->ptrs .tpcZS = &o->tpcZS ;
240+ {
241+ std::lock_guard lk (mPipeline ->completionPolicyMutex );
242+ mPipeline ->completionPolicyQueue .emplace (m->timeSliceId );
243+ }
244+ mPipeline ->completionPolicyNotify .notify_one ();
223245 {
224246 std::lock_guard lk (mPipeline ->queueMutex );
225247 mPipeline ->pipelineQueue .emplace (std::move (o));
226248 }
227249 mPipeline ->queueNotify .notify_one ();
228250 }
251+ {
252+ std::lock_guard lk (mPipeline ->completionPolicyMutex );
253+ mPipeline ->pipelineSenderTerminating = true ;
254+ }
255+ mPipeline ->completionPolicyNotify .notify_one ();
229256}
230257
231258void GPURecoWorkflowSpec::TerminateReceiveThread ()
0 commit comments