Skip to content

Commit 38fb609

Browse files
committed
Optionally pause CTF writing if disk space is low
1 parent 6e4b3e4 commit 38fb609

2 files changed

Lines changed: 64 additions & 1 deletion

File tree

Detectors/CTF/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,22 @@ o2-ctf-reader-workflow --ctf-input input.lst --onlyDet ITS,TPC,TOF --its-entropy
5151

5252
See below for the details of `--ctf-dict` option.
5353

54+
One can pause the writing if available disk space is low using a combination of following options:
55+
```bash
56+
--require-free-disk <float>: pause writing operation if available disk space is below this margin in bytes (if > 0) or this fraction of total (if < 0)
57+
```
58+
59+
```bash
60+
--wait-for-free-disk <float seconds>: if paused due to the low disk space, recheck after this time (in s)
61+
```
62+
63+
```bash
64+
--max-wait-for-free-disk <float seconds>: produce fatal if paused due to the low disk space for more than this amount( in s).
65+
```
66+
67+
68+
69+
5470
## CTF reader workflow
5571
5672
`o2-ctf-reader-workflow` should be the 1st workflow in the piped chain of CTF processing.

Detectors/CTF/workflow/src/CTFWriterSpec.cxx

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ class CTFWriterSpec : public o2::framework::Task
125125
bool mStoreMetaFile = false;
126126
bool mRejectCurrentTF = false;
127127
bool mFallBackDirUsed = false;
128+
bool mFallBackDirProvided = false;
128129
int mReportInterval = -1;
129130
int mVerbosity = 0;
130131
int mSaveDictAfter = 0; // if positive and mWriteCTF==true, save dictionary after each mSaveDictAfter TFs processed
@@ -138,6 +139,9 @@ class CTFWriterSpec : public o2::framework::Task
138139
size_t mNCTF = 0; // total number of CTFs written
139140
size_t mNCTFPrevDict = 0; // total number of CTFs used for previous dictionary version
140141
size_t mNAccCTF = 0; // total number of CTFs accumulated in the current file
142+
int mWaitDiskFull = 0; // if mCheckDiskFull triggers, pause for this amount of ms before new attempt
143+
int mWaitDiskFullMax = -1; // produce fatal mCheckDiskFull block the workflow for more than this time (in ms)
144+
float mCheckDiskFull = 0.; // wait for if available abs. disk space is < mCheckDiskFull (if >0) or if its fraction is < -mCheckDiskFull (if <0)
141145
long mCTFAutoSave = 0; // if > 0, autosave after so many TFs
142146
size_t mNCTFFiles = 0; // total number of CTF files written
143147
int mMaxCTFPerFile = 0; // max CTFs per files to store
@@ -229,6 +233,7 @@ void CTFWriterSpec::init(InitContext& ic)
229233
mCTFDirFallBack = ic.options().get<std::string>("output-dir-alt");
230234
if (mCTFDirFallBack != "/dev/null") {
231235
mCTFDirFallBack = o2::utils::Str::rectifyDirectory(mCTFDirFallBack);
236+
mFallBackDirProvided = true;
232237
}
233238
mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir");
234239
mMinSize = ic.options().get<int64_t>("min-file-size");
@@ -249,6 +254,11 @@ void CTFWriterSpec::init(InitContext& ic)
249254
}
250255
}
251256
}
257+
258+
mCheckDiskFull = ic.options().get<float>("require-free-disk");
259+
mWaitDiskFull = 1000 * ic.options().get<float>("wait-for-free-disk");
260+
mWaitDiskFullMax = 1000 * ic.options().get<float>("max-wait-for-free-disk");
261+
252262
mChkSize = std::max(size_t(mMinSize * 1.1), mMaxSize);
253263
o2::utils::createDirectoriesIfAbsent(LOCKFileDir);
254264

@@ -418,6 +428,40 @@ void CTFWriterSpec::run(ProcessingContext& pc)
418428
mCurrCTFSize = estimateCTFSize(pc);
419429
if (mWriteCTF && !mRejectCurrentTF) {
420430
prepareTFTreeAndFile();
431+
432+
int totalWait = 0, nwaitCycles = 0;
433+
while ((mFallBackDirUsed || !mFallBackDirProvided) && mCheckDiskFull) { // we are on the physical disk and not on the RAM disk
434+
constexpr size_t MB = 1024 * 1024;
435+
constexpr int showFirstN = 10, prsecaleWarnings = 50;
436+
try {
437+
const auto si = std::filesystem::space(mCTFFileOut->GetName());
438+
std::string wmsg{};
439+
if (mCheckDiskFull > 0.f && si.available < mCheckDiskFull) {
440+
nwaitCycles++;
441+
wmsg = fmt::format("Disk has {} MB available while at least {} MB is requested, wait for {} ms (on top of {} ms)", si.available / MB, size_t(mCheckDiskFull) / MB, mWaitDiskFull, totalWait);
442+
} else if (mCheckDiskFull < 0.f && float(si.available) / si.capacity < -mCheckDiskFull) { // relative margin requested
443+
nwaitCycles++;
444+
wmsg = fmt::format("Disk has {:.3f}% available while at least {:.3f}% is requested, wait for {} ms (on top of {} ms)", si.capacity ? float(si.available) / si.capacity * 100.f : 0., -mCheckDiskFull, mWaitDiskFull, totalWait);
445+
} else {
446+
nwaitCycles = 0;
447+
}
448+
if (nwaitCycles) {
449+
if (mWaitDiskFullMax > 0 && totalWait > mWaitDiskFullMax) {
450+
closeTFTreeAndFile(); // try to save whatever we have
451+
LOGP(fatal, "Disk has {} MB available out of {} MB after waiting for {} ms", si.available / MB, si.capacity / MB, mWaitDiskFullMax);
452+
}
453+
if (nwaitCycles < showFirstN + 1 || (prsecaleWarnings && (nwaitCycles % prsecaleWarnings) == 0)) {
454+
LOG(alarm) << wmsg;
455+
}
456+
pc.services().get<RawDeviceService>().waitFor((unsigned int)(mWaitDiskFull));
457+
totalWait += mWaitDiskFull;
458+
continue;
459+
}
460+
} catch (std::exception const& e) {
461+
LOG(fatal) << "unable to query disk space info for path " << mCurrentCTFFileNameFull << ", reason: " << e.what();
462+
}
463+
break;
464+
}
421465
}
422466
// create header
423467
CTFHeader header{mTimingInfo.runNumber, mTimingInfo.creation, mTimingInfo.firstTForbit, mTimingInfo.tfCounter};
@@ -518,7 +562,7 @@ void CTFWriterSpec::prepareTFTreeAndFile()
518562
closeTFTreeAndFile();
519563
mFallBackDirUsed = false;
520564
auto ctfDir = mCTFDir.empty() ? o2::utils::Str::rectifyDirectory("./") : mCTFDir;
521-
if (mChkSize > 0 && (mCTFDirFallBack != "/dev/null")) {
565+
if (mChkSize > 0 && mFallBackDirProvided) {
522566
createLockFile(0);
523567
auto sz = getAvailableDiskSpace(ctfDir, 0); // check main storage
524568
if (sz < mChkSize) {
@@ -750,6 +794,9 @@ DataProcessorSpec getCTFWriterSpec(DetID::mask_t dets, const std::string& outTyp
750794
{"max-ctf-per-file", VariantType::Int, 0, {"if > 0, avoid storing more than requested CTFs per file"}},
751795
{"ctf-rejection", VariantType::Int, 0, {">0: percentage to reject randomly, <0: reject if timeslice%|value|!=0"}},
752796
{"ctf-file-compression", VariantType::Int, 0, {"if >= 0: impose CTF file compression level"}},
797+
{"require-free-disk", VariantType::Float, 0.f, {"pause writing op. if available disk space is below this margin, in bytes if >0, as a fraction of total if <0"}},
798+
{"wait-for-free-disk", VariantType::Float, 10.f, {"if paused due to the low disk space, recheck after this time (in s)"}},
799+
{"max-wait-for-free-disk", VariantType::Float, 60.f, {"produce fatal if paused due to the low disk space for more than this amount in s."}},
753800
{"ignore-partition-run-dir", VariantType::Bool, false, {"Do not creare partition-run directory in output-dir"}}}};
754801
}
755802

0 commit comments

Comments
 (0)