Skip to content

Commit 1ab7698

Browse files
committed
GPU Standalone multi-threading: handle end-of-stream over out-of-band channel
1 parent 33a28f0 commit 1ab7698

4 files changed

Lines changed: 29 additions & 10 deletions

File tree

GPU/Workflow/include/GPUWorkflow/GPUWorkflowSpec.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include "Framework/Task.h"
2222
#include "Framework/ConcreteDataMatcher.h"
2323
#include "Framework/InitContext.h"
24-
#include "Framework/ProcessingContext.h"
2524
#include "Framework/CompletionPolicy.h"
2625
#include "Algorithm/Parser.h"
2726
#include <string>
@@ -162,6 +161,7 @@ class GPURecoWorkflowSpec : public o2::framework::Task
162161
int handlePipeline(o2::framework::ProcessingContext& pc, GPUTrackingInOutPointers& ptrs, gpurecoworkflow_internals::GPURecoWorkflowSpec_TPCZSBuffers& tpcZSmeta, o2::gpu::GPUTrackingInOutZS& tpcZS);
163162
void RunReceiveThread();
164163
void TerminateReceiveThread();
164+
void handlePipelineEndOfStream(o2::framework::EndOfStreamContext& ec);
165165

166166
CompletionPolicyData* mPolicyData;
167167
std::unique_ptr<GPUO2Interface> mGPUReco;

GPU/Workflow/src/GPUWorkflowInternal.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ struct GPURecoWorkflowSpec_PipelineInternals {
4949
std::thread receiveThread;
5050
std::condition_variable notifyThread;
5151
std::mutex threadMutex;
52-
volatile bool mayReceive = false;
5352
volatile bool shouldTerminate = false;
5453

5554
std::queue<std::unique_ptr<GPURecoWorkflow_QueueObject>> pipelineQueue;

GPU/Workflow/src/GPUWorkflowPipeline.cxx

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct pipelinePrepareMessage {
5353
fair::mq::RegionInfo regionInfo;
5454
size_t pointerCounts[GPUTrackingInOutZS::NSLICES][GPUTrackingInOutZS::NENDPOINTS];
5555
size_t pointersTotal;
56+
bool flagEndOfStream;
5657
};
5758

5859
int GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingInOutPointers& ptrs, GPURecoWorkflowSpec_TPCZSBuffers& tpcZSmeta, o2::gpu::GPUTrackingInOutZS& tpcZS)
@@ -94,6 +95,7 @@ int GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingInOutP
9495
preMessage.magicWord = preMessage.MAGIC_WORD;
9596
preMessage.timeSliceId = tinfo.timeslice;
9697
preMessage.pointersTotal = ptrsTotal;
98+
preMessage.flagEndOfStream = false;
9799
memcpy((void*)&preMessage.tfSettings, (const void*)ptrs.settingsTF, sizeof(preMessage.tfSettings));
98100

99101
if (ptrsTotal) {
@@ -133,13 +135,30 @@ int GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingInOutP
133135
return 0;
134136
}
135137

138+
void GPURecoWorkflowSpec::handlePipelineEndOfStream(EndOfStreamContext& ec)
139+
{
140+
if (mSpecConfig.enableDoublePipeline == 1) {
141+
TerminateReceiveThread(); // TODO: Apparently breaks START / STOP / START
142+
}
143+
if (mSpecConfig.enableDoublePipeline == 2) {
144+
auto* device = ec.services().get<RawDeviceService>().device();
145+
pipelinePrepareMessage preMessage;
146+
preMessage.flagEndOfStream = true;
147+
auto channel = device->GetChannels().find("gpu-prepare-channel");
148+
fair::mq::MessagePtr payload(device->NewMessage());
149+
LOG(info) << "Sending end-of-stream message over out-of-bands channel";
150+
payload->Rebuild(&preMessage, sizeof(preMessage), nullptr, nullptr);
151+
channel->second[0].Send(payload);
152+
}
153+
}
154+
136155
void GPURecoWorkflowSpec::RunReceiveThread()
137156
{
138-
std::unique_lock lk(mPipeline->threadMutex);
139-
mPipeline->notifyThread.wait(lk, [this]() { return mPipeline->shouldTerminate || mPipeline->mayReceive; });
140-
lk.unlock();
157+
auto* device = mPipeline->fmqDevice;
158+
while (device->GetCurrentState() != fair::mq::State::Running) {
159+
usleep(300000);
160+
}
141161
while (!mPipeline->shouldTerminate) {
142-
auto* device = mPipeline->fmqDevice;
143162
bool received = false;
144163
int recvTimeot = 1000;
145164
fair::mq::MessagePtr msg;
@@ -164,6 +183,10 @@ void GPURecoWorkflowSpec::RunReceiveThread()
164183
if (m->magicWord != m->MAGIC_WORD) {
165184
LOG(fatal) << "Prepare message corrupted, invalid magic word";
166185
}
186+
if (m->flagEndOfStream) {
187+
LOG(info) << "Received end-of-stream from out-of-band channel";
188+
continue;
189+
}
167190

168191
auto o = std::make_unique<GPURecoWorkflow_QueueObject>();
169192
o->timeSliceId = m->timeSliceId;

GPU/Workflow/src/GPUWorkflowSpec.cxx

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ void GPURecoWorkflowSpec::stop()
347347

348348
void GPURecoWorkflowSpec::endOfStream(EndOfStreamContext& ec)
349349
{
350-
TerminateReceiveThread(); // TODO: Apparently breaks START / STOP / START
350+
handlePipelineEndOfStream(ec);
351351
}
352352

353353
void GPURecoWorkflowSpec::finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj)
@@ -628,9 +628,6 @@ void GPURecoWorkflowSpec::run(ProcessingContext& pc)
628628
unsigned int threadIndex = mNextThreadIndex;
629629
if (mConfig->configProcessing.doublePipeline) {
630630
mNextThreadIndex = (mNextThreadIndex + 1) % 2;
631-
std::lock_guard lk(mPipeline->threadMutex);
632-
mPipeline->mayReceive = true;
633-
mPipeline->notifyThread.notify_one();
634631
}
635632

636633
if (mSpecConfig.enableDoublePipeline) {

0 commit comments

Comments
 (0)