@@ -33,13 +33,13 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
3333 //
3434 DataProcessorSpec{
3535 " rdataframe_producer" , //
36- Inputs{}, //
36+ Inputs{}, //
3737 {
3838 OutputSpec{ { " xz" }, " TES" , " RFRAME" }, //
3939 },
40- AlgorithmSpec{ [](ProcessingContext& ctx ) {
40+ AlgorithmSpec{ adaptStateless ( [](DataAllocator& outputs ) {
4141 // We ask the framework for something which can build a Table
42- auto & out = ctx. outputs () .make <TableBuilder>(Output{ " TES" , " RFRAME" });
42+ auto & out = outputs.make <TableBuilder>(Output{ " TES" , " RFRAME" });
4343 // We use RDataFrame to create a few columns with 100 rows.
4444 // The final action is the one which allows the user to create the
4545 // output message.
@@ -53,48 +53,48 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
5353 .Define (" y" , " 2.f" )
5454 .Define (" z" , " x+y" );
5555 t.ForeachSlot (out.persist <float , float >({" x" , " z" }), {" x" , " z" });
56- } } //
57- }, //
56+ }) } //
57+ }, //
5858 DataProcessorSpec{
5959 " rdataframe_consumer" , //
6060 {
6161 InputSpec{ " xz" , " TES" , " RFRAME" }, //
6262 }, //
6363 Outputs{}, //
6464 AlgorithmSpec{
65- [](ProcessingContext& ctx) {
66- // / This gets a table handle from the message.
67- auto s = ctx.inputs ().get <TableConsumer>(" xz" );
65+ adaptStateless (
66+ [](InputRecord& inputs, ControlService& control) {
67+ // / This gets a table handle from the message.
68+ auto s = inputs.get <TableConsumer>(" xz" );
6869
69- // / From the handle, we construct the actual arrow table
70- // / which is then used as a source for the RDataFrame.
71- // / This is probably easy to change to a:
72- // /
73- // / auto rdf = ctx.inputs().get<RDataSource>("xz");
74- auto table = s->asArrowTable ();
75- if (table->num_rows () != 100 ) {
76- LOG (ERROR) << " Wrong number of entries for the arrow table" << table->num_rows ();
77- }
70+ // / From the handle, we construct the actual arrow table
71+ // / which is then used as a source for the RDataFrame.
72+ // / This is probably easy to change to a:
73+ // /
74+ // / auto rdf = ctx.inputs().get<RDataSource>("xz");
75+ auto table = s->asArrowTable ();
76+ if (table->num_rows () != 100 ) {
77+ LOG (ERROR) << " Wrong number of entries for the arrow table" << table->num_rows ();
78+ }
7879
79- if (table->num_columns () != 2 ) {
80- LOG (ERROR) << " Wrong number of columns for the arrow table" << table->num_columns ();
81- }
80+ if (table->num_columns () != 2 ) {
81+ LOG (ERROR) << " Wrong number of columns for the arrow table" << table->num_columns ();
82+ }
8283
83- auto source = std::make_unique<ROOT::RDF::RArrowDS>(s->asArrowTable (), std::vector<std::string>{});
84- ROOT::RDataFrame rdf (std::move (source));
84+ auto source = std::make_unique<ROOT::RDF::RArrowDS>(s->asArrowTable (), std::vector<std::string>{});
85+ ROOT::RDataFrame rdf (std::move (source));
8586
86- if (*rdf.Count () != 100 )
87- {
88- LOG (ERROR) << " Wrong number of entries for the DataFrame" << *rdf.Count ();
89- }
87+ if (*rdf.Count () != 100 ) {
88+ LOG (ERROR) << " Wrong number of entries for the DataFrame" << *rdf.Count ();
89+ }
9090
91- if (*rdf.Mean (" z" ) - 3 .f > 0 .1f ) {
92- LOG (ERROR) << " Wrong average for z" ;
93- }
91+ if (*rdf.Mean (" z" ) - 3 .f > 0 .1f ) {
92+ LOG (ERROR) << " Wrong average for z" ;
93+ }
9494
95- ctx. services (). get <ControlService>() .readyToQuit (true );
96- } //
97- } //
98- } //
95+ control .readyToQuit (true );
96+ }) //
97+ } //
98+ } //
9999 };
100100}
0 commit comments