2323#include " Framework/SourceInfoHeader.h"
2424#include " Framework/ConfigParamRegistry.h"
2525#include " Framework/RateLimiter.h"
26+ #include " Framework/TimingInfo.h"
2627#include " Headers/DataHeader.h"
2728#include " Headers/Stack.h"
2829
@@ -171,7 +172,7 @@ void sendOnChannel(FairMQDevice& device, FairMQMessagePtr&& headerMessage, FairM
171172InjectorFunction o2DataModelAdaptor (OutputSpec const & spec, uint64_t startTime, uint64_t /* step*/ )
172173{
173174 auto timesliceId = std::make_shared<size_t >(startTime);
174- return [timesliceId, spec](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
175+ return [timesliceId, spec](TimingInfo&, FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
175176 for (int i = 0 ; i < parts.Size () / 2 ; ++i) {
176177 auto dh = o2::header::get<DataHeader*>(parts.At (i * 2 )->GetData ());
177178
@@ -219,7 +220,7 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL
219220 std::string descriptions;
220221 };
221222
222- return [filterSpecs = std::move (filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
223+ return [filterSpecs = std::move (filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](TimingInfo& timingInfo, FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
223224 std::unordered_map<std::string, FairMQParts> outputs;
224225 std::vector<std::string> unmatchedDescriptions;
225226 static int64_t dplCounter = -1 ;
@@ -239,6 +240,7 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL
239240 continue ;
240241 }
241242 const_cast <DataProcessingHeader*>(dph)->startTime = dplCounter;
243+ timingInfo.timeslice = dph->startTime ;
242244 LOG (debug) << msgidx << " : " << DataSpecUtils::describe (OutputSpec{dh->dataOrigin , dh->dataDescription , dh->subSpecification }) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At (msgidx + 1 )->GetSize ();
243245
244246 OutputSpec query{dh->dataOrigin , dh->dataDescription , dh->subSpecification };
@@ -337,7 +339,7 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, uint64_t startTime
337339{
338340 auto timesliceId = std::make_shared<size_t >(startTime);
339341
340- return [timesliceId, spec, step](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
342+ return [timesliceId, spec, step](TimingInfo&, FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
341343 // We iterate on all the parts and we send them two by two,
342344 // adding the appropriate O2 header.
343345 for (int i = 0 ; i < parts.Size (); ++i) {
@@ -363,7 +365,8 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, uint64_t startTime
363365DataProcessorSpec specifyExternalFairMQDeviceProxy (char const * name,
364366 std::vector<OutputSpec> const & outputs,
365367 char const * defaultChannelConfig,
366- std::function<void (FairMQDevice&,
368+ std::function<void (TimingInfo&,
369+ FairMQDevice&,
367370 FairMQParts&,
368371 ChannelRetriever)>
369372 converter,
@@ -397,7 +400,11 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
397400 ctx.services ().get <CallbackService>().set (CallbackService::Id::Start, channelConfigurationChecker);
398401 // Converter should pump messages
399402
400- auto dataHandler = [device, converter, outputRoutes = std::move (outputRoutes), control = &ctx.services ().get <ControlService>(), outputChannels = std::move (outputChannels)](FairMQParts& inputs, int ) {
403+ auto dataHandler = [device, converter,
404+ outputRoutes = std::move (outputRoutes),
405+ control = &ctx.services ().get <ControlService>(),
406+ &timingInfo = ctx.services ().get <TimingInfo>(),
407+ outputChannels = std::move (outputChannels)](FairMQParts& inputs, int ) {
401408 // pass a copy of the outputRoutes
402409 auto channelRetriever = [&outputRoutes](OutputSpec const & query, DataProcessingHeader::StartTime timeslice) -> std::string {
403410 for (auto & route : outputRoutes) {
@@ -421,7 +428,7 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
421428 };
422429 // we buffer the condition since the converter will forward messages by move
423430 bool doEos = checkEos ();
424- converter (*device, inputs, channelRetriever);
431+ converter (timingInfo, *device, inputs, channelRetriever);
425432
426433 if (doEos) {
427434 for (auto const & channel : outputChannels) {
0 commit comments