Skip to content

Commit c049e90

Browse files
committed
Update prototype devices
- Use the new Multipart API for the prototype devices. - Update the FLP/EPN examples to use the new configuration system. - Update the DDS calls with the API (dds::intercom_api instead of dds::key_value). - Remove obsolete Merger device.
1 parent cab9f85 commit c049e90

40 files changed

Lines changed: 585 additions & 1104 deletions

CCDB/src/runConditionsServer.cxx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,8 @@ int main(int argc, char** argv)
4949
("output-type", value<string>(&outputType)->default_value("ROOT"), "Output file type");
5050

5151
config.AddToCmdLineOptions(serverOptions);
52+
config.ParseAll(argc, argv);
5253

53-
config.ParseAll(argc, argv);
54-
5554
string file = config.GetValue<string>("config-json-file");
5655
string id = config.GetValue<string>("id");
5756

Examples/flp2epn-distributed/CMakeLists.txt

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,33 @@ if(FAIRMQ_DEPENDENCIES)
5151
${DEPENDENCIES}
5252
${CMAKE_THREAD_LIBS_INIT}
5353
${FAIRMQ_DEPENDENCIES}
54+
${Boost_RANDOM_LIBRARY}
55+
${Boost_CHRONO_LIBRARY}
56+
${Boost_REGEX_LIBRARY}
5457
FairMQ
5558
)
5659
else(FAIRMQ_DEPENDENCIES)
5760
set(DEPENDENCIES
5861
${DEPENDENCIES}
5962
${CMAKE_THREAD_LIBS_INIT}
60-
${Boost_DATE_TIME_LIBRARY} ${Boost_THREAD_LIBRARY} ${Boost_THREAD_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_CHRONO_LIBRARY} FairMQ
63+
${Boost_DATE_TIME_LIBRARY}
64+
${Boost_THREAD_LIBRARY}
65+
${Boost_THREAD_LIBRARY}
66+
${Boost_SYSTEM_LIBRARY}
67+
${Boost_PROGRAM_OPTIONS_LIBRARY}
68+
${Boost_CHRONO_LIBRARY}
69+
${Boost_RANDOM_LIBRARY}
70+
${Boost_REGEX_LIBRARY}
71+
FairMQ
6172
)
6273
endif(FAIRMQ_DEPENDENCIES)
6374

6475
if(DDS_FOUND)
6576
set(DEPENDENCIES
6677
${DEPENDENCIES}
67-
dds-key-value-lib
78+
${DDS_INTERCOM_LIBRARY_SHARED}
79+
${DDS_PROTOCOL_LIBRARY_SHARED} # also link the two DDS dependency libraries to avoid linking issues on some osx systems
80+
${DDS_USER_DEFAULTS_LIBRARY_SHARED}
6881
)
6982
endif()
7083

Examples/flp2epn-distributed/EPNReceiver.cxx

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ void EPNReceiver::PrintBuffer(const unordered_map<uint16_t, TFBuffer>& buffer) c
5151

5252
for (auto& it : buffer) {
5353
string stars = "";
54-
for (unsigned int j = 1; j <= (it.second).parts.size(); ++j) {
54+
for (unsigned int j = 1; j <= (it.second).parts.Size(); ++j) {
5555
stars += "*";
5656
}
5757
LOG(INFO) << setw(4) << it.first << ": " << stars;
@@ -65,10 +65,6 @@ void EPNReceiver::DiscardIncompleteTimeframes()
6565
if ((boost::posix_time::microsec_clock::local_time() - (it->second).startTime).total_milliseconds() > fBufferTimeoutInMs) {
6666
LOG(WARN) << "Timeframe #" << it->first << " incomplete after " << fBufferTimeoutInMs << " milliseconds, discarding";
6767
fDiscardedSet.insert(it->first);
68-
for (unsigned int i = 0; i < (it->second).parts.size(); ++i) {
69-
(it->second).parts.at(i).reset();
70-
}
71-
it->second.parts.clear();
7268
fTimeframeBuffer.erase(it++);
7369
LOG(WARN) << "Number of discarded timeframes: " << fDiscardedSet.size();
7470
} else {
@@ -92,19 +88,17 @@ void EPNReceiver::Run()
9288
// f2eHeader* header; // holds the header of the currently arrived message.
9389
uint16_t id = 0; // holds the timeframe id of the currently arrived sub-timeframe.
9490

95-
FairMQChannel& dataInputChannel = fChannels.at("data-in").at(0);
96-
FairMQChannel& dataOutChannel = fChannels.at("data-out").at(0);
9791
FairMQChannel& ackOutChannel = fChannels.at("ack-out").at(0);
9892

9993
while (CheckCurrentState(RUNNING)) {
10094
poller->Poll(100);
10195

10296
if (poller->CheckInput(0)) {
103-
unique_ptr<FairMQMessage> headerPart(fTransportFactory->CreateMessage());
97+
FairMQParts parts;
10498

105-
if (dataInputChannel.Receive(headerPart) > 0) {
99+
if (Receive(parts, "data-in") > 0) {
106100
// store the received ID
107-
f2eHeader& header = *(static_cast<f2eHeader*>(headerPart->GetData()));
101+
f2eHeader& header = *(static_cast<f2eHeader*>(parts.At(0)->GetData()));
108102
id = header.timeFrameId;
109103
// LOG(INFO) << "Received sub-time frame #" << id << " from FLP" << header.flpIndex;
110104

@@ -119,60 +113,41 @@ void EPNReceiver::Run()
119113
// }
120114
// end DEBUG
121115

122-
unique_ptr<FairMQMessage> dataPart(fTransportFactory->CreateMessage());
123-
124-
// receive the data part
125-
if (dataInputChannel.Receive(dataPart) > 0)
116+
if (fDiscardedSet.find(id) == fDiscardedSet.end())
126117
{
127-
if (fDiscardedSet.find(id) == fDiscardedSet.end())
128-
{
129-
if (fTimeframeBuffer.find(id) == fTimeframeBuffer.end())
130-
{
131-
// if this is the first part with this ID, save the receive time.
132-
fTimeframeBuffer[id].startTime = boost::posix_time::microsec_clock::local_time();
133-
}
134-
// if the received ID has not previously been discarded,
135-
// store the data part in the buffer
136-
fTimeframeBuffer[id].parts.push_back(move(dataPart));
137-
// PrintBuffer(fTimeframeBuffer);
138-
}
139-
else
118+
if (fTimeframeBuffer.find(id) == fTimeframeBuffer.end())
140119
{
141-
// if received ID has been previously discarded.
142-
LOG(WARN) << "Received part from an already discarded timeframe with id " << id;
120+
// if this is the first part with this ID, save the receive time.
121+
fTimeframeBuffer[id].startTime = boost::posix_time::microsec_clock::local_time();
143122
}
123+
// if the received ID has not previously been discarded,
124+
// store the data part in the buffer
125+
fTimeframeBuffer[id].parts.AddPart(move(parts.At(1)));
126+
// PrintBuffer(fTimeframeBuffer);
144127
}
145128
else
146129
{
147-
LOG(ERROR) << "no data received from input socket";
130+
// if received ID has been previously discarded.
131+
LOG(WARN) << "Received part from an already discarded timeframe with id " << id;
148132
}
149133

150-
if (fTimeframeBuffer[id].parts.size() == fNumFLPs) {
134+
if (fTimeframeBuffer[id].parts.Size() == fNumFLPs) {
151135
// LOG(INFO) << "Collected all parts for timeframe #" << id;
152-
// when all parts are collected send all except last one with 'snd-more' flag, and last one without the flag.
153-
for (int i = 0; i < fNumFLPs - 1; ++i) {
154-
dataOutChannel.SendPart(fTimeframeBuffer[id].parts.at(i));
155-
}
156-
dataOutChannel.Send(fTimeframeBuffer[id].parts.at(fNumFLPs - 1));
136+
// when all parts are collected send then to the output channel
137+
Send(fTimeframeBuffer[id].parts, "data-out");
157138

158139
if (fTestMode > 0) {
159140
// Send an acknowledgement back to the sampler to measure the round trip time
160-
unique_ptr<FairMQMessage> ack(fTransportFactory->CreateMessage(sizeof(uint16_t)));
141+
unique_ptr<FairMQMessage> ack(NewMessage(sizeof(uint16_t)));
161142
memcpy(ack->GetData(), &id, sizeof(uint16_t));
162143

163144
if (ackOutChannel.SendAsync(ack) <= 0) {
164145
LOG(ERROR) << "Could not send acknowledgement without blocking";
165146
}
166147
}
167148

168-
// let transport know that the data is no longer needed. transport will clean up after it is sent out.
169-
for (unsigned int i = 0; i < fTimeframeBuffer[id].parts.size(); ++i) {
170-
fTimeframeBuffer[id].parts.at(i).reset();
171-
}
172-
fTimeframeBuffer[id].parts.clear();
173-
174149
// fTimeframeBuffer[id].endTime = boost::posix_time::microsec_clock::local_time();
175-
// do something with time here ...
150+
176151
fTimeframeBuffer.erase(id);
177152
}
178153

@@ -209,7 +184,7 @@ void EPNReceiver::sendHeartbeats()
209184
while (CheckCurrentState(RUNNING)) {
210185
try {
211186
for (int i = 0; i < fNumFLPs; ++i) {
212-
unique_ptr<FairMQMessage> heartbeatMsg(fTransportFactory->CreateMessage(ownAddressSize));
187+
unique_ptr<FairMQMessage> heartbeatMsg(NewMessage(ownAddressSize));
213188
memcpy(heartbeatMsg->GetData(), ownAddress.c_str(), ownAddressSize);
214189

215190
fChannels.at("heartbeat-out").at(i).Send(heartbeatMsg);

Examples/flp2epn-distributed/EPNReceiver.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ namespace Devices {
2323

2424
struct TFBuffer
2525
{
26-
std::vector<std::unique_ptr<FairMQMessage>> parts;
26+
FairMQParts parts;
2727
boost::posix_time::ptime startTime;
2828
boost::posix_time::ptime endTime;
2929
};

Examples/flp2epn-distributed/FLPSender.cxx

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ struct f2eHeader {
2828
};
2929

3030
FLPSender::FLPSender()
31-
: fHeaderBuffer()
32-
, fDataBuffer()
31+
: fSTFBuffer()
3332
, fArrivalTime()
3433
, fNumEPNs(0)
3534
, fIndex(0)
@@ -64,10 +63,10 @@ void FLPSender::receiveHeartbeats()
6463

6564
while (CheckCurrentState(RUNNING)) {
6665
try {
67-
unique_ptr<FairMQMessage> hbMsg(fTransportFactory->CreateMessage());
66+
unique_ptr<FairMQMessage> heartbeat(NewMessage());
6867

69-
if (hbChannel.Receive(hbMsg) > 0) {
70-
string address = string(static_cast<char*>(hbMsg->GetData()), hbMsg->GetSize());
68+
if (hbChannel.Receive(heartbeat) > 0) {
69+
string address = string(static_cast<char*>(heartbeat->GetData()), heartbeat->GetSize());
7170

7271
if (fHeartbeats.find(address) != fHeartbeats.end()) {
7372
ptime now = boost::posix_time::microsec_clock::local_time();
@@ -97,7 +96,7 @@ void FLPSender::Run()
9796
// boost::thread heartbeatReceiver(boost::bind(&FLPSender::receiveHeartbeats, this));
9897

9998
// base buffer, to be copied from for every timeframe body (zero-copy)
100-
unique_ptr<FairMQMessage> baseMsg(fTransportFactory->CreateMessage(fEventSize));
99+
unique_ptr<FairMQMessage> baseMsg(NewMessage(fEventSize));
101100

102101
uint16_t timeFrameId = 0;
103102

@@ -106,59 +105,55 @@ void FLPSender::Run()
106105

107106
while (CheckCurrentState(RUNNING)) {
108107
// initialize f2e header
109-
f2eHeader* h = new f2eHeader;
108+
f2eHeader* header = new f2eHeader;
110109

111110
if (fTestMode > 0) {
112111
// test-mode: receive and store id part in the buffer.
113-
unique_ptr<FairMQMessage> idPart(fTransportFactory->CreateMessage());
114-
if (dataInChannel.Receive(idPart) > 0) {
115-
h->timeFrameId = *(static_cast<uint16_t*>(idPart->GetData()));
116-
h->flpIndex = fIndex;
112+
unique_ptr<FairMQMessage> id(NewMessage());
113+
if (dataInChannel.Receive(id) > 0) {
114+
header->timeFrameId = *(static_cast<uint16_t*>(id->GetData()));
115+
header->flpIndex = fIndex;
117116
} else {
118117
// if nothing was received, try again
119-
delete h;
118+
delete header;
120119
continue;
121120
}
122121
} else {
123122
// regular mode: use the id generated locally
124-
h->timeFrameId = timeFrameId;
125-
h->flpIndex = fIndex;
123+
header->timeFrameId = timeFrameId;
124+
header->flpIndex = fIndex;
126125

127126
if (++timeFrameId == UINT16_MAX - 1) {
128127
timeFrameId = 0;
129128
}
130129
}
131130

132-
// unique_ptr<FairMQMessage> headerPart(fTransportFactory->CreateMessage(sizeof(f2eHeader)));
133-
unique_ptr<FairMQMessage> headerPart(fTransportFactory->CreateMessage(h, sizeof(f2eHeader), [](void* data, void* hint){ delete static_cast<f2eHeader*>(hint); }, h));
134-
unique_ptr<FairMQMessage> dataPart(fTransportFactory->CreateMessage());
131+
FairMQParts parts;
132+
133+
parts.AddPart(NewMessage(header, sizeof(f2eHeader), [](void* data, void* hint){ delete static_cast<f2eHeader*>(hint); }, header));
134+
parts.AddPart(NewMessage());
135135

136136
// save the arrival time of the message.
137137
fArrivalTime.push(boost::posix_time::microsec_clock::local_time());
138138

139139
if (fTestMode > 0) {
140140
// test-mode: initialize and store data part in the buffer.
141-
dataPart->Copy(baseMsg);
142-
fHeaderBuffer.push(move(headerPart));
143-
fDataBuffer.push(move(dataPart));
141+
parts.At(1)->Copy(baseMsg);
142+
fSTFBuffer.push(move(parts));
144143
} else {
145144
// regular mode: receive data part from input
146-
if (dataInChannel.Receive(dataPart) >= 0) {
147-
fHeaderBuffer.push(move(headerPart));
148-
fDataBuffer.push(move(dataPart));
145+
if (dataInChannel.Receive(parts.At(1)) >= 0) {
146+
fSTFBuffer.push(move(parts));
149147
} else {
150148
// if nothing was received, try again
151149
continue;
152150
}
153151
}
154152

155-
// LOG(INFO) << fDataBuffer.size();
156-
157153
// if offset is 0 - send data out without staggering.
158-
if (fSendOffset == 0 && fDataBuffer.size() > 0) {
154+
if (fSendOffset == 0 && fSTFBuffer.size() > 0) {
159155
sendFrontData();
160-
} else if (fDataBuffer.size() > 0) {
161-
// size_t dataSize = fDataBuffer.front()->GetSize();
156+
} else if (fSTFBuffer.size() > 0) {
162157
ptime now = boost::posix_time::microsec_clock::local_time();
163158
if ((now - fArrivalTime.front()).total_milliseconds() >= (fSendDelay * fSendOffset)) {
164159
sendFrontData();
@@ -174,8 +169,8 @@ void FLPSender::Run()
174169

175170
inline void FLPSender::sendFrontData()
176171
{
177-
f2eHeader h = *(static_cast<f2eHeader*>(fHeaderBuffer.front()->GetData()));
178-
uint16_t currentTimeframeId = h.timeFrameId;
172+
f2eHeader header = *(static_cast<f2eHeader*>(fSTFBuffer.front().At(0)->GetData()));
173+
uint16_t currentTimeframeId = header.timeFrameId;
179174

180175
// for which EPN is the message?
181176
int direction = currentTimeframeId % fNumEPNs;
@@ -193,21 +188,14 @@ inline void FLPSender::sendFrontData()
193188
// if (to_simple_string(storedHeartbeat) == "not-a-date-time" ||
194189
// (currentTime - storedHeartbeat).total_milliseconds() > fHeartbeatTimeoutInMs) {
195190
// LOG(WARN) << "Heartbeat too old for EPN#" << direction << ", discarding message.";
196-
// fHeaderBuffer.pop();
191+
// fSTFBuffer.pop();
197192
// fArrivalTime.pop();
198-
// fDataBuffer.pop();
199193
// } else { // if the heartbeat from the corresponding EPN is within timeout period, send the data.
200-
if (fChannels.at("data-out").at(direction).SendPart(fHeaderBuffer.front()) < 0) {
201-
// TODO: replace SendPart() with SendPartAsync() after nov15 fairroot release
202-
LOG(ERROR) << "Failed to queue ID part of event #" << currentTimeframeId;
203-
} else {
204-
if (fChannels.at("data-out").at(direction).SendAsync(fDataBuffer.front()) < 0) {
205-
LOG(ERROR) << "Could not send message with event #" << currentTimeframeId << " without blocking";
206-
}
194+
if (SendAsync(fSTFBuffer.front(), "data-out", direction) < 0) {
195+
LOG(ERROR) << "Failed to queue sub-timeframe #" << currentTimeframeId;
207196
}
208-
fHeaderBuffer.pop();
197+
fSTFBuffer.pop();
209198
fArrivalTime.pop();
210-
fDataBuffer.pop();
211199
// }
212200
}
213201

Examples/flp2epn-distributed/FLPSender.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ class FLPSender : public FairMQDevice
7575
/// Sends the "oldest" element from the sub-timeframe container
7676
void sendFrontData();
7777

78-
std::queue<std::unique_ptr<FairMQMessage>> fHeaderBuffer; ///< Stores sub-timeframe headers
79-
std::queue<std::unique_ptr<FairMQMessage>> fDataBuffer; ///< Stores sub-timeframe bodies
78+
std::queue<FairMQParts> fSTFBuffer; ///< Buffer for sub-timeframes
8079
std::queue<boost::posix_time::ptime> fArrivalTime; ///< Stores arrival times of sub-timeframes
8180

8281
int fNumEPNs; ///< Number of epnReceivers

0 commit comments

Comments
 (0)