Skip to content

Commit 05ab2a3

Browse files
committed
DPL: use the new adapt* helpers
Use the new helpers in a few more tests to reduce verbosity.
1 parent 66a1a58 commit 05ab2a3

2 files changed

Lines changed: 76 additions & 74 deletions

File tree

Framework/Core/test/test_SimpleRDataFrameProcessing.cxx

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
3333
//
3434
DataProcessorSpec{
3535
"rdataframe_producer", //
36-
Inputs{}, //
36+
Inputs{}, //
3737
{
3838
OutputSpec{ { "xz" }, "TES", "RFRAME" }, //
3939
},
40-
AlgorithmSpec{ [](ProcessingContext& ctx) {
40+
AlgorithmSpec{ adaptStateless([](DataAllocator& outputs) {
4141
// We ask the framework for something which can build a Table
42-
auto& out = ctx.outputs().make<TableBuilder>(Output{ "TES", "RFRAME" });
42+
auto& out = outputs.make<TableBuilder>(Output{ "TES", "RFRAME" });
4343
// We use RDataFrame to create a few columns with 100 rows.
4444
// The final action is the one which allows the user to create the
4545
// output message.
@@ -53,48 +53,48 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
5353
.Define("y", "2.f")
5454
.Define("z", "x+y");
5555
t.ForeachSlot(out.persist<float, float>({"x", "z"}), {"x", "z"});
56-
} } //
57-
}, //
56+
}) } //
57+
}, //
5858
DataProcessorSpec{
5959
"rdataframe_consumer", //
6060
{
6161
InputSpec{ "xz", "TES", "RFRAME" }, //
6262
}, //
6363
Outputs{}, //
6464
AlgorithmSpec{
65-
[](ProcessingContext& ctx) {
66-
/// This gets a table handle from the message.
67-
auto s = ctx.inputs().get<TableConsumer>("xz");
65+
adaptStateless(
66+
[](InputRecord& inputs, ControlService& control) {
67+
/// This gets a table handle from the message.
68+
auto s = inputs.get<TableConsumer>("xz");
6869

69-
/// From the handle, we construct the actual arrow table
70-
/// which is then used as a source for the RDataFrame.
71-
/// This is probably easy to change to a:
72-
///
73-
/// auto rdf = ctx.inputs().get<RDataSource>("xz");
74-
auto table = s->asArrowTable();
75-
if (table->num_rows() != 100) {
76-
LOG(ERROR) << "Wrong number of entries for the arrow table" << table->num_rows();
77-
}
70+
/// From the handle, we construct the actual arrow table
71+
/// which is then used as a source for the RDataFrame.
72+
/// This is probably easy to change to a:
73+
///
74+
/// auto rdf = ctx.inputs().get<RDataSource>("xz");
75+
auto table = s->asArrowTable();
76+
if (table->num_rows() != 100) {
77+
LOG(ERROR) << "Wrong number of entries for the arrow table" << table->num_rows();
78+
}
7879

79-
if (table->num_columns() != 2) {
80-
LOG(ERROR) << "Wrong number of columns for the arrow table" << table->num_columns();
81-
}
80+
if (table->num_columns() != 2) {
81+
LOG(ERROR) << "Wrong number of columns for the arrow table" << table->num_columns();
82+
}
8283

83-
auto source = std::make_unique<ROOT::RDF::RArrowDS>(s->asArrowTable(), std::vector<std::string>{});
84-
ROOT::RDataFrame rdf(std::move(source));
84+
auto source = std::make_unique<ROOT::RDF::RArrowDS>(s->asArrowTable(), std::vector<std::string>{});
85+
ROOT::RDataFrame rdf(std::move(source));
8586

86-
if (*rdf.Count() != 100)
87-
{
88-
LOG(ERROR) << "Wrong number of entries for the DataFrame" << *rdf.Count();
89-
}
87+
if (*rdf.Count() != 100) {
88+
LOG(ERROR) << "Wrong number of entries for the DataFrame" << *rdf.Count();
89+
}
9090

91-
if (*rdf.Mean("z") - 3.f > 0.1f) {
92-
LOG(ERROR) << "Wrong average for z";
93-
}
91+
if (*rdf.Mean("z") - 3.f > 0.1f) {
92+
LOG(ERROR) << "Wrong average for z";
93+
}
9494

95-
ctx.services().get<ControlService>().readyToQuit(true);
96-
} //
97-
} //
98-
} //
95+
control.readyToQuit(true);
96+
}) //
97+
} //
98+
} //
9999
};
100100
}

Framework/Core/test/test_SimpleStatefulProcessing01.cxx

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -32,52 +32,54 @@ WorkflowSpec defineDataProcessing(ConfigContext const&) {
3232
// particular case, but a Singleton or a captured new object would
3333
// work as well.
3434
AlgorithmSpec{
35-
[](InitContext& setup) {
36-
static int foo = 0;
37-
static int step = 0; // incremented in registered callbacks
38-
auto startcb = []() {
39-
++step;
40-
LOG(INFO) << "start " << step;
41-
};
42-
auto stopcb = []() {
43-
++step;
44-
LOG(INFO) << "stop " << step;
45-
};
46-
auto resetcb = []() {
47-
++step;
48-
LOG(INFO) << "reset " << step;
49-
};
50-
setup.services().get<CallbackService>().set(CallbackService::Id::Start, startcb);
51-
setup.services().get<CallbackService>().set(CallbackService::Id::Stop, stopcb);
52-
setup.services().get<CallbackService>().set(CallbackService::Id::Reset, resetcb);
53-
return [](ProcessingContext& ctx) {
54-
sleep(1);
55-
auto out = ctx.outputs().newChunk({ "TES", "STATEFUL", 0 }, sizeof(int));
56-
auto outI = reinterpret_cast<int*>(out.data);
57-
outI[0] = foo++;
58-
};
59-
} //
60-
} //
61-
}, //
35+
adaptStateful(
36+
[](CallbackService& callbacks) {
37+
static int foo = 0;
38+
static int step = 0; // incremented in registered callbacks
39+
auto startcb = []() {
40+
++step;
41+
LOG(INFO) << "start " << step;
42+
};
43+
auto stopcb = []() {
44+
++step;
45+
LOG(INFO) << "stop " << step;
46+
};
47+
auto resetcb = []() {
48+
++step;
49+
LOG(INFO) << "reset " << step;
50+
};
51+
callbacks.set(CallbackService::Id::Start, startcb);
52+
callbacks.set(CallbackService::Id::Stop, stopcb);
53+
callbacks.set(CallbackService::Id::Reset, resetcb);
54+
return adaptStateless([](DataAllocator& outputs) {
55+
sleep(1);
56+
auto out = outputs.newChunk({ "TES", "STATEFUL", 0 }, sizeof(int));
57+
auto outI = reinterpret_cast<int*>(out.data);
58+
outI[0] = foo++;
59+
});
60+
}) //
61+
} //
62+
}, //
6263
DataProcessorSpec{
6364
"consumer", //
6465
{ InputSpec{ "test", "TES", "STATEFUL", 0, Lifetime::Timeframe } }, //
6566
Outputs{}, //
6667
AlgorithmSpec{
67-
[](InitContext&) {
68-
static int expected = 0;
69-
return [](ProcessingContext& ctx) {
70-
const int* in = reinterpret_cast<const int*>(ctx.inputs().get("test").payload);
68+
adaptStateful(
69+
[]() {
70+
static int expected = 0;
71+
return adaptStateless([](InputRecord& inputs, ControlService& control) {
72+
const int* in = reinterpret_cast<const int*>(inputs.get("test").payload);
7173

72-
if (*in != expected++) {
73-
LOG(ERROR) << "Expecting " << expected << " found " << *in;
74-
} else {
75-
LOG(INFO) << "Everything OK for " << expected << std::endl;
76-
ctx.services().get<ControlService>().readyToQuit(true);
77-
}
78-
};
79-
} //
80-
} //
81-
} //
74+
if (*in != expected++) {
75+
LOG(ERROR) << "Expecting " << expected << " found " << *in;
76+
} else {
77+
LOG(INFO) << "Everything OK for " << expected << std::endl;
78+
control.readyToQuit(true);
79+
}
80+
});
81+
}) //
82+
} //
83+
} //
8284
};
8385
}

0 commit comments

Comments
 (0)