Skip to content

Commit bb4f977

Browse files
implemented synchronized input, statistics and options for polling and logging period
1 parent 6239c30 commit bb4f977

2 files changed

Lines changed: 163 additions & 7 deletions

File tree

devices/aliceHLTwrapper/WrapperDevice.cxx

Lines changed: 128 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,30 @@
2929
#include <memory>
3030
using namespace ALICE::HLT;
3131

32+
// the chrono lib needs C++11
33+
#if __cplusplus < 201103L
34+
# warning statistics measurement for WrapperDevice disabled: need C++11 standard
35+
#else
36+
# define USE_CHRONO
37+
#endif
38+
#ifdef USE_CHRONO
39+
#include <chrono>
40+
using std::chrono::system_clock;
41+
typedef std::chrono::milliseconds TimeScale;
42+
#endif //USE_CHRONO
43+
3244
WrapperDevice::WrapperDevice(int argc, char** argv)
3345
: mComponent(NULL)
3446
, mArgv()
47+
, mPollingPeriod(10)
48+
, mSkipProcessing(0)
49+
, mLastCalcTime(-1)
50+
, mLastSampleTime(-1)
51+
, mMinTimeBetweenSample(-1)
52+
, mMaxTimeBetweenSample(-1)
53+
, mTotalReadCycles(-1)
54+
, mMaxReadCycles(-1)
55+
, mNSamples(-1)
3556
{
3657
mArgv.insert(mArgv.end(), argv, argv+argc);
3758
}
@@ -55,6 +76,14 @@ void WrapperDevice::Init()
5576
}
5677

5778
mComponent=component.release();
79+
mLastCalcTime=-1;
80+
mLastSampleTime=-1;
81+
mMinTimeBetweenSample=-1;
82+
mMaxTimeBetweenSample=-1;
83+
mTotalReadCycles=0;
84+
mMaxReadCycles=-1;
85+
mNSamples=0;
86+
5887
FairMQDevice::Init();
5988
}
6089

@@ -63,6 +92,9 @@ void WrapperDevice::Run()
6392
/// inherited from FairMQDevice
6493
int iResult=0;
6594

95+
#ifdef USE_CHRONO
96+
static system_clock::time_point refTime = system_clock::now();
97+
#endif //USE_CHRONO
6698
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
6799

68100
FairMQPoller* poller = fTransportFactory->CreatePoller(*fPayloadInputs);
@@ -78,23 +110,70 @@ void WrapperDevice::Run()
78110
int errorCount=0;
79111
const int maxError=10;
80112

81-
vector</*const*/ FairMQMessage*> inputMessages;
113+
vector</*const*/ FairMQMessage*> inputMessages(fNumInputs, NULL);
114+
int nReadCycles=0;
82115
while ( fState == RUNNING ) {
83116

84117
// read input messages
85-
poller->Poll(100);
118+
poller->Poll(mPollingPeriod);
119+
int inputsReceived=0;
120+
bool receivedAtLeastOneMessage=false;
86121
for(int i = 0; i < fNumInputs; i++) {
122+
if (inputMessages[i]!=NULL) {
123+
inputsReceived++;
124+
continue;
125+
}
87126
received = false;
88127
if (poller->CheckInput(i)){
89128
auto_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
90129
received = fPayloadInputs->at(i)->Receive(msg.get());
91130
if (received) {
92-
inputMessages.push_back(msg.release());
131+
receivedAtLeastOneMessage=true;
132+
inputMessages[i]=msg.release();
133+
inputsReceived++;
93134
//LOG(INFO) << "------ recieve Msg from " << i ;
94135
}
95136
}
96137
}
138+
if (receivedAtLeastOneMessage) nReadCycles++;
139+
if (inputsReceived<fNumInputs) {
140+
continue;
141+
}
142+
mNSamples++;
143+
mTotalReadCycles+=nReadCycles;
144+
if (mMaxReadCycles<0 || mMaxReadCycles<nReadCycles)
145+
mMaxReadCycles=nReadCycles;
146+
// if (nReadCycles>1) {
147+
// LOG(INFO) << "------ recieved complete Msg from " << fNumInputs << " input(s) after " << nReadCycles << " read cycles" ;
148+
// }
149+
nReadCycles=0;
150+
#ifdef USE_CHRONO
151+
auto duration = std::chrono::duration_cast< TimeScale>(std::chrono::system_clock::now() - refTime);
152+
153+
if (mLastSampleTime>=0) {
154+
int sampleTimeDiff=duration.count()-mLastSampleTime;
155+
if (mMinTimeBetweenSample < 0 || sampleTimeDiff<mMinTimeBetweenSample)
156+
mMinTimeBetweenSample=sampleTimeDiff;
157+
if (mMaxTimeBetweenSample < 0 || sampleTimeDiff>mMaxTimeBetweenSample)
158+
mMaxTimeBetweenSample=sampleTimeDiff;
159+
}
160+
mLastSampleTime=duration.count();
161+
if (duration.count()-mLastCalcTime>fLogIntervalInMs) {
162+
LOG(INFO) << "------ processed " << mNSamples << " sample(s) ";
163+
if (mNSamples>0) {
164+
LOG(INFO) << "------ min " << mMinTimeBetweenSample << "ms, max " << mMaxTimeBetweenSample << "ms avrg " << (duration.count()-mLastCalcTime)/mNSamples << "ms ";
165+
LOG(INFO) << "------ avrg number of read cycles " << mTotalReadCycles/mNSamples << " max number of read cycles " << mMaxReadCycles;
166+
}
167+
mNSamples=0;
168+
mTotalReadCycles=0;
169+
mMinTimeBetweenSample=-1;
170+
mMaxTimeBetweenSample=-1;
171+
mMaxReadCycles=-1;
172+
mLastCalcTime=duration.count();
173+
}
174+
#endif //USE_CHRONO
97175

176+
if (!mSkipProcessing) {
98177
// prepare input from messages
99178
vector<ALICE::HLT::Component::BufferDesc_t> dataArray;
100179
for (vector</*const*/ FairMQMessage*>::iterator msg=inputMessages.begin();
@@ -144,12 +223,14 @@ void WrapperDevice::Run()
144223
iResult=-ENOMSG;
145224
}
146225
}
226+
}
147227

148228
// cleanup
149229
for (vector<FairMQMessage*>::iterator mit=inputMessages.begin();
150-
mit!=inputMessages.end(); mit++)
230+
mit!=inputMessages.end(); mit++) {
151231
delete *mit;
152-
inputMessages.clear();
232+
*mit=NULL;
233+
}
153234
}
154235

155236
delete poller;
@@ -195,3 +276,45 @@ void WrapperDevice::InitInput()
195276

196277
FairMQDevice::InitInput();
197278
}
279+
280+
void WrapperDevice::SetProperty(const int key, const string& value, const int slot)
281+
{
282+
/// inherited from FairMQDevice
283+
/// handle device specific properties and forward to FairMQDevice::SetProperty
284+
return FairMQDevice::SetProperty(key, value, slot);
285+
}
286+
287+
string WrapperDevice::GetProperty(const int key, const string& default_, const int slot)
288+
{
289+
/// inherited from FairMQDevice
290+
/// handle device specific properties and forward to FairMQDevice::GetProperty
291+
return FairMQDevice::GetProperty(key, default_, slot);
292+
}
293+
294+
void WrapperDevice::SetProperty(const int key, const int value, const int slot)
295+
{
296+
/// inherited from FairMQDevice
297+
/// handle device specific properties and forward to FairMQDevice::SetProperty
298+
switch (key) {
299+
case PollingPeriod:
300+
mPollingPeriod=value;
301+
return;
302+
case SkipProcessing:
303+
mSkipProcessing=value;
304+
return;
305+
}
306+
return FairMQDevice::SetProperty(key, value, slot);
307+
}
308+
309+
int WrapperDevice::GetProperty(const int key, const int default_, const int slot)
310+
{
311+
/// inherited from FairMQDevice
312+
/// handle device specific properties and forward to FairMQDevice::GetProperty
313+
switch (key) {
314+
case PollingPeriod:
315+
return mPollingPeriod;
316+
case SkipProcessing:
317+
return mSkipProcessing;
318+
}
319+
return FairMQDevice::GetProperty(key, default_, slot);
320+
}

devices/aliceHLTwrapper/WrapperDevice.h

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,29 @@ namespace ALICE
5050
virtual void InitOutput();
5151
/// inherited from FairMQDevice
5252
virtual void InitInput();
53+
/// inherited from FairMQDevice
54+
/// handle device specific properties and forward to FairMQDevice::SetProperty
55+
virtual void SetProperty(const int key, const string& value, const int slot = 0);
56+
/// inherited from FairMQDevice
57+
/// handle device specific properties and forward to FairMQDevice::GetProperty
58+
virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0);
59+
/// inherited from FairMQDevice
60+
/// handle device specific properties and forward to FairMQDevice::SetProperty
61+
virtual void SetProperty(const int key, const int value, const int slot = 0);
62+
/// inherited from FairMQDevice
63+
/// handle device specific properties and forward to FairMQDevice::GetProperty
64+
virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0);
65+
66+
/////////////////////////////////////////////////////////////////
67+
// device property identifier
68+
enum
69+
{
70+
Id = FairMQDevice::Last,
71+
PollingPeriod,
72+
SkipProcessing,
73+
Last
74+
};
75+
5376
protected:
5477

5578
private:
@@ -58,8 +81,18 @@ namespace ALICE
5881
// assignment operator prohibited
5982
WrapperDevice& operator=(const WrapperDevice&);
6083

61-
Component* mComponent;
62-
vector<char*> mArgv;
84+
Component* mComponent; // component instance
85+
vector<char*> mArgv; // array of arguments for the component
86+
87+
int mPollingPeriod; // period of polling on input sockets in ms
88+
int mSkipProcessing; // skip component processing
89+
int mLastCalcTime; // start time of current statistic period
90+
int mLastSampleTime; // time of last data sample
91+
int mMinTimeBetweenSample; // min time between data samples in statistic period
92+
int mMaxTimeBetweenSample; // max time between data samples in statistic period
93+
int mTotalReadCycles; // tot number of read cycles in statistic period
94+
int mMaxReadCycles; // max number of read cycles in statistic period
95+
int mNSamples; // number of samples in statistic period
6396
};
6497

6598
} // namespace hlt

0 commit comments

Comments
 (0)