Skip to content

Commit e01747a

Browse files
committed
GPU: Ensure that gpu-prepare-workflow and gpu-workflow process time frames in the same order via completion policy
1 parent 8345ea3 commit e01747a

5 files changed

Lines changed: 47 additions & 9 deletions

File tree

GPU/Workflow/include/GPUWorkflow/GPUWorkflowSpec.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <array>
2828
#include <vector>
2929
#include <mutex>
30+
#include <functional>
3031

3132
class TStopwatch;
3233
namespace fair::mq
@@ -122,7 +123,7 @@ class GPURecoWorkflowSpec : public o2::framework::Task
122123
bool tpcTriggerHandling = false;
123124
};
124125

125-
GPURecoWorkflowSpec(CompletionPolicyData* policyData, Config const& specconfig, std::vector<int> const& tpcsectors, unsigned long tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr);
126+
GPURecoWorkflowSpec(CompletionPolicyData* policyData, Config const& specconfig, std::vector<int> const& tpcsectors, unsigned long tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder = nullptr);
126127
~GPURecoWorkflowSpec() override;
127128
void init(o2::framework::InitContext& ic) final;
128129
void run(o2::framework::ProcessingContext& pc) final;
@@ -162,8 +163,10 @@ class GPURecoWorkflowSpec : public o2::framework::Task
162163
void RunReceiveThread();
163164
void TerminateReceiveThread();
164165
void handlePipelineEndOfStream(o2::framework::EndOfStreamContext& ec);
166+
void initPipeline(o2::framework::InitContext& ic);
165167

166168
CompletionPolicyData* mPolicyData;
169+
std::function<bool(o2::framework::DataProcessingHeader::StartTime)> mPolicyOrder;
167170
std::unique_ptr<GPUO2Interface> mGPUReco;
168171
std::unique_ptr<GPUDisplayFrontendInterface> mDisplayFrontend;
169172
std::unique_ptr<TPCFastTransform> mFastTransform;

GPU/Workflow/src/GPUWorkflowInternal.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,17 @@ struct GPURecoWorkflowSpec_PipelineInternals {
4747
fair::mq::Device* fmqDevice;
4848

4949
std::thread receiveThread;
50-
std::condition_variable notifyThread;
5150
std::mutex threadMutex;
5251
volatile bool shouldTerminate = false;
5352

5453
std::queue<std::unique_ptr<GPURecoWorkflow_QueueObject>> pipelineQueue;
5554
std::mutex queueMutex;
5655
std::condition_variable queueNotify;
56+
57+
std::queue<o2::framework::DataProcessingHeader::StartTime> completionPolicyQueue;
58+
bool pipelineSenderTerminating = false;
59+
std::mutex completionPolicyMutex;
60+
std::condition_variable completionPolicyNotify;
5761
};
5862

5963
} // namespace gpurecoworkflow_internals

GPU/Workflow/src/GPUWorkflowPipeline.cxx

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,23 @@ struct pipelinePrepareMessage {
5656
bool flagEndOfStream;
5757
};
5858

59+
void GPURecoWorkflowSpec::initPipeline(o2::framework::InitContext& ic)
60+
{
61+
if (mSpecConfig.enableDoublePipeline == 1) {
62+
mPipeline->fmqDevice = ic.services().get<RawDeviceService>().device();
63+
mPolicyOrder = [this](o2::framework::DataProcessingHeader::StartTime timeslice) {
64+
std::unique_lock lk(mPipeline->completionPolicyMutex);
65+
mPipeline->completionPolicyNotify.wait(lk, [pipeline = mPipeline.get()] { return pipeline->pipelineSenderTerminating || !pipeline->completionPolicyQueue.empty(); });
66+
if (mPipeline->completionPolicyQueue.front() == timeslice) {
67+
mPipeline->completionPolicyQueue.pop();
68+
return true;
69+
}
70+
return false;
71+
};
72+
mPipeline->receiveThread = std::thread([this]() { RunReceiveThread(); });
73+
}
74+
}
75+
5976
int GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingInOutPointers& ptrs, GPURecoWorkflowSpec_TPCZSBuffers& tpcZSmeta, o2::gpu::GPUTrackingInOutZS& tpcZS)
6077
{
6178
auto* device = pc.services().get<RawDeviceService>().device();
@@ -184,8 +201,8 @@ void GPURecoWorkflowSpec::RunReceiveThread()
184201
LOG(fatal) << "Prepare message corrupted, invalid magic word";
185202
}
186203
if (m->flagEndOfStream) {
187-
LOG(info) << "Received end-of-stream from out-of-band channel";
188-
continue;
204+
LOG(info) << "Received end-of-stream from out-of-band channel, terminating receive thread"; // TODO: Breaks START / STOP / START
205+
break;
189206
}
190207

191208
auto o = std::make_unique<GPURecoWorkflow_QueueObject>();
@@ -220,12 +237,22 @@ void GPURecoWorkflowSpec::RunReceiveThread()
220237
}
221238
}
222239
o->ptrs.tpcZS = &o->tpcZS;
240+
{
241+
std::lock_guard lk(mPipeline->completionPolicyMutex);
242+
mPipeline->completionPolicyQueue.emplace(m->timeSliceId);
243+
}
244+
mPipeline->completionPolicyNotify.notify_one();
223245
{
224246
std::lock_guard lk(mPipeline->queueMutex);
225247
mPipeline->pipelineQueue.emplace(std::move(o));
226248
}
227249
mPipeline->queueNotify.notify_one();
228250
}
251+
{
252+
std::lock_guard lk(mPipeline->completionPolicyMutex);
253+
mPipeline->pipelineSenderTerminating = true;
254+
}
255+
mPipeline->completionPolicyNotify.notify_one();
229256
}
230257

231258
void GPURecoWorkflowSpec::TerminateReceiveThread()

GPU/Workflow/src/GPUWorkflowSpec.cxx

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ using namespace o2::gpu::gpurecoworkflow_internals;
107107
namespace o2::gpu
108108
{
109109

110-
GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyData* policyData, Config const& specconfig, std::vector<int> const& tpcsectors, unsigned long tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr) : o2::framework::Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
110+
GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyData* policyData, Config const& specconfig, std::vector<int> const& tpcsectors, unsigned long tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) : o2::framework::Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
111111
{
112112
if (mSpecConfig.outputCAClusters && !mSpecConfig.caClusterer && !mSpecConfig.decompressTPC) {
113113
throw std::runtime_error("inconsistent configuration: cluster output is only possible if CA clusterer is activated");
@@ -118,6 +118,10 @@ GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyDa
118118
mTFSettings.reset(new GPUSettingsTF);
119119
mTimer.reset(new TStopwatch);
120120
mPipeline.reset(new GPURecoWorkflowSpec_PipelineInternals);
121+
122+
if (mSpecConfig.enableDoublePipeline == 1 && gPolicyOrder) {
123+
*gPolicyOrder = &mPolicyOrder;
124+
}
121125
}
122126

123127
GPURecoWorkflowSpec::~GPURecoWorkflowSpec() = default;
@@ -285,9 +289,8 @@ void GPURecoWorkflowSpec::init(InitContext& ic)
285289
}
286290
}
287291

288-
if (mSpecConfig.enableDoublePipeline == 1) {
289-
mPipeline->fmqDevice = ic.services().get<RawDeviceService>().device();
290-
mPipeline->receiveThread = std::thread([this]() { RunReceiveThread(); });
292+
if (mSpecConfig.enableDoublePipeline) {
293+
initPipeline(ic);
291294
}
292295

293296
auto& callbacks = ic.services().get<CallbackService>();

GPU/Workflow/src/gpu-reco-workflow.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ using namespace o2::gpu;
3838
using CompletionPolicyData = std::vector<InputSpec>;
3939
static CompletionPolicyData gPolicyData;
4040
static constexpr unsigned long gTpcSectorMask = 0xFFFFFFFFF;
41+
static std::function<bool(o2::framework::DataProcessingHeader::StartTime)>* gPolicyOrderCheck;
4142
static std::shared_ptr<GPURecoWorkflowSpec> gTask;
4243

4344
void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
@@ -71,7 +72,7 @@ void customize(std::vector<DispatchPolicy>& policies)
7172

7273
void customize(std::vector<CompletionPolicy>& policies)
7374
{
74-
policies.push_back(o2::tpc::TPCSectorCompletionPolicy("gpu-reconstruction.*", o2::tpc::TPCSectorCompletionPolicy::Config::RequireAll, &gPolicyData, &gTpcSectorMask)());
75+
policies.push_back(o2::tpc::TPCSectorCompletionPolicy("gpu-reconstruction.*", o2::tpc::TPCSectorCompletionPolicy::Config::RequireAll, &gPolicyData, &gTpcSectorMask, &gPolicyOrderCheck)());
7576
}
7677

7778
void customize(o2::framework::OnWorkflowTerminationHook& hook)

0 commit comments

Comments
 (0)