Skip to content

Commit a5bd0b3

Browse files
committed
Optional threshold on fraction or abs.number of failed fetches
Optionf --fetch-failure-threshold <float> of o2-ctf-reader and o2-raw-tf-reader allows to produce fatal if abs. number (option value <0) or fraction of fetches wrt total number of files (value > 0) have failed. E.g. --fetch-failure-threshold -2.5 will produce fatal on 3rd fetch failure, while --fetch-failure-threshold 0.1 will produce fatal after the failure of 10% of input files fetches.
1 parent d1c39c5 commit a5bd0b3

4 files changed

Lines changed: 26 additions & 3 deletions

File tree

Common/Utils/include/CommonUtils/FileFetcher.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@ class FileFetcher
5858
~FileFetcher();
5959

6060
const auto& getFileRef(size_t i) const { return mInputFiles[i]; }
61-
61+
void setFailThreshold(float f) { mFailThreshold = f; }
62+
float getFailThreshold() const { return mFailThreshold; }
6263
void setMaxFilesInQueue(size_t s) { mMaxInQueue = s > 0 ? s : 1; }
6364
void setMaxLoops(size_t v) { mMaxLoops = v; }
6465
bool isRunning() const { return mRunning; }
66+
bool isFailed() const { return mFailure; }
6567
void start();
6668
void stop();
6769
void cleanup();
@@ -100,10 +102,12 @@ class FileFetcher
100102
size_t mMaxInQueue{5};
101103
bool mRunning = false;
102104
bool mNoRemoteCopy = false;
105+
bool mFailure = false;
103106
size_t mMaxLoops = 0;
104107
size_t mNLoops = 0;
105108
size_t mNFilesProc = 0;
106109
size_t mNFilesProcOK = 0;
110+
float mFailThreshold = 0.f; // throw if too many failed fetches (>0 : fraction to total, <0 abs number)
107111
mutable std::mutex mMtx;
108112
std::mutex mMtxStop;
109113
std::thread mFetcherThread{};

Common/Utils/src/FileFetcher.cxx

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,9 @@ void FileFetcher::stop()
215215
if (mFetcherThread.joinable()) {
216216
mFetcherThread.join();
217217
}
218+
if (mFailure) {
219+
LOGP(fatal, "too many failures in file fetching: {} in {} attempts for {} files in {} loops, abort", mNFilesProc - mNFilesProcOK, mNFilesProc, getNFiles(), mNLoops);
220+
}
218221
}
219222

220223
//____________________________________________________________
@@ -282,6 +285,19 @@ void FileFetcher::fetcher()
282285
fileRef.copied = true;
283286
mQueue.push(fileEntry);
284287
mNFilesProcOK++;
288+
} else {
289+
if (mFailThreshold < 0.f) { // cut on abs number of failures
290+
if (mNFilesProc - mNFilesProcOK > -mNFilesProcOK) {
291+
mFailure = true;
292+
}
293+
} else if (mFailThreshold > 0.f) {
294+
float fracFail = mNLoops ? (mNFilesProc - mNFilesProcOK) / float(mNFilesProc) : (mNFilesProc - mNFilesProcOK) / float(getNFiles());
295+
mFailure = fracFail > mFailThreshold;
296+
}
297+
if (mFailure) {
298+
mRunning = false;
299+
break;
300+
}
285301
}
286302
}
287303
}
@@ -338,7 +354,7 @@ bool FileFetcher::copyFile(size_t id)
338354
}
339355
}
340356
}
341-
if (!fs::is_regular_file(mInputFiles[id].getLocalName()) || fs::is_empty(mInputFiles[id].getLocalName())) {
357+
if (!fs::is_regular_file(mInputFiles[id].getLocalName()) || fs::is_empty(mInputFiles[id].getLocalName()) || sysRet != 0) {
342358
LOGP(alarm, "FileFetcher: failed for copy command {}", realCmd);
343359
return false;
344360
}

Detectors/CTF/workflow/src/CTFReaderSpec.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ void CTFReaderSpec::init(InitContext& ic)
147147
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
148148
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
149149
mFileFetcher->setMaxLoops(mInput.maxLoops);
150+
mFileFetcher->setFailThreshold(ic.options().get<float>("fetch-failure-threshold"));
150151
mFileFetcher->start();
151152
if (!mInput.fileIRFrames.empty()) {
152153
mIRFrameSelector.loadIRFrames(mInput.fileIRFrames);
@@ -429,6 +430,7 @@ DataProcessorSpec getCTFReaderSpec(const CTFReaderInp& inp)
429430
options.emplace_back(ConfigParamSpec{"select-ctf-ids", VariantType::String, "", {"comma-separated list CTF IDs to inject (from cumulative counter of CTFs seen)"}});
430431
options.emplace_back(ConfigParamSpec{"impose-run-start-timstamp", VariantType::Int64, 0L, {"impose run start time stamp (ms), ignored if 0"}});
431432
options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}});
433+
options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
432434
if (!inp.metricChannel.empty()) {
433435
options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, inp.metricChannel, {"Out-of-band channel config for TF throttling"}});
434436
}

Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ void TFReaderSpec::init(o2f::InitContext& ic)
9696
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
9797
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
9898
mFileFetcher->setMaxLoops(mInput.maxLoops);
99+
mFileFetcher->setFailThreshold(ic.options().get<float>("fetch-failure-threshold"));
99100
mFileFetcher->start();
100101
}
101102

@@ -458,7 +459,7 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp)
458459
LOGP(alarm, R"(To avoid reader filling shm buffer use "--shm-throw-bad-alloc 0 --shm-segment-id 2")");
459460
}
460461
}
461-
462+
spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
462463
spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);
463464

464465
return spec;

0 commit comments

Comments
 (0)