From 0ce462fd891ba1d093c1957932b5d2ff07056801 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Wed, 8 Jul 2020 16:16:04 +0200 Subject: [PATCH] DPL Analysis: Spawn<> template for user-defined extended tables --- Analysis/Tutorials/src/filters.cxx | 23 +++- Framework/Core/include/Framework/ASoA.h | 92 +++++++++++---- .../Core/include/Framework/AnalysisTask.h | 107 +++++++++++++++++- .../Core/include/Framework/Expressions.h | 19 ++++ Framework/Core/src/AODReaderHelpers.cxx | 46 +------- Framework/Core/src/ExpressionHelpers.h | 19 ---- 6 files changed, 215 insertions(+), 91 deletions(-) diff --git a/Analysis/Tutorials/src/filters.cxx b/Analysis/Tutorials/src/filters.cxx index 186373ef34a6c..fb7762708a1e7 100644 --- a/Analysis/Tutorials/src/filters.cxx +++ b/Analysis/Tutorials/src/filters.cxx @@ -16,9 +16,11 @@ namespace o2::aod namespace etaphi { DECLARE_SOA_COLUMN(NPhi, nphi, float); +DECLARE_SOA_EXPRESSION_COLUMN(CosPhi, cosphi, float, ncos(aod::etaphi::nphi)); } // namespace etaphi -DECLARE_SOA_TABLE(TPhi, "AOD", "ETAPHI", +DECLARE_SOA_TABLE(TPhi, "AOD", "TPHI", etaphi::NPhi); +DECLARE_SOA_EXTENDED_TABLE_USER(EPhi, TPhi, "EPHI", aod::etaphi::CosPhi); } // namespace o2::aod using namespace o2; @@ -30,17 +32,18 @@ using namespace o2::framework::expressions; // FIXME: this should really inherit from AnalysisTask but // we need GCC 7.4+ for that struct ATask { - Produces etaphi; - + Produces tphi; void process(aod::Tracks const& tracks) { for (auto& track : tracks) { - etaphi(track.phi()); + tphi(track.phi()); } } }; struct BTask { + Spawns ephi; + float fPI = static_cast(M_PI); float ptlow = 0.5f; float ptup = 2.0f; @@ -64,9 +67,19 @@ struct BTask { } }; +struct CTask { + void process(aod::Collision const&, soa::Join const& tracks) + { + for (auto& track : tracks) { + LOGF(INFO, "%.3f == %.3f", track.cosphi(), std::cos(track.phi())); + } + } +}; + WorkflowSpec defineDataProcessing(ConfigContext const&) { return WorkflowSpec{ adaptAnalysisTask("produce-normalizedphi"), - adaptAnalysisTask("consume-normalizedphi")}; + adaptAnalysisTask("consume-normalizedphi"), + adaptAnalysisTask("consume-spawned")}; } diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index 9d602b2ba49fa..604bc73f7cc28 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -1232,28 +1232,34 @@ using ConcatBase = decltype(concat(std::declval(), std::declval())); #define DECLARE_SOA_TABLE(_Name_, _Origin_, _Description_, ...) \ DECLARE_SOA_TABLE_FULL(_Name_, #_Name_, _Origin_, _Description_, __VA_ARGS__); -#define DECLARE_SOA_EXTENDED_TABLE(_Name_, _Table_, _Description_, ...) \ - using _Name_ = o2::soa::JoinBase<_Table_, o2::soa::Table<__VA_ARGS__>>; \ - \ - struct _Name_##Metadata : o2::soa::TableMetadata<_Name_##Metadata> { \ - using table_t = _Name_; \ - using base_table_t = _Table_; \ - using expression_pack_t = framework::pack<__VA_ARGS__>; \ - static constexpr char const* mLabel = #_Name_; \ - static constexpr char const mOrigin[4] = "DYN"; \ - static constexpr char const mDescription[16] = _Description_; \ - }; \ - \ - template <> \ - struct MetadataTrait<_Name_> { \ - using metadata = _Name_##Metadata; \ - }; \ - \ - template <> \ - struct MetadataTrait<_Name_::unfiltered_iterator> { \ - using metadata = _Name_##Metadata; \ +#define DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, _Table_, _Origin_, _Description_, ...) \ + using _Name_ = o2::soa::JoinBase<_Table_, o2::soa::Table<__VA_ARGS__>>; \ + \ + struct _Name_##Metadata : o2::soa::TableMetadata<_Name_##Metadata> { \ + using table_t = _Name_; \ + using base_table_t = _Table_; \ + using expression_pack_t = framework::pack<__VA_ARGS__>; \ + static constexpr char const* mLabel = #_Name_; \ + static constexpr char const mOrigin[4] = _Origin_; \ + static constexpr char const mDescription[16] = _Description_; \ + }; \ + \ + template <> \ + struct MetadataTrait<_Name_> { \ + using metadata = _Name_##Metadata; \ + }; \ + \ + template <> \ + struct MetadataTrait<_Name_::unfiltered_iterator> { \ + using metadata = _Name_##Metadata; \ }; +#define DECLARE_SOA_EXTENDED_TABLE(_Name_, _Table_, _Description_, ...) \ + DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, _Table_, "DYN", _Description_, __VA_ARGS__) + +#define DECLARE_SOA_EXTENDED_TABLE_USER(_Name_, _Table_, _Description_, ...) \ + DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, _Table_, "AOD", _Description_, __VA_ARGS__) + namespace o2::soa { template @@ -1443,6 +1449,52 @@ auto filter(T&& t, framework::expressions::Filter const& expr) return Filtered(t.asArrowTable(), expr); } +/// Expression-based column generator to materialize columns +template +auto spawner(framework::pack columns, arrow::Table* atable) +{ + arrow::TableBatchReader reader(*atable); + std::shared_ptr batch; + arrow::ArrayVector v; + std::vector chunks(sizeof...(C)); + + auto projectors = framework::expressions::createProjectors(columns, atable->schema()); + while (true) { + auto s = reader.ReadNext(&batch); + if (!s.ok()) { + throw std::runtime_error(fmt::format("Cannot read batches from table {}", s.ToString())); + } + if (batch == nullptr) { + break; + } + s = projectors->Evaluate(*batch, arrow::default_memory_pool(), &v); + if (!s.ok()) { + throw std::runtime_error(fmt::format("Cannot apply projector {}", s.ToString())); + } + for (auto i = 0u; i < sizeof...(C); ++i) { + chunks[i].emplace_back(v.at(i)); + } + } + std::vector> arrays(sizeof...(C)); + for (auto i = 0u; i < sizeof...(C); ++i) { + arrays[i] = std::make_shared(chunks[i]); + } + + auto extra_schema = o2::soa::createSchemaFromColumns(columns); + std::vector> new_fields; + std::vector> new_columns; + auto original_columns = atable->columns(); + auto original_fields = atable->schema()->fields(); + std::copy(original_fields.begin(), original_fields.end(), std::back_inserter(new_fields)); + std::copy(original_columns.begin(), original_columns.end(), std::back_inserter(new_columns)); + for (auto i = 0u; i < framework::pack_size(columns); ++i) { + new_columns.push_back(arrays[i]); + new_fields.emplace_back(extra_schema->field(i)); + } + auto new_schema = std::make_shared(new_fields); + return arrow::Table::Make(new_schema, new_columns); +} + } // namespace o2::soa #endif // O2_FRAMEWORK_ASOA_H_ diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index 169a677aecd02..cd3056424216c 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -121,8 +121,8 @@ struct WritingCursor> { int64_t mCount = -1; }; -/// This helper class allow you to declare things which will be crated by a -/// give analysis task. Notice how the actual cursor is implemented by the +/// This helper class allows you to declare things which will be created by a +/// given analysis task. Notice how the actual cursor is implemented by the /// means of the WritingCursor helper class, from which produces actually /// derives. template @@ -142,6 +142,58 @@ struct Produces> : WritingCursor +struct Spawns { + using metadata = typename aod::MetadataTrait::metadata; + using base_table_t = typename metadata::base_table_t; + using expression_pack_t = typename metadata::expression_pack_t; + using base_metadata = typename aod::MetadataTrait::metadata; + + constexpr expression_pack_t pack() + { + return expression_pack_t{}; + } + + InputSpec const base_spec() + { + return InputSpec{ + base_metadata::tableLabel(), + header::DataOrigin{base_metadata::origin()}, + header::DataDescription{base_metadata::description()}}; + } + + constexpr const char* base_label() + { + return base_metadata::tableLabel(); + } + + OutputSpec const spec() const + { + return OutputSpec{OutputLabel{metadata::tableLabel()}, metadata::origin(), metadata::description()}; + } + + OutputRef ref() const + { + return OutputRef{metadata::tableLabel(), 0}; + } + T* operator->() + { + return table.get(); + } + T& operator*() + { + return *table.get(); + } + + auto asArrowTable() + { + return table->asArrowTable(); + } + std::shared_ptr table = nullptr; +}; + /// This helper class allows you to declare things which will be created by a /// given analysis task. Currently wrapped objects are limited to be TNamed /// descendants. Objects will be written to a ROOT file at the end of the @@ -886,6 +938,34 @@ struct OutputManager> { } }; +template +struct OutputManager> { + static bool appendOutput(std::vector& outputs, Spawns& what, uint32_t) + { + outputs.emplace_back(what.spec()); + return true; + } + + static bool prepare(ProcessingContext& pc, Spawns& what) + { + auto original_table = pc.inputs().get(what.base_label())->asArrowTable(); + what.table = std::make_shared(o2::soa::spawner(what.pack(), original_table.get())); + return true; + } + + static bool finalize(ProcessingContext&, Spawns&) + { + return true; + } + + static bool postRun(EndOfStreamContext& eosc, Spawns& what) + { + using metadata = typename std::decay_t::metadata; + eosc.outputs().adopt(Output{metadata::origin(), metadata::description()}, what.asArrowTable()); + return true; + } +}; + template class has_instance { @@ -967,6 +1047,24 @@ struct OptionManager> { } }; +/// Manager template to facilitate extended tables spawning +template +struct SpawnManager { + static bool requestInputs(std::vector&, T const&) { return false; } +}; + +template +struct SpawnManager> { + static bool requestInputs(std::vector& inputs, Spawns& spawns) + { + auto base_spec = spawns.base_spec(); + if (std::find_if(inputs.begin(), inputs.end(), [&](InputSpec const& spec) { return base_spec.binding == spec.binding; }) == inputs.end()) { + inputs.emplace_back(base_spec); + } + return true; + } +}; + // SFINAE test template class has_process @@ -1047,6 +1145,11 @@ DataProcessorSpec adaptAnalysisTask(char const* name, Args&&... args) }, tupledTask); } + //request base tables for spawnable extended tables + std::apply([&inputs](auto&... x) { + return (SpawnManager>::requestInputs(inputs, x), ...); + }, + tupledTask); std::apply([&outputs, &hash](auto&... x) { return (OutputManager>::appendOutput(outputs, x, hash), ...); }, tupledTask); std::apply([&options, &hash](auto&... x) { return (OptionManager>::appendOption(options, x), ...); }, tupledTask); diff --git a/Framework/Core/include/Framework/Expressions.h b/Framework/Core/include/Framework/Expressions.h index d883155768f21..c5c5cfb972fb6 100644 --- a/Framework/Core/include/Framework/Expressions.h +++ b/Framework/Core/include/Framework/Expressions.h @@ -433,6 +433,25 @@ void updateExpressionInfos(expressions::Filter const& filter, std::vector +std::shared_ptr createProjectors(framework::pack, gandiva::SchemaPtr schema) +{ + std::shared_ptr projector; + auto s = gandiva::Projector::Make( + schema, + {makeExpression( + framework::expressions::createExpressionTree( + framework::expressions::createOperations(C::Projector()), + schema), + C::asArrowField())...}, + &projector); + if (s.ok()) { + return projector; + } else { + throw std::runtime_error(fmt::format("Failed to create projector: {}", s.ToString())); + } +} } // namespace o2::framework::expressions #endif // O2_FRAMEWORK_EXPRESSIONS_H_ diff --git a/Framework/Core/src/AODReaderHelpers.cxx b/Framework/Core/src/AODReaderHelpers.cxx index aa31efa4f0f90..c8771181e6ea4 100644 --- a/Framework/Core/src/AODReaderHelpers.cxx +++ b/Framework/Core/src/AODReaderHelpers.cxx @@ -141,39 +141,6 @@ std::vector getListOfUnknown(std::vector const& routes return unknows; } -/// Expression-based column generator to materialize columns -template -auto spawner(framework::pack columns, arrow::Table* atable) -{ - arrow::TableBatchReader reader(*atable); - std::shared_ptr batch; - arrow::ArrayVector v; - std::vector chunks(sizeof...(C)); - - auto projectors = framework::expressions::createProjectors(columns, atable->schema()); - while (true) { - auto s = reader.ReadNext(&batch); - if (!s.ok()) { - throw std::runtime_error(fmt::format("Cannot read batches from table {}", s.ToString())); - } - if (batch == nullptr) { - break; - } - s = projectors->Evaluate(*batch, arrow::default_memory_pool(), &v); - if (!s.ok()) { - throw std::runtime_error(fmt::format("Cannot apply projector {}", s.ToString())); - } - for (auto i = 0u; i < sizeof...(C); ++i) { - chunks[i].emplace_back(v.at(i)); - } - } - std::vector> results(sizeof...(C)); - for (auto i = 0u; i < sizeof...(C); ++i) { - results[i] = std::make_shared(chunks[i]); - } - return results; -} - AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector requested) { return AlgorithmSpec::InitCallback{[requested](InitContext& ic) { @@ -204,19 +171,8 @@ AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector reques auto maker = [&](auto metadata) { using metadata_t = decltype(metadata); using expressions = typename metadata_t::expression_pack_t; - auto extra_schema = o2::soa::createSchemaFromColumns(expressions{}); auto original_table = pc.inputs().get(input.binding)->asArrowTable(); - auto original_fields = original_table->schema()->fields(); - std::vector> fields; - auto arrays = spawner(expressions{}, original_table.get()); - std::vector> columns = original_table->columns(); - std::copy(original_fields.begin(), original_fields.end(), std::back_inserter(fields)); - for (auto i = 0u; i < framework::pack_size(expressions{}); ++i) { - columns.push_back(arrays[i]); - fields.emplace_back(extra_schema->field(i)); - } - auto new_schema = std::make_shared(fields); - return arrow::Table::Make(new_schema, columns); + return o2::soa::spawner(expressions{}, original_table.get()); }; if (description == header::DataDescription{"TRACKPAR"}) { diff --git a/Framework/Core/src/ExpressionHelpers.h b/Framework/Core/src/ExpressionHelpers.h index d42adc77b0f35..5797a94180b3c 100644 --- a/Framework/Core/src/ExpressionHelpers.h +++ b/Framework/Core/src/ExpressionHelpers.h @@ -96,25 +96,6 @@ struct ColumnOperationSpec { result.type = type; } }; - -template -std::shared_ptr createProjectors(framework::pack, gandiva::SchemaPtr schema) -{ - std::shared_ptr projector; - auto s = gandiva::Projector::Make( - schema, - {makeExpression( - framework::expressions::createExpressionTree( - framework::expressions::createOperations(C::Projector()), - schema), - C::asArrowField())...}, - &projector); - if (s.ok()) { - return projector; - } else { - throw std::runtime_error(fmt::format("Failed to create projector: {}", s.ToString())); - } -} } // namespace o2::framework::expressions #endif // O2_FRAMEWORK_EXPRESSIONS_HELPERS_H_