2929#include < memory>
3030using namespace ALICE ::HLT ;
3131
32+ // the chrono lib needs C++11
33+ #if __cplusplus < 201103L
34+ # warning statistics measurement for WrapperDevice disabled: need C++11 standard
35+ #else
36+ # define USE_CHRONO
37+ #endif
38+ #ifdef USE_CHRONO
39+ #include < chrono>
40+ using std::chrono::system_clock;
41+ typedef std::chrono::milliseconds TimeScale;
42+ #endif // USE_CHRONO
43+
3244WrapperDevice::WrapperDevice (int argc, char ** argv)
3345 : mComponent(NULL )
3446 , mArgv()
47+ , mPollingPeriod(10 )
48+ , mSkipProcessing(0 )
49+ , mLastCalcTime(-1 )
50+ , mLastSampleTime(-1 )
51+ , mMinTimeBetweenSample(-1 )
52+ , mMaxTimeBetweenSample(-1 )
53+ , mTotalReadCycles(-1 )
54+ , mMaxReadCycles(-1 )
55+ , mNSamples(-1 )
3556{
3657 mArgv .insert (mArgv .end (), argv, argv+argc);
3758}
@@ -55,6 +76,14 @@ void WrapperDevice::Init()
5576 }
5677
5778 mComponent =component.release ();
79+ mLastCalcTime =-1 ;
80+ mLastSampleTime =-1 ;
81+ mMinTimeBetweenSample =-1 ;
82+ mMaxTimeBetweenSample =-1 ;
83+ mTotalReadCycles =0 ;
84+ mMaxReadCycles =-1 ;
85+ mNSamples =0 ;
86+
5887 FairMQDevice::Init ();
5988}
6089
@@ -63,6 +92,9 @@ void WrapperDevice::Run()
6392 // / inherited from FairMQDevice
6493 int iResult=0 ;
6594
95+ #ifdef USE_CHRONO
96+ static system_clock::time_point refTime = system_clock::now ();
97+ #endif // USE_CHRONO
6698 boost::thread rateLogger (boost::bind (&FairMQDevice::LogSocketRates, this ));
6799
68100 FairMQPoller* poller = fTransportFactory ->CreatePoller (*fPayloadInputs );
@@ -78,23 +110,70 @@ void WrapperDevice::Run()
78110 int errorCount=0 ;
79111 const int maxError=10 ;
80112
81- vector</* const*/ FairMQMessage*> inputMessages;
113+ vector</* const*/ FairMQMessage*> inputMessages (fNumInputs , NULL );
114+ int nReadCycles=0 ;
82115 while ( fState == RUNNING ) {
83116
84117 // read input messages
85- poller->Poll (100 );
118+ poller->Poll (mPollingPeriod );
119+ int inputsReceived=0 ;
120+ bool receivedAtLeastOneMessage=false ;
86121 for (int i = 0 ; i < fNumInputs ; i++) {
122+ if (inputMessages[i]!=NULL ) {
123+ inputsReceived++;
124+ continue ;
125+ }
87126 received = false ;
88127 if (poller->CheckInput (i)){
89128 auto_ptr<FairMQMessage> msg (fTransportFactory ->CreateMessage ());
90129 received = fPayloadInputs ->at (i)->Receive (msg.get ());
91130 if (received) {
92- inputMessages.push_back (msg.release ());
131+ receivedAtLeastOneMessage=true ;
132+ inputMessages[i]=msg.release ();
133+ inputsReceived++;
93134 // LOG(INFO) << "------ recieve Msg from " << i ;
94135 }
95136 }
96137 }
138+ if (receivedAtLeastOneMessage) nReadCycles++;
139+ if (inputsReceived<fNumInputs ) {
140+ continue ;
141+ }
142+ mNSamples ++;
143+ mTotalReadCycles +=nReadCycles;
144+ if (mMaxReadCycles <0 || mMaxReadCycles <nReadCycles)
145+ mMaxReadCycles =nReadCycles;
146+ // if (nReadCycles>1) {
147+ // LOG(INFO) << "------ recieved complete Msg from " << fNumInputs << " input(s) after " << nReadCycles << " read cycles" ;
148+ // }
149+ nReadCycles=0 ;
150+ #ifdef USE_CHRONO
151+ auto duration = std::chrono::duration_cast< TimeScale>(std::chrono::system_clock::now () - refTime);
152+
153+ if (mLastSampleTime >=0 ) {
154+ int sampleTimeDiff=duration.count ()-mLastSampleTime ;
155+ if (mMinTimeBetweenSample < 0 || sampleTimeDiff<mMinTimeBetweenSample )
156+ mMinTimeBetweenSample =sampleTimeDiff;
157+ if (mMaxTimeBetweenSample < 0 || sampleTimeDiff>mMaxTimeBetweenSample )
158+ mMaxTimeBetweenSample =sampleTimeDiff;
159+ }
160+ mLastSampleTime =duration.count ();
161+ if (duration.count ()-mLastCalcTime >fLogIntervalInMs ) {
162+ LOG (INFO ) << " ------ processed " << mNSamples << " sample(s) " ;
163+ if (mNSamples >0 ) {
164+ LOG (INFO ) << " ------ min " << mMinTimeBetweenSample << " ms, max " << mMaxTimeBetweenSample << " ms avrg " << (duration.count ()-mLastCalcTime )/mNSamples << " ms " ;
165+ LOG (INFO ) << " ------ avrg number of read cycles " << mTotalReadCycles /mNSamples << " max number of read cycles " << mMaxReadCycles ;
166+ }
167+ mNSamples =0 ;
168+ mTotalReadCycles =0 ;
169+ mMinTimeBetweenSample =-1 ;
170+ mMaxTimeBetweenSample =-1 ;
171+ mMaxReadCycles =-1 ;
172+ mLastCalcTime =duration.count ();
173+ }
174+ #endif // USE_CHRONO
97175
176+ if (!mSkipProcessing ) {
98177 // prepare input from messages
99178 vector<ALICE ::HLT ::Component::BufferDesc_t> dataArray;
100179 for (vector</* const*/ FairMQMessage*>::iterator msg=inputMessages.begin ();
@@ -144,12 +223,14 @@ void WrapperDevice::Run()
144223 iResult=-ENOMSG ;
145224 }
146225 }
226+ }
147227
148228 // cleanup
149229 for (vector<FairMQMessage*>::iterator mit=inputMessages.begin ();
150- mit!=inputMessages.end (); mit++)
230+ mit!=inputMessages.end (); mit++) {
151231 delete *mit;
152- inputMessages.clear ();
232+ *mit=NULL ;
233+ }
153234 }
154235
155236 delete poller;
@@ -195,3 +276,45 @@ void WrapperDevice::InitInput()
195276
196277 FairMQDevice::InitInput ();
197278}
279+
280+ void WrapperDevice::SetProperty (const int key, const string& value, const int slot)
281+ {
282+ // / inherited from FairMQDevice
283+ // / handle device specific properties and forward to FairMQDevice::SetProperty
284+ return FairMQDevice::SetProperty (key, value, slot);
285+ }
286+
287+ string WrapperDevice::GetProperty (const int key, const string& default_, const int slot)
288+ {
289+ // / inherited from FairMQDevice
290+ // / handle device specific properties and forward to FairMQDevice::GetProperty
291+ return FairMQDevice::GetProperty (key, default_, slot);
292+ }
293+
294+ void WrapperDevice::SetProperty (const int key, const int value, const int slot)
295+ {
296+ // / inherited from FairMQDevice
297+ // / handle device specific properties and forward to FairMQDevice::SetProperty
298+ switch (key) {
299+ case PollingPeriod:
300+ mPollingPeriod =value;
301+ return ;
302+ case SkipProcessing:
303+ mSkipProcessing =value;
304+ return ;
305+ }
306+ return FairMQDevice::SetProperty (key, value, slot);
307+ }
308+
309+ int WrapperDevice::GetProperty (const int key, const int default_, const int slot)
310+ {
311+ // / inherited from FairMQDevice
312+ // / handle device specific properties and forward to FairMQDevice::GetProperty
313+ switch (key) {
314+ case PollingPeriod:
315+ return mPollingPeriod ;
316+ case SkipProcessing:
317+ return mSkipProcessing ;
318+ }
319+ return FairMQDevice::GetProperty (key, default_, slot);
320+ }
0 commit comments