@@ -28,8 +28,7 @@ struct f2eHeader {
2828};
2929
3030FLPSender::FLPSender ()
31- : fHeaderBuffer()
32- , fDataBuffer()
31+ : fSTFBuffer()
3332 , fArrivalTime()
3433 , fNumEPNs(0 )
3534 , fIndex(0 )
@@ -64,10 +63,10 @@ void FLPSender::receiveHeartbeats()
6463
6564 while (CheckCurrentState (RUNNING)) {
6665 try {
67- unique_ptr<FairMQMessage> hbMsg ( fTransportFactory -> CreateMessage ());
66+ unique_ptr<FairMQMessage> heartbeat ( NewMessage ());
6867
69- if (hbChannel.Receive (hbMsg ) > 0 ) {
70- string address = string (static_cast <char *>(hbMsg ->GetData ()), hbMsg ->GetSize ());
68+ if (hbChannel.Receive (heartbeat ) > 0 ) {
69+ string address = string (static_cast <char *>(heartbeat ->GetData ()), heartbeat ->GetSize ());
7170
7271 if (fHeartbeats .find (address) != fHeartbeats .end ()) {
7372 ptime now = boost::posix_time::microsec_clock::local_time ();
@@ -97,7 +96,7 @@ void FLPSender::Run()
9796 // boost::thread heartbeatReceiver(boost::bind(&FLPSender::receiveHeartbeats, this));
9897
9998 // base buffer, to be copied from for every timeframe body (zero-copy)
100- unique_ptr<FairMQMessage> baseMsg (fTransportFactory -> CreateMessage (fEventSize ));
99+ unique_ptr<FairMQMessage> baseMsg (NewMessage (fEventSize ));
101100
102101 uint16_t timeFrameId = 0 ;
103102
@@ -106,59 +105,55 @@ void FLPSender::Run()
106105
107106 while (CheckCurrentState (RUNNING)) {
108107 // initialize f2e header
109- f2eHeader* h = new f2eHeader;
108+ f2eHeader* header = new f2eHeader;
110109
111110 if (fTestMode > 0 ) {
112111 // test-mode: receive and store id part in the buffer.
113- unique_ptr<FairMQMessage> idPart ( fTransportFactory -> CreateMessage ());
114- if (dataInChannel.Receive (idPart ) > 0 ) {
115- h ->timeFrameId = *(static_cast <uint16_t *>(idPart ->GetData ()));
116- h ->flpIndex = fIndex ;
112+ unique_ptr<FairMQMessage> id ( NewMessage ());
113+ if (dataInChannel.Receive (id ) > 0 ) {
114+ header ->timeFrameId = *(static_cast <uint16_t *>(id ->GetData ()));
115+ header ->flpIndex = fIndex ;
117116 } else {
118117 // if nothing was received, try again
119- delete h ;
118+ delete header ;
120119 continue ;
121120 }
122121 } else {
123122 // regular mode: use the id generated locally
124- h ->timeFrameId = timeFrameId;
125- h ->flpIndex = fIndex ;
123+ header ->timeFrameId = timeFrameId;
124+ header ->flpIndex = fIndex ;
126125
127126 if (++timeFrameId == UINT16_MAX - 1 ) {
128127 timeFrameId = 0 ;
129128 }
130129 }
131130
132- // unique_ptr<FairMQMessage> headerPart(fTransportFactory->CreateMessage(sizeof(f2eHeader)));
133- unique_ptr<FairMQMessage> headerPart (fTransportFactory ->CreateMessage (h, sizeof (f2eHeader), [](void * data, void * hint){ delete static_cast <f2eHeader*>(hint); }, h));
134- unique_ptr<FairMQMessage> dataPart (fTransportFactory ->CreateMessage ());
131+ FairMQParts parts;
132+
133+ parts.AddPart (NewMessage (header, sizeof (f2eHeader), [](void * data, void * hint){ delete static_cast <f2eHeader*>(hint); }, header));
134+ parts.AddPart (NewMessage ());
135135
136136 // save the arrival time of the message.
137137 fArrivalTime .push (boost::posix_time::microsec_clock::local_time ());
138138
139139 if (fTestMode > 0 ) {
140140 // test-mode: initialize and store data part in the buffer.
141- dataPart->Copy (baseMsg);
142- fHeaderBuffer .push (move (headerPart));
143- fDataBuffer .push (move (dataPart));
141+ parts.At (1 )->Copy (baseMsg);
142+ fSTFBuffer .push (move (parts));
144143 } else {
145144 // regular mode: receive data part from input
146- if (dataInChannel.Receive (dataPart) >= 0 ) {
147- fHeaderBuffer .push (move (headerPart));
148- fDataBuffer .push (move (dataPart));
145+ if (dataInChannel.Receive (parts.At (1 )) >= 0 ) {
146+ fSTFBuffer .push (move (parts));
149147 } else {
150148 // if nothing was received, try again
151149 continue ;
152150 }
153151 }
154152
155- // LOG(INFO) << fDataBuffer.size();
156-
157153 // if offset is 0 - send data out without staggering.
158- if (fSendOffset == 0 && fDataBuffer .size () > 0 ) {
154+ if (fSendOffset == 0 && fSTFBuffer .size () > 0 ) {
159155 sendFrontData ();
160- } else if (fDataBuffer .size () > 0 ) {
161- // size_t dataSize = fDataBuffer.front()->GetSize();
156+ } else if (fSTFBuffer .size () > 0 ) {
162157 ptime now = boost::posix_time::microsec_clock::local_time ();
163158 if ((now - fArrivalTime .front ()).total_milliseconds () >= (fSendDelay * fSendOffset )) {
164159 sendFrontData ();
@@ -174,8 +169,8 @@ void FLPSender::Run()
174169
175170inline void FLPSender::sendFrontData ()
176171{
177- f2eHeader h = *(static_cast <f2eHeader*>(fHeaderBuffer .front ()->GetData ()));
178- uint16_t currentTimeframeId = h .timeFrameId ;
172+ f2eHeader header = *(static_cast <f2eHeader*>(fSTFBuffer .front (). At ( 0 )->GetData ()));
173+ uint16_t currentTimeframeId = header .timeFrameId ;
179174
180175 // for which EPN is the message?
181176 int direction = currentTimeframeId % fNumEPNs ;
@@ -193,21 +188,14 @@ inline void FLPSender::sendFrontData()
193188 // if (to_simple_string(storedHeartbeat) == "not-a-date-time" ||
194189 // (currentTime - storedHeartbeat).total_milliseconds() > fHeartbeatTimeoutInMs) {
195190 // LOG(WARN) << "Heartbeat too old for EPN#" << direction << ", discarding message.";
196- // fHeaderBuffer .pop();
191+ // fSTFBuffer .pop();
197192 // fArrivalTime.pop();
198- // fDataBuffer.pop();
199193 // } else { // if the heartbeat from the corresponding EPN is within timeout period, send the data.
200- if (fChannels .at (" data-out" ).at (direction).SendPart (fHeaderBuffer .front ()) < 0 ) {
201- // TODO: replace SendPart() with SendPartAsync() after nov15 fairroot release
202- LOG (ERROR) << " Failed to queue ID part of event #" << currentTimeframeId;
203- } else {
204- if (fChannels .at (" data-out" ).at (direction).SendAsync (fDataBuffer .front ()) < 0 ) {
205- LOG (ERROR) << " Could not send message with event #" << currentTimeframeId << " without blocking" ;
206- }
194+ if (SendAsync (fSTFBuffer .front (), " data-out" , direction) < 0 ) {
195+ LOG (ERROR) << " Failed to queue sub-timeframe #" << currentTimeframeId;
207196 }
208- fHeaderBuffer .pop ();
197+ fSTFBuffer .pop ();
209198 fArrivalTime .pop ();
210- fDataBuffer .pop ();
211199 // }
212200}
213201
0 commit comments