2727#include " Headers/Stack.h"
2828
2929#include " RawFileReaderWorkflow.h" // not installed
30-
30+ # include < TStopwatch.h >
3131#include < fairmq/FairMQDevice.h>
3232
3333#include < unistd.h>
@@ -47,22 +47,27 @@ class RawReaderSpecs : public o2f::Task
4747 public:
4848 explicit RawReaderSpecs (const std::string& config, int loop = 1 , uint32_t delay_us = 0 ,
4949 uint32_t errmap = 0xffffffff , uint32_t minTF = 0 , uint32_t maxTF = 0xffffffff , bool partPerSP = true ,
50- size_t spSize = 1024L * 1024L , size_t buffSize = 1024L * 1024L ,
50+ size_t spSize = 1024L * 1024L , size_t buffSize = 5 * 1024UL ,
5151 const std::string& rawChannelName = " " )
52- : mLoop(loop < 0 ? INT_MAX : (loop < 1 ? 1 : loop)), mDelayUSec(delay_us), mMinTFID(minTF), mMaxTFID(maxTF), mPartPerSP(partPerSP), mReader(std::make_unique<o2::raw::RawFileReader>(config)), mRawChannelName(rawChannelName)
52+ : mLoop(loop < 0 ? INT_MAX : (loop < 1 ? 1 : loop)), mDelayUSec(delay_us), mMinTFID(minTF), mMaxTFID(maxTF), mPartPerSP(partPerSP), mReader(std::make_unique<o2::raw::RawFileReader>(config, 0 , buffSize )), mRawChannelName(rawChannelName)
5353 {
5454 mReader ->setCheckErrors (errmap);
5555 mReader ->setMaxTFToRead (maxTF);
56- mReader ->setBufferSize (buffSize);
5756 mReader ->setNominalSPageSize (spSize);
5857 LOG (INFO ) << " Will preprocess files with buffer size of " << buffSize << " bytes" ;
5958 LOG (INFO ) << " Number of loops over whole data requested: " << mLoop ;
59+ for (int i = NTimers; i--;) {
60+ mTimer [i].Stop ();
61+ mTimer [i].Reset ();
62+ }
6063 }
6164
6265 void init (o2f::InitContext& ic) final
6366 {
6467 assert (mReader );
68+ mTimer [TimerInit].Start ();
6569 mReader ->init ();
70+ mTimer [TimerInit].Stop ();
6671 if (mMaxTFID >= mReader ->getNTimeFrames ()) {
6772 mMaxTFID = mReader ->getNTimeFrames () - 1 ;
6873 }
@@ -71,10 +76,11 @@ class RawReaderSpecs : public o2f::Task
7176 void run (o2f::ProcessingContext& ctx) final
7277 {
7378 assert (mReader );
74- static size_t loopsDone = 0 , sentSize = 0 , sentMessages = 0 ;
7579 if (mDone ) {
7680 return ;
7781 }
82+ auto tTotStart = mTimer [TimerTotal].CpuTime (), tIOStart = mTimer [TimerIO].CpuTime ();
83+ mTimer [TimerTotal].Start (false );
7884 auto device = ctx.services ().get <o2f::RawDeviceService>().device ();
7985 assert (device);
8086
@@ -105,11 +111,15 @@ class RawReaderSpecs : public o2f::Task
105111
106112 if (tfID > mMaxTFID ) {
107113 if (mReader ->getNTimeFrames () && --mLoop ) {
108- loopsDone ++;
114+ mLoopsDone ++;
109115 tfID = 0 ;
110- LOG (INFO ) << " Starting new loop " << loopsDone << " from the beginning of data" ;
116+ LOG (INFO ) << " Starting new loop " << mLoopsDone << " from the beginning of data" ;
111117 } else {
112- LOGF (INFO , " Finished: payload of %zu bytes in %zu messages sent for %d TFs" , sentSize, sentMessages, mTFCounter );
118+ mTimer [TimerTotal].Stop ();
119+ LOGF (INFO , " Finished: payload of %zu bytes in %zu messages sent for %d TFs" , mSentSize , mSentMessages , mTFCounter );
120+ for (int i = 0 ; i < NTimers; i++) {
121+ LOGF (INFO , " Timing for %15s: Cpu: %.3e Real: %.3e s in %d slots" , TimerName[i], mTimer [i].CpuTime (), mTimer [i].RealTime (), mTimer [i].Counter () - 1 );
122+ }
113123 ctx.services ().get <o2f::ControlService>().endOfStream ();
114124 ctx.services ().get <o2f::ControlService>().readyToQuit (o2f::QuitRequest::Me);
115125 mDone = true ;
@@ -129,7 +139,7 @@ class RawReaderSpecs : public o2f::Task
129139
130140 // read next time frame
131141 size_t tfNParts = 0 , tfSize = 0 ;
132- LOG (INFO ) << " Reading TF#" << mTFCounter << " (" << tfID << " at iteration " << loopsDone << ' )' ;
142+ LOG (INFO ) << " Reading TF#" << mTFCounter << " (" << tfID << " at iteration " << mLoopsDone << ' )' ;
133143 o2::header::Stack dummyStack{o2h::DataHeader{}, o2::framework::DataProcessingHeader{0 }}; // dummy stack to just to get stack size
134144 auto hstackSize = dummyStack.size ();
135145
@@ -148,14 +158,16 @@ class RawReaderSpecs : public o2f::Task
148158 while (hdrTmpl.splitPayloadIndex < hdrTmpl.splitPayloadParts ) {
149159
150160 tfSize += hdrTmpl.payloadSize = mPartPerSP ? partsSP[hdrTmpl.splitPayloadIndex ] : link.getNextHBFSize ();
151-
152- auto hdMessage = device->NewMessage (hstackSize);
153- auto plMessage = device->NewMessage (hdrTmpl.payloadSize );
161+ auto fmqFactory = device->GetChannel (link.fairMQChannel , 0 ).Transport ();
162+ auto hdMessage = fmqFactory->CreateMessage (hstackSize, fair::mq::Alignment{64 });
163+ auto plMessage = fmqFactory->CreateMessage (hdrTmpl.payloadSize , fair::mq::Alignment{64 });
164+ mTimer [TimerIO].Start (false );
154165 auto bread = mPartPerSP ? link.readNextSuperPage (reinterpret_cast <char *>(plMessage->GetData ())) : link.readNextHBF (reinterpret_cast <char *>(plMessage->GetData ()));
155166 if (bread != hdrTmpl.payloadSize ) {
156167 LOG (ERROR ) << " Link " << il << " read " << bread << " bytes instead of " << hdrTmpl.payloadSize
157168 << " expected in TF=" << mTFCounter << " part=" << hdrTmpl.splitPayloadIndex ;
158169 }
170+ mTimer [TimerIO].Stop ();
159171 // check if the RDH to send corresponds to expected orbit
160172 if (hdrTmpl.splitPayloadIndex == 0 ) {
161173 auto ir = o2::raw::RDHUtils::getHeartBeatIR (plMessage->GetData ());
@@ -180,22 +192,23 @@ class RawReaderSpecs : public o2f::Task
180192 tfNParts++;
181193 }
182194 LOGF (DEBUG , " Added %d parts for TF#%d(%d in iteration %d) of %s/%s/0x%u" , hdrTmpl.splitPayloadParts , mTFCounter , tfID,
183- loopsDone , link.origin .as <std::string>(), link.description .as <std::string>(), link.subspec );
195+ mLoopsDone , link.origin .as <std::string>(), link.description .as <std::string>(), link.subspec );
184196 }
185197
186198 if (mTFCounter ) { // delay sending
187199 usleep (mDelayUSec );
188200 }
189-
190201 for (auto & msgIt : messagesPerRoute) {
191202 LOG (INFO ) << " Sending " << msgIt.second ->Size () / 2 << " parts to channel " << msgIt.first ;
192203 device->Send (*msgIt.second .get (), msgIt.first );
193204 }
205+ mTimer [TimerTotal].Stop ();
206+
207+ LOGF (INFO , " Sent payload of %zu bytes in %zu parts in %zu messages for TF %d | Timing (total/IO): %.3e / %.3e" , tfSize, tfNParts,
208+ messagesPerRoute.size (), mTFCounter , mTimer [TimerTotal].CpuTime () - tTotStart, mTimer [TimerIO].CpuTime () - tIOStart);
194209
195- LOGF (INFO , " Sent payload of %zu bytes in %zu parts in %zu messages for TF %d" , tfSize, tfNParts,
196- messagesPerRoute.size (), mTFCounter );
197- sentSize += tfSize;
198- sentMessages += tfNParts;
210+ mSentSize += tfSize;
211+ mSentMessages += tfNParts;
199212
200213 mReader ->setNextTFToRead (++tfID);
201214 ++mTFCounter ;
@@ -215,10 +228,20 @@ class RawReaderSpecs : public o2f::Task
215228 uint32_t mDelayUSec = 0 ; // Delay in microseconds between TFs
216229 uint32_t mMinTFID = 0 ; // 1st TF to extract
217230 uint32_t mMaxTFID = 0xffffffff ; // last TF to extrct
231+ size_t mLoopsDone = 0 ;
232+ size_t mSentSize = 0 ;
233+ size_t mSentMessages = 0 ;
218234 bool mPartPerSP = true ; // fill part per superpage
219235 bool mDone = false ; // processing is over or not
220236 std::string mRawChannelName = " " ; // name of optional non-DPL channel
221237 std::unique_ptr<o2::raw::RawFileReader> mReader ; // matching engine
238+
239+ enum TimerIDs { TimerInit,
240+ TimerTotal,
241+ TimerIO,
242+ NTimers };
243+ static constexpr std::string_view TimerName[] = {" Init" , " Total" , " IO" };
244+ TStopwatch mTimer [NTimers];
222245};
223246
224247o2f::DataProcessorSpec getReaderSpec (std::string config, int loop, uint32_t delay_us, uint32_t errmap,
0 commit comments