2727#include " Framework/CheckTypes.h"
2828#include " Framework/ServiceRegistry.h"
2929#include " Framework/RuntimeError.h"
30+ #include " Framework/RouteState.h"
3031
3132#include " Headers/DataHeader.h"
3233#include < TClass.h>
@@ -106,35 +107,35 @@ class DataAllocator
106107 // plain buffer as polymorphic spectator std::vector, which does not run constructors / destructors
107108 using ValueType = typename T::value_type;
108109 auto & timingInfo = mRegistry ->get <TimingInfo>();
109- std::string const & channel = matchDataHeader (spec, timingInfo.timeslice );
110+ auto routeIndex = matchDataHeader (spec, timingInfo.timeslice );
110111 auto & context = mRegistry ->get <MessageContext>();
111112
112113 // Note: initial payload size is 0 and will be set by the context before sending
113- FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, channel , o2::header::gSerializationMethodNone , 0 );
114+ FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, routeIndex , o2::header::gSerializationMethodNone , 0 );
114115 return context.add <MessageContext::VectorObject<ValueType, MessageContext::ContainerRefObject<std::vector<ValueType, o2::pmr::NoConstructAllocator<ValueType>>>>>(
115- std::move (headerMessage), channel , 0 , std::forward<Args>(args)...)
116+ std::move (headerMessage), routeIndex , 0 , std::forward<Args>(args)...)
116117 .get ();
117118 } else if constexpr (is_specialization_v<T, std::vector> && has_messageable_value_type<T>::value) {
118119 // this catches all std::vector objects with messageable value type before checking if is also
119120 // has a root dictionary, so non-serialized transmission is preferred
120121 using ValueType = typename T::value_type;
121122 auto & timingInfo = mRegistry ->get <TimingInfo>();
122- std::string const & channel = matchDataHeader (spec, timingInfo.timeslice );
123+ auto routeIndex = matchDataHeader (spec, timingInfo.timeslice );
123124 auto & context = mRegistry ->get <MessageContext>();
124125
125126 // Note: initial payload size is 0 and will be set by the context before sending
126- FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, channel , o2::header::gSerializationMethodNone , 0 );
127- return context.add <MessageContext::VectorObject<ValueType>>(std::move (headerMessage), channel , 0 , std::forward<Args>(args)...).get ();
127+ FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, routeIndex , o2::header::gSerializationMethodNone , 0 );
128+ return context.add <MessageContext::VectorObject<ValueType>>(std::move (headerMessage), routeIndex , 0 , std::forward<Args>(args)...).get ();
128129 } else if constexpr (has_root_dictionary<T>::value == true && is_messageable<T>::value == false ) {
129130 // Extended support for types implementing the Root ClassDef interface, both TObject
130131 // derived types and others
131132 auto & timingInfo = mRegistry ->get <TimingInfo>();
132- std::string const & channel = matchDataHeader (spec, timingInfo.timeslice );
133+ auto routeIndex = matchDataHeader (spec, timingInfo.timeslice );
133134 auto & context = mRegistry ->get <MessageContext>();
134135
135136 // Note: initial payload size is 0 and will be set by the context before sending
136- FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, channel , o2::header::gSerializationMethodROOT , 0 );
137- return context.add <MessageContext::RootSerializedObject<T>>(std::move (headerMessage), channel , std::forward<Args>(args)...).get ();
137+ FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, routeIndex , o2::header::gSerializationMethodROOT , 0 );
138+ return context.add <MessageContext::RootSerializedObject<T>>(std::move (headerMessage), routeIndex , std::forward<Args>(args)...).get ();
138139 } else if constexpr (std::is_base_of_v<std::string, T>) {
139140 auto * s = new std::string (args...);
140141 adopt (spec, s);
@@ -172,11 +173,11 @@ class DataAllocator
172173 auto [nElements] = std::make_tuple (args...);
173174 auto size = nElements * sizeof (T);
174175 auto & timingInfo = mRegistry ->get <TimingInfo>();
175- std::string const & channel = matchDataHeader (spec, timingInfo.timeslice );
176+ auto routeIndex = matchDataHeader (spec, timingInfo.timeslice );
176177 auto & context = mRegistry ->get <MessageContext>();
177178
178- FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, channel , o2::header::gSerializationMethodNone , size);
179- return context.add <MessageContext::SpanObject<T>>(std::move (headerMessage), channel , 0 , nElements).get ();
179+ FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, routeIndex , o2::header::gSerializationMethodNone , size);
180+ return context.add <MessageContext::SpanObject<T>>(std::move (headerMessage), routeIndex , 0 , nElements).get ();
180181 }
181182 } else if constexpr (std::is_same_v<FirstArg, std::shared_ptr<arrow::Schema>>) {
182183 if constexpr (std::is_base_of_v<arrow::ipc::RecordBatchWriter, T>) {
@@ -239,10 +240,10 @@ class DataAllocator
239240
240241 char * payload = reinterpret_cast <char *>(ptr);
241242 auto & timingInfo = mRegistry ->get <TimingInfo>();
242- std::string const & channel = matchDataHeader (spec, timingInfo.timeslice );
243+ auto routeIndex = matchDataHeader (spec, timingInfo.timeslice );
243244 // the correct payload size is set later when sending the
244245 // RawBufferContext, see DataProcessor::doSend
245- auto header = headerMessageFromOutput (spec, channel , o2::header::gSerializationMethodNone , 0 );
246+ auto header = headerMessageFromOutput (spec, routeIndex , o2::header::gSerializationMethodNone , 0 );
246247
247248 auto lambdaSerialize = [voidPtr = payload]() {
248249 return o2::utils::BoostSerialize<type>(*(reinterpret_cast <type*>(voidPtr)));
@@ -253,7 +254,7 @@ class DataAllocator
253254 delete tmpPtr;
254255 };
255256
256- mRegistry ->get <RawBufferContext>().addRawBuffer (std::move (header), std::move (payload), std::move (channel) , std::move (lambdaSerialize), std::move (lambdaDestructor));
257+ mRegistry ->get <RawBufferContext>().addRawBuffer (std::move (header), std::move (payload), routeIndex , std::move (lambdaSerialize), std::move (lambdaDestructor));
257258 }
258259
259260 // / Send a snapshot of an object, depending on the object type it is serialized before.
@@ -278,12 +279,13 @@ class DataAllocator
278279 template <typename T>
279280 void snapshot (const Output& spec, T const & object)
280281 {
281- auto proxy = mRegistry ->get <MessageContext>().proxy ();
282+ auto & proxy = mRegistry ->get <MessageContext>().proxy ();
282283 FairMQMessagePtr payloadMessage;
283284 auto serializationType = o2::header::gSerializationMethodNone ;
285+ RouteIndex routeIndex = matchDataHeader (spec, mRegistry ->get <TimingInfo>().timeslice );
284286 if constexpr (is_messageable<T>::value == true ) {
285287 // Serialize a snapshot of a trivially copyable, non-polymorphic object,
286- payloadMessage = proxy.createMessage (sizeof (T));
288+ payloadMessage = proxy.createMessage (routeIndex, sizeof (T));
287289 memcpy (payloadMessage->GetData (), &object, sizeof (T));
288290
289291 serializationType = o2::header::gSerializationMethodNone ;
@@ -296,7 +298,7 @@ class DataAllocator
296298 // reference object
297299 constexpr auto elementSizeInBytes = sizeof (ElementType);
298300 auto sizeInBytes = elementSizeInBytes * object.size ();
299- payloadMessage = proxy.createMessage (sizeInBytes);
301+ payloadMessage = proxy.createMessage (routeIndex, sizeInBytes);
300302
301303 if constexpr (std::is_pointer<typename T::value_type>::value == false ) {
302304 // vector of elements
@@ -324,7 +326,7 @@ class DataAllocator
324326 }
325327 } else if constexpr (has_root_dictionary<T>::value == true || is_specialization_v<T, ROOTSerialized> == true ) {
326328 // Serialize a snapshot of an object with root dictionary
327- payloadMessage = proxy.createMessage ();
329+ payloadMessage = proxy.createMessage (routeIndex );
328330 if constexpr (is_specialization_v<T, ROOTSerialized> == true ) {
329331 // Explicitely ROOT serialize a snapshot of object.
330332 // An object wrapped into type `ROOTSerialized` is explicitely marked to be ROOT serialized
@@ -403,9 +405,9 @@ class DataAllocator
403405 o2::pmr::FairMQMemoryResource* getMemoryResource (const Output& spec)
404406 {
405407 auto & timingInfo = mRegistry ->get <TimingInfo>();
406- std::string const & channel = matchDataHeader (spec, timingInfo. timeslice );
407- auto & context = mRegistry -> get <MessageContext>( );
408- return *context. proxy () .getTransport (channel );
408+ auto & proxy = mRegistry -> get <FairMQDeviceProxy>( );
409+ RouteIndex routeIndex = matchDataHeader (spec, timingInfo. timeslice );
410+ return *proxy.getTransport (routeIndex );
409411 }
410412
411413 // make a stl (pmr) vector
@@ -486,9 +488,9 @@ class DataAllocator
486488 AllowedOutputRoutes mAllowedOutputRoutes ;
487489 ServiceRegistry* mRegistry ;
488490
489- std::string const & matchDataHeader (const Output& spec, size_t timeframeId);
491+ RouteIndex matchDataHeader (const Output& spec, size_t timeframeId);
490492 FairMQMessagePtr headerMessageFromOutput (Output const & spec, //
491- std::string const & channel, //
493+ RouteIndex index, //
492494 o2::header::SerializationMethod serializationMethod, //
493495 size_t payloadSize); //
494496
@@ -504,12 +506,12 @@ DataAllocator::CacheId DataAllocator::adoptContainer(const Output& spec, Contain
504506 // Find a matching channel, extract the message for it form the container
505507 // and put it in the queue to be sent at the end of the processing
506508 auto & timingInfo = mRegistry ->get <TimingInfo>();
507- std::string const & channel = matchDataHeader (spec, timingInfo.timeslice );
509+ auto routeIndex = matchDataHeader (spec, timingInfo.timeslice );
508510
509511 auto & context = mRegistry ->get <MessageContext>();
510- FairMQMessagePtr payloadMessage = o2::pmr::getMessage (std::forward<ContainerT>(container), *context. proxy (). getTransport (channel) );
511-
512- FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, channel, //
512+ auto * transport = mRegistry -> get <FairMQDeviceProxy>(). getTransport (routeIndex );
513+ FairMQMessagePtr payloadMessage = o2::pmr::getMessage (std::forward<ContainerT>(container), *transport);
514+ FairMQMessagePtr headerMessage = headerMessageFromOutput (spec, routeIndex, //
513515 method, //
514516 payloadMessage->GetSize () //
515517 );
@@ -522,7 +524,7 @@ DataAllocator::CacheId DataAllocator::adoptContainer(const Output& spec, Contain
522524 cacheId.value = context.addToCache (payloadMessage);
523525 }
524526
525- context.add <MessageContext::TrivialObject>(std::move (headerMessage), std::move (payloadMessage), channel );
527+ context.add <MessageContext::TrivialObject>(std::move (headerMessage), std::move (payloadMessage), routeIndex );
526528 return cacheId;
527529}
528530
0 commit comments