Skip to content

Commit 4f0db8d

Browse files
authored
O2sim: Add rate limiting to mctracks proxy (AliceO2Group#11578)
* O2sim: rate-limited proxy * change channel to pair from pub/sub to prevent dropping events * add rate limit to example script * send EoS to proxy if we finish early
1 parent 158c44e commit 4f0db8d

4 files changed

Lines changed: 37 additions & 13 deletions

File tree

run/SimExamples/MCTrackToDPL/run.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
set -x
44

55
NEVENTS=100
6+
RATELIMIT=1
67
# launch generator process (for 100 min bias Pythia8 events; no Geant; no geometry)
78
o2-sim -j 1 -g pythia8pp -n ${NEVENTS} --noDiscOutput --forwardKine --noGeant -m CAVE -e TGeant3 &> sim.log &
89
SIMPROC=$!
910

1011
# launch a DPL process (having the right proxy configuration)
1112
# (Note that the option --o2sim-pid is not strictly necessary when only one o2-sim process is running.
1213
# The socket will than be auto-determined.)
13-
o2-sim-mctracks-proxy -b --enable-test-consumer --nevents ${NEVENTS} --o2sim-pid ${SIMPROC} &
14+
o2-sim-mctracks-proxy -b --enable-test-consumer --nevents ${NEVENTS} --o2sim-pid ${SIMPROC} --timeframes-rate-limit ${RATELIMIT} &
1415
TRACKANAPROC=$!
1516

1617
wait ${SIMPROC}

run/o2sim_mctracks_proxy.cxx

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313

1414
#include "Framework/WorkflowSpec.h"
1515
#include "Framework/ConfigParamSpec.h"
16-
#include "Framework/CompletionPolicy.h"
17-
#include "Framework/CompletionPolicyHelpers.h"
18-
#include "Framework/DeviceSpec.h"
16+
#include "Framework/CommonDataProcessors.h"
1917
#include "Framework/ExternalFairMQDeviceProxy.h"
2018
#include "Framework/Task.h"
2119
#include "Framework/DataRef.h"
@@ -53,10 +51,13 @@ class ConsumerTask
5351
auto const* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
5452
LOG(debug) << "Payload size " << dh->payloadSize << " method " << dh->payloadSerializationMethod.as<std::string>();
5553
}
56-
auto tracks = pc.inputs().get<std::vector<o2::MCTrack>>("mctracks");
57-
auto eventheader = pc.inputs().get<o2::dataformats::MCEventHeader*>("mcheader");
58-
LOG(info) << "Got " << tracks.size() << " tracks";
59-
LOG(info) << "Got " << eventheader->GetB() << " as impact parameter in the event header";
54+
try {
55+
auto tracks = pc.inputs().get<std::vector<o2::MCTrack>>("mctracks");
56+
auto eventheader = pc.inputs().get<o2::dataformats::MCEventHeader*>("mcheader");
57+
LOG(info) << "Got " << tracks.size() << " tracks";
58+
LOG(info) << "Got " << eventheader->GetB() << " as impact parameter in the event header";
59+
} catch (...) {
60+
}
6061
}
6162
};
6263

@@ -119,7 +120,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
119120
// use given pid
120121
// TODO: this could go away with a proper pipeline implementation
121122
std::string channelspec;
122-
std::string channelbase = "type=sub,method=connect,address=ipc://";
123+
std::string channelbase = "type=pair,method=connect,address=ipc://";
123124
if (configcontext.options().get<int>("o2sim-pid") != -1) {
124125
std::stringstream channelstr;
125126
channelstr << channelbase << "/tmp/o2sim-hitmerger-kineforward-" << configcontext.options().get<int>("o2sim-pid") << ",rateLogging=100";
@@ -139,9 +140,11 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
139140
channelspec = channelbase + socketlist[0] + ",rateLogging=100";
140141
}
141142

142-
specs.emplace_back(specifyExternalFairMQDeviceProxy("o2sim-mctrack-proxy",
143-
outputs,
144-
channelspec.c_str(), f));
143+
auto proxy = specifyExternalFairMQDeviceProxy("o2sim-mctrack-proxy",
144+
outputs,
145+
channelspec.c_str(), f);
146+
proxy.algorithm = CommonDataProcessors::wrapWithRateLimiting(proxy.algorithm);
147+
specs.push_back(proxy);
145148

146149
if (configcontext.options().get<bool>("enable-test-consumer")) {
147150
// connect a test consumer

run/o2sim_parallel.cxx

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
#include <unordered_map>
4242
#include <filesystem>
4343
#include <atomic>
44+
#include "Framework/SourceInfoHeader.h"
45+
#include "Headers/Stack.h"
4446

4547
#include "SimPublishChannelHelper.h"
4648
#include <CommonUtils/FileSystemUtils.h>
@@ -89,6 +91,24 @@ void remove_tmp_files()
8991

9092
void cleanup()
9193
{
94+
auto& conf = o2::conf::SimConfig::Instance();
95+
if (conf.forwardKine()) {
96+
auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
97+
auto forwardchannel = fair::mq::Channel{"kineforward", "pair", factory};
98+
auto address = std::string{"ipc:///tmp/o2sim-hitmerger-kineforward-"} + std::to_string(getpid());
99+
forwardchannel.Bind(address.c_str());
100+
forwardchannel.Validate();
101+
fair::mq::Parts parts;
102+
fair::mq::MessagePtr payload(forwardchannel.NewMessage());
103+
o2::framework::SourceInfoHeader sih;
104+
sih.state = o2::framework::InputChannelState::Completed;
105+
auto channelAlloc = o2::pmr::getTransportAllocator(forwardchannel.Transport());
106+
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
107+
parts.AddPart(std::move(header));
108+
parts.AddPart(std::move(payload));
109+
forwardchannel.Send(parts);
110+
LOGP(info, "SENDING END-OF-STREAM TO PROXY AT {}", address.c_str());
111+
}
92112
remove_tmp_files();
93113
o2::utils::ShmManager::Instance().release();
94114

run/o2simtopology_template.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@
135135
"name": "kineforward",
136136
"sockets": [
137137
{
138-
"type": "pub",
138+
"type": "pair",
139139
"method": "bind",
140140
"address": "ipc:///tmp/o2sim-hitmerger-kineforward-#PID#",
141141
"sndBufSize": 1000,

0 commit comments

Comments
 (0)