Skip to content

Commit dc4a12d

Browse files
committed
CTFReader dateLimiter acts on injection rather than reading
By default the ctf-reader reads into the memory the CTF data and prepares all output messages but injects them only once the rate-limiter allows that. With the option --limit-tf-before-reading set also the preparation of the data to inject will be conditioned by the green light from the rate-limiter.
1 parent e16f7af commit dc4a12d

6 files changed

Lines changed: 23 additions & 14 deletions

File tree

Detectors/CTF/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,3 +177,6 @@ To apply TF rate limiting (make sure that no more than N TFs are in processing)
177177
too all workflows (e.g. via ARGS_ALL).
178178
The IPCID is the NUMA domain ID (usually 0 on non-EPN workflow).
179179
Additionally, one may throttle on the free SHM by providing an option to the reader `--timeframes-shm-limit <shm-size>`.
180+
181+
Note that by default the reader reads into the memory the CTF data and prepares all output messages but injects them only once the rate-limiter allows that.
182+
With the option `--limit-tf-before-reading` set also the preparation of the data to inject will be conditioned by the green light from the rate-limiter.

Detectors/CTF/workflow/include/CTFWorkflow/CTFReaderSpec.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,15 @@ struct CTFReaderInp {
3434
std::vector<int> ctfIDs{};
3535
bool skipSkimmedOutTF = false;
3636
bool allowMissingDetectors = false;
37+
bool checkTFLimitBeforeReading = false;
3738
bool sup0xccdb = false;
3839
int maxFileCache = 1;
3940
int64_t delay_us = 0;
4041
int maxLoops = 0;
4142
int maxTFs = -1;
4243
unsigned int subspec = 0;
4344
unsigned int decSSpecEMC = 0;
44-
int tfRateLimit = 0;
45+
int tfRateLimit = -999;
4546
size_t minSHM = 0;
4647
};
4748

Detectors/CTF/workflow/src/CTFReaderSpec.cxx

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ void CTFReaderSpec::init(InitContext& ic)
145145
mInput.ctfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-ctf-ids"));
146146
mUseLocalTFCounter = ic.options().get<bool>("local-tf-counter");
147147
mImposeRunStartMS = ic.options().get<int64_t>("impose-run-start-timstamp");
148+
mInput.checkTFLimitBeforeReading = ic.options().get<bool>("limit-tf-before-reading");
148149
mRunning = true;
149150
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
150151
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
@@ -187,11 +188,9 @@ void CTFReaderSpec::openCTFFile(const std::string& flname)
187188
///_______________________________________
188189
void CTFReaderSpec::run(ProcessingContext& pc)
189190
{
190-
static bool initOnceDone = false;
191-
if (!initOnceDone) {
191+
if (mInput.tfRateLimit == -999) {
192192
mInput.tfRateLimit = std::stoi(pc.services().get<RawDeviceService>().device()->fConfig->GetValue<std::string>("timeframes-rate-limit"));
193193
}
194-
195194
std::string tfFileName;
196195
if (mCTFCounter >= mInput.maxTFs || (!mInput.ctfIDs.empty() && mSelIDEntry >= mInput.ctfIDs.size())) { // done
197196
LOG(info) << "All CTFs from selected range were injected, stopping";
@@ -272,12 +271,16 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc)
272271
LOGP(info, "Skimming did not define any selection for TF [{}] : [{}]", ir0.asString(), ir1.asString());
273272
return false;
274273
} else {
275-
limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
274+
if (mInput.checkTFLimitBeforeReading) {
275+
limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
276+
}
276277
LOGP(info, "{} IR-Frames are selected for TF [{}] : [{}]", irSpan.size(), ir0.asString(), ir1.asString());
277278
}
278279
auto outVec = pc.outputs().make<std::vector<o2::dataformats::IRFrame>>(OutputRef{"selIRFrames"}, irSpan.begin(), irSpan.end());
279280
} else {
280-
limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
281+
if (mInput.checkTFLimitBeforeReading) {
282+
limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
283+
}
281284
}
282285

283286
// send CTF Header
@@ -312,14 +315,17 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc)
312315
checkTreeEntries();
313316
mTimer.Stop();
314317

315-
// do we need to way to respect the delay ?
318+
// do we need to wait to respect the delay ?
316319
long tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
317320
if (mCTFCounter) {
318321
auto tDiff = tNow - mLastSendTime;
319322
if (tDiff < mInput.delay_us) {
320323
pc.services().get<RawDeviceService>().waitFor((mInput.delay_us - tDiff) / 1000); // respect requested delay before sending
321324
}
322325
}
326+
if (!mInput.checkTFLimitBeforeReading) {
327+
limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
328+
}
323329
tNow = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
324330
LOGP(info, "Read CTF {} {} in {:.3f} s, {:.4f} s elapsed from previous CTF", mCTFCounter, entryStr, mTimer.CpuTime() - cput, mCTFCounter ? 1e-6 * (tNow - mLastSendTime) : 0.);
325331
mLastSendTime = tNow;
@@ -443,7 +449,8 @@ DataProcessorSpec getCTFReaderSpec(const CTFReaderInp& inp)
443449
options.emplace_back(ConfigParamSpec{"select-ctf-ids", VariantType::String, "", {"comma-separated list CTF IDs to inject (from cumulative counter of CTFs seen)"}});
444450
options.emplace_back(ConfigParamSpec{"impose-run-start-timstamp", VariantType::Int64, 0L, {"impose run start time stamp (ms), ignored if 0"}});
445451
options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}});
446-
options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
452+
options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
453+
options.emplace_back(ConfigParamSpec{"limit-tf-before-reading", VariantType::Bool, false, {"Check TF limiting before reading new TF, otherwhise before injecting it"}});
447454
if (!inp.metricChannel.empty()) {
448455
options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, inp.metricChannel, {"Out-of-band channel config for TF throttling"}});
449456
}

Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,7 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx)
117117
if (device != mDevice) {
118118
throw std::runtime_error(fmt::format("FMQDevice has changed, old={} new={}", fmt::ptr(mDevice), fmt::ptr(device)));
119119
}
120-
static bool initOnceDone = false;
121-
if (!initOnceDone) {
120+
if (mInput.tfRateLimit == -999) {
122121
mInput.tfRateLimit = std::stoi(device->fConfig->GetValue<std::string>("timeframes-rate-limit"));
123122
}
124123
auto acknowledgeOutput = [this](fair::mq::Parts& parts, bool verbose = false) {

Detectors/Raw/TFReaderDD/src/TFReaderSpec.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ struct TFReaderInp {
3636
o2::detectors::DetID::mask_t detMaskRawOnly{};
3737
o2::detectors::DetID::mask_t detMaskNonRawOnly{};
3838
size_t minSHM = 0;
39-
int tfRateLimit = 0;
39+
int tfRateLimit = -999;
4040
int maxTFCache = 1;
4141
int maxFileCache = 1;
4242
int verbosity = 0;

Detectors/Raw/src/RawFileReaderWorkflow.cxx

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class RawReaderSpecs : public o2f::Task
7878
uint32_t mMaxTFID = 0xffffffff; // last TF to extrct
7979
int mRunNumber = 0; // run number to pass
8080
int mVerbosity = 0;
81-
int mTFRateLimit = 0;
81+
int mTFRateLimit = -999;
8282
bool mPreferCalcTF = false;
8383
size_t mMinSHM = 0;
8484
size_t mLoopsDone = 0;
@@ -170,8 +170,7 @@ void RawReaderSpecs::run(o2f::ProcessingContext& ctx)
170170
mTimer.Start(false);
171171
auto device = ctx.services().get<o2f::RawDeviceService>().device();
172172
assert(device);
173-
static bool initOnceDone = false;
174-
if (!initOnceDone) {
173+
if (mTFRateLimit == -999) {
175174
mTFRateLimit = std::stoi(device->fConfig->GetValue<std::string>("timeframes-rate-limit"));
176175
}
177176
auto findOutputChannel = [&ctx, this](o2h::DataHeader& h) {

0 commit comments

Comments
 (0)