From dd68b9597c56f72a60a7d5586b8460905f8d4270 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse Date: Wed, 27 Feb 2019 14:32:52 +0100 Subject: [PATCH] DPL: add initial set of helpers for analysis This reduces quite a lot the verbosity of the analysis. --- Framework/Core/CMakeLists.txt | 1 + .../Core/include/Framework/AnalysisHelpers.h | 44 +++++++ Framework/Core/src/AnalysisHelpers.cxx | 45 +++++++ Framework/TestWorkflows/src/o2D0Analysis.cxx | 120 +++++++++--------- 4 files changed, 152 insertions(+), 58 deletions(-) create mode 100644 Framework/Core/include/Framework/AnalysisHelpers.h create mode 100644 Framework/Core/src/AnalysisHelpers.cxx diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index f61a51649136e..557bd45319a2d 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -21,6 +21,7 @@ endif() set(SRCS src/AODReaderHelpers.cxx + src/AnalysisHelpers.cxx src/BoostOptionsRetriever.cxx src/ConfigParamsHelper.cxx src/CompletionPolicy.cxx diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h new file mode 100644 index 0000000000000..8ce86b4996034 --- /dev/null +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -0,0 +1,44 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef o2_framework_AnalysisHelpers_H_DEFINED +#define o2_framework_AnalysisHelpers_H_DEFINED + +#include "Framework/AnalysisHelpers.h" + +#include +#include + +using namespace ROOT::RDF; + +namespace o2 +{ +namespace framework +{ +class TableConsumer; +} + +namespace analysis +{ + +/// Do a single loop on all the entries of the @a input table +ROOT::RDataFrame doSingleLoopOn(std::unique_ptr& input); + +/// Do a double loop on all the entries with the same value for the \a grouping +/// of the @a input table, where the entries for the outer index are prefixed +/// with `_` while the entries for the inner loop are prefixed with +/// `bar_`. +ROOT::RDataFrame doSelfCombinationsWith(std::unique_ptr& input, + std::string name = "p", + std::string grouping = "eventID"); + +} // namespace analysis +} // namespace o2 + +#endif // o2_framework_AnalysisHelpers_H_DEFINED diff --git a/Framework/Core/src/AnalysisHelpers.cxx b/Framework/Core/src/AnalysisHelpers.cxx new file mode 100644 index 0000000000000..109cb5e2c4dd2 --- /dev/null +++ b/Framework/Core/src/AnalysisHelpers.cxx @@ -0,0 +1,45 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/AnalysisHelpers.h" +#include "Framework/RCombinedDS.h" +#include "Framework/TableBuilder.h" +#include "Framework/TableConsumer.h" + +#include +#include + +using namespace ROOT::RDF; + +namespace o2 +{ +namespace analysis +{ + +ROOT::RDataFrame doSingleLoopOn(std::unique_ptr& input) +{ + auto flat = std::make_unique(input->asArrowTable(), std::vector{}); + ROOT::RDataFrame rdf(std::move(flat)); + return rdf; +} + +ROOT::RDataFrame doSelfCombinationsWith(std::unique_ptr& input, std::string name, std::string grouping) +{ + auto table = input->asArrowTable(); + using Index = RCombinedDSBlockJoinIndex; + auto left = std::make_unique(table, std::vector{}); + auto right = std::make_unique(table, std::vector{}); + auto combined = std::make_unique(std::move(left), std::move(right), std::move(std::make_unique(grouping, true, BlockCombinationRule::StrictlyUpper)), name + "_", name + "bar_"); + + ROOT::RDataFrame rdf(std::move(combined)); + return rdf; +} + +} // namespace analysis +} // namespace o2 diff --git a/Framework/TestWorkflows/src/o2D0Analysis.cxx b/Framework/TestWorkflows/src/o2D0Analysis.cxx index b5caed4216813..b40a6d460f27d 100644 --- a/Framework/TestWorkflows/src/o2D0Analysis.cxx +++ b/Framework/TestWorkflows/src/o2D0Analysis.cxx @@ -8,85 +8,89 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "Framework/runDataProcessing.h" -#include "Framework/RCombinedDS.h" -#include "Framework/TableBuilder.h" +#include "Framework/AnalysisHelpers.h" #include -#include +using namespace ROOT::RDF; using namespace o2::framework; // A dummy workflow which creates a few of the tables proposed by Ruben, // using ARROW WorkflowSpec defineDataProcessing(ConfigContext const& specs) { + // Workflow definition. A workflow can be one or more DataProcessors + // each implementing (part of) an analysis. Each DataProcessor has + // (at least) a name, some Inputs, some Outputs and they get arranged + // together accordingly. WorkflowSpec workflow{ - /// Minimal analysis example + // Multiple DataProcessor specs + // can be provided per workflow DataProcessorSpec{ + // The name of my analysis "d0-analysis", - { + Inputs{ // Dangling inputs of type AOD will be automatically picked up // by DPL and an extra reader device will be instanciated to - // read them. - InputSpec{ "DZeroFlagged", "AOD", "DZEROFLAGGED" }, + // read them. In this particular case the signature + // AOD/DZEROFLAGGED is associated to Gianmichele's + // D0 candidates schema. The first string is just a label + // so that the algorithm can be in principle be reused for different + // kind of candidates. + InputSpec{ "candidates", "AOD", "DZEROFLAGGED" }, }, - {}, + // No outputs for the time being. + Outputs{}, AlgorithmSpec{ - [](InitContext& setup) { - return [](ProcessingContext& ctx) { - auto s = ctx.inputs().get("DZeroFlagged"); - /// From the handle, we construct the actual arrow table - /// which is then used as a source for the RDataFrame. - /// This is probably easy to change to a: - /// - /// auto rdf = ctx.inputs().get("xz"); - auto table = s->asArrowTable(); - using namespace ROOT::RDF; + // This is the actual per "message" loop, where a message could + // be the contents of a file or part of it. + // FIXME: Too much boilerplate. + adaptStateless([](InputRecord& inputs) { + auto input = inputs.get("candidates"); - TFile f("result.root", "RECREATE"); + // This does a single loop on all the candidates in the input message + // using a simple mask on the cand_type_ML column and does + // a simple 1D histogram of the filtered entries. + auto candidates = o2::analysis::doSingleLoopOn(input); - auto flatD0 = std::make_unique(table, std::vector{}); - ROOT::RDataFrame rdf1(std::move(flatD0)); - /// A single loop to do an invariant mass plot, where we use the preselected - /// candidates - auto candFilter = [](int x) -> bool { return x & 0x1; }; - auto h1 = rdf1.Filter(candFilter, { "cand_type_ML" }).Histo1D("inv_mass_ML"); - h1->SetName("InvariantMass"); - h1->Write(); + auto h1 = candidates.Filter("(bool)(cand_type_ML & 0x1)").Histo1D("inv_mass_ML"); - /// Double loops on all supported loop types. - using Index = RCombinedDSBlockJoinIndex; - auto types = { - BlockCombinationRule::Anti, - BlockCombinationRule::Full, - BlockCombinationRule::Diagonal, - BlockCombinationRule::StrictlyUpper, - BlockCombinationRule::Upper, - }; - // A few helpers - auto bothCandFilter = [](int x, int y) -> bool { return x & 0x1 && y & 0x1; }; - auto delta = [](float x, float y) { return x - y; }; + // A lambda function subtracting two quantities. This defines + // a function "delta" which can be invoked with + // + // delta(1,2) + // + // and will return 1 - 2. + auto delta = [](float x, float y) { return x - y; }; - for (auto combinationType : types) { - auto d0 = std::make_unique(table, std::vector{}); - auto d0bar = std::make_unique(table, std::vector{}); - auto d0d0bar = std::make_unique(std::move(d0), std::move(d0bar), std::move(std::make_unique("cand_evtID_ML", true, combinationType)), "d0_", "d0bar_"); + // This does all the combinations for all the candidates which have + // the same value for cand_evtID_ML (the Event ID). + // d0_ is the prefix assigned to the outer variable of the double loop. + // d0bar_ is the prefix assigned to the inner variable of the double loop. + // + // The lines below will: + // * Filter the combinations according to some mask + // * Define a column delta_phi with the difference in phi between d0 and d0bar phi + // * Define a column delta_eta with the difference in phi between d0 and d0bar eta + // * Do two histograms with delta_phi, delta_eta + auto combinations = o2::analysis::doSelfCombinationsWith(input, "d0", "cand_evtID_ML"); + auto deltas = combinations.Filter("d0_cand_type_ML & 0x1 && d0bar_cand_type_ML & 0x1") + .Define("delta_phi", delta, { "d0_phi_cand_ML", "d0bar_phi_cand_ML" }) + .Define("delta_eta", delta, { "d0_eta_cand_ML", "d0bar_eta_cand_ML" }); + auto h2 = deltas.Histo1D("delta_phi"); + auto h3 = deltas.Histo1D("delta_eta"); - ROOT::RDataFrame rdf2(std::move(d0d0bar)); - auto combinatorics = rdf2.Filter(bothCandFilter, { "d0_cand_type_ML", "d0bar_cand_type_ML" }) - .Define("delta_phi", delta, { "d0_phi_cand_ML", "d0bar_phi_cand_ML" }) - .Define("delta_eta", delta, { "d0_eta_cand_ML", "d0bar_eta_cand_ML" }); - auto h2 = combinatorics.Histo1D("delta_phi"); - auto h3 = combinatorics.Histo1D("delta_eta"); - - std::string rule = RCombinedDSIndexHelpers::combinationRuleAsString(combinationType); - h2->SetName(("DeltaPhi/" + rule).c_str()); - h2->Write(); - h3->SetName(("DeltaEta/" + rule).c_str()); - h3->Write(); - } - }; - } } } + // FIXME: For the moment we hardcode saving the histograms. + // In reality it should send the results as outputs to a downstream merger + // process which merges them as wished. + TFile f("result.root", "RECREATE"); + h1->SetName("InvariantMass"); + h1->Write(); + h2->SetName("DeltaPhi"); + h2->Write(); + h3->SetName("DeltaEta"); + h3->Write(); + }) } } }; return workflow; }