Skip to content

Commit 23dc96d

Browse files
committed
CTF writer: optionally accumulate CTFs until min-file-size obtained
o2-ctf-writer called with options --min-file-size <min> --max-file-size <max> will accumulate CTFs in entries of the same tree/file until its size exceeds min-file-size (but not max-file-size). o2-ctf-reader will read all entries of each CTF file.
1 parent fa5e4f0 commit 23dc96d

6 files changed

Lines changed: 240 additions & 100 deletions

File tree

DataFormats/Detectors/Common/include/DetectorsCommonDataFormats/EncodedBlocks.h

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ class EncodedBlocks
382382
void copyToFlat(void* base) { fillFlatCopy(create(base, estimateSize())); }
383383

384384
/// attach to tree
385-
void appendToTree(TTree& tree, const std::string& name) const;
385+
size_t appendToTree(TTree& tree, const std::string& name) const;
386386

387387
/// read from tree to non-flat object
388388
void readFromTree(TTree& tree, const std::string& name, int ev = 0);
@@ -446,7 +446,7 @@ class EncodedBlocks
446446

447447
/// add and fill single branch
448448
template <typename D>
449-
static void fillTreeBranch(TTree& tree, const std::string& brname, D& dt, int compLevel, int splitLevel = 99);
449+
static size_t fillTreeBranch(TTree& tree, const std::string& brname, D& dt, int compLevel, int splitLevel = 99);
450450

451451
/// read single branch
452452
template <typename D>
@@ -485,14 +485,16 @@ void EncodedBlocks<H, N, W>::readFromTree(VD& vec, TTree& tree, const std::strin
485485
///_____________________________________________________________________________
486486
/// attach to tree
487487
template <typename H, int N, typename W>
488-
void EncodedBlocks<H, N, W>::appendToTree(TTree& tree, const std::string& name) const
488+
size_t EncodedBlocks<H, N, W>::appendToTree(TTree& tree, const std::string& name) const
489489
{
490-
fillTreeBranch(tree, o2::utils::Str::concat_string(name, "_wrapper."), const_cast<base&>(*this), WrappersCompressionLevel, WrappersSplitLevel);
490+
long s = 0;
491+
s += fillTreeBranch(tree, o2::utils::Str::concat_string(name, "_wrapper."), const_cast<base&>(*this), WrappersCompressionLevel, WrappersSplitLevel);
491492
for (int i = 0; i < N; i++) {
492493
int compression = mMetadata[i].opt == Metadata::OptStore::ROOTCompression ? 1 : 0;
493-
fillTreeBranch(tree, o2::utils::Str::concat_string(name, "_block.", std::to_string(i), "."), const_cast<Block<W>&>(mBlocks[i]), compression);
494+
s += fillTreeBranch(tree, o2::utils::Str::concat_string(name, "_block.", std::to_string(i), "."), const_cast<Block<W>&>(mBlocks[i]), compression);
494495
}
495496
tree.SetEntries(tree.GetEntries() + 1);
497+
return s;
496498
}
497499

498500
///_____________________________________________________________________________
@@ -513,11 +515,14 @@ inline void EncodedBlocks<H, N, W>::readTreeBranch(TTree& tree, const std::strin
513515
/// add and fill single branch
514516
template <typename H, int N, typename W>
515517
template <typename D>
516-
inline void EncodedBlocks<H, N, W>::fillTreeBranch(TTree& tree, const std::string& brname, D& dt, int compLevel, int splitLevel)
518+
inline size_t EncodedBlocks<H, N, W>::fillTreeBranch(TTree& tree, const std::string& brname, D& dt, int compLevel, int splitLevel)
517519
{
518-
auto* br = tree.Branch(brname.c_str(), &dt, 512, splitLevel);
519-
br->SetCompressionLevel(compLevel);
520-
br->Fill();
520+
auto* br = tree.GetBranch(brname.c_str());
521+
if (!br) {
522+
br = tree.Branch(brname.c_str(), &dt, 512, splitLevel);
523+
br->SetCompressionLevel(compLevel);
524+
}
525+
return br->Fill();
521526
}
522527

523528
///_____________________________________________________________________________

Detectors/CTF/README.md

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,28 @@ Example of usage:
1616
o2-its-reco-workflow --entropy-encoding | o2-ctf-writer-workflow --onlyDet ITS
1717
```
1818

19+
For the storage optimization reason one can request multiple CTFs stored in the same output file (as entries of the `ctf` tree):
20+
```bash
21+
o2-ctf-writer --min-file-size <min> --max-file-size <max> ...
22+
```
23+
will accumulate CTFs in entries of the same tree/file until its size fits exceeds `min` and does not exceed `max` (`max` check is disabled if `max<=min`) or EOS received.
24+
1925
## CTF reader workflow
2026

2127
`o2-ctf-reader-workflow` should be the 1st workflow in the piped chain of CTF processing.
22-
At the moment accepts as an input a single file produced by the `o2-ctf-writer-workflow`, reads data for all detectors present in it
23-
(the list can be narrowd by `--onlyDet arg (=none)` and `--skipDet arg (=none)` comma-separated lists), decode them using decoder provided
24-
by detector and injects to DPL.
28+
At the moment accepts as an input a comma-separated list of CTF files produced by the `o2-ctf-writer-workflow`, reads data for all detectors present in it
29+
(the list can be narrowed by `--onlyDet arg (=none)` and `--skipDet arg (=none)` comma-separated lists), decode them using decoder provided
30+
by detector and injects to DPL. In case of multiple entries in the CTF tree, they all will be read in row.
2531

2632
Example of usage:
2733
```bash
2834
o2-ctf-reader-workflow --onlyDet ITS --ctf-input o2_ctf_0000000000.root | o2-its-reco-workflow --trackerCA --clusters-from-upstream --disable-mc
2935
```
3036

37+
With `--delay <s>` a delay of `s` seconds will be introduced between injections of consecutive CTFs (if >1).
38+
One can loop over the input by providing `--loop <N=1>` option.
39+
40+
3141
## Support for externally provided encoding dictionaries
3242

3343
By default encoding with generate for every TF and store in the CTF the dictionary information necessary to decode the CTF.

Detectors/CTF/workflow/include/CTFWorkflow/CTFWriterSpec.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ namespace ctf
2323
{
2424

2525
/// create a processor spec
26-
framework::DataProcessorSpec getCTFWriterSpec(o2::detectors::DetID::mask_t dets, uint64_t run, bool doCTF = true, bool doDict = false, bool dictPerDet = false);
26+
framework::DataProcessorSpec getCTFWriterSpec(o2::detectors::DetID::mask_t dets, uint64_t run, bool doCTF = true,
27+
bool doDict = false, bool dictPerDet = false, size_t smn = 0, size_t szmx = 0);
2728

2829
} // namespace ctf
2930
} // namespace o2

Detectors/CTF/workflow/src/CTFReaderSpec.cxx

Lines changed: 62 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,15 @@ class CTFReaderSpec : public o2::framework::Task
7272
void run(o2::framework::ProcessingContext& pc) final;
7373

7474
private:
75+
void openCTFFile(const std::string& flname);
76+
7577
DetID::mask_t mDets; // detectors
7678
std::vector<std::string> mInput; // input files
77-
uint32_t mTFCounter = 0;
79+
std::unique_ptr<TFile> mCTFFile;
80+
std::unique_ptr<TTree> mCTFTree;
81+
uint32_t mCTFCounter = 0;
7882
size_t mNextToProcess = 0;
83+
int mCurrEntry = 0;
7984
int mLoops = 1;
8085
int mLoopsCounter = 0;
8186
int mDelayMUS = 0;
@@ -97,43 +102,52 @@ void CTFReaderSpec::init(InitContext& ic)
97102
mCTFDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("input-dir"));
98103
}
99104

105+
///_______________________________________
106+
void CTFReaderSpec::openCTFFile(const std::string& flname)
107+
{
108+
mCTFFile.reset(TFile::Open(flname.c_str()));
109+
if (!mCTFFile->IsOpen() || mCTFFile->IsZombie()) {
110+
LOG(ERROR) << "Failed to open file " << flname;
111+
throw std::runtime_error("failed to open CTF file");
112+
}
113+
mCTFTree.reset((TTree*)mCTFFile->Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()));
114+
if (!mCTFTree) {
115+
throw std::runtime_error("failed to load CTF tree");
116+
}
117+
mCurrEntry = 0;
118+
}
119+
100120
///_______________________________________
101121
void CTFReaderSpec::run(ProcessingContext& pc)
102122
{
103123
if (mNextToProcess >= mInput.size()) {
104124
return;
105125
}
106-
if (mDelayMUS && mTFCounter > 0) {
126+
if (mDelayMUS && mCTFCounter > 0) {
107127
usleep(mDelayMUS);
108128
}
109129

110130
auto cput = mTimer.CpuTime();
111131
mTimer.Start(false);
112-
std::string inputFile = o2::utils::Str::concat_string(mCTFDir, mInput[mNextToProcess]);
113-
LOG(INFO) << "Reading CTF input " << mNextToProcess << ' ' << inputFile;
114132

115-
TFile flIn(inputFile.c_str());
116-
if (!flIn.IsOpen() || flIn.IsZombie()) {
117-
LOG(ERROR) << "Failed to open file " << inputFile;
118-
throw std::runtime_error("failed to open CTF file");
119-
}
120-
std::unique_ptr<TTree> tree((TTree*)flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()));
121-
if (!tree) {
122-
throw std::runtime_error("failed to load CTF tree");
133+
if (!mCTFTree) { // there is still a tree open with multiple entries
134+
std::string inputFile = o2::utils::Str::concat_string(mCTFDir, mInput[mNextToProcess]);
135+
LOG(INFO) << "Reading CTF input " << mNextToProcess << ' ' << inputFile;
136+
openCTFFile(inputFile);
123137
}
124138
CTFHeader ctfHeader;
125-
if (!readFromTree(*tree, "CTFHeader", ctfHeader)) {
139+
if (!readFromTree(*(mCTFTree.get()), "CTFHeader", ctfHeader, mCurrEntry)) {
126140
throw std::runtime_error("did not find CTFHeader");
127141
}
128142
LOG(INFO) << ctfHeader;
129143

130-
auto setFirstTFOrbit = [&](const std::string& label) {
144+
auto setFirstTFOrbit = [&pc, &ctfHeader, this](const std::string& label) {
131145
auto* hd = pc.outputs().findMessageHeader({label});
132146
if (!hd) {
133147
throw std::runtime_error(o2::utils::Str::concat_string("failed to find output message header for ", label));
134148
}
135149
hd->firstTForbit = ctfHeader.firstTForbit;
136-
hd->tfCounter = mTFCounter;
150+
hd->tfCounter = this->mCTFCounter;
137151
};
138152

139153
// send CTF Header
@@ -146,128 +160,134 @@ void CTFReaderSpec::run(ProcessingContext& pc)
146160
det = DetID::ITS;
147161
if (detsTF[det]) {
148162
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::itsmft::CTF));
149-
o2::itsmft::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
163+
o2::itsmft::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
150164
setFirstTFOrbit(det.getName());
151165
}
152166

153167
det = DetID::MFT;
154168
if (detsTF[det]) {
155169
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::itsmft::CTF));
156-
o2::itsmft::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
170+
o2::itsmft::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
157171
setFirstTFOrbit(det.getName());
158172
}
159173

160174
det = DetID::TPC;
161175
if (detsTF[det]) {
162176
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::tpc::CTF));
163-
o2::tpc::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
177+
o2::tpc::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
164178
setFirstTFOrbit(det.getName());
165179
}
166180

167181
det = DetID::TRD;
168182
if (detsTF[det]) {
169183
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::trd::CTF));
170-
o2::trd::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
184+
o2::trd::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
171185
setFirstTFOrbit(det.getName());
172186
}
173187

174188
det = DetID::FT0;
175189
if (detsTF[det]) {
176190
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::ft0::CTF));
177-
o2::ft0::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
191+
o2::ft0::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
178192
setFirstTFOrbit(det.getName());
179193
}
180194

181195
det = DetID::FV0;
182196
if (detsTF[det]) {
183197
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::fv0::CTF));
184-
o2::fv0::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
198+
o2::fv0::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
185199
setFirstTFOrbit(det.getName());
186200
}
187201

188202
det = DetID::FDD;
189203
if (detsTF[det]) {
190204
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::fdd::CTF));
191-
o2::fdd::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
205+
o2::fdd::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
192206
setFirstTFOrbit(det.getName());
193207
}
194208

195209
det = DetID::TOF;
196210
if (detsTF[det]) {
197211
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::tof::CTF));
198-
o2::tof::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
212+
o2::tof::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
199213
setFirstTFOrbit(det.getName());
200214
}
201215

202216
det = DetID::MID;
203217
if (detsTF[det]) {
204218
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::mid::CTF));
205-
o2::mid::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
219+
o2::mid::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
206220
setFirstTFOrbit(det.getName());
207221
}
208222

209223
det = DetID::MCH;
210224
if (detsTF[det]) {
211225
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::mch::CTF));
212-
o2::mch::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
226+
o2::mch::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
213227
setFirstTFOrbit(det.getName());
214228
}
215229

216230
det = DetID::EMC;
217231
if (detsTF[det]) {
218232
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::emcal::CTF));
219-
o2::emcal::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
233+
o2::emcal::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
220234
setFirstTFOrbit(det.getName());
221235
}
222236

223237
det = DetID::PHS;
224238
if (detsTF[det]) {
225239
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::phos::CTF));
226-
o2::phos::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
240+
o2::phos::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
227241
setFirstTFOrbit(det.getName());
228242
}
229243

230244
det = DetID::CPV;
231245
if (detsTF[det]) {
232246
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::cpv::CTF));
233-
o2::cpv::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
247+
o2::cpv::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
234248
setFirstTFOrbit(det.getName());
235249
}
236250

237251
det = DetID::ZDC;
238252
if (detsTF[det]) {
239253
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::zdc::CTF));
240-
o2::zdc::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
254+
o2::zdc::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
241255
setFirstTFOrbit(det.getName());
242256
}
243257

244258
det = DetID::HMP;
245259
if (detsTF[det]) {
246260
auto& bufVec = pc.outputs().make<std::vector<o2::ctf::BufferType>>({det.getName()}, sizeof(o2::hmpid::CTF));
247-
o2::hmpid::CTF::readFromTree(bufVec, *(tree.get()), det.getName());
261+
o2::hmpid::CTF::readFromTree(bufVec, *(mCTFTree.get()), det.getName(), mCurrEntry);
248262
setFirstTFOrbit(det.getName());
249263
}
250264

251265
mTimer.Stop();
252-
LOGF(INFO, "Read CTF %s in %.3e s", inputFile.c_str(), mTimer.CpuTime() - cput);
253-
254-
bool moreToProcess = true;
255-
if (++mNextToProcess >= mInput.size()) {
256-
if (++mLoopsCounter >= mLoops) {
257-
moreToProcess = false;
258-
} else {
259-
mNextToProcess = 0;
260-
LOG(INFO) << "Starting new loop " << mNextToProcess << " of " << mLoops;
266+
LOGP(INFO, "Read CTF#{} ({} of {} in {}) in {:.3f} s", mCTFCounter, mCurrEntry, mCTFTree->GetEntries(), mCTFFile->GetName(), mTimer.CpuTime() - cput);
267+
268+
bool moreToProcess = (++mCurrEntry < mCTFTree->GetEntries());
269+
if (!moreToProcess) { // this file is done, check if there are other files
270+
mCTFTree.reset();
271+
mCTFFile->Close();
272+
mCTFFile.reset();
273+
moreToProcess = true;
274+
if (++mNextToProcess >= mInput.size()) {
275+
if (++mLoopsCounter >= mLoops) {
276+
moreToProcess = false;
277+
} else {
278+
mNextToProcess = 0;
279+
LOG(INFO) << "Starting new loop " << mNextToProcess << " of " << mLoops;
280+
}
261281
}
262282
}
263283

264-
mTFCounter++;
284+
mCTFCounter++;
265285

266286
if (!moreToProcess) {
267287
pc.services().get<ControlService>().endOfStream();
268288
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
269-
LOGF(INFO, "CTF reading total timing: Cpu: %.3e Real: %.3e s for %u TFs in %d loops",
270-
mTimer.CpuTime(), mTimer.RealTime(), mTFCounter, mLoops);
289+
LOGP(INFO, "CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs in {} loops",
290+
mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mLoops);
271291
}
272292
}
273293

0 commit comments

Comments
 (0)