Skip to content

Commit 73e6cf8

Browse files
authored
O2sim: Aggregating mode for mctracks proxy (#11608)
1 parent 66d40fc commit 73e6cf8

5 files changed

Lines changed: 208 additions & 119 deletions

File tree

Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
#include "Framework/DataProcessorSpec.h"
1515
#include "Framework/OutputSpec.h"
16-
#include "Framework/DataAllocator.h"
1716
#include <fairmq/FwdDecls.h>
1817
#include <vector>
1918
#include <functional>
@@ -43,6 +42,10 @@ void sendOnChannel(fair::mq::Device& device, o2::header::Stack&& headerStack, fa
4342

4443
void sendOnChannel(fair::mq::Device& device, fair::mq::Parts& messages, std::string const& channel, size_t timeSlice);
4544

45+
/// append a header/payload part to multipart message for aggregate sending, a channel retriever
46+
/// callback is required to get the associated fair::mq::Channel
47+
void appendForSending(fair::mq::Device& device, o2::header::Stack&& headerStack, size_t timeSliceID, fair::mq::MessagePtr&& payloadMessage, OutputSpec const& spec, fair::mq::Parts& messageCache, ChannelRetriever& channelRetriever);
48+
4649
/// Helper function which takes a set of inputs coming from a device,
4750
/// massages them so that they are valid DPL messages using @param spec as header
4851
/// and sends them to the downstream components.
@@ -101,7 +104,8 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* label,
101104
std::vector<OutputSpec> const& outputs,
102105
const char* defaultChannelConfig,
103106
InjectorFunction converter,
104-
uint64_t minSHM = 0);
107+
uint64_t minSHM = 0,
108+
bool sendTFcounter = false);
105109

106110
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* label,
107111
Inputs const& inputSpecs,

Framework/Core/src/ExternalFairMQDeviceProxy.cxx

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "Framework/RateLimiter.h"
2727
#include "Framework/TimingInfo.h"
2828
#include "Framework/DeviceState.h"
29+
#include "Framework/Monitoring.h"
2930
#include "Headers/DataHeader.h"
3031
#include "Headers/Stack.h"
3132
#include "CommonConstants/LHCConstants.h"
@@ -172,6 +173,31 @@ void sendOnChannel(fair::mq::Device& device, fair::mq::MessagePtr&& headerMessag
172173
sendOnChannel(device, out, spec, tslice, channelRetriever);
173174
}
174175

176+
void appendForSending(fair::mq::Device& device, o2::header::Stack&& headerStack, size_t timeSliceID, fair::mq::MessagePtr&& payloadMessage, OutputSpec const& spec, fair::mq::Parts& messageCache, ChannelRetriever& channelRetriever)
177+
{
178+
auto channelName = channelRetriever(spec, timeSliceID);
179+
constexpr auto index = 0;
180+
if (channelName.empty()) {
181+
LOG(warning) << "can not find matching channel for " << DataSpecUtils::describe(spec);
182+
return;
183+
}
184+
for (auto& channelInfo : device.GetChannels()) {
185+
if (channelInfo.first != channelName) {
186+
continue;
187+
}
188+
assert(channelInfo.second.size() == 1);
189+
// allocate the header message using the underlying transport of the channel
190+
auto channelAlloc = o2::pmr::getTransportAllocator(channelInfo.second[index].Transport());
191+
fair::mq::MessagePtr headerMessage = o2::pmr::getMessage(std::move(headerStack), channelAlloc);
192+
193+
fair::mq::Parts out;
194+
messageCache.AddPart(std::move(headerMessage));
195+
messageCache.AddPart(std::move(payloadMessage));
196+
return;
197+
}
198+
LOG(error) << "internal mismatch, can not find channel " << channelName << " in the list of channel infos of the device";
199+
}
200+
175201
InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t /*step*/)
176202
{
177203
return [spec](TimingInfo&, fair::mq::Device& device, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) {
@@ -399,7 +425,8 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
399425
std::vector<OutputSpec> const& outputs,
400426
char const* defaultChannelConfig,
401427
InjectorFunction converter,
402-
uint64_t minSHM)
428+
uint64_t minSHM,
429+
bool sendTFcounter)
403430
{
404431
DataProcessorSpec spec;
405432
spec.name = strdup(name);
@@ -411,7 +438,7 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
411438
// The Init method will register a new "Out of band" channel and
412439
// attach an OnData to it which is responsible for converting incoming
413440
// messages into DPL messages.
414-
spec.algorithm = AlgorithmSpec{[converter, minSHM, deviceName = spec.name](InitContext& ctx) {
441+
spec.algorithm = AlgorithmSpec{[converter, minSHM, deviceName = spec.name, sendTFcounter](InitContext& ctx) {
415442
auto* device = ctx.services().get<RawDeviceService>().device();
416443
// make a copy of the output routes and pass to the lambda by move
417444
auto outputRoutes = ctx.services().get<RawDeviceService>().spec().outputs;
@@ -567,7 +594,7 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
567594
}
568595
};
569596

570-
auto runHandler = [dataHandler, minSHM](ProcessingContext& ctx) {
597+
auto runHandler = [dataHandler, minSHM, sendTFcounter](ProcessingContext& ctx) {
571598
static RateLimiter limiter;
572599
auto device = ctx.services().get<RawDeviceService>().device();
573600
limiter.check(ctx, std::stoi(device->fConfig->GetValue<std::string>("timeframes-rate-limit")), minSHM);
@@ -598,6 +625,9 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
598625
timingInfo.creation = dph->creation;
599626
}
600627
dataHandler(timingInfo, parts, 0, ci);
628+
if (sendTFcounter) {
629+
ctx.services().get<o2::monitoring::Monitoring>().send(o2::monitoring::Metric{(uint64_t)timingInfo.tfCounter, "df-sent"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
630+
}
601631
}
602632
if (nReceived == 0 || channels.size() == 1) {
603633
break;

run/SimExamples/McTracksToAOD/run.sh

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,20 @@
22

33
set -x
44

5-
NEVENTS=100
6-
# launch generator process (for 100 min bias Pythia8 events; no Geant; no geometry)
7-
o2-sim -j 1 -g pythia8pp -n ${NEVENTS} --noDiscOutput --forwardKine --noGeant -m CAVE -e TGeant3 &> sim.log &
5+
NEVENTS=10000
6+
# launch generator process (for 10000 min bias Pythia8 events; no Geant; no geometry)
7+
o2-sim -j 1 -g pythia8pp -n ${NEVENTS} --noDiscOutput --forwardKine --noGeant &> sim.log &
88
SIMPROC=$!
99

1010
# launch a DPL process (having the right proxy configuration)
1111
# (Note that the option --o2sim-pid is not strictly necessary when only one o2-sim process is running.
1212
# The socket will than be auto-determined.)
1313

14-
o2-sim-mctracks-proxy -b --nevents ${NEVENTS} --o2sim-pid ${SIMPROC} | o2-sim-mctracks-to-aod -b | o2-analysis-mctracks-to-aod-simple-task -b &
14+
# --aggregate-timeframe 200 is used to combine 200 generated events into a timeframe that is then converted to AOD tables
15+
# note that if you need special configuration for the analysis tasks, it needs to be passed to proxy and converter as well
16+
o2-sim-mctracks-proxy -b --nevents ${NEVENTS} --o2sim-pid ${SIMPROC} --aggregate-timeframe 200 |\
17+
o2-sim-mctracks-to-aod -b |\
18+
o2-analysis-mctracks-to-aod-simple-task -b &
1519
TRACKANAPROC=$!
1620

1721
wait ${SIMPROC}

run/o2sim_mctracks_proxy.cxx

Lines changed: 94 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111

1212
#include <boost/program_options.hpp>
1313

14+
#include "../Framework/Core/src/ArrowSupport.h"
1415
#include "Framework/WorkflowSpec.h"
1516
#include "Framework/ConfigParamSpec.h"
16-
#include "Framework/CommonDataProcessors.h"
1717
#include "Framework/ExternalFairMQDeviceProxy.h"
1818
#include "Framework/Task.h"
1919
#include "Framework/DataRef.h"
@@ -34,6 +34,7 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
3434
workflowOptions.push_back(ConfigParamSpec{"enable-test-consumer", o2::framework::VariantType::Bool, false, {"enable a simple test consumer for injected MC tracks"}});
3535
workflowOptions.push_back(ConfigParamSpec{"o2sim-pid", o2::framework::VariantType::Int, -1, {"The process id of the source o2-sim"}});
3636
workflowOptions.push_back(ConfigParamSpec{"nevents", o2::framework::VariantType::Int, -1, {"The number of events expected to arrive on the proxy"}});
37+
workflowOptions.push_back(ConfigParamSpec{"aggregate-timeframe", o2::framework::VariantType::Int, -1, {"The number of events to aggregate per timeframe"}});
3738
}
3839

3940
#include "Framework/runDataProcessing.h"
@@ -43,7 +44,7 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
4344
class ConsumerTask
4445
{
4546
public:
46-
void init(o2::framework::InitContext& ic) {}
47+
void init(o2::framework::InitContext& /*ic*/) {}
4748
void run(o2::framework::ProcessingContext& pc)
4849
{
4950
LOG(debug) << "Running simple kinematics consumer client";
@@ -63,38 +64,96 @@ class ConsumerTask
6364

6465
/// Function converting raw input data to DPL data format. Uses knowledge of how MCTracks and MCEventHeaders
6566
/// are sent from the o2sim side.
66-
InjectorFunction o2simKinematicsConverter(std::vector<OutputSpec> const& specs, uint64_t startTime, uint64_t step, int nevents)
67+
/// If aggregate-timeframe is set to non-negative value N, this number of events is accumulated and then sent
68+
/// as a multipart message, which is useful for AOD creation
69+
InjectorFunction o2simKinematicsConverter(std::vector<OutputSpec> const& specs, uint64_t startTime, uint64_t step, int nevents, int nPerTF)
6770
{
6871
auto timesliceId = std::make_shared<size_t>(startTime);
72+
auto totalEventCounter = std::make_shared<size_t>(0);
73+
auto eventCounter = std::make_shared<size_t>(0);
74+
auto TFcounter = std::make_shared<size_t>(startTime);
75+
auto MCHeadersMessageCache = std::make_shared<fair::mq::Parts>();
76+
auto MCTracksMessageCache = std::make_shared<fair::mq::Parts>();
6977

70-
return [timesliceId, specs, step, nevents](TimingInfo&, fair::mq::Device& device, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) {
78+
return [timesliceId, specs, step, nevents, nPerTF, totalEventCounter, eventCounter, TFcounter, MCHeadersMessageCache = MCHeadersMessageCache, MCTracksMessageCache = MCTracksMessageCache](TimingInfo& ti, fair::mq::Device& device, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) mutable {
7179
// We iterate on all the parts and we send them two by two,
7280
// adding the appropriate O2 header.
73-
static int eventcounter = 0;
74-
75-
for (int i = 0; i < parts.Size(); ++i) {
76-
DataHeader dh;
77-
ConcreteDataMatcher matcher = DataSpecUtils::asConcreteDataMatcher(specs[i]);
78-
dh.dataOrigin = matcher.origin;
79-
dh.dataDescription = matcher.description;
80-
dh.subSpecification = matcher.subSpec;
81-
dh.payloadSize = parts.At(i)->GetSize();
82-
if (i == 0) {
83-
dh.payloadSerializationMethod = gSerializationMethodROOT;
84-
} else if (i == 1) {
85-
dh.payloadSerializationMethod = gSerializationMethodROOT;
86-
}
81+
if (nPerTF < 0) {
82+
// if no aggregation requested, forward each message with the DPL header
8783
if (*timesliceId != newTimesliceId) {
8884
LOG(fatal) << "Time slice ID provided from oldestPossible mechanism " << newTimesliceId << " is out of sync with expected value " << *timesliceId;
8985
}
90-
DataProcessingHeader dph{newTimesliceId, 0};
91-
// we have to move the incoming data
92-
o2::header::Stack headerStack{dh, dph};
93-
sendOnChannel(device, std::move(headerStack), std::move(parts.At(i)), specs[i], channelRetriever);
86+
for (auto i = 0U; i < parts.Size(); ++i) {
87+
DataHeader dh;
88+
ConcreteDataMatcher matcher = DataSpecUtils::asConcreteDataMatcher(specs[i]);
89+
dh.dataOrigin = matcher.origin;
90+
dh.dataDescription = matcher.description;
91+
dh.subSpecification = matcher.subSpec;
92+
dh.payloadSize = parts.At(i)->GetSize();
93+
dh.payloadSerializationMethod = gSerializationMethodROOT;
94+
DataProcessingHeader dph{newTimesliceId, 0};
95+
// we have to move the incoming data
96+
o2::header::Stack headerStack{dh, dph};
97+
sendOnChannel(device, std::move(headerStack), std::move(parts.At(i)), specs[i], channelRetriever);
98+
}
99+
*timesliceId += step;
100+
} else {
101+
// if aggregation is requested, colelct the payloads into a multipart message
102+
ti.timeslice = *TFcounter;
103+
ti.tfCounter = *TFcounter;
104+
DataHeader headerDH;
105+
DataHeader tracksDH;
106+
auto headerSize = parts.At(0)->GetSize();
107+
auto tracksSize = parts.At(1)->GetSize();
108+
109+
DataProcessingHeader hdph{*TFcounter, 0};
110+
ConcreteDataMatcher headerMatcher = DataSpecUtils::asConcreteDataMatcher(specs[0]);
111+
headerDH.dataOrigin = headerMatcher.origin;
112+
headerDH.dataDescription = headerMatcher.description;
113+
headerDH.subSpecification = headerMatcher.subSpec;
114+
headerDH.payloadSize = headerSize;
115+
headerDH.payloadSerializationMethod = gSerializationMethodROOT;
116+
headerDH.splitPayloadParts = nPerTF;
117+
headerDH.splitPayloadIndex = *eventCounter;
118+
o2::header::Stack hhs{headerDH, hdph};
119+
120+
DataProcessingHeader tdph{*TFcounter, 0};
121+
ConcreteDataMatcher tracksMatcher = DataSpecUtils::asConcreteDataMatcher(specs[1]);
122+
tracksDH.dataOrigin = tracksMatcher.origin;
123+
tracksDH.dataDescription = tracksMatcher.description;
124+
tracksDH.subSpecification = tracksMatcher.subSpec;
125+
tracksDH.payloadSize = tracksSize;
126+
tracksDH.payloadSerializationMethod = gSerializationMethodROOT;
127+
tracksDH.splitPayloadParts = nPerTF;
128+
tracksDH.splitPayloadIndex = *eventCounter;
129+
o2::header::Stack ths{tracksDH, tdph};
130+
131+
appendForSending(device, std::move(hhs), *TFcounter, std::move(parts.At(0)), specs[0], *MCHeadersMessageCache.get(), channelRetriever);
132+
appendForSending(device, std::move(ths), *TFcounter, std::move(parts.At(1)), specs[1], *MCTracksMessageCache.get(), channelRetriever);
133+
++(*eventCounter);
94134
}
95-
*timesliceId += step;
96-
eventcounter++;
97-
if (eventcounter == nevents) {
135+
136+
++(*totalEventCounter);
137+
if (nPerTF > 0 && *eventCounter == static_cast<size_t>(nPerTF)) {
138+
// if aggregation is requested, only send the accumulated vectors
139+
LOGP(info, ">> Events: {}; TF counter: {}", *eventCounter, *TFcounter);
140+
*eventCounter = 0;
141+
++(*TFcounter);
142+
sendOnChannel(device, *MCHeadersMessageCache.get(), channelRetriever(specs[0], *TFcounter), *TFcounter);
143+
sendOnChannel(device, *MCTracksMessageCache.get(), channelRetriever(specs[1], *TFcounter), *TFcounter);
144+
MCHeadersMessageCache->Clear();
145+
MCTracksMessageCache->Clear();
146+
}
147+
148+
if (*totalEventCounter == static_cast<size_t>(nevents)) {
149+
if (nPerTF > 0) {
150+
// send accumulated messages if the limit is reached
151+
++(*TFcounter);
152+
sendOnChannel(device, *MCHeadersMessageCache.get(), channelRetriever(specs[0], *TFcounter), *TFcounter);
153+
sendOnChannel(device, *MCTracksMessageCache.get(), channelRetriever(specs[1], *TFcounter), *TFcounter);
154+
MCHeadersMessageCache->Clear();
155+
MCTracksMessageCache->Clear();
156+
}
98157
// I am done (I don't expect more events to convert); so tell the proxy device to shut-down
99158
stop = true;
100159
}
@@ -114,7 +173,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
114173

115174
// fetch the number of events to expect
116175
auto nevents = configcontext.options().get<int>("nevents");
117-
o2::framework::InjectorFunction f = o2simKinematicsConverter(outputs, 0, 1, nevents);
176+
auto nEventsPerTF = configcontext.options().get<int>("aggregate-timeframe");
177+
o2::framework::InjectorFunction f = o2simKinematicsConverter(outputs, 0, 1, nevents, nEventsPerTF);
118178

119179
// construct the input channel to listen on
120180
// use given pid
@@ -142,11 +202,16 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
142202

143203
auto proxy = specifyExternalFairMQDeviceProxy("o2sim-mctrack-proxy",
144204
outputs,
145-
channelspec.c_str(), f);
146-
proxy.algorithm = CommonDataProcessors::wrapWithRateLimiting(proxy.algorithm);
205+
channelspec.c_str(), f, 0, true);
206+
// add monitoring service to be able to report number of timeframes sent for the rate limiting to work
207+
proxy.requiredServices.push_back(o2::framework::ArrowSupport::arrowBackendSpec());
208+
// if aggregation is requested, set the enumeration repetitions to aggregation size
209+
if (nEventsPerTF > 0) {
210+
proxy.inputs.emplace_back(InputSpec{"clock", "enum", "DPL", 0, Lifetime::Enumeration, {ConfigParamSpec{"repetitions", VariantType::Int64, static_cast<int64_t>(nEventsPerTF), {"merged events"}}}});
211+
}
147212
specs.push_back(proxy);
148213

149-
if (configcontext.options().get<bool>("enable-test-consumer")) {
214+
if (configcontext.options().get<bool>("enable-test-consumer") && (nEventsPerTF < 0)) {
150215
// connect a test consumer
151216
std::vector<InputSpec> inputs;
152217
inputs.emplace_back("mctracks", "MC", "MCTRACKS", 0., Lifetime::Timeframe);

0 commit comments

Comments
 (0)