Skip to content

Commit b7969ee

Browse files
ktfdavidrohr
authored andcommitted
DPL: handle startTime rewriting correctly
Correctly update the timinginfo for new messages, so that multiple parallel workflows are handled correctly.
1 parent b072a5e commit b7969ee

9 files changed

Lines changed: 21 additions & 14 deletions

File tree

Detectors/CTP/workflowScalers/src/ctp-proxy.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ InjectorFunction dcs2dpl()
4545

4646
auto timesliceId = std::make_shared<size_t>(0);
4747

48-
return [timesliceId](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
48+
return [timesliceId](TimingInfo&, FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
4949
// make sure just 2 messages received
5050
if (parts.Size() != 2) {
5151
LOG(error) << "received " << parts.Size() << " instead of 2 expected";

Detectors/DCS/testWorkflow/src/DCStoDPLconverter.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ o2f::InjectorFunction dcs2dpl(std::unordered_map<DPID, o2h::DataDescription>& dp
5454
{
5555

5656
auto timesliceId = std::make_shared<size_t>(startTime);
57-
return [dpid2group, timesliceId, step, verbose](FairMQDevice& device, FairMQParts& parts, o2f::ChannelRetriever channelRetriever) {
57+
return [dpid2group, timesliceId, step, verbose](TimingInfo&, FairMQDevice& device, FairMQParts& parts, o2f::ChannelRetriever channelRetriever) {
5858
static std::unordered_map<DPID, DPCOM> cache; // will keep only the latest measurement in the 1-second wide window for each DPID
5959
static auto timer = std::chrono::system_clock::now();
6060

Detectors/DCS/testWorkflow/src/dcs-config-proxy.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ InjectorFunction dcs2dpl(const std::string& acknowledge)
6868

6969
auto timesliceId = std::make_shared<size_t>(0);
7070

71-
return [acknowledge, timesliceId](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
71+
return [acknowledge, timesliceId](TimingInfo&, FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
7272
// make sure just 2 messages received
7373
if (parts.Size() != 2) {
7474
LOG(error) << "received " << parts.Size() << " instead of 2 expected";

Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ namespace o2::framework
2424
/// A callback function to retrieve the FairMQChannel name to be used for sending
2525
/// messages of the specified OutputSpec
2626
using ChannelRetriever = std::function<std::string(OutputSpec const&, DataProcessingHeader::StartTime)>;
27-
using InjectorFunction = std::function<void(FairMQDevice& device, FairMQParts& inputs, ChannelRetriever)>;
27+
using InjectorFunction = std::function<void(TimingInfo&, FairMQDevice& device, FairMQParts& inputs, ChannelRetriever)>;
2828
using ChannelSelector = std::function<std::string(InputSpec const& input, const std::unordered_map<std::string, std::vector<FairMQChannel>>& channels)>;
2929

3030
struct InputChannelSpec;

Framework/Core/src/ExternalFairMQDeviceProxy.cxx

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "Framework/SourceInfoHeader.h"
2424
#include "Framework/ConfigParamRegistry.h"
2525
#include "Framework/RateLimiter.h"
26+
#include "Framework/TimingInfo.h"
2627
#include "Headers/DataHeader.h"
2728
#include "Headers/Stack.h"
2829

@@ -171,7 +172,7 @@ void sendOnChannel(FairMQDevice& device, FairMQMessagePtr&& headerMessage, FairM
171172
InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t /*step*/)
172173
{
173174
auto timesliceId = std::make_shared<size_t>(startTime);
174-
return [timesliceId, spec](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
175+
return [timesliceId, spec](TimingInfo&, FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
175176
for (int i = 0; i < parts.Size() / 2; ++i) {
176177
auto dh = o2::header::get<DataHeader*>(parts.At(i * 2)->GetData());
177178

@@ -219,7 +220,7 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL
219220
std::string descriptions;
220221
};
221222

222-
return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
223+
return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](TimingInfo& timingInfo, FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
223224
std::unordered_map<std::string, FairMQParts> outputs;
224225
std::vector<std::string> unmatchedDescriptions;
225226
static int64_t dplCounter = -1;
@@ -239,6 +240,7 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL
239240
continue;
240241
}
241242
const_cast<DataProcessingHeader*>(dph)->startTime = dplCounter;
243+
timingInfo.timeslice = dph->startTime;
242244
LOG(debug) << msgidx << ": " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At(msgidx + 1)->GetSize();
243245

244246
OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
@@ -337,7 +339,7 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, uint64_t startTime
337339
{
338340
auto timesliceId = std::make_shared<size_t>(startTime);
339341

340-
return [timesliceId, spec, step](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
342+
return [timesliceId, spec, step](TimingInfo&, FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
341343
// We iterate on all the parts and we send them two by two,
342344
// adding the appropriate O2 header.
343345
for (int i = 0; i < parts.Size(); ++i) {
@@ -363,7 +365,8 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, uint64_t startTime
363365
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
364366
std::vector<OutputSpec> const& outputs,
365367
char const* defaultChannelConfig,
366-
std::function<void(FairMQDevice&,
368+
std::function<void(TimingInfo&,
369+
FairMQDevice&,
367370
FairMQParts&,
368371
ChannelRetriever)>
369372
converter,
@@ -397,7 +400,11 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
397400
ctx.services().get<CallbackService>().set(CallbackService::Id::Start, channelConfigurationChecker);
398401
// Converter should pump messages
399402

400-
auto dataHandler = [device, converter, outputRoutes = std::move(outputRoutes), control = &ctx.services().get<ControlService>(), outputChannels = std::move(outputChannels)](FairMQParts& inputs, int) {
403+
auto dataHandler = [device, converter,
404+
outputRoutes = std::move(outputRoutes),
405+
control = &ctx.services().get<ControlService>(),
406+
&timingInfo = ctx.services().get<TimingInfo>(),
407+
outputChannels = std::move(outputChannels)](FairMQParts& inputs, int) {
401408
// pass a copy of the outputRoutes
402409
auto channelRetriever = [&outputRoutes](OutputSpec const& query, DataProcessingHeader::StartTime timeslice) -> std::string {
403410
for (auto& route : outputRoutes) {
@@ -421,7 +428,7 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
421428
};
422429
// we buffer the condition since the converter will forward messages by move
423430
bool doEos = checkEos();
424-
converter(*device, inputs, channelRetriever);
431+
converter(timingInfo, *device, inputs, channelRetriever);
425432

426433
if (doEos) {
427434
for (auto const& channel : outputChannels) {

Framework/Core/src/ReadoutAdapter.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ InjectorFunction readoutAdapter(OutputSpec const& spec)
2525
{
2626
auto counter = std::make_shared<uint64_t>(0);
2727

28-
return [spec, counter](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
28+
return [spec, counter](TimingInfo&, FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
2929
for (size_t i = 0; i < parts.Size(); ++i) {
3030
DataHeader dh;
3131
// FIXME: this will have to change and extract the actual subspec from

Framework/Core/test/benchmark_ExternalFairMQDeviceProxies.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
466466
// reads the messages from the output proxy via the out-of-band channel
467467

468468
// converter callback for the external FairMQ device proxy ProcessorSpec generator
469-
auto converter = [](FairMQDevice& device, FairMQParts& inputs, ChannelRetriever channelRetriever) {
469+
auto converter = [](TimingInfo&, FairMQDevice& device, FairMQParts& inputs, ChannelRetriever channelRetriever) {
470470
ASSERT_ERROR(inputs.Size() >= 2);
471471
if (inputs.Size() < 2) {
472472
return;

Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
344344
// reads the messages from the output proxy via the out-of-band channel
345345

346346
// converter callback for the external FairMQ device proxy ProcessorSpec generator
347-
auto converter = [](FairMQDevice& device, FairMQParts& inputs, ChannelRetriever channelRetriever) {
347+
auto converter = [](TimingInfo&, FairMQDevice& device, FairMQParts& inputs, ChannelRetriever channelRetriever) {
348348
ASSERT_ERROR(inputs.Size() >= 2);
349349
if (inputs.Size() < 2) {
350350
return;

Utilities/DataSampling/src/DataSamplingReadoutAdapter.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ static std::atomic<unsigned int> blockId = 0;
2525

2626
InjectorFunction dataSamplingReadoutAdapter(OutputSpec const& spec)
2727
{
28-
return [spec](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
28+
return [spec](TimingInfo&, FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
2929
for (size_t i = 0; i < parts.Size(); ++i) {
3030

3131
DataHeader dh;

0 commit comments

Comments
 (0)