@@ -60,22 +60,10 @@ DataChunk
6060DataAllocator::newChunk (const Output& spec, size_t size) {
6161 std::string channel = matchDataHeader (spec, mContext ->timeslice ());
6262
63- DataHeader dh;
64- dh.dataOrigin = spec.origin ;
65- dh.dataDescription = spec.description ;
66- dh.subSpecification = spec.subSpec ;
67- dh.payloadSize = size;
68- dh.payloadSerializationMethod = o2::header::gSerializationMethodNone ;
69-
70- DataProcessingHeader dph{mContext ->timeslice (), 1 };
71- // we have to move the incoming data
72- o2::header::Stack headerStack{dh, dph};
73- FairMQMessagePtr headerMessage = mDevice ->NewMessageFor (channel, 0 ,
74- headerStack.data (),
75- headerStack.size (),
76- &o2::header::Stack::freefn,
77- headerStack.data ());
78- headerStack.release ();
63+ FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, channel, //
64+ o2::header::gSerializationMethodNone , //
65+ size //
66+ );
7967 FairMQMessagePtr payloadMessage = mDevice ->NewMessageFor (channel, 0 , size);
8068 auto dataPtr = payloadMessage->GetData ();
8169 auto dataSize = payloadMessage->GetSize ();
@@ -95,22 +83,10 @@ DataAllocator::adoptChunk(const Output& spec, char *buffer, size_t size, fairmq_
9583 // queue to be sent at the end of the processing
9684 std::string channel = matchDataHeader (spec, mContext ->timeslice ());
9785
98- DataHeader dh;
99- dh.dataOrigin = spec.origin ;
100- dh.dataDescription = spec.description ;
101- dh.subSpecification = spec.subSpec ;
102- dh.payloadSize = size;
103- dh.payloadSerializationMethod = o2::header::gSerializationMethodNone ;
104-
105- DataProcessingHeader dph{mContext ->timeslice (), 1 };
106- // we have to move the incoming data
107- o2::header::Stack headerStack{dh, dph};
108- FairMQMessagePtr headerMessage = mDevice ->NewMessageFor (channel, 0 ,
109- headerStack.data (),
110- headerStack.size (),
111- &o2::header::Stack::freefn,
112- headerStack.data ());
113- headerStack.release ();
86+ FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, channel, //
87+ o2::header::gSerializationMethodNone , //
88+ size //
89+ );
11490
11591 FairMQParts parts;
11692
@@ -125,22 +101,21 @@ DataAllocator::adoptChunk(const Output& spec, char *buffer, size_t size, fairmq_
125101 return DataChunk{reinterpret_cast <char *>(dataPtr), dataSize};
126102}
127103
128- FairMQMessagePtr
129- DataAllocator::headerMessageFromOutput (Output const &spec,
130- std::string const &channel,
131- o2::header::SerializationMethod method) {
104+ FairMQMessagePtr DataAllocator::headerMessageFromOutput (Output const & spec, //
105+ std::string const & channel, //
106+ o2::header::SerializationMethod method, //
107+ size_t payloadSize) //
108+ {
132109 DataHeader dh;
133110 dh.dataOrigin = spec.origin ;
134111 dh.dataDescription = spec.description ;
135112 dh.subSpecification = spec.subSpec ;
136- // the correct payload size is st later when sending the
137- // RootObjectContext, see DataProcessor::doSend
138- dh.payloadSize = 0 ;
113+ dh.payloadSize = payloadSize;
139114 dh.payloadSerializationMethod = method;
140115
141116 DataProcessingHeader dph{mContext ->timeslice (), 1 };
142117 // we have to move the incoming data
143- o2::header::Stack headerStack{dh, dph};
118+ o2::header::Stack headerStack{ dh, dph, spec. metaHeader };
144119 FairMQMessagePtr headerMessage = mDevice ->NewMessageFor (channel, 0 ,
145120 headerStack.data (),
146121 headerStack.size (),
@@ -150,31 +125,33 @@ DataAllocator::headerMessageFromOutput(Output const &spec,
150125 return std::move (headerMessage);
151126}
152127
153- void
154- DataAllocator::addPartToContext (FairMQMessagePtr&& payloadMessage,
155- const Output &spec,
156- o2::header::SerializationMethod serializationMethod)
128+ void DataAllocator::addPartToContext (FairMQMessagePtr&& payloadMessage, const Output& spec,
129+ o2::header::SerializationMethod serializationMethod)
157130{
158- std::string channel = matchDataHeader (spec, mRootContext ->timeslice ());
159- auto headerMessage = headerMessageFromOutput (spec, channel, serializationMethod);
160-
161- FairMQParts parts;
162-
163- // FIXME: this is kind of ugly, we know that we can change the content of the
164- // header message because we have just created it, but the API declares it const
165- const DataHeader* cdh = o2::header::get<DataHeader*>(headerMessage->GetData ());
166- DataHeader *dh = const_cast <DataHeader *>(cdh);
167- dh->payloadSize = payloadMessage->GetSize ();
168- parts.AddPart (std::move (headerMessage));
169- parts.AddPart (std::move (payloadMessage));
170- mContext ->addPart (std::move (parts), channel);
131+ std::string channel = matchDataHeader (spec, mRootContext ->timeslice ());
132+ // the correct payload size is st later when sending the
133+ // RootObjectContext, see DataProcessor::doSend
134+ auto headerMessage = headerMessageFromOutput (spec, channel, serializationMethod, 0 );
135+
136+ FairMQParts parts;
137+
138+ // FIXME: this is kind of ugly, we know that we can change the content of the
139+ // header message because we have just created it, but the API declares it const
140+ const DataHeader* cdh = o2::header::get<DataHeader*>(headerMessage->GetData ());
141+ DataHeader* dh = const_cast <DataHeader*>(cdh);
142+ dh->payloadSize = payloadMessage->GetSize ();
143+ parts.AddPart (std::move (headerMessage));
144+ parts.AddPart (std::move (payloadMessage));
145+ mContext ->addPart (std::move (parts), channel);
171146}
172147
173- void
174- DataAllocator::adopt ( const Output &spec, TObject*ptr) {
148+ void DataAllocator::adopt ( const Output& spec, TObject* ptr)
149+ {
175150 std::unique_ptr<TObject> payload (ptr);
176151 std::string channel = matchDataHeader (spec, mRootContext ->timeslice ());
177- auto header = headerMessageFromOutput (spec, channel, o2::header::gSerializationMethodROOT );
152+ // the correct payload size is set later when sending the
153+ // RootObjectContext, see DataProcessor::doSend
154+ auto header = headerMessageFromOutput (spec, channel, o2::header::gSerializationMethodROOT , 0 );
178155 mRootContext ->addObject (std::move (header), std::move (payload), channel);
179156 assert (payload.get () == nullptr );
180157}
0 commit comments