1111
1212#include < boost/program_options.hpp>
1313
14+ #include " ../Framework/Core/src/ArrowSupport.h"
1415#include " Framework/WorkflowSpec.h"
1516#include " Framework/ConfigParamSpec.h"
16- #include " Framework/CommonDataProcessors.h"
1717#include " Framework/ExternalFairMQDeviceProxy.h"
1818#include " Framework/Task.h"
1919#include " Framework/DataRef.h"
@@ -34,6 +34,7 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
3434 workflowOptions.push_back (ConfigParamSpec{" enable-test-consumer" , o2::framework::VariantType::Bool, false , {" enable a simple test consumer for injected MC tracks" }});
3535 workflowOptions.push_back (ConfigParamSpec{" o2sim-pid" , o2::framework::VariantType::Int, -1 , {" The process id of the source o2-sim" }});
3636 workflowOptions.push_back (ConfigParamSpec{" nevents" , o2::framework::VariantType::Int, -1 , {" The number of events expected to arrive on the proxy" }});
37+ workflowOptions.push_back (ConfigParamSpec{" aggregate-timeframe" , o2::framework::VariantType::Int, -1 , {" The number of events to aggregate per timeframe" }});
3738}
3839
3940#include " Framework/runDataProcessing.h"
@@ -43,7 +44,7 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
4344class ConsumerTask
4445{
4546 public:
46- void init (o2::framework::InitContext& ic ) {}
47+ void init (o2::framework::InitContext& /* ic */ ) {}
4748 void run (o2::framework::ProcessingContext& pc)
4849 {
4950 LOG (debug) << " Running simple kinematics consumer client" ;
@@ -63,38 +64,96 @@ class ConsumerTask
6364
6465// / Function converting raw input data to DPL data format. Uses knowledge of how MCTracks and MCEventHeaders
6566// / are sent from the o2sim side.
66- InjectorFunction o2simKinematicsConverter (std::vector<OutputSpec> const & specs, uint64_t startTime, uint64_t step, int nevents)
67+ // / If aggregate-timeframe is set to non-negative value N, this number of events is accumulated and then sent
68+ // / as a multipart message, which is useful for AOD creation
69+ InjectorFunction o2simKinematicsConverter (std::vector<OutputSpec> const & specs, uint64_t startTime, uint64_t step, int nevents, int nPerTF)
6770{
6871 auto timesliceId = std::make_shared<size_t >(startTime);
72+ auto totalEventCounter = std::make_shared<size_t >(0 );
73+ auto eventCounter = std::make_shared<size_t >(0 );
74+ auto TFcounter = std::make_shared<size_t >(startTime);
75+ auto MCHeadersMessageCache = std::make_shared<fair::mq::Parts>();
76+ auto MCTracksMessageCache = std::make_shared<fair::mq::Parts>();
6977
70- return [timesliceId, specs, step, nevents](TimingInfo&, fair::mq::Device& device, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool & stop) {
78+ return [timesliceId, specs, step, nevents, nPerTF, totalEventCounter, eventCounter, TFcounter, MCHeadersMessageCache = MCHeadersMessageCache, MCTracksMessageCache = MCTracksMessageCache ](TimingInfo& ti , fair::mq::Device& device, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool & stop) mutable {
7179 // We iterate on all the parts and we send them two by two,
7280 // adding the appropriate O2 header.
73- static int eventcounter = 0 ;
74-
75- for (int i = 0 ; i < parts.Size (); ++i) {
76- DataHeader dh;
77- ConcreteDataMatcher matcher = DataSpecUtils::asConcreteDataMatcher (specs[i]);
78- dh.dataOrigin = matcher.origin ;
79- dh.dataDescription = matcher.description ;
80- dh.subSpecification = matcher.subSpec ;
81- dh.payloadSize = parts.At (i)->GetSize ();
82- if (i == 0 ) {
83- dh.payloadSerializationMethod = gSerializationMethodROOT ;
84- } else if (i == 1 ) {
85- dh.payloadSerializationMethod = gSerializationMethodROOT ;
86- }
81+ if (nPerTF < 0 ) {
82+ // if no aggregation requested, forward each message with the DPL header
8783 if (*timesliceId != newTimesliceId) {
8884 LOG (fatal) << " Time slice ID provided from oldestPossible mechanism " << newTimesliceId << " is out of sync with expected value " << *timesliceId;
8985 }
90- DataProcessingHeader dph{newTimesliceId, 0 };
91- // we have to move the incoming data
92- o2::header::Stack headerStack{dh, dph};
93- sendOnChannel (device, std::move (headerStack), std::move (parts.At (i)), specs[i], channelRetriever);
86+ for (auto i = 0U ; i < parts.Size (); ++i) {
87+ DataHeader dh;
88+ ConcreteDataMatcher matcher = DataSpecUtils::asConcreteDataMatcher (specs[i]);
89+ dh.dataOrigin = matcher.origin ;
90+ dh.dataDescription = matcher.description ;
91+ dh.subSpecification = matcher.subSpec ;
92+ dh.payloadSize = parts.At (i)->GetSize ();
93+ dh.payloadSerializationMethod = gSerializationMethodROOT ;
94+ DataProcessingHeader dph{newTimesliceId, 0 };
95+ // we have to move the incoming data
96+ o2::header::Stack headerStack{dh, dph};
97+ sendOnChannel (device, std::move (headerStack), std::move (parts.At (i)), specs[i], channelRetriever);
98+ }
99+ *timesliceId += step;
100+ } else {
101+ // if aggregation is requested, colelct the payloads into a multipart message
102+ ti.timeslice = *TFcounter;
103+ ti.tfCounter = *TFcounter;
104+ DataHeader headerDH;
105+ DataHeader tracksDH;
106+ auto headerSize = parts.At (0 )->GetSize ();
107+ auto tracksSize = parts.At (1 )->GetSize ();
108+
109+ DataProcessingHeader hdph{*TFcounter, 0 };
110+ ConcreteDataMatcher headerMatcher = DataSpecUtils::asConcreteDataMatcher (specs[0 ]);
111+ headerDH.dataOrigin = headerMatcher.origin ;
112+ headerDH.dataDescription = headerMatcher.description ;
113+ headerDH.subSpecification = headerMatcher.subSpec ;
114+ headerDH.payloadSize = headerSize;
115+ headerDH.payloadSerializationMethod = gSerializationMethodROOT ;
116+ headerDH.splitPayloadParts = nPerTF;
117+ headerDH.splitPayloadIndex = *eventCounter;
118+ o2::header::Stack hhs{headerDH, hdph};
119+
120+ DataProcessingHeader tdph{*TFcounter, 0 };
121+ ConcreteDataMatcher tracksMatcher = DataSpecUtils::asConcreteDataMatcher (specs[1 ]);
122+ tracksDH.dataOrigin = tracksMatcher.origin ;
123+ tracksDH.dataDescription = tracksMatcher.description ;
124+ tracksDH.subSpecification = tracksMatcher.subSpec ;
125+ tracksDH.payloadSize = tracksSize;
126+ tracksDH.payloadSerializationMethod = gSerializationMethodROOT ;
127+ tracksDH.splitPayloadParts = nPerTF;
128+ tracksDH.splitPayloadIndex = *eventCounter;
129+ o2::header::Stack ths{tracksDH, tdph};
130+
131+ appendForSending (device, std::move (hhs), *TFcounter, std::move (parts.At (0 )), specs[0 ], *MCHeadersMessageCache.get (), channelRetriever);
132+ appendForSending (device, std::move (ths), *TFcounter, std::move (parts.At (1 )), specs[1 ], *MCTracksMessageCache.get (), channelRetriever);
133+ ++(*eventCounter);
94134 }
95- *timesliceId += step;
96- eventcounter++;
97- if (eventcounter == nevents) {
135+
136+ ++(*totalEventCounter);
137+ if (nPerTF > 0 && *eventCounter == static_cast <size_t >(nPerTF)) {
138+ // if aggregation is requested, only send the accumulated vectors
139+ LOGP (info, " >> Events: {}; TF counter: {}" , *eventCounter, *TFcounter);
140+ *eventCounter = 0 ;
141+ ++(*TFcounter);
142+ sendOnChannel (device, *MCHeadersMessageCache.get (), channelRetriever (specs[0 ], *TFcounter), *TFcounter);
143+ sendOnChannel (device, *MCTracksMessageCache.get (), channelRetriever (specs[1 ], *TFcounter), *TFcounter);
144+ MCHeadersMessageCache->Clear ();
145+ MCTracksMessageCache->Clear ();
146+ }
147+
148+ if (*totalEventCounter == static_cast <size_t >(nevents)) {
149+ if (nPerTF > 0 ) {
150+ // send accumulated messages if the limit is reached
151+ ++(*TFcounter);
152+ sendOnChannel (device, *MCHeadersMessageCache.get (), channelRetriever (specs[0 ], *TFcounter), *TFcounter);
153+ sendOnChannel (device, *MCTracksMessageCache.get (), channelRetriever (specs[1 ], *TFcounter), *TFcounter);
154+ MCHeadersMessageCache->Clear ();
155+ MCTracksMessageCache->Clear ();
156+ }
98157 // I am done (I don't expect more events to convert); so tell the proxy device to shut-down
99158 stop = true ;
100159 }
@@ -114,7 +173,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
114173
115174 // fetch the number of events to expect
116175 auto nevents = configcontext.options ().get <int >(" nevents" );
117- o2::framework::InjectorFunction f = o2simKinematicsConverter (outputs, 0 , 1 , nevents);
176+ auto nEventsPerTF = configcontext.options ().get <int >(" aggregate-timeframe" );
177+ o2::framework::InjectorFunction f = o2simKinematicsConverter (outputs, 0 , 1 , nevents, nEventsPerTF);
118178
119179 // construct the input channel to listen on
120180 // use given pid
@@ -142,11 +202,16 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
142202
143203 auto proxy = specifyExternalFairMQDeviceProxy (" o2sim-mctrack-proxy" ,
144204 outputs,
145- channelspec.c_str (), f);
146- proxy.algorithm = CommonDataProcessors::wrapWithRateLimiting (proxy.algorithm );
205+ channelspec.c_str (), f, 0 , true );
206+ // add monitoring service to be able to report number of timeframes sent for the rate limiting to work
207+ proxy.requiredServices .push_back (o2::framework::ArrowSupport::arrowBackendSpec ());
208+ // if aggregation is requested, set the enumeration repetitions to aggregation size
209+ if (nEventsPerTF > 0 ) {
210+ proxy.inputs .emplace_back (InputSpec{" clock" , " enum" , " DPL" , 0 , Lifetime::Enumeration, {ConfigParamSpec{" repetitions" , VariantType::Int64, static_cast <int64_t >(nEventsPerTF), {" merged events" }}}});
211+ }
147212 specs.push_back (proxy);
148213
149- if (configcontext.options ().get <bool >(" enable-test-consumer" )) {
214+ if (configcontext.options ().get <bool >(" enable-test-consumer" ) && (nEventsPerTF < 0 ) ) {
150215 // connect a test consumer
151216 std::vector<InputSpec> inputs;
152217 inputs.emplace_back (" mctracks" , " MC" , " MCTRACKS" , 0 ., Lifetime::Timeframe);
0 commit comments