Skip to content

Commit 13fef42

Browse files
matthiasrichterktf
authored andcommitted
Improved testing of the DPL proxies
The default input and output proxies can not be tested together in the same workflow configuration because data descriptions on the producer and consumer side can not be the same. There are now two workflow configurations: 1. For testing the default input proxy, the producer is opening the out of band channel directly and the output proxy is skipped. 2. For testing the default output proxy, the input proxy is created with a custom converter function to adjust data description. A specific unit test is added with the necessary command line option
1 parent 9e85643 commit 13fef42

2 files changed

Lines changed: 35 additions & 10 deletions

File tree

Framework/Core/CMakeLists.txt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,3 +348,17 @@ o2_add_test(
348348
--consumer
349349
"--global-config consumer-config --local-option hello-aliceo2 --a-boolean3 --an-int2 20 --a-double2 22. --an-int64-2 50000000000000"
350350
)
351+
352+
# the test is compiled from the ExternalFairMQDeviceWorkflow test and run with
353+
# command line option to include the output proxy
354+
o2_add_test(
355+
ExternalFairMQOutputProxyWorkflow NAME test_Framework_test_ExternalFairMQOutputProxyWorkflow
356+
SOURCES test/test_ExternalFairMQDeviceWorkflow.cxx
357+
COMPONENT_NAME Framework
358+
LABELS framework workflow
359+
TIMEOUT 60
360+
PUBLIC_LINK_LIBRARIES O2::Framework
361+
NO_BOOST_TEST
362+
COMMAND_LINE_ARGS
363+
--proxy-mode all --run ${DPL_WORKFLOW_TESTS_EXTRA_OPTIONS}
364+
)

Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -373,17 +373,28 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
373373
if (channelName.empty()) {
374374
return;
375375
}
376-
// make a copy of the header message, get the data header and change origin
377-
auto outHeaderMessage = device.NewMessageFor(channelName, 0, inputs.At(msgidx)->GetSize());
378-
memcpy(outHeaderMessage->GetData(), inputs.At(msgidx)->GetData(), inputs.At(msgidx)->GetSize());
379-
// this we obviously need to fix in the get API, const'ness of the returned header pointer
380-
// should depend on const'ness of the buffer
381-
auto odh = const_cast<o2::header::DataHeader*>(o2::header::get<o2::header::DataHeader*>(outHeaderMessage->GetData()));
382-
odh->dataOrigin = o2::header::DataOrigin("PRX");
383376
FairMQParts output;
384-
output.AddPart(std::move(outHeaderMessage));
385-
output.AddPart(std::move(inputs.At(msgidx + 1)));
386-
LOG(debug) << "sending " << DataSpecUtils::describe(OutputSpec{odh->dataOrigin, odh->dataDescription, odh->subSpecification});
377+
for (; msgidx < inputs.Size(); ++msgidx) {
378+
auto const* dh = o2::header::get<o2::header::DataHeader*>(inputs.At(msgidx)->GetData());
379+
if (dh) {
380+
LOGP(debug, "{}/{}/{} with {} part(s), index {}",
381+
dh->dataOrigin.as<std::string>(),
382+
dh->dataDescription.as<std::string>(),
383+
dh->subSpecification,
384+
dh->splitPayloadParts,
385+
dh->splitPayloadIndex);
386+
// make a copy of the header message, get the data header and change origin
387+
auto outHeaderMessage = device.NewMessageFor(channelName, 0, inputs.At(msgidx)->GetSize());
388+
memcpy(outHeaderMessage->GetData(), inputs.At(msgidx)->GetData(), inputs.At(msgidx)->GetSize());
389+
// this we obviously need to fix in the get API, const'ness of the returned header pointer
390+
// should depend on const'ness of the buffer
391+
auto odh = const_cast<o2::header::DataHeader*>(o2::header::get<o2::header::DataHeader*>(outHeaderMessage->GetData()));
392+
odh->dataOrigin = o2::header::DataOrigin("PRX");
393+
output.AddPart(std::move(outHeaderMessage));
394+
} else {
395+
output.AddPart(std::move(inputs.At(msgidx)));
396+
}
397+
}
387398
o2::framework::sendOnChannel(device, output, channelName);
388399
};
389400

0 commit comments

Comments
 (0)