Skip to content

Commit db3ce9b

Browse files
committed
Multiple fixes in RawFileReader
* Create messages with FMQ channel transort * Create buffer for every input file * In part-per-super-page mode read s-page as single chunk * Add timers
1 parent a45b70e commit db3ce9b

4 files changed

Lines changed: 99 additions & 70 deletions

File tree

Detectors/Raw/include/DetectorsRaw/RawFileReader.h

Lines changed: 47 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ class RawFileReader
103103
StartHB = 0x1 << 1,
104104
StartSP = 0x1 << 2,
105105
EndHB = 0x1 << 3 };
106-
size_t offset = 0; // where data of the block starts
107-
uint32_t size = 0; // block size
108-
uint32_t tfID = 0; // tf counter (from 0)
109-
IR ir = 0; // ir starting the block
110-
uint16_t fileID = 0; // file id where the block is located
111-
uint8_t flags = 0; // different flags
106+
size_t offset = 0; //! where data of the block starts
107+
uint32_t size = 0; //! block size
108+
uint32_t tfID = 0; //! tf counter (from 0)
109+
IR ir = 0; //! ir starting the block
110+
uint16_t fileID = 0; //! file id where the block is located
111+
uint8_t flags = 0; //! different flags
112112
LinkBlock() = default;
113113
LinkBlock(int fid, size_t offs) : offset(offs), fileID(fid) {}
114114
void setFlag(uint8_t fl, bool v = true)
@@ -124,26 +124,26 @@ class RawFileReader
124124

125125
//=====================================================================================
126126
struct LinkData {
127-
RDHAny rdhl; // RDH with the running info of the last RDH seen
128-
LinkSpec_t spec = 0; // Link subspec augmented by its origin
129-
LinkSubSpec_t subspec = 0; // subspec according to DataDistribution
130-
uint32_t nTimeFrames = 0;
131-
uint32_t nHBFrames = 0;
132-
uint32_t nSPages = 0;
133-
uint64_t nCRUPages = 0;
134-
bool cruDetector = true; // CRU vs RORC detector
135-
bool continuousRO = true;
136-
137-
o2::header::DataOrigin origin = o2::header::gDataOriginInvalid;
138-
o2::header::DataDescription description = o2::header::gDataDescriptionInvalid;
139-
std::string fairMQChannel{}; // name of the fairMQ channel for the output
140-
int nErrors = 0;
141-
std::vector<LinkBlock> blocks;
127+
RDHAny rdhl; //! RDH with the running info of the last RDH seen
128+
LinkSpec_t spec = 0; //! Link subspec augmented by its origin
129+
LinkSubSpec_t subspec = 0; //! subspec according to DataDistribution
130+
uint32_t nTimeFrames = 0; //!
131+
uint32_t nHBFrames = 0; //!
132+
uint32_t nSPages = 0; //!
133+
uint64_t nCRUPages = 0; //!
134+
bool cruDetector = true; //! CRU vs RORC detector
135+
bool continuousRO = true; //!
136+
137+
o2::header::DataOrigin origin = o2::header::gDataOriginInvalid; //!
138+
o2::header::DataDescription description = o2::header::gDataDescriptionInvalid; //!
139+
std::string fairMQChannel{}; //! name of the fairMQ channel for the output
140+
int nErrors = 0; //!
141+
std::vector<LinkBlock> blocks; //!
142142
//
143143
// transient info during processing
144-
bool openHB = false;
145-
int nHBFinTF = 0;
146-
int nextBlock2Read = 0; // next block which should be read
144+
bool openHB = false; //!
145+
int nHBFinTF = 0; //!
146+
int nextBlock2Read = 0; //! next block which should be read
147147

148148
LinkData() = default;
149149
template <typename H>
@@ -170,12 +170,12 @@ class RawFileReader
170170
std::string describe() const;
171171

172172
private:
173-
const RawFileReader* reader = nullptr;
173+
const RawFileReader* reader = nullptr; //!
174174
};
175175

176176
//=====================================================================================
177177

178-
RawFileReader(const std::string& config = "", int verbosity = 0);
178+
RawFileReader(const std::string& config = "", int verbosity = 0, size_t buffsize = 50 * 1024UL);
179179
~RawFileReader() { clear(); }
180180

181181
void loadFromInputsMap(const InputsMap& inp);
@@ -239,28 +239,29 @@ class RawFileReader
239239
static constexpr o2::header::DataDescription DEFDataDescription = o2::header::gDataDescriptionRawData;
240240
static constexpr ReadoutCardType DEFCardType = CRU;
241241

242-
o2::header::DataOrigin mDefDataOrigin = DEFDataOrigin;
243-
o2::header::DataDescription mDefDataDescription = DEFDataDescription;
244-
ReadoutCardType mDefCardType = CRU;
242+
o2::header::DataOrigin mDefDataOrigin = DEFDataOrigin; //!
243+
o2::header::DataDescription mDefDataDescription = DEFDataDescription; //!
244+
ReadoutCardType mDefCardType = CRU; //!
245245

246-
std::vector<std::string> mFileNames; // input file names
247-
std::vector<FILE*> mFiles; // input file handlers
248-
std::vector<OrigDescCard> mDataSpecs; // data origin and description for every input file + readout card type
246+
std::vector<std::string> mFileNames; //! input file names
247+
std::vector<FILE*> mFiles; //! input file handlers
248+
std::vector<std::unique_ptr<char[]>> mFileBuffers; //! buffers for input files
249+
std::vector<OrigDescCard> mDataSpecs; //! data origin and description for every input file + readout card type
249250
bool mInitDone = false;
250-
std::unordered_map<LinkSpec_t, int> mLinkEntries; // mapping between RDH specs and link entry in the mLinksData
251-
std::vector<LinkData> mLinksData; // info on links data in the files
252-
std::vector<int> mOrderedIDs; // links entries ordered in Specs
253-
uint32_t mMaxTFToRead = 0xffffffff; // max TFs to process
254-
uint32_t mNTimeFrames = 0; // total number of time frames
255-
uint32_t mNextTF2Read = 0; // next TF to read
256-
uint32_t mOrbitMin = 0xffffffff; // lowest orbit seen by any link
257-
uint32_t mOrbitMax = 0; // highest orbit seen by any link
258-
size_t mBufferSize = 1024 * 1024; // size of the buffer for files preprocessing
259-
int mNominalSPageSize = 0x1 << 20; // expected super-page size in B
260-
int mCurrentFileID = 0; // current file being processed
261-
long int mPosInFile = 0; // current position in the file
262-
bool mMultiLinkFile = false; // was > than 1 link seen in the file?
263-
uint32_t mCheckErrors = 0; // mask for errors to check
251+
std::unordered_map<LinkSpec_t, int> mLinkEntries; //! mapping between RDH specs and link entry in the mLinksData
252+
std::vector<LinkData> mLinksData; //! info on links data in the files
253+
std::vector<int> mOrderedIDs; //! links entries ordered in Specs
254+
uint32_t mMaxTFToRead = 0xffffffff; //! max TFs to process
255+
uint32_t mNTimeFrames = 0; //! total number of time frames
256+
uint32_t mNextTF2Read = 0; //! next TF to read
257+
uint32_t mOrbitMin = 0xffffffff; //! lowest orbit seen by any link
258+
uint32_t mOrbitMax = 0; //! highest orbit seen by any link
259+
size_t mBufferSize = 5 * 1024UL; //! size of the buffer for files reading
260+
int mNominalSPageSize = 0x1 << 20; //! expected super-page size in B
261+
int mCurrentFileID = 0; //! current file being processed
262+
long int mPosInFile = 0; //! current position in the file
263+
bool mMultiLinkFile = false; //! was > than 1 link seen in the file?
264+
uint32_t mCheckErrors = 0; //! mask for errors to check
264265
int mVerbosity = 0;
265266

266267
ClassDefNV(RawFileReader, 1);

Detectors/Raw/src/RawFileReader.cxx

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,15 @@ size_t RawFileReader::LinkData::readNextSuperPage(char* buff)
243243
break;
244244
}
245245
ibl++;
246-
auto fl = reader->mFiles[blc.fileID];
247-
if (fseek(fl, blc.offset, SEEK_SET) || fread(buff + sz, 1, blc.size, fl) != blc.size) {
246+
sz += blc.size;
247+
}
248+
if (sz) {
249+
auto fl = reader->mFiles[blocks[nextBlock2Read].fileID];
250+
if (fseek(fl, blocks[nextBlock2Read].offset, SEEK_SET) || fread(buff, 1, sz, fl) != sz) {
248251
LOGF(ERROR, "Failed to read for the %s a bloc:", describe());
249-
blc.print();
252+
blocks[nextBlock2Read].print();
250253
error = true;
251254
}
252-
sz += blc.size;
253255
}
254256
nextBlock2Read = ibl;
255257
return error ? 0 : sz; // in case of the error we ignore the data
@@ -446,7 +448,7 @@ bool RawFileReader::LinkData::preprocessCRUPage(const RDHAny& rdh, bool newSPage
446448
//====================== methods of RawFileReader ========================
447449

448450
//_____________________________________________________________________
449-
RawFileReader::RawFileReader(const std::string& config, int verbosity) : mVerbosity(verbosity)
451+
RawFileReader::RawFileReader(const std::string& config, int verbosity, size_t buffSize) : mVerbosity(verbosity), mBufferSize(buffSize)
450452
{
451453
if (!config.empty()) {
452454
auto inp = parseInput(config);
@@ -573,7 +575,10 @@ bool RawFileReader::addFile(const std::string& sname, o2::header::DataOrigin ori
573575
LOG(ERROR) << "Cannot add new files after initialization";
574576
return false;
575577
}
578+
mFileBuffers.push_back(std::make_unique<char[]>(mBufferSize));
576579
auto inFile = fopen(sname.c_str(), "rb");
580+
setvbuf(inFile, mFileBuffers.back().get(), _IOFBF, mBufferSize);
581+
577582
bool ok = true;
578583
if (!inFile) {
579584
LOG(ERROR) << "Failed to open input file " << sname;

Detectors/Raw/src/RawFileReaderWorkflow.cxx

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
#include "Headers/Stack.h"
2828

2929
#include "RawFileReaderWorkflow.h" // not installed
30-
30+
#include <TStopwatch.h>
3131
#include <fairmq/FairMQDevice.h>
3232

3333
#include <unistd.h>
@@ -47,22 +47,27 @@ class RawReaderSpecs : public o2f::Task
4747
public:
4848
explicit RawReaderSpecs(const std::string& config, int loop = 1, uint32_t delay_us = 0,
4949
uint32_t errmap = 0xffffffff, uint32_t minTF = 0, uint32_t maxTF = 0xffffffff, bool partPerSP = true,
50-
size_t spSize = 1024L * 1024L, size_t buffSize = 1024L * 1024L,
50+
size_t spSize = 1024L * 1024L, size_t buffSize = 5 * 1024UL,
5151
const std::string& rawChannelName = "")
52-
: mLoop(loop < 0 ? INT_MAX : (loop < 1 ? 1 : loop)), mDelayUSec(delay_us), mMinTFID(minTF), mMaxTFID(maxTF), mPartPerSP(partPerSP), mReader(std::make_unique<o2::raw::RawFileReader>(config)), mRawChannelName(rawChannelName)
52+
: mLoop(loop < 0 ? INT_MAX : (loop < 1 ? 1 : loop)), mDelayUSec(delay_us), mMinTFID(minTF), mMaxTFID(maxTF), mPartPerSP(partPerSP), mReader(std::make_unique<o2::raw::RawFileReader>(config, 0, buffSize)), mRawChannelName(rawChannelName)
5353
{
5454
mReader->setCheckErrors(errmap);
5555
mReader->setMaxTFToRead(maxTF);
56-
mReader->setBufferSize(buffSize);
5756
mReader->setNominalSPageSize(spSize);
5857
LOG(INFO) << "Will preprocess files with buffer size of " << buffSize << " bytes";
5958
LOG(INFO) << "Number of loops over whole data requested: " << mLoop;
59+
for (int i = NTimers; i--;) {
60+
mTimer[i].Stop();
61+
mTimer[i].Reset();
62+
}
6063
}
6164

6265
void init(o2f::InitContext& ic) final
6366
{
6467
assert(mReader);
68+
mTimer[TimerInit].Start();
6569
mReader->init();
70+
mTimer[TimerInit].Stop();
6671
if (mMaxTFID >= mReader->getNTimeFrames()) {
6772
mMaxTFID = mReader->getNTimeFrames() - 1;
6873
}
@@ -71,10 +76,11 @@ class RawReaderSpecs : public o2f::Task
7176
void run(o2f::ProcessingContext& ctx) final
7277
{
7378
assert(mReader);
74-
static size_t loopsDone = 0, sentSize = 0, sentMessages = 0;
7579
if (mDone) {
7680
return;
7781
}
82+
auto tTotStart = mTimer[TimerTotal].CpuTime(), tIOStart = mTimer[TimerIO].CpuTime();
83+
mTimer[TimerTotal].Start(false);
7884
auto device = ctx.services().get<o2f::RawDeviceService>().device();
7985
assert(device);
8086

@@ -105,11 +111,15 @@ class RawReaderSpecs : public o2f::Task
105111

106112
if (tfID > mMaxTFID) {
107113
if (mReader->getNTimeFrames() && --mLoop) {
108-
loopsDone++;
114+
mLoopsDone++;
109115
tfID = 0;
110-
LOG(INFO) << "Starting new loop " << loopsDone << " from the beginning of data";
116+
LOG(INFO) << "Starting new loop " << mLoopsDone << " from the beginning of data";
111117
} else {
112-
LOGF(INFO, "Finished: payload of %zu bytes in %zu messages sent for %d TFs", sentSize, sentMessages, mTFCounter);
118+
mTimer[TimerTotal].Stop();
119+
LOGF(INFO, "Finished: payload of %zu bytes in %zu messages sent for %d TFs", mSentSize, mSentMessages, mTFCounter);
120+
for (int i = 0; i < NTimers; i++) {
121+
LOGF(INFO, "Timing for %15s: Cpu: %.3e Real: %.3e s in %d slots", TimerName[i], mTimer[i].CpuTime(), mTimer[i].RealTime(), mTimer[i].Counter() - 1);
122+
}
113123
ctx.services().get<o2f::ControlService>().endOfStream();
114124
ctx.services().get<o2f::ControlService>().readyToQuit(o2f::QuitRequest::Me);
115125
mDone = true;
@@ -129,7 +139,7 @@ class RawReaderSpecs : public o2f::Task
129139

130140
// read next time frame
131141
size_t tfNParts = 0, tfSize = 0;
132-
LOG(INFO) << "Reading TF#" << mTFCounter << " (" << tfID << " at iteration " << loopsDone << ')';
142+
LOG(INFO) << "Reading TF#" << mTFCounter << " (" << tfID << " at iteration " << mLoopsDone << ')';
133143
o2::header::Stack dummyStack{o2h::DataHeader{}, o2::framework::DataProcessingHeader{0}}; // dummy stack to just to get stack size
134144
auto hstackSize = dummyStack.size();
135145

@@ -148,14 +158,16 @@ class RawReaderSpecs : public o2f::Task
148158
while (hdrTmpl.splitPayloadIndex < hdrTmpl.splitPayloadParts) {
149159

150160
tfSize += hdrTmpl.payloadSize = mPartPerSP ? partsSP[hdrTmpl.splitPayloadIndex] : link.getNextHBFSize();
151-
152-
auto hdMessage = device->NewMessage(hstackSize);
153-
auto plMessage = device->NewMessage(hdrTmpl.payloadSize);
161+
auto fmqFactory = device->GetChannel(link.fairMQChannel, 0).Transport();
162+
auto hdMessage = fmqFactory->CreateMessage(hstackSize, fair::mq::Alignment{64});
163+
auto plMessage = fmqFactory->CreateMessage(hdrTmpl.payloadSize, fair::mq::Alignment{64});
164+
mTimer[TimerIO].Start(false);
154165
auto bread = mPartPerSP ? link.readNextSuperPage(reinterpret_cast<char*>(plMessage->GetData())) : link.readNextHBF(reinterpret_cast<char*>(plMessage->GetData()));
155166
if (bread != hdrTmpl.payloadSize) {
156167
LOG(ERROR) << "Link " << il << " read " << bread << " bytes instead of " << hdrTmpl.payloadSize
157168
<< " expected in TF=" << mTFCounter << " part=" << hdrTmpl.splitPayloadIndex;
158169
}
170+
mTimer[TimerIO].Stop();
159171
// check if the RDH to send corresponds to expected orbit
160172
if (hdrTmpl.splitPayloadIndex == 0) {
161173
auto ir = o2::raw::RDHUtils::getHeartBeatIR(plMessage->GetData());
@@ -180,22 +192,23 @@ class RawReaderSpecs : public o2f::Task
180192
tfNParts++;
181193
}
182194
LOGF(DEBUG, "Added %d parts for TF#%d(%d in iteration %d) of %s/%s/0x%u", hdrTmpl.splitPayloadParts, mTFCounter, tfID,
183-
loopsDone, link.origin.as<std::string>(), link.description.as<std::string>(), link.subspec);
195+
mLoopsDone, link.origin.as<std::string>(), link.description.as<std::string>(), link.subspec);
184196
}
185197

186198
if (mTFCounter) { // delay sending
187199
usleep(mDelayUSec);
188200
}
189-
190201
for (auto& msgIt : messagesPerRoute) {
191202
LOG(INFO) << "Sending " << msgIt.second->Size() / 2 << " parts to channel " << msgIt.first;
192203
device->Send(*msgIt.second.get(), msgIt.first);
193204
}
205+
mTimer[TimerTotal].Stop();
206+
207+
LOGF(INFO, "Sent payload of %zu bytes in %zu parts in %zu messages for TF %d | Timing (total/IO): %.3e / %.3e", tfSize, tfNParts,
208+
messagesPerRoute.size(), mTFCounter, mTimer[TimerTotal].CpuTime() - tTotStart, mTimer[TimerIO].CpuTime() - tIOStart);
194209

195-
LOGF(INFO, "Sent payload of %zu bytes in %zu parts in %zu messages for TF %d", tfSize, tfNParts,
196-
messagesPerRoute.size(), mTFCounter);
197-
sentSize += tfSize;
198-
sentMessages += tfNParts;
210+
mSentSize += tfSize;
211+
mSentMessages += tfNParts;
199212

200213
mReader->setNextTFToRead(++tfID);
201214
++mTFCounter;
@@ -215,10 +228,20 @@ class RawReaderSpecs : public o2f::Task
215228
uint32_t mDelayUSec = 0; // Delay in microseconds between TFs
216229
uint32_t mMinTFID = 0; // 1st TF to extract
217230
uint32_t mMaxTFID = 0xffffffff; // last TF to extrct
231+
size_t mLoopsDone = 0;
232+
size_t mSentSize = 0;
233+
size_t mSentMessages = 0;
218234
bool mPartPerSP = true; // fill part per superpage
219235
bool mDone = false; // processing is over or not
220236
std::string mRawChannelName = ""; // name of optional non-DPL channel
221237
std::unique_ptr<o2::raw::RawFileReader> mReader; // matching engine
238+
239+
enum TimerIDs { TimerInit,
240+
TimerTotal,
241+
TimerIO,
242+
NTimers };
243+
static constexpr std::string_view TimerName[] = {"Init", "Total", "IO"};
244+
TStopwatch mTimer[NTimers];
222245
};
223246

224247
o2f::DataProcessorSpec getReaderSpec(std::string config, int loop, uint32_t delay_us, uint32_t errmap,

Detectors/Raw/src/rawfile-reader-workflow.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
2828
options.push_back(ConfigParamSpec{"max-tf", VariantType::Int64, 0xffffffffL, {"max TF ID to process"}});
2929
options.push_back(ConfigParamSpec{"loop", VariantType::Int, 1, {"loop N times (infinite for N<0)"}});
3030
options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
31-
options.push_back(ConfigParamSpec{"buffer-size", VariantType::Int64, 1024L * 1024L, {"buffer size for files preprocessing"}});
31+
options.push_back(ConfigParamSpec{"buffer-size", VariantType::Int64, 5 * 1024L, {"buffer size for files preprocessing"}});
3232
options.push_back(ConfigParamSpec{"super-page-size", VariantType::Int64, 1024L * 1024L, {"super-page size for FMQ parts definition"}});
3333
options.push_back(ConfigParamSpec{"part-per-hbf", VariantType::Bool, false, {"FMQ parts per superpage (default) of HBF"}});
3434
options.push_back(ConfigParamSpec{"raw-channel-config", VariantType::String, "", {"optional raw FMQ channel for non-DPL output"}});

0 commit comments

Comments
 (0)