Skip to content

Commit e3e6fa3

Browse files
Supporting header stack in the DPL output API
- Introducing optional header stack parameter to Output description - Adding helper method to access headers from the header stack at DPL input - removing code duplication This depends on PR AliceO2Group#1076
1 parent 2320449 commit e3e6fa3

6 files changed

Lines changed: 156 additions & 75 deletions

File tree

Framework/Core/include/Framework/DataAllocator.h

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class DataAllocator
6060
const AllowedOutputRoutes &routes);
6161

6262
DataChunk newChunk(const Output&, size_t);
63-
DataChunk newChunk(OutputRef const& ref, size_t size) { return newChunk(getOutputByBind(ref), size); }
63+
DataChunk newChunk(OutputRef&& ref, size_t size) { return newChunk(getOutputByBind(std::move(ref)), size); }
6464

6565
DataChunk adoptChunk(const Output&, char *, size_t, fairmq_free_fn*, void *);
6666

@@ -321,18 +321,33 @@ class DataAllocator
321321
"pointer to data type not supported by API. Please pass object by reference");
322322
}
323323

324+
/// make an object of type T and route to output specified by OutputRef
325+
/// The object is owned by the framework, returned reference can be used to fill the object.
326+
///
327+
/// OutputRef descriptors are expected to be passed as rvalue, i.e. a temporary object in the
328+
/// function call
324329
template <typename T, typename... Args>
325-
auto make(OutputRef const& ref, Args&&... args)
330+
auto make(OutputRef&& ref, Args&&... args)
326331
{
327-
return make<T>(getOutputByBind(ref), std::forward<Args>(args)...);
332+
return make<T>(getOutputByBind(std::move(ref)), std::forward<Args>(args)...);
328333
}
329334

330-
void adopt(OutputRef const& ref, TObject* obj) { return adopt(getOutputByBind(ref), obj); }
331-
335+
/// adopt an object of type T and route to output specified by OutputRef
336+
/// Framework takes ownership of the object
337+
///
338+
/// OutputRef descriptors are expected to be passed as rvalue, i.e. a temporary object in the
339+
/// function call
340+
void adopt(OutputRef&& ref, TObject* obj) { return adopt(getOutputByBind(std::move(ref)), obj); }
341+
342+
/// snapshot object and route to output specified by OutputRef
343+
/// Framework makes a (serialized) copy of object content.
344+
///
345+
/// OutputRef descriptors are expected to be passed as rvalue, i.e. a temporary object in the
346+
/// function call
332347
template <typename... Args>
333-
auto snapshot(OutputRef const& ref, Args&&... args)
348+
auto snapshot(OutputRef&& ref, Args&&... args)
334349
{
335-
return snapshot(getOutputByBind(ref), std::forward<Args>(args)...);
350+
return snapshot(getOutputByBind(std::move(ref)), std::forward<Args>(args)...);
336351
}
337352

338353
private:
@@ -343,16 +358,17 @@ class DataAllocator
343358
RootObjectContext* mRootContext;
344359

345360
std::string matchDataHeader(const Output &spec, size_t timeframeId);
346-
FairMQMessagePtr headerMessageFromOutput(Output const &spec,
347-
std::string const &channel,
348-
o2::header::SerializationMethod serializationMethod);
361+
FairMQMessagePtr headerMessageFromOutput(Output const& spec, //
362+
std::string const& channel, //
363+
o2::header::SerializationMethod serializationMethod, //
364+
size_t payloadSize); //
349365

350-
Output getOutputByBind(OutputRef const& ref)
366+
Output getOutputByBind(OutputRef&& ref)
351367
{
352368
for (size_t ri = 0, re = mAllowedOutputRoutes.size(); ri != re; ++ri) {
353369
if (mAllowedOutputRoutes[ri].matcher.binding.value == ref.label) {
354370
auto spec = mAllowedOutputRoutes[ri].matcher;
355-
return Output{ spec.origin, spec.description, ref.subSpec, spec.lifetime };
371+
return Output{ spec.origin, spec.description, ref.subSpec, spec.lifetime, std::move(ref.headerStack) };
356372
}
357373
}
358374
throw std::runtime_error("Unable to find OutputSpec with label " + ref.label);

Framework/Core/include/Framework/DataRefUtils.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,21 @@ struct DataRefUtils {
155155
static unsigned getPayloadSize(const DataRef& ref)
156156
{
157157
using DataHeader = o2::header::DataHeader;
158-
auto header = o2::header::get<const DataHeader*>(ref.header);
158+
auto* header = o2::header::get<const DataHeader*>(ref.header);
159159
if (!header) {
160160
return 0;
161161
}
162162
return header->payloadSize;
163163
}
164+
165+
template <typename T>
166+
static auto getHeader(const DataRef& ref)
167+
{
168+
using HeaderT = typename std::remove_pointer<T>::type;
169+
static_assert(std::is_pointer<T>::value && std::is_base_of<o2::header::BaseHeader, HeaderT>::value,
170+
"pointer to BaseHeader-derived type required");
171+
return o2::header::get<T>(ref.header);
172+
}
164173
};
165174

166175
} // namespace framework

Framework/Core/include/Framework/Output.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,34 @@ struct Output {
2424
header::DataDescription description;
2525
header::DataHeader::SubSpecificationType subSpec = 0;
2626
enum Lifetime lifetime = Lifetime::Timeframe;
27+
header::Stack metaHeader = {};
28+
29+
Output(header::DataOrigin o, header::DataDescription d) : origin(o), description(d) {}
30+
31+
Output(header::DataOrigin o, header::DataDescription d, header::DataHeader::SubSpecificationType s)
32+
: origin(o), description(d), subSpec(s)
33+
{
34+
}
35+
36+
Output(header::DataOrigin o, header::DataDescription d, header::DataHeader::SubSpecificationType s, Lifetime l)
37+
: origin(o), description(d), subSpec(s), lifetime(l)
38+
{
39+
}
40+
41+
Output(header::DataOrigin o, header::DataDescription d, header::DataHeader::SubSpecificationType s, Lifetime l,
42+
header::Stack&& stack)
43+
: origin(o), description(d), subSpec(s), lifetime(l), metaHeader(std::move(stack))
44+
{
45+
}
46+
47+
Output(const Output&& rhs)
48+
: origin(rhs.origin),
49+
description(rhs.description),
50+
subSpec(rhs.subSpec),
51+
lifetime(rhs.lifetime),
52+
metaHeader(std::move(rhs.metaHeader))
53+
{
54+
}
2755

2856
bool operator==(const Output& that)
2957
{

Framework/Core/include/Framework/OutputRef.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,23 @@ namespace framework
2020
{
2121

2222
/// A reference to an output spec
23+
///
24+
/// OutputRef descriptors are expected to be passed as rvalue, i.e. a temporary object in the
25+
/// function call. This is due to the fact that the header stack has no ordinary copy
26+
/// constructor but only a move constructor
2327
struct OutputRef {
2428
std::string label;
2529
header::DataHeader::SubSpecificationType subSpec;
30+
header::Stack headerStack = {};
31+
32+
OutputRef(std::string&& l, header::DataHeader::SubSpecificationType s = 0) : label(std::move(l)), subSpec(s) {}
33+
34+
OutputRef(const std::string& l, header::DataHeader::SubSpecificationType s = 0) : label(l), subSpec(s) {}
35+
36+
OutputRef(std::string&& l, header::DataHeader::SubSpecificationType s, o2::header::Stack&& stack)
37+
: label(std::move(l)), subSpec(s), headerStack(std::move(stack))
38+
{
39+
}
2640
};
2741

2842
} // namespace framework

Framework/Core/src/DataAllocator.cxx

Lines changed: 37 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -60,22 +60,10 @@ DataChunk
6060
DataAllocator::newChunk(const Output& spec, size_t size) {
6161
std::string channel = matchDataHeader(spec, mContext->timeslice());
6262

63-
DataHeader dh;
64-
dh.dataOrigin = spec.origin;
65-
dh.dataDescription = spec.description;
66-
dh.subSpecification = spec.subSpec;
67-
dh.payloadSize = size;
68-
dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
69-
70-
DataProcessingHeader dph{mContext->timeslice(), 1};
71-
//we have to move the incoming data
72-
o2::header::Stack headerStack{dh, dph};
73-
FairMQMessagePtr headerMessage = mDevice->NewMessageFor(channel, 0,
74-
headerStack.data(),
75-
headerStack.size(),
76-
&o2::header::Stack::freefn,
77-
headerStack.data());
78-
headerStack.release();
63+
FairMQMessagePtr headerMessage = headerMessageFromOutput(spec, channel, //
64+
o2::header::gSerializationMethodNone, //
65+
size //
66+
);
7967
FairMQMessagePtr payloadMessage = mDevice->NewMessageFor(channel, 0, size);
8068
auto dataPtr = payloadMessage->GetData();
8169
auto dataSize = payloadMessage->GetSize();
@@ -95,22 +83,10 @@ DataAllocator::adoptChunk(const Output& spec, char *buffer, size_t size, fairmq_
9583
// queue to be sent at the end of the processing
9684
std::string channel = matchDataHeader(spec, mContext->timeslice());
9785

98-
DataHeader dh;
99-
dh.dataOrigin = spec.origin;
100-
dh.dataDescription = spec.description;
101-
dh.subSpecification = spec.subSpec;
102-
dh.payloadSize = size;
103-
dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
104-
105-
DataProcessingHeader dph{mContext->timeslice(), 1};
106-
//we have to move the incoming data
107-
o2::header::Stack headerStack{dh, dph};
108-
FairMQMessagePtr headerMessage = mDevice->NewMessageFor(channel, 0,
109-
headerStack.data(),
110-
headerStack.size(),
111-
&o2::header::Stack::freefn,
112-
headerStack.data());
113-
headerStack.release();
86+
FairMQMessagePtr headerMessage = headerMessageFromOutput(spec, channel, //
87+
o2::header::gSerializationMethodNone, //
88+
size //
89+
);
11490

11591
FairMQParts parts;
11692

@@ -125,22 +101,21 @@ DataAllocator::adoptChunk(const Output& spec, char *buffer, size_t size, fairmq_
125101
return DataChunk{reinterpret_cast<char *>(dataPtr), dataSize};
126102
}
127103

128-
FairMQMessagePtr
129-
DataAllocator::headerMessageFromOutput(Output const &spec,
130-
std::string const &channel,
131-
o2::header::SerializationMethod method) {
104+
FairMQMessagePtr DataAllocator::headerMessageFromOutput(Output const& spec, //
105+
std::string const& channel, //
106+
o2::header::SerializationMethod method, //
107+
size_t payloadSize) //
108+
{
132109
DataHeader dh;
133110
dh.dataOrigin = spec.origin;
134111
dh.dataDescription = spec.description;
135112
dh.subSpecification = spec.subSpec;
136-
// the correct payload size is st later when sending the
137-
// RootObjectContext, see DataProcessor::doSend
138-
dh.payloadSize = 0;
113+
dh.payloadSize = payloadSize;
139114
dh.payloadSerializationMethod = method;
140115

141116
DataProcessingHeader dph{mContext->timeslice(), 1};
142117
//we have to move the incoming data
143-
o2::header::Stack headerStack{dh, dph};
118+
o2::header::Stack headerStack{ dh, dph, spec.metaHeader };
144119
FairMQMessagePtr headerMessage = mDevice->NewMessageFor(channel, 0,
145120
headerStack.data(),
146121
headerStack.size(),
@@ -150,31 +125,33 @@ DataAllocator::headerMessageFromOutput(Output const &spec,
150125
return std::move(headerMessage);
151126
}
152127

153-
void
154-
DataAllocator::addPartToContext(FairMQMessagePtr&& payloadMessage,
155-
const Output &spec,
156-
o2::header::SerializationMethod serializationMethod)
128+
void DataAllocator::addPartToContext(FairMQMessagePtr&& payloadMessage, const Output& spec,
129+
o2::header::SerializationMethod serializationMethod)
157130
{
158-
std::string channel = matchDataHeader(spec, mRootContext->timeslice());
159-
auto headerMessage = headerMessageFromOutput(spec, channel, serializationMethod);
160-
161-
FairMQParts parts;
162-
163-
// FIXME: this is kind of ugly, we know that we can change the content of the
164-
// header message because we have just created it, but the API declares it const
165-
const DataHeader* cdh = o2::header::get<DataHeader*>(headerMessage->GetData());
166-
DataHeader *dh = const_cast<DataHeader *>(cdh);
167-
dh->payloadSize = payloadMessage->GetSize();
168-
parts.AddPart(std::move(headerMessage));
169-
parts.AddPart(std::move(payloadMessage));
170-
mContext->addPart(std::move(parts), channel);
131+
std::string channel = matchDataHeader(spec, mRootContext->timeslice());
132+
// the correct payload size is st later when sending the
133+
// RootObjectContext, see DataProcessor::doSend
134+
auto headerMessage = headerMessageFromOutput(spec, channel, serializationMethod, 0);
135+
136+
FairMQParts parts;
137+
138+
// FIXME: this is kind of ugly, we know that we can change the content of the
139+
// header message because we have just created it, but the API declares it const
140+
const DataHeader* cdh = o2::header::get<DataHeader*>(headerMessage->GetData());
141+
DataHeader* dh = const_cast<DataHeader*>(cdh);
142+
dh->payloadSize = payloadMessage->GetSize();
143+
parts.AddPart(std::move(headerMessage));
144+
parts.AddPart(std::move(payloadMessage));
145+
mContext->addPart(std::move(parts), channel);
171146
}
172147

173-
void
174-
DataAllocator::adopt(const Output &spec, TObject*ptr) {
148+
void DataAllocator::adopt(const Output& spec, TObject* ptr)
149+
{
175150
std::unique_ptr<TObject> payload(ptr);
176151
std::string channel = matchDataHeader(spec, mRootContext->timeslice());
177-
auto header = headerMessageFromOutput(spec, channel, o2::header::gSerializationMethodROOT);
152+
// the correct payload size is set later when sending the
153+
// RootObjectContext, see DataProcessor::doSend
154+
auto header = headerMessageFromOutput(spec, channel, o2::header::gSerializationMethodROOT, 0);
178155
mRootContext->addObject(std::move(header), std::move(payload), channel);
179156
assert(payload.get() == nullptr);
180157
}

Framework/Core/test/test_DataAllocator.cxx

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,23 @@ using namespace o2::framework;
2929
LOG(ERROR) << R"(Test condition ")" #condition R"(" failed)"; \
3030
}
3131

32+
namespace test
33+
{
34+
struct MetaHeader : public o2::header::BaseHeader {
35+
// Required to do the lookup
36+
static const o2::header::HeaderType sHeaderType;
37+
static const uint32_t sVersion = 1;
38+
39+
MetaHeader(uint32_t v)
40+
: BaseHeader(sizeof(MetaHeader), sHeaderType, o2::header::gSerializationMethodNone, sVersion), secret(v)
41+
{
42+
}
43+
44+
uint64_t secret;
45+
};
46+
constexpr o2::header::HeaderType MetaHeader::sHeaderType = "MetaHead";
47+
}
48+
3249
DataProcessorSpec getTimeoutSpec()
3350
{
3451
// a timer process to terminate the workflow after a timeout
@@ -58,7 +75,9 @@ DataProcessorSpec getSourceSpec()
5875
std::vector<o2::test::Polymorphic> c{ { 0xaffe }, { 0xd00f } };
5976
// class TriviallyCopyable is both messageable and has a dictionary, the default
6077
// picked by the framework is no serialization
61-
pc.outputs().snapshot(Output{ "TST", "MESSAGEABLE", 0, Lifetime::Timeframe }, a);
78+
test::MetaHeader meta1{ 42 };
79+
test::MetaHeader meta2{ 23 };
80+
pc.outputs().snapshot(Output{ "TST", "MESSAGEABLE", 0, Lifetime::Timeframe, { meta1, meta2 } }, a);
6281
pc.outputs().snapshot(Output{ "TST", "MSGBLEROOTSRLZ", 0, Lifetime::Timeframe },
6382
o2::framework::ROOTSerialized<decltype(a)>(a));
6483
// class Polymorphic is not messageable, so the serialization type is deduced
@@ -92,13 +111,31 @@ DataProcessorSpec getSinkSpec()
92111
auto processingFct = [](ProcessingContext& pc) {
93112
using DataHeader = o2::header::DataHeader;
94113
for (auto& input : pc.inputs()) {
95-
auto dh = o2::header::get<const DataHeader*>(input.header);
114+
auto* dh = o2::header::get<const DataHeader*>(input.header);
96115
LOG(INFO) << dh->dataOrigin.str << " " << dh->dataDescription.str << " " << dh->payloadSize;
116+
117+
using DumpStackFctType = std::function<void(const o2::header::BaseHeader*)>;
118+
DumpStackFctType dumpStack = [&](const o2::header::BaseHeader* h) {
119+
o2::header::hexDump("", h, h->size());
120+
if (h->flagsNextHeader) {
121+
auto next = reinterpret_cast<const byte*>(h) + h->size();
122+
dumpStack(reinterpret_cast<const o2::header::BaseHeader*>(next));
123+
}
124+
};
125+
126+
dumpStack(dh);
97127
}
98128
// plain, unserialized object in input1 channel
99129
auto object1 = pc.inputs().get<o2::test::TriviallyCopyable>("input1");
100130
ASSERT_ERROR(object1 != nullptr);
101131
ASSERT_ERROR(*object1 == o2::test::TriviallyCopyable(42, 23, 0xdead));
132+
// check the additional header on the stack
133+
auto* metaHeader1 = DataRefUtils::getHeader<test::MetaHeader*>(pc.inputs().get("input1"));
134+
// check if there are more of the same type
135+
auto* metaHeader2 = metaHeader1 ? o2::header::get<test::MetaHeader*>(metaHeader1->next()) : nullptr;
136+
ASSERT_ERROR(metaHeader1 != nullptr);
137+
ASSERT_ERROR(metaHeader1->secret == 42);
138+
ASSERT_ERROR(metaHeader2 != nullptr && metaHeader2->secret == 23);
102139

103140
// ROOT-serialized messageable object in input2 channel
104141
auto object2 = pc.inputs().get<o2::test::TriviallyCopyable>("input2");

0 commit comments

Comments
 (0)