Skip to content

Commit 17d733d

Browse files
committed
Use bufferization for raw files preprocessing
1 parent 25a6958 commit 17d733d

6 files changed

Lines changed: 60 additions & 53 deletions

File tree

Detectors/Raw/include/DetectorsRaw/RawFileReader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ class RawFileReader
180180
void setNominalSPageSize(int n = 0x1 << 20) { mNominalSPageSize = n > (0x1 << 15) ? n : (0x1 << 15); }
181181
int getNominalSPageSize() const { return mNominalSPageSize; }
182182

183+
void setBufferSize(size_t s) { mBufferSize = s < sizeof(RDHAny) ? sizeof(RDHAny) : s; }
184+
size_t getBufferSize() const { return mBufferSize; }
185+
183186
void setMaxTFToRead(uint32_t n) { mMaxTFToRead = n; }
184187
uint32_t getMaxTFToRead() const { return mMaxTFToRead; }
185188
uint32_t getNTimeFrames() const { return mNTimeFrames; }
@@ -218,6 +221,7 @@ class RawFileReader
218221
uint32_t mNextTF2Read = 0; // next TF to read
219222
uint32_t mOrbitMin = 0xffffffff; // lowest orbit seen by any link
220223
uint32_t mOrbitMax = 0; // highest orbit seen by any link
224+
size_t mBufferSize = 1024 * 1024; // size of the buffer for files preprocessing
221225
int mNominalSPageSize = 0x1 << 20; // expected super-page size in B
222226
int mCurrentFileID = 0; // current file being processed
223227
long int mPosInFile = 0; // current position in the file

Detectors/Raw/src/RawFileReader.cxx

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <cstring>
1717
#include <iostream>
1818
#include <iomanip>
19+
#include <memory>
1920
#include <sstream>
2021
#include <iostream>
2122
#include "DetectorsRaw/RawFileReader.h"
@@ -26,6 +27,8 @@
2627
#include "Framework/Logger.h"
2728

2829
#include <Common/Configuration.h>
30+
#include <TStopwatch.h>
31+
#include <fcntl.h>
2932

3033
using namespace o2::raw;
3134
namespace o2h = o2::header;
@@ -358,65 +361,58 @@ int RawFileReader::getLinkLocalID(const RDHAny& rdh, const o2::header::DataOrigi
358361
bool RawFileReader::preprocessFile(int ifl)
359362
{
360363
// preprocess file, check RDH data, build statistics
364+
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(mBufferSize);
361365
FILE* fl = mFiles[ifl];
362366
mCurrentFileID = ifl;
363-
RDHAny rdh;
364-
365367
LinkSpec_t specPrev = 0xffffffffffffffff;
366368
int lIDPrev = -1;
367369
mMultiLinkFile = false;
368370
rewind(fl);
369371
long int nr = 0;
370372
mPosInFile = 0;
371-
int nRDHread = 0;
372-
bool ok = true;
373-
int readBytes = sizeof(RDHAny);
374-
while ((nr = fread(&rdh, 1, readBytes, fl))) {
375-
if (nr < readBytes) {
376-
LOG(ERROR) << "EOF was unexpected, only " << nr << " bytes were read for RDH";
377-
ok = false;
378-
break;
379-
}
380-
if (!(ok = RDHUtils::checkRDH(rdh))) {
381-
break;
382-
}
383-
nRDHread++;
384-
LinkSpec_t spec = createSpec(mDataSpecs[mCurrentFileID].first, RDHUtils::getSubSpec(rdh));
385-
int lID = lIDPrev;
386-
if (spec != specPrev) { // link has changed
387-
specPrev = spec;
388-
if (lIDPrev != -1) {
389-
mMultiLinkFile = true;
373+
size_t nRDHread = 0, boffs;
374+
bool ok = true, readMore = true;
375+
while (readMore && (nr = fread(buffer.get(), 1, mBufferSize, fl))) {
376+
boffs = 0;
377+
while (1) {
378+
auto& rdh = *reinterpret_cast<RDHUtils::RDHAny*>(&buffer[boffs]);
379+
nRDHread++;
380+
LinkSpec_t spec = createSpec(mDataSpecs[mCurrentFileID].first, RDHUtils::getSubSpec(rdh));
381+
int lID = lIDPrev;
382+
if (spec != specPrev) { // link has changed
383+
specPrev = spec;
384+
if (lIDPrev != -1) {
385+
mMultiLinkFile = true;
386+
}
387+
lID = getLinkLocalID(rdh, mDataSpecs[mCurrentFileID].first);
390388
}
391-
lID = getLinkLocalID(rdh, mDataSpecs[mCurrentFileID].first);
392-
}
393-
bool newSPage = lID != lIDPrev;
394-
mLinksData[lID].preprocessCRUPage(rdh, newSPage);
395-
if (mLinksData[lID].nTimeFrames > mMaxTFToRead) { // limit reached, discard the last read
396-
mLinksData[lID].nTimeFrames--;
397-
mLinksData[lID].blocks.pop_back();
398-
if (mLinksData[lID].nHBFrames > 0) {
399-
mLinksData[lID].nHBFrames--;
389+
bool newSPage = lID != lIDPrev;
390+
mLinksData[lID].preprocessCRUPage(rdh, newSPage);
391+
if (mLinksData[lID].nTimeFrames > mMaxTFToRead) { // limit reached, discard the last read
392+
mLinksData[lID].nTimeFrames--;
393+
mLinksData[lID].blocks.pop_back();
394+
if (mLinksData[lID].nHBFrames > 0) {
395+
mLinksData[lID].nHBFrames--;
396+
}
397+
if (mLinksData[lID].nCRUPages > 0) {
398+
mLinksData[lID].nCRUPages--;
399+
}
400+
lIDPrev = -1; // last block is closed
401+
readMore = false;
402+
break;
400403
}
401-
if (mLinksData[lID].nCRUPages > 0) {
402-
mLinksData[lID].nCRUPages--;
404+
boffs += RDHUtils::getOffsetToNext(rdh);
405+
mPosInFile += RDHUtils::getOffsetToNext(rdh);
406+
lIDPrev = lID;
407+
if (boffs + sizeof(RDHUtils::RDHAny) >= nr) {
408+
if (fseek(fl, mPosInFile, SEEK_SET)) {
409+
readMore = false;
410+
break;
411+
}
412+
break;
403413
}
404-
lIDPrev = -1; // last block is closed
405-
break;
406414
}
407-
//
408-
mPosInFile += RDHUtils::getOffsetToNext(rdh);
409-
if (fseek(fl, mPosInFile, SEEK_SET)) {
410-
break;
411-
}
412-
lIDPrev = lID;
413415
}
414-
mPosInFile = ftell(fl);
415-
if (lIDPrev != -1) { // close last block
416-
auto& lastBlock = mLinksData[lIDPrev].blocks.back();
417-
lastBlock.size = mPosInFile - lastBlock.offset;
418-
}
419-
420416
LOGF(INFO, "File %3d : %9li bytes scanned, %6d RDH read for %4d links from %s",
421417
mCurrentFileID, mPosInFile, nRDHread, int(mLinkEntries.size()), mFileNames[mCurrentFileID]);
422418
return ok;

Detectors/Raw/src/RawFileReaderWorkflow.cxx

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@ class rawReaderSpecs : public o2f::Task
4545
{
4646
public:
4747
explicit rawReaderSpecs(const std::string& config, bool tfAsMessage = false, bool outPerRoute = true, int loop = 1, uint32_t delay_us = 0,
48-
uint32_t errmap = 0xffffffff, uint32_t maxTF = 0xffffffff)
48+
uint32_t errmap = 0xffffffff, uint32_t maxTF = 0xffffffff, size_t buffSize = 1024L * 1024L)
4949
: mLoop(loop < 1 ? 1 : loop), mHBFPerMessage(!tfAsMessage), mOutPerRoute(outPerRoute), mDelayUSec(delay_us), mReader(std::make_unique<o2::raw::RawFileReader>(config))
5050
{
5151
mReader->setCheckErrors(errmap);
5252
mReader->setMaxTFToRead(maxTF);
53+
mReader->setBufferSize(buffSize);
54+
LOG(INFO) << "Will preprocess files with buffer size of " << buffSize << " bytes";
5355
LOG(INFO) << "Number of loops over whole data requested: " << mLoop;
5456
if (mHBFPerMessage) {
5557
LOG(INFO) << "Every link TF will be sent as multipart of HBF messages";
@@ -221,7 +223,7 @@ class rawReaderSpecs : public o2f::Task
221223
std::unique_ptr<o2::raw::RawFileReader> mReader; // matching engine
222224
};
223225

224-
o2f::DataProcessorSpec getReaderSpec(std::string config, bool tfAsMessage, bool outPerRoute, int loop, uint32_t delay_us, uint32_t errmap, uint32_t maxTF)
226+
o2f::DataProcessorSpec getReaderSpec(std::string config, bool tfAsMessage, bool outPerRoute, int loop, uint32_t delay_us, uint32_t errmap, uint32_t maxTF, size_t buffSize)
225227
{
226228
// check which inputs are present in files to read
227229
o2f::Outputs outputs;
@@ -238,14 +240,14 @@ o2f::DataProcessorSpec getReaderSpec(std::string config, bool tfAsMessage, bool
238240
"raw-file-reader",
239241
o2f::Inputs{},
240242
outputs,
241-
o2f::AlgorithmSpec{o2f::adaptFromTask<rawReaderSpecs>(config, tfAsMessage, outPerRoute, loop, delay_us, errmap, maxTF)},
243+
o2f::AlgorithmSpec{o2f::adaptFromTask<rawReaderSpecs>(config, tfAsMessage, outPerRoute, loop, delay_us, errmap, maxTF, buffSize)},
242244
o2f::Options{}};
243245
}
244246

245247
o2f::WorkflowSpec o2::raw::getRawFileReaderWorkflow(std::string inifile, bool tfAsMessage, bool outPerRoute,
246-
int loop, uint32_t delay_us, uint32_t errmap, uint32_t maxTF)
248+
int loop, uint32_t delay_us, uint32_t errmap, uint32_t maxTF, size_t buffSize)
247249
{
248250
o2f::WorkflowSpec specs;
249-
specs.emplace_back(getReaderSpec(inifile, tfAsMessage, outPerRoute, loop, delay_us, errmap, maxTF));
251+
specs.emplace_back(getReaderSpec(inifile, tfAsMessage, outPerRoute, loop, delay_us, errmap, maxTF, buffSize));
250252
return specs;
251253
}

Detectors/Raw/src/RawFileReaderWorkflow.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ namespace raw
2121
{
2222

2323
framework::WorkflowSpec getRawFileReaderWorkflow(std::string inifile, bool tfAsMessage = false, bool outPerRoute = true,
24-
int loop = 1, uint32_t delay_us = 0, uint32_t errMap = 0xffffffff, uint32_t maxTF = 0xffffffff);
24+
int loop = 1, uint32_t delay_us = 0, uint32_t errMap = 0xffffffff,
25+
uint32_t maxTF = 0xffffffff, size_t bufferSize = 1024L * 1024L);
2526

2627
} // namespace raw
2728
} // namespace o2

Detectors/Raw/src/rawfile-reader-workflow.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
2828
options.push_back(ConfigParamSpec{"message-per-tf", o2::framework::VariantType::Bool, false, {"send TF of each link as a single FMQ message rather than multipart with message per HB"}});
2929
options.push_back(ConfigParamSpec{"output-per-link", o2::framework::VariantType::Bool, false, {"send message per Link rather than per FMQ output route"}});
3030
options.push_back(ConfigParamSpec{"delay", o2::framework::VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
31+
options.push_back(ConfigParamSpec{"buffer-size", o2::framework::VariantType::Int64, 1024L * 1024L, {"buffer size for files preprocessing"}});
3132
options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings"}});
3233
// options for error-check suppression
3334
options.push_back(ConfigParamSpec{RawFileReader::nochk_opt(RawFileReader::ErrWrongPacketCounterIncrement), VariantType::Bool, false, {RawFileReader::nochk_expl(RawFileReader::ErrWrongPacketCounterIncrement)}});
@@ -51,6 +52,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
5152
auto inifile = configcontext.options().get<std::string>("conf");
5253
auto loop = configcontext.options().get<int>("loop");
5354
uint32_t maxTF = uint32_t(configcontext.options().get<int64_t>("max-tf"));
55+
uint64_t buffSize = uint64_t(configcontext.options().get<int64_t>("buffer-size"));
5456
auto tfAsMessage = configcontext.options().get<bool>("message-per-tf");
5557
auto outPerRoute = !configcontext.options().get<bool>("output-per-link");
5658
o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
@@ -64,5 +66,5 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
6466
}
6567
}
6668

67-
return std::move(o2::raw::getRawFileReaderWorkflow(inifile, tfAsMessage, outPerRoute, loop, delay_us, errmap, maxTF));
69+
return std::move(o2::raw::getRawFileReaderWorkflow(inifile, tfAsMessage, outPerRoute, loop, delay_us, errmap, maxTF, buffSize));
6870
}

Detectors/Raw/src/rawfileCheck.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ int main(int argc, char* argv[])
3838
"max-tf,m", bpo::value<uint32_t>()->default_value(0xffffffff), "max.number of TF to read")(
3939
"verbosity,v", bpo::value<int>()->default_value(reader.getVerbosity()), "1: long report, 2 or 3: print or dump all RDH")(
4040
"spsize,s", bpo::value<int>()->default_value(reader.getNominalSPageSize()), "nominal super-page size in bytes")(
41+
"buffer-size,b", bpo::value<size_t>()->default_value(reader.getNominalSPageSize()), "buffer size for files preprocessing")(
4142
"configKeyValues", bpo::value(&configKeyValues)->default_value(""), "semicolon separated key=value strings")(
4243
RawFileReader::nochk_opt(RawFileReader::ErrWrongPacketCounterIncrement).c_str(), RawFileReader::nochk_expl(RawFileReader::ErrWrongPacketCounterIncrement).c_str())(
4344
RawFileReader::nochk_opt(RawFileReader::ErrWrongPageCounterIncrement).c_str(), RawFileReader::nochk_expl(RawFileReader::ErrWrongPageCounterIncrement).c_str())(
@@ -90,6 +91,7 @@ int main(int argc, char* argv[])
9091
reader.setVerbosity(vm["verbosity"].as<int>());
9192
reader.setNominalSPageSize(vm["spsize"].as<int>());
9293
reader.setMaxTFToRead(vm["max-tf"].as<uint32_t>());
94+
reader.setBufferSize(vm["buffer-size"].as<size_t>());
9395
uint32_t errmap = 0xffffffff;
9496
for (int i = RawFileReader::NErrorsDefined; i--;) {
9597
if (vm.count(RawFileReader::nochk_opt(RawFileReader::ErrTypes(i)).c_str())) {

0 commit comments

Comments
 (0)