Skip to content

Commit f5e9c9d

Browse files
committed
Allow referencing outputs by label
This should reduce the verbosity of the output API by allowing assigning a label to the OutputSpec and use such a label to refer to it.
1 parent b17f721 commit f5e9c9d

7 files changed

Lines changed: 138 additions & 33 deletions

File tree

Framework/Core/include/Framework/DataAllocator.h

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212

1313
#include <fairmq/FairMQDevice.h>
1414
#include "Headers/DataHeader.h"
15-
#include "Framework/OutputRoute.h"
1615
#include "Framework/Output.h"
16+
#include "Framework/OutputRef.h"
17+
#include "Framework/OutputRoute.h"
1718
#include "Framework/DataChunk.h"
1819
#include "Framework/MessageContext.h"
1920
#include "Framework/RootObjectContext.h"
@@ -29,6 +30,7 @@
2930
#include <utility>
3031
#include <type_traits>
3132
#include <gsl/span>
33+
#include <utility>
3234

3335
#include <TClass.h>
3436

@@ -42,7 +44,7 @@ namespace framework {
4244
class DataAllocator
4345
{
4446
public:
45-
using AllowedOutputsMap = std::vector<OutputRoute>;
47+
using AllowedOutputRoutes = std::vector<OutputRoute>;
4648
using DataHeader = o2::header::DataHeader;
4749
using DataOrigin = o2::header::DataOrigin;
4850
using DataDescription = o2::header::DataDescription;
@@ -51,7 +53,7 @@ class DataAllocator
5153
DataAllocator(FairMQDevice *device,
5254
MessageContext *context,
5355
RootObjectContext *rootContext,
54-
const AllowedOutputsMap &outputs);
56+
const AllowedOutputRoutes &routes);
5557

5658
DataChunk newChunk(const Output&, size_t);
5759
DataChunk adoptChunk(const Output&, char *, size_t, fairmq_free_fn*, void *);
@@ -313,20 +315,43 @@ class DataAllocator
313315
"pointer to data type not supported by API. Please pass object by reference");
314316
}
315317

318+
template <typename T, typename... Args>
319+
auto make(OutputRef const& ref, Args&&... args)
320+
{
321+
return make<T>(getOutputByBind(ref), std::forward<Args>(args)...);
322+
}
316323
private:
324+
FairMQDevice *mDevice;
325+
AllowedOutputRoutes mAllowedOutputRoutes;
326+
MessageContext *mContext;
327+
RootObjectContext *mRootContext;
328+
317329
std::string matchDataHeader(const Output &spec, size_t timeframeId);
318330
FairMQMessagePtr headerMessageFromOutput(Output const &spec,
319331
std::string const &channel,
320332
o2::header::SerializationMethod serializationMethod);
321333

334+
Output getOutputByBind(OutputRef const &ref)
335+
{
336+
for (size_t ri = 0, re = mAllowedOutputRoutes.size(); ri != re; ++ri) {
337+
if (mAllowedOutputRoutes[ri].matcher.binding.value == ref.label) {
338+
auto spec = mAllowedOutputRoutes[ri].matcher;
339+
return Output{
340+
spec.origin,
341+
spec.description,
342+
ref.subSpec,
343+
spec.lifetime
344+
};
345+
}
346+
}
347+
throw std::runtime_error("Unable to find OutputSpec with label " + ref.label);
348+
assert(false);
349+
}
350+
322351
void addPartToContext(FairMQMessagePtr&& payload,
323352
const Output &spec,
324353
o2::header::SerializationMethod serializationMethod);
325354

326-
FairMQDevice *mDevice;
327-
AllowedOutputsMap mAllowedOutputs;
328-
MessageContext *mContext;
329-
RootObjectContext *mRootContext;
330355
};
331356

332357
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#ifndef FRAMEWORK_OUTPUTREF_H
11+
#define FRAMEWORK_OUTPUTREF_H
12+
13+
#include "Headers/DataHeader.h"
14+
15+
#include <string>
16+
17+
namespace o2
18+
{
19+
namespace framework
20+
{
21+
22+
/// A reference to an output spec
23+
struct OutputRef {
24+
std::string label;
25+
header::DataHeader::SubSpecificationType subSpec;
26+
};
27+
28+
} // namespace framework
29+
} // namespace o2
30+
#endif

Framework/Core/include/Framework/OutputSpec.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,64 @@
1616
namespace o2 {
1717
namespace framework {
1818

19+
struct OutputLabel {
20+
std::string value;
21+
};
22+
1923
/// A selector for some kind of data being processed, either in
2024
/// input or in output. This can be used, for example to match
2125
/// specific payloads in a timeframe.
2226
struct OutputSpec {
27+
OutputLabel binding;
2328
header::DataOrigin origin;
2429
header::DataDescription description;
2530
header::DataHeader::SubSpecificationType subSpec = 0;
2631
enum Lifetime lifetime = Lifetime::Timeframe;
2732

33+
OutputSpec(OutputLabel const&inBinding,
34+
header::DataOrigin inOrigin,
35+
header::DataDescription inDescription,
36+
header::DataHeader::SubSpecificationType inSubSpec,
37+
enum Lifetime inLifetime = Lifetime::Timeframe)
38+
: binding{inBinding},
39+
origin{inOrigin},
40+
description{inDescription},
41+
subSpec{inSubSpec},
42+
lifetime{inLifetime}
43+
{}
44+
45+
OutputSpec(header::DataOrigin inOrigin,
46+
header::DataDescription inDescription,
47+
header::DataHeader::SubSpecificationType inSubSpec,
48+
enum Lifetime inLifetime = Lifetime::Timeframe)
49+
: binding{OutputLabel{""}},
50+
origin{inOrigin},
51+
description{inDescription},
52+
subSpec{inSubSpec},
53+
lifetime{inLifetime}
54+
{}
55+
56+
OutputSpec(OutputLabel const &inBinding,
57+
header::DataOrigin inOrigin,
58+
header::DataDescription inDescription,
59+
enum Lifetime inLifetime = Lifetime::Timeframe)
60+
: binding{inBinding},
61+
origin{inOrigin},
62+
description{inDescription},
63+
subSpec{0},
64+
lifetime{inLifetime}
65+
{}
66+
67+
OutputSpec(header::DataOrigin inOrigin,
68+
header::DataDescription inDescription,
69+
enum Lifetime inLifetime = Lifetime::Timeframe)
70+
: binding{OutputLabel{""}},
71+
origin{inOrigin},
72+
description{inDescription},
73+
subSpec{0},
74+
lifetime{inLifetime}
75+
{}
76+
2877
bool operator==(const OutputSpec& that)
2978
{
3079
return origin == that.origin && description == that.description && subSpec == that.subSpec &&

Framework/Core/src/DataAllocator.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ using DataProcessingHeader = o2::framework::DataProcessingHeader;
2424
DataAllocator::DataAllocator(FairMQDevice *device,
2525
MessageContext *context,
2626
RootObjectContext *rootContext,
27-
const AllowedOutputsMap &outputs)
27+
const AllowedOutputRoutes &routes)
2828
: mDevice{device},
29-
mAllowedOutputs{outputs},
29+
mAllowedOutputRoutes{routes},
3030
mContext{context},
3131
mRootContext{rootContext}
3232
{
@@ -35,7 +35,7 @@ DataAllocator::DataAllocator(FairMQDevice *device,
3535
std::string
3636
DataAllocator::matchDataHeader(const Output& spec, size_t timeslice) {
3737
// FIXME: we should take timeframeId into account as well.
38-
for (auto &output : mAllowedOutputs) {
38+
for (auto &output : mAllowedOutputRoutes) {
3939
if (DataSpecUtils::match(output.matcher, spec.origin, spec.description, spec.subSpec)
4040
&& ((timeslice % output.maxTimeslices) == output.timeslice)) {
4141
return output.channel;

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,12 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
187187
assert(edge.outputGlobalIndex < outputsMatchers.size());
188188

189189
if (edge.isForward == false) {
190-
OutputRoute route;
191-
route.matcher = outputsMatchers[edge.outputGlobalIndex];
192-
route.timeslice = edge.timeIndex;
193-
route.maxTimeslices = consumer.maxInputTimeslices;
194-
route.channel = channel.name;
190+
OutputRoute route{
191+
edge.timeIndex,
192+
consumer.maxInputTimeslices,
193+
outputsMatchers[edge.outputGlobalIndex],
194+
channel.name
195+
};
195196
device.outputs.emplace_back(route);
196197
} else {
197198
ForwardRoute route;

Framework/TestWorkflows/src/o2DiamondWorkflow.cxx

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111

1212
using namespace o2::framework;
1313

14-
AlgorithmSpec simplePipe(o2::header::DataDescription what) {
14+
AlgorithmSpec simplePipe(std::string const &what) {
1515
return AlgorithmSpec{ [what](ProcessingContext& ctx) {
16-
auto bData = ctx.outputs().make<int>(Output{ "TST", what, 0 }, 1);
16+
auto bData = ctx.outputs().make<int>(OutputRef{what}, 1);
1717
} };
1818
}
1919

@@ -24,28 +24,28 @@ void defineDataProcessing(WorkflowSpec &specs) {
2424
"A",
2525
Inputs{},
2626
{
27-
OutputSpec{"TST", "A1"},
28-
OutputSpec{"TST", "A2"}
27+
OutputSpec{{"a1"}, "TST", "A1"},
28+
OutputSpec{{"a2"}, "TST", "A2"}
2929
},
3030
AlgorithmSpec{
3131
[](ProcessingContext &ctx) {
3232
sleep(1);
33-
auto aData = ctx.outputs().make<int>(Output{ "TST", "A1", 0 }, 1);
34-
auto bData = ctx.outputs().make<int>(Output{ "TST", "A2", 0 }, 1);
33+
auto aData = ctx.outputs().make<int>(OutputRef{ "a1" }, 1);
34+
auto bData = ctx.outputs().make<int>(OutputRef{ "a2" }, 1);
3535
}
3636
}
3737
},
3838
{
3939
"B",
4040
{InputSpec{"x", "TST", "A1"}},
41-
{OutputSpec{"TST", "B1"}},
42-
simplePipe(o2::header::DataDescription{"B1"})
41+
{OutputSpec{{"b1"}, "TST", "B1"}},
42+
simplePipe("b1")
4343
},
4444
{
4545
"C",
4646
Inputs{InputSpec{"x", "TST", "A2"}},
47-
Outputs{OutputSpec{"TST", "C1"}},
48-
simplePipe(o2::header::DataDescription{"C1"})
47+
Outputs{OutputSpec{{"c1"}, "TST", "C1"}},
48+
simplePipe("c1")
4949
},
5050
{
5151
"D",

Framework/TestWorkflows/src/o2DummyWorkflow.cxx

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ void defineDataProcessing(std::vector<DataProcessorSpec> &specs) {
3535
"reader",
3636
Inputs{},
3737
{
38-
OutputSpec{"TPC", "CLUSTERS"},
39-
OutputSpec{"ITS", "CLUSTERS"}
38+
OutputSpec{{"tpc"}, "TPC", "CLUSTERS"},
39+
OutputSpec{{"its"}, "ITS", "CLUSTERS"}
4040
},
4141
AlgorithmSpec{
4242
[](ProcessingContext &ctx) {
4343
sleep(1);
4444
// Creates a new message of size 1000 which
4545
// has "TPC" as data origin and "CLUSTERS" as data description.
46-
auto tpcClusters = ctx.outputs().make<FakeCluster>(Output{ "TPC", "CLUSTERS", 0 }, 1000);
46+
auto tpcClusters = ctx.outputs().make<FakeCluster>(OutputRef{"tpc"}, 1000);
4747
int i = 0;
4848

4949
for (auto &cluster : tpcClusters) {
@@ -55,7 +55,7 @@ void defineDataProcessing(std::vector<DataProcessorSpec> &specs) {
5555
i++;
5656
}
5757

58-
auto itsClusters = ctx.outputs().make<FakeCluster>(Output{ "ITS", "CLUSTERS", 0 }, 1000);
58+
auto itsClusters = ctx.outputs().make<FakeCluster>(OutputRef{"its"}, 1000);
5959
i = 0;
6060
for (auto &cluster : itsClusters) {
6161
assert(i < 1000);
@@ -73,9 +73,9 @@ void defineDataProcessing(std::vector<DataProcessorSpec> &specs) {
7373
DataProcessorSpec tpcClusterSummary{
7474
"tpc-cluster-summary",
7575
{ InputSpec{ "clusters", "TPC", "CLUSTERS"} },
76-
{ OutputSpec{ "TPC", "SUMMARY"} },
76+
{ OutputSpec{ {"summary"}, "TPC", "SUMMARY"} },
7777
AlgorithmSpec{ [](ProcessingContext& ctx) {
78-
auto tpcSummary = ctx.outputs().make<Summary>(Output{ "TPC", "SUMMARY", 0 }, 1);
78+
auto tpcSummary = ctx.outputs().make<Summary>(OutputRef{"summary"}, 1);
7979
tpcSummary.at(0).inputCount = ctx.inputs().size();
8080
} },
8181
{ ConfigParamSpec{ "some-cut", VariantType::Float, 1.0f, { "some cut" } } },
@@ -86,10 +86,10 @@ void defineDataProcessing(std::vector<DataProcessorSpec> &specs) {
8686
"its-cluster-summary",
8787
{ InputSpec{ "clusters", "ITS", "CLUSTERS" } },
8888
{
89-
OutputSpec{ "ITS", "SUMMARY" },
89+
OutputSpec{ {"summary"}, "ITS", "SUMMARY" },
9090
},
9191
AlgorithmSpec{ [](ProcessingContext& ctx) {
92-
auto itsSummary = ctx.outputs().make<Summary>(Output{ "ITS", "SUMMARY", 0 }, 1);
92+
auto itsSummary = ctx.outputs().make<Summary>(OutputRef{"summary"}, 1);
9393
itsSummary.at(0).inputCount = ctx.inputs().size();
9494
} },
9595
{ ConfigParamSpec{ "some-cut", VariantType::Float, 1.0f, { "some cut" } } },

0 commit comments

Comments
 (0)