Skip to content

Commit 11029c9

Browse files
committed
Disambiguate OutputSpec usage
Currently OutputSpec has two semantically different usages. - To list the outputs which will be produced by a computation - To actually assing an output header to the outgoing messages. This PR resolves the ambiguity by renaming OutputSpec to Output in the second case, as discussed in the WP4 meeting.
1 parent d360c4b commit 11029c9

24 files changed

Lines changed: 134 additions & 92 deletions

Framework/Core/include/Framework/DataAllocator.h

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <fairmq/FairMQDevice.h>
1414
#include "Headers/DataHeader.h"
1515
#include "Framework/OutputRoute.h"
16+
#include "Framework/Output.h"
1617
#include "Framework/DataChunk.h"
1718
#include "Framework/MessageContext.h"
1819
#include "Framework/RootObjectContext.h"
@@ -52,15 +53,15 @@ class DataAllocator
5253
RootObjectContext *rootContext,
5354
const AllowedOutputsMap &outputs);
5455

55-
DataChunk newChunk(const OutputSpec &, size_t);
56-
DataChunk adoptChunk(const OutputSpec &, char *, size_t, fairmq_free_fn*, void *);
56+
DataChunk newChunk(const Output&, size_t);
57+
DataChunk adoptChunk(const Output&, char *, size_t, fairmq_free_fn*, void *);
5758

5859
// In case no extra argument is provided and the passed type is trivially
5960
// copyable and non polymorphic, the most likely wanted behavior is to create
6061
// a message with that type, and so we do.
6162
template <typename T>
6263
typename std::enable_if<is_messageable<T>::value == true, T&>::type
63-
make(const OutputSpec& spec)
64+
make(const Output& spec)
6465
{
6566
DataChunk chunk = newChunk(spec, sizeof(T));
6667
return *reinterpret_cast<T*>(chunk.data);
@@ -70,7 +71,7 @@ class DataAllocator
7071
// collection elements of that type
7172
template <typename T>
7273
typename std::enable_if<is_messageable<T>::value == true, gsl::span<T>>::type
73-
make(const OutputSpec& spec, size_t nElements)
74+
make(const Output& spec, size_t nElements)
7475
{
7576
auto size = nElements*sizeof(T);
7677
DataChunk chunk = newChunk(spec, size);
@@ -86,7 +87,7 @@ class DataAllocator
8687
/// once the processing callback completes.
8788
template <typename T, typename... Args>
8889
typename std::enable_if<std::is_base_of<TObject, T>::value == true, T&>::type
89-
make(const OutputSpec &spec, Args... args) {
90+
make(const Output& spec, Args... args) {
9091
auto obj = new T(args...);
9192
adopt(spec, obj);
9293
return *obj;
@@ -100,7 +101,7 @@ class DataAllocator
100101
std::is_base_of<TObject, T>::value == false &&
101102
is_messageable<T>::value == false,
102103
T&>::type
103-
make(const OutputSpec&)
104+
make(const Output&)
104105
{
105106
static_assert(is_messageable<T>::value == true ||
106107
std::is_base_of<TObject, T>::value == true,
@@ -116,7 +117,7 @@ class DataAllocator
116117
std::is_base_of<TObject, T>::value == false &&
117118
is_messageable<T>::value == false,
118119
gsl::span<T>>::type
119-
make(const OutputSpec&, size_t)
120+
make(const Output&, size_t)
120121
{
121122
static_assert(is_messageable<T>::value == true,
122123
"data type T not supported by API, \n specializations available for"
@@ -131,7 +132,7 @@ class DataAllocator
131132
std::is_base_of<TObject, T>::value == false &&
132133
is_messageable<T>::value == false,
133134
T&>::type
134-
make(const OutputSpec&, U, V, Args...)
135+
make(const Output&, U, V, Args...)
135136
{
136137
static_assert(is_messageable<T>::value == true || std::is_base_of<TObject, T>::value == true,
137138
"data type T not supported by API, \n specializations available for"
@@ -143,7 +144,7 @@ class DataAllocator
143144
/// Adopt a TObject in the framework and serialize / send
144145
/// it to the consumers of @a spec once done.
145146
void
146-
adopt(const OutputSpec &spec, TObject*obj);
147+
adopt(const Output& spec, TObject*obj);
147148

148149
/// Serialize a snapshot of an object with root dictionary when called,
149150
/// will then be sent once the computation ends.
@@ -155,7 +156,7 @@ class DataAllocator
155156
/// explicitely otherwise by using ROOTSerialized wrapper type.
156157
template <typename T>
157158
typename std::enable_if<has_root_dictionary<T>::value == true && is_messageable<T>::value == false, void>::type
158-
snapshot(const OutputSpec& spec, T& object)
159+
snapshot(const Output& spec, T& object)
159160
{
160161
FairMQMessagePtr payloadMessage(mDevice->NewMessage());
161162
auto* cl = TClass::GetClass(typeid(T));
@@ -173,7 +174,7 @@ class DataAllocator
173174
/// after the call will not be sent.
174175
template <typename W>
175176
typename std::enable_if<is_specialization<W, ROOTSerialized>::value == true, void>::type
176-
snapshot(const OutputSpec& spec, W wrapper)
177+
snapshot(const Output& spec, W wrapper)
177178
{
178179
using T = typename W::wrapped_type;
179180
static_assert(std::is_same<typename W::hint_type, const char>::value || //
@@ -214,7 +215,7 @@ class DataAllocator
214215
/// unserialized. Use @a ROOTSerialized type wrapper to force ROOT serialization.
215216
template <typename T>
216217
typename std::enable_if<is_messageable<T>::value == true, void>::type
217-
snapshot(const OutputSpec& spec, T const& object)
218+
snapshot(const Output& spec, T const& object)
218219
{
219220
FairMQMessagePtr payloadMessage(mDevice->NewMessage(sizeof(T)));
220221
memcpy(payloadMessage->GetData(), &object, sizeof(T));
@@ -230,7 +231,7 @@ class DataAllocator
230231
typename std::enable_if<is_specialization<C, std::vector>::value == true &&
231232
std::is_pointer<typename C::value_type>::value == false &&
232233
is_messageable<typename C::value_type>::value == true>::type
233-
snapshot(const OutputSpec& spec, C const& v)
234+
snapshot(const Output& spec, C const& v)
234235
{
235236
auto sizeInBytes = sizeof(typename C::value_type) * v.size();
236237
FairMQMessagePtr payloadMessage(mDevice->NewMessage(sizeInBytes));
@@ -250,7 +251,7 @@ class DataAllocator
250251
is_specialization<C, std::vector>::value == true &&
251252
std::is_pointer<typename C::value_type>::value == true &&
252253
is_messageable<typename std::remove_pointer<typename C::value_type>::type>::value == true>::type
253-
snapshot(const OutputSpec& spec, C const& v)
254+
snapshot(const Output& spec, C const& v)
254255
{
255256
using ElementType = typename std::remove_pointer<typename C::value_type>::type;
256257
constexpr auto elementSizeInBytes = sizeof(ElementType);
@@ -273,7 +274,7 @@ class DataAllocator
273274
is_messageable<T>::value == false && //
274275
std::is_pointer<T>::value == false && //
275276
is_specialization<T, std::vector>::value == false>::type //
276-
snapshot(const OutputSpec& spec, T const&)
277+
snapshot(const Output& spec, T const&)
277278
{
278279
static_assert(has_root_dictionary<T>::value == true ||
279280
is_specialization<T, ROOTSerialized>::value == true ||
@@ -294,7 +295,7 @@ class DataAllocator
294295
typename std::remove_pointer<typename T::value_type>::type
295296
>::value == false
296297
>::type
297-
snapshot(const OutputSpec& spec, T const&)
298+
snapshot(const Output& spec, T const&)
298299
{
299300
static_assert(is_messageable<typename std::remove_pointer<typename T::value_type>::type>::value == true,
300301
"data type T not supported by API, \n specializations available for"
@@ -306,20 +307,20 @@ class DataAllocator
306307
/// specialization to catch the case where a pointer to an object has been
307308
/// accidentally given as parameter
308309
template <typename T>
309-
typename std::enable_if<std::is_pointer<T>::value>::type snapshot(const OutputSpec& spec, T const&)
310+
typename std::enable_if<std::is_pointer<T>::value>::type snapshot(const Output& spec, T const&)
310311
{
311312
static_assert(std::is_pointer<T>::value == false,
312313
"pointer to data type not supported by API. Please pass object by reference");
313314
}
314315

315316
private:
316-
std::string matchDataHeader(const OutputSpec &spec, size_t timeframeId);
317-
FairMQMessagePtr headerMessageFromSpec(OutputSpec const &spec,
318-
std::string const &channel,
319-
o2::header::SerializationMethod serializationMethod);
317+
std::string matchDataHeader(const Output &spec, size_t timeframeId);
318+
FairMQMessagePtr headerMessageFromOutput(Output const &spec,
319+
std::string const &channel,
320+
o2::header::SerializationMethod serializationMethod);
320321

321322
void addPartToContext(FairMQMessagePtr&& payload,
322-
const OutputSpec &spec,
323+
const Output &spec,
323324
o2::header::SerializationMethod serializationMethod);
324325

325326
FairMQDevice *mDevice;

Framework/Core/include/Framework/Dispatcher.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class Dispatcher
8080

8181
/// Creates dispatcher output specification basing on input specification of the same data. Basically, it adds '_S' at
8282
/// the end of description, which makes data stream distinctive from the main flow (which is not sampled/filtered).
83-
static OutputSpec createDispatcherOutputSpec(const InputSpec& dispatcherInput)
83+
static Output createDispatcherOutput(const InputSpec& dispatcherInput)
8484
{
8585
header::DataDescription description = dispatcherInput.description;
8686
size_t len = strlen(description.str);
@@ -89,7 +89,7 @@ class Dispatcher
8989
description.str[len + 1] = 'S';
9090
}
9191

92-
return OutputSpec{ dispatcherInput.origin, description, 0, dispatcherInput.lifetime };
92+
return Output{ dispatcherInput.origin, description, 0, dispatcherInput.lifetime };
9393
}
9494

9595
protected:
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#ifndef FRAMEWORK_OUTPUT_H
11+
#define FRAMEWORK_OUTPUT_H
12+
13+
#include "Headers/DataHeader.h"
14+
#include "Framework/Lifetime.h"
15+
16+
namespace o2 {
17+
namespace framework {
18+
19+
/// A concrete description of the output to be created
20+
struct Output {
21+
header::DataOrigin origin;
22+
header::DataDescription description;
23+
header::DataHeader::SubSpecificationType subSpec = 0;
24+
enum Lifetime lifetime = Lifetime::Timeframe;
25+
26+
bool operator==(const Output& that)
27+
{
28+
return origin == that.origin && description == that.description && subSpec == that.subSpec &&
29+
lifetime == that.lifetime;
30+
};
31+
};
32+
33+
}
34+
}
35+
#endif

Framework/Core/include/Framework/SerializationMethods.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ namespace framework
2626
/// Usage: (with 'output' being the DataAllocator of the ProcessingContext)
2727
/// SomeType object;
2828
/// ROOTSerialized<decltype(object)> ref(object);
29-
/// output.snapshot(OutputSpec{}, ref);
29+
/// output.snapshot(Output{}, ref);
3030
/// - or -
31-
/// output.snapshot(OutputSpec{}, ROOTSerialized<decltype(object)>(object));
31+
/// output.snapshot(Output{}, ROOTSerialized<decltype(object)>(object));
3232
///
3333
/// The existence of the ROOT dictionary for the wrapped type can not be
3434
/// checked at compile time, a runtime check must be performed in the

Framework/Core/src/DataAllocator.cxx

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ DataAllocator::DataAllocator(FairMQDevice *device,
3333
}
3434

3535
std::string
36-
DataAllocator::matchDataHeader(const OutputSpec &spec, size_t timeslice) {
36+
DataAllocator::matchDataHeader(const Output& spec, size_t timeslice) {
3737
// FIXME: we should take timeframeId into account as well.
3838
for (auto &output : mAllowedOutputs) {
3939
if (DataSpecUtils::match(output.matcher, spec.origin, spec.description, spec.subSpec)
@@ -50,7 +50,7 @@ DataAllocator::matchDataHeader(const OutputSpec &spec, size_t timeslice) {
5050
}
5151

5252
DataChunk
53-
DataAllocator::newChunk(const OutputSpec &spec, size_t size) {
53+
DataAllocator::newChunk(const Output& spec, size_t size) {
5454
std::string channel = matchDataHeader(spec, mContext->timeslice());
5555

5656
DataHeader dh;
@@ -83,7 +83,7 @@ DataAllocator::newChunk(const OutputSpec &spec, size_t size) {
8383
}
8484

8585
DataChunk
86-
DataAllocator::adoptChunk(const OutputSpec &spec, char *buffer, size_t size, fairmq_free_fn *freefn, void *hint = nullptr) {
86+
DataAllocator::adoptChunk(const Output& spec, char *buffer, size_t size, fairmq_free_fn *freefn, void *hint = nullptr) {
8787
// Find a matching channel, create a new message for it and put it in the
8888
// queue to be sent at the end of the processing
8989
std::string channel = matchDataHeader(spec, mContext->timeslice());
@@ -119,9 +119,9 @@ DataAllocator::adoptChunk(const OutputSpec &spec, char *buffer, size_t size, fai
119119
}
120120

121121
FairMQMessagePtr
122-
DataAllocator::headerMessageFromSpec(OutputSpec const &spec,
123-
std::string const &channel,
124-
o2::header::SerializationMethod method) {
122+
DataAllocator::headerMessageFromOutput(Output const &spec,
123+
std::string const &channel,
124+
o2::header::SerializationMethod method) {
125125
DataHeader dh;
126126
dh.dataOrigin = spec.origin;
127127
dh.dataDescription = spec.description;
@@ -145,11 +145,11 @@ DataAllocator::headerMessageFromSpec(OutputSpec const &spec,
145145

146146
void
147147
DataAllocator::addPartToContext(FairMQMessagePtr&& payloadMessage,
148-
const OutputSpec &spec,
148+
const Output &spec,
149149
o2::header::SerializationMethod serializationMethod)
150150
{
151151
std::string channel = matchDataHeader(spec, mRootContext->timeslice());
152-
auto headerMessage = headerMessageFromSpec(spec, channel, serializationMethod);
152+
auto headerMessage = headerMessageFromOutput(spec, channel, serializationMethod);
153153

154154
FairMQParts parts;
155155

@@ -164,10 +164,10 @@ DataAllocator::addPartToContext(FairMQMessagePtr&& payloadMessage,
164164
}
165165

166166
void
167-
DataAllocator::adopt(const OutputSpec &spec, TObject*ptr) {
167+
DataAllocator::adopt(const Output &spec, TObject*ptr) {
168168
std::unique_ptr<TObject> payload(ptr);
169169
std::string channel = matchDataHeader(spec, mRootContext->timeslice());
170-
auto header = headerMessageFromSpec(spec, channel, o2::header::gSerializationMethodROOT);
170+
auto header = headerMessageFromOutput(spec, channel, o2::header::gSerializationMethodROOT);
171171
mRootContext->addObject(std::move(header), std::move(payload), channel);
172172
assert(payload.get() == nullptr);
173173
}

Framework/Core/src/DispatcherDPL.cxx

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ void DispatcherDPL::processCallback(ProcessingContext& ctx, BernoulliGenerator&
3939
if (bernoulliGenerator.drawLots()) {
4040
for (auto& input : inputs) {
4141

42-
OutputSpec outputSpec = createDispatcherOutputSpec(*input.spec);
42+
Output output = createDispatcherOutput(*input.spec);
4343

4444
const auto* inputHeader = header::get<header::DataHeader*>(input.header);
4545

@@ -48,10 +48,10 @@ void DispatcherDPL::processCallback(ProcessingContext& ctx, BernoulliGenerator&
4848
<< "', description '" << inputHeader->dataDescription.str
4949
<< "' has gSerializationMethodInvalid.";
5050
} else*/ if (inputHeader->payloadSerializationMethod == header::gSerializationMethodROOT) {
51-
ctx.outputs().adopt(outputSpec, DataRefUtils::as<TObject>(input).release());
51+
ctx.outputs().adopt(output, DataRefUtils::as<TObject>(input).release());
5252
} else { // POD
5353
// todo: use API for that when it is available
54-
ctx.outputs().adoptChunk(outputSpec, const_cast<char*>(input.payload), inputHeader->payloadSize,
54+
ctx.outputs().adoptChunk(output, const_cast<char*>(input.payload), inputHeader->payloadSize,
5555
&header::Stack::freefn, nullptr);
5656
}
5757

@@ -72,12 +72,18 @@ void DispatcherDPL::addSource(const DataProcessorSpec& externalDataProcessor, co
7272
};
7373

7474
mDataProcessorSpec.inputs.push_back(newInput);
75-
OutputSpec newOutput = createDispatcherOutputSpec(newInput);
75+
Output newOutput = createDispatcherOutput(newInput);
76+
OutputSpec spec {
77+
newOutput.origin,
78+
newOutput.description,
79+
newOutput.subSpec,
80+
newOutput.lifetime
81+
};
7682
if (mCfg.enableParallelDispatchers ||
77-
std::find(mDataProcessorSpec.outputs.begin(), mDataProcessorSpec.outputs.end(), newOutput) ==
83+
std::find(mDataProcessorSpec.outputs.begin(), mDataProcessorSpec.outputs.end(), spec) ==
7884
mDataProcessorSpec.outputs.end()) {
7985

80-
mDataProcessorSpec.outputs.push_back(newOutput);
86+
mDataProcessorSpec.outputs.push_back(spec);
8187
}
8288

8389
if (mCfg.enableTimePipeliningDispatchers &&

Framework/Core/test/test_DataAllocator.cxx

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ DataProcessorSpec getTimeoutSpec()
3434
// a timer process to terminate the workflow after a timeout
3535
auto processingFct = [](ProcessingContext& pc) {
3636
static int counter = 0;
37-
pc.outputs().snapshot(OutputSpec{ "TST", "TIMER", 0, Lifetime::Timeframe }, counter);
37+
pc.outputs().snapshot(Output{ "TST", "TIMER", 0, Lifetime::Timeframe }, counter);
3838

3939
sleep(1);
4040
if (counter++ > 10) {
@@ -58,22 +58,22 @@ DataProcessorSpec getSourceSpec()
5858
std::vector<o2::test::Polymorphic> c{ { 0xaffe }, { 0xd00f } };
5959
// class TriviallyCopyable is both messageable and has a dictionary, the default
6060
// picked by the framework is no serialization
61-
pc.outputs().snapshot(OutputSpec{ "TST", "MESSAGEABLE", 0, Lifetime::Timeframe }, a);
62-
pc.outputs().snapshot(OutputSpec{ "TST", "MSGBLEROOTSRLZ", 0, Lifetime::Timeframe },
61+
pc.outputs().snapshot(Output{ "TST", "MESSAGEABLE", 0, Lifetime::Timeframe }, a);
62+
pc.outputs().snapshot(Output{ "TST", "MSGBLEROOTSRLZ", 0, Lifetime::Timeframe },
6363
o2::framework::ROOTSerialized<decltype(a)>(a));
6464
// class Polymorphic is not messageable, so the serialization type is deduced
6565
// from the fact that the type has a dictionary and can be ROOT-serialized.
66-
pc.outputs().snapshot(OutputSpec{ "TST", "ROOTNONTOBJECT", 0, Lifetime::Timeframe }, b);
66+
pc.outputs().snapshot(Output{ "TST", "ROOTNONTOBJECT", 0, Lifetime::Timeframe }, b);
6767
// vector of ROOT serializable class
68-
pc.outputs().snapshot(OutputSpec{ "TST", "ROOTVECTOR", 0, Lifetime::Timeframe }, c);
68+
pc.outputs().snapshot(Output{ "TST", "ROOTVECTOR", 0, Lifetime::Timeframe }, c);
6969
// likewise, passed anonymously with char type and class name
7070
o2::framework::ROOTSerialized<char, const char> d(*((char*)&c), "vector<o2::test::Polymorphic>");
71-
pc.outputs().snapshot(OutputSpec{ "TST", "ROOTSERLZDVEC", 0, Lifetime::Timeframe }, d);
71+
pc.outputs().snapshot(Output{ "TST", "ROOTSERLZDVEC", 0, Lifetime::Timeframe }, d);
7272
// vector of ROOT serializable class wrapped with TClass info as hint
7373
auto* cl = TClass::GetClass(typeid(decltype(c)));
7474
ASSERT_ERROR(cl != nullptr);
7575
o2::framework::ROOTSerialized<char, TClass> e(*((char*)&c), cl);
76-
pc.outputs().snapshot(OutputSpec{ "TST", "ROOTSERLZDVEC2", 0, Lifetime::Timeframe }, e);
76+
pc.outputs().snapshot(Output{ "TST", "ROOTSERLZDVEC2", 0, Lifetime::Timeframe }, e);
7777
};
7878

7979
return DataProcessorSpec{ "source", // name of the processor

0 commit comments

Comments
 (0)