Skip to content

Commit a45b70e

Browse files
authored
DPL Analysis: Spawn<> template for user-defined extended tables (#3958)
1 parent 47b1df4 commit a45b70e

6 files changed

Lines changed: 215 additions & 91 deletions

File tree

Analysis/Tutorials/src/filters.cxx

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ namespace o2::aod
1616
namespace etaphi
1717
{
1818
DECLARE_SOA_COLUMN(NPhi, nphi, float);
19+
DECLARE_SOA_EXPRESSION_COLUMN(CosPhi, cosphi, float, ncos(aod::etaphi::nphi));
1920
} // namespace etaphi
20-
DECLARE_SOA_TABLE(TPhi, "AOD", "ETAPHI",
21+
DECLARE_SOA_TABLE(TPhi, "AOD", "TPHI",
2122
etaphi::NPhi);
23+
DECLARE_SOA_EXTENDED_TABLE_USER(EPhi, TPhi, "EPHI", aod::etaphi::CosPhi);
2224
} // namespace o2::aod
2325

2426
using namespace o2;
@@ -30,17 +32,18 @@ using namespace o2::framework::expressions;
3032
// FIXME: this should really inherit from AnalysisTask but
3133
// we need GCC 7.4+ for that
3234
struct ATask {
33-
Produces<aod::TPhi> etaphi;
34-
35+
Produces<aod::TPhi> tphi;
3536
void process(aod::Tracks const& tracks)
3637
{
3738
for (auto& track : tracks) {
38-
etaphi(track.phi());
39+
tphi(track.phi());
3940
}
4041
}
4142
};
4243

4344
struct BTask {
45+
Spawns<aod::EPhi> ephi;
46+
4447
float fPI = static_cast<float>(M_PI);
4548
float ptlow = 0.5f;
4649
float ptup = 2.0f;
@@ -64,9 +67,19 @@ struct BTask {
6467
}
6568
};
6669

70+
struct CTask {
71+
void process(aod::Collision const&, soa::Join<aod::Tracks, aod::EPhi> const& tracks)
72+
{
73+
for (auto& track : tracks) {
74+
LOGF(INFO, "%.3f == %.3f", track.cosphi(), std::cos(track.phi()));
75+
}
76+
}
77+
};
78+
6779
WorkflowSpec defineDataProcessing(ConfigContext const&)
6880
{
6981
return WorkflowSpec{
7082
adaptAnalysisTask<ATask>("produce-normalizedphi"),
71-
adaptAnalysisTask<BTask>("consume-normalizedphi")};
83+
adaptAnalysisTask<BTask>("consume-normalizedphi"),
84+
adaptAnalysisTask<CTask>("consume-spawned")};
7285
}

Framework/Core/include/Framework/ASoA.h

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,28 +1232,34 @@ using ConcatBase = decltype(concat(std::declval<T1>(), std::declval<T2>()));
12321232
#define DECLARE_SOA_TABLE(_Name_, _Origin_, _Description_, ...) \
12331233
DECLARE_SOA_TABLE_FULL(_Name_, #_Name_, _Origin_, _Description_, __VA_ARGS__);
12341234

1235-
#define DECLARE_SOA_EXTENDED_TABLE(_Name_, _Table_, _Description_, ...) \
1236-
using _Name_ = o2::soa::JoinBase<_Table_, o2::soa::Table<__VA_ARGS__>>; \
1237-
\
1238-
struct _Name_##Metadata : o2::soa::TableMetadata<_Name_##Metadata> { \
1239-
using table_t = _Name_; \
1240-
using base_table_t = _Table_; \
1241-
using expression_pack_t = framework::pack<__VA_ARGS__>; \
1242-
static constexpr char const* mLabel = #_Name_; \
1243-
static constexpr char const mOrigin[4] = "DYN"; \
1244-
static constexpr char const mDescription[16] = _Description_; \
1245-
}; \
1246-
\
1247-
template <> \
1248-
struct MetadataTrait<_Name_> { \
1249-
using metadata = _Name_##Metadata; \
1250-
}; \
1251-
\
1252-
template <> \
1253-
struct MetadataTrait<_Name_::unfiltered_iterator> { \
1254-
using metadata = _Name_##Metadata; \
1235+
#define DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, _Table_, _Origin_, _Description_, ...) \
1236+
using _Name_ = o2::soa::JoinBase<_Table_, o2::soa::Table<__VA_ARGS__>>; \
1237+
\
1238+
struct _Name_##Metadata : o2::soa::TableMetadata<_Name_##Metadata> { \
1239+
using table_t = _Name_; \
1240+
using base_table_t = _Table_; \
1241+
using expression_pack_t = framework::pack<__VA_ARGS__>; \
1242+
static constexpr char const* mLabel = #_Name_; \
1243+
static constexpr char const mOrigin[4] = _Origin_; \
1244+
static constexpr char const mDescription[16] = _Description_; \
1245+
}; \
1246+
\
1247+
template <> \
1248+
struct MetadataTrait<_Name_> { \
1249+
using metadata = _Name_##Metadata; \
1250+
}; \
1251+
\
1252+
template <> \
1253+
struct MetadataTrait<_Name_::unfiltered_iterator> { \
1254+
using metadata = _Name_##Metadata; \
12551255
};
12561256

1257+
#define DECLARE_SOA_EXTENDED_TABLE(_Name_, _Table_, _Description_, ...) \
1258+
DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, _Table_, "DYN", _Description_, __VA_ARGS__)
1259+
1260+
#define DECLARE_SOA_EXTENDED_TABLE_USER(_Name_, _Table_, _Description_, ...) \
1261+
DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, _Table_, "AOD", _Description_, __VA_ARGS__)
1262+
12571263
namespace o2::soa
12581264
{
12591265
template <typename... Ts>
@@ -1443,6 +1449,52 @@ auto filter(T&& t, framework::expressions::Filter const& expr)
14431449
return Filtered<T>(t.asArrowTable(), expr);
14441450
}
14451451

1452+
/// Expression-based column generator to materialize columns
1453+
template <typename... C>
1454+
auto spawner(framework::pack<C...> columns, arrow::Table* atable)
1455+
{
1456+
arrow::TableBatchReader reader(*atable);
1457+
std::shared_ptr<arrow::RecordBatch> batch;
1458+
arrow::ArrayVector v;
1459+
std::vector<arrow::ArrayVector> chunks(sizeof...(C));
1460+
1461+
auto projectors = framework::expressions::createProjectors(columns, atable->schema());
1462+
while (true) {
1463+
auto s = reader.ReadNext(&batch);
1464+
if (!s.ok()) {
1465+
throw std::runtime_error(fmt::format("Cannot read batches from table {}", s.ToString()));
1466+
}
1467+
if (batch == nullptr) {
1468+
break;
1469+
}
1470+
s = projectors->Evaluate(*batch, arrow::default_memory_pool(), &v);
1471+
if (!s.ok()) {
1472+
throw std::runtime_error(fmt::format("Cannot apply projector {}", s.ToString()));
1473+
}
1474+
for (auto i = 0u; i < sizeof...(C); ++i) {
1475+
chunks[i].emplace_back(v.at(i));
1476+
}
1477+
}
1478+
std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays(sizeof...(C));
1479+
for (auto i = 0u; i < sizeof...(C); ++i) {
1480+
arrays[i] = std::make_shared<arrow::ChunkedArray>(chunks[i]);
1481+
}
1482+
1483+
auto extra_schema = o2::soa::createSchemaFromColumns(columns);
1484+
std::vector<std::shared_ptr<arrow::Field>> new_fields;
1485+
std::vector<std::shared_ptr<arrow::ChunkedArray>> new_columns;
1486+
auto original_columns = atable->columns();
1487+
auto original_fields = atable->schema()->fields();
1488+
std::copy(original_fields.begin(), original_fields.end(), std::back_inserter(new_fields));
1489+
std::copy(original_columns.begin(), original_columns.end(), std::back_inserter(new_columns));
1490+
for (auto i = 0u; i < framework::pack_size(columns); ++i) {
1491+
new_columns.push_back(arrays[i]);
1492+
new_fields.emplace_back(extra_schema->field(i));
1493+
}
1494+
auto new_schema = std::make_shared<arrow::Schema>(new_fields);
1495+
return arrow::Table::Make(new_schema, new_columns);
1496+
}
1497+
14461498
} // namespace o2::soa
14471499

14481500
#endif // O2_FRAMEWORK_ASOA_H_

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ struct WritingCursor<soa::Table<PC...>> {
121121
int64_t mCount = -1;
122122
};
123123

124-
/// This helper class allow you to declare things which will be crated by a
125-
/// give analysis task. Notice how the actual cursor is implemented by the
124+
/// This helper class allows you to declare things which will be created by a
125+
/// given analysis task. Notice how the actual cursor is implemented by the
126126
/// means of the WritingCursor helper class, from which produces actually
127127
/// derives.
128128
template <typename... C>
@@ -142,6 +142,58 @@ struct Produces<soa::Table<C...>> : WritingCursor<typename soa::FilterPersistent
142142
}
143143
};
144144

145+
/// This helper class allows you to declare extended tables which should be
146+
/// created by the task (as opposed to those pre-defined by data model)
147+
template <typename T>
148+
struct Spawns {
149+
using metadata = typename aod::MetadataTrait<T>::metadata;
150+
using base_table_t = typename metadata::base_table_t;
151+
using expression_pack_t = typename metadata::expression_pack_t;
152+
using base_metadata = typename aod::MetadataTrait<base_table_t>::metadata;
153+
154+
constexpr expression_pack_t pack()
155+
{
156+
return expression_pack_t{};
157+
}
158+
159+
InputSpec const base_spec()
160+
{
161+
return InputSpec{
162+
base_metadata::tableLabel(),
163+
header::DataOrigin{base_metadata::origin()},
164+
header::DataDescription{base_metadata::description()}};
165+
}
166+
167+
constexpr const char* base_label()
168+
{
169+
return base_metadata::tableLabel();
170+
}
171+
172+
OutputSpec const spec() const
173+
{
174+
return OutputSpec{OutputLabel{metadata::tableLabel()}, metadata::origin(), metadata::description()};
175+
}
176+
177+
OutputRef ref() const
178+
{
179+
return OutputRef{metadata::tableLabel(), 0};
180+
}
181+
T* operator->()
182+
{
183+
return table.get();
184+
}
185+
T& operator*()
186+
{
187+
return *table.get();
188+
}
189+
190+
auto asArrowTable()
191+
{
192+
return table->asArrowTable();
193+
}
194+
std::shared_ptr<T> table = nullptr;
195+
};
196+
145197
/// This helper class allows you to declare things which will be created by a
146198
/// given analysis task. Currently wrapped objects are limited to be TNamed
147199
/// descendants. Objects will be written to a ROOT file at the end of the
@@ -886,6 +938,34 @@ struct OutputManager<OutputObj<T>> {
886938
}
887939
};
888940

941+
template <typename T>
942+
struct OutputManager<Spawns<T>> {
943+
static bool appendOutput(std::vector<OutputSpec>& outputs, Spawns<T>& what, uint32_t)
944+
{
945+
outputs.emplace_back(what.spec());
946+
return true;
947+
}
948+
949+
static bool prepare(ProcessingContext& pc, Spawns<T>& what)
950+
{
951+
auto original_table = pc.inputs().get<TableConsumer>(what.base_label())->asArrowTable();
952+
what.table = std::make_shared<T>(o2::soa::spawner(what.pack(), original_table.get()));
953+
return true;
954+
}
955+
956+
static bool finalize(ProcessingContext&, Spawns<T>&)
957+
{
958+
return true;
959+
}
960+
961+
static bool postRun(EndOfStreamContext& eosc, Spawns<T>& what)
962+
{
963+
using metadata = typename std::decay_t<decltype(what)>::metadata;
964+
eosc.outputs().adopt(Output{metadata::origin(), metadata::description()}, what.asArrowTable());
965+
return true;
966+
}
967+
};
968+
889969
template <typename T>
890970
class has_instance
891971
{
@@ -967,6 +1047,24 @@ struct OptionManager<Configurable<T>> {
9671047
}
9681048
};
9691049

1050+
/// Manager template to facilitate extended tables spawning
1051+
template <typename T>
1052+
struct SpawnManager {
1053+
static bool requestInputs(std::vector<InputSpec>&, T const&) { return false; }
1054+
};
1055+
1056+
template <typename TABLE>
1057+
struct SpawnManager<Spawns<TABLE>> {
1058+
static bool requestInputs(std::vector<InputSpec>& inputs, Spawns<TABLE>& spawns)
1059+
{
1060+
auto base_spec = spawns.base_spec();
1061+
if (std::find_if(inputs.begin(), inputs.end(), [&](InputSpec const& spec) { return base_spec.binding == spec.binding; }) == inputs.end()) {
1062+
inputs.emplace_back(base_spec);
1063+
}
1064+
return true;
1065+
}
1066+
};
1067+
9701068
// SFINAE test
9711069
template <typename T>
9721070
class has_process
@@ -1047,6 +1145,11 @@ DataProcessorSpec adaptAnalysisTask(char const* name, Args&&... args)
10471145
},
10481146
tupledTask);
10491147
}
1148+
//request base tables for spawnable extended tables
1149+
std::apply([&inputs](auto&... x) {
1150+
return (SpawnManager<std::decay_t<decltype(x)>>::requestInputs(inputs, x), ...);
1151+
},
1152+
tupledTask);
10501153

10511154
std::apply([&outputs, &hash](auto&... x) { return (OutputManager<std::decay_t<decltype(x)>>::appendOutput(outputs, x, hash), ...); }, tupledTask);
10521155
std::apply([&options, &hash](auto&... x) { return (OptionManager<std::decay_t<decltype(x)>>::appendOption(options, x), ...); }, tupledTask);

Framework/Core/include/Framework/Expressions.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,25 @@ void updateExpressionInfos(expressions::Filter const& filter, std::vector<Expres
433433
gandiva::ConditionPtr makeCondition(gandiva::NodePtr node);
434434
/// Function to create gandiva projecting expression from generic gandiva expression tree
435435
gandiva::ExpressionPtr makeExpression(gandiva::NodePtr node, gandiva::FieldPtr result);
436+
437+
template <typename... C>
438+
std::shared_ptr<gandiva::Projector> createProjectors(framework::pack<C...>, gandiva::SchemaPtr schema)
439+
{
440+
std::shared_ptr<gandiva::Projector> projector;
441+
auto s = gandiva::Projector::Make(
442+
schema,
443+
{makeExpression(
444+
framework::expressions::createExpressionTree(
445+
framework::expressions::createOperations(C::Projector()),
446+
schema),
447+
C::asArrowField())...},
448+
&projector);
449+
if (s.ok()) {
450+
return projector;
451+
} else {
452+
throw std::runtime_error(fmt::format("Failed to create projector: {}", s.ToString()));
453+
}
454+
}
436455
} // namespace o2::framework::expressions
437456

438457
#endif // O2_FRAMEWORK_EXPRESSIONS_H_

Framework/Core/src/AODReaderHelpers.cxx

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -141,39 +141,6 @@ std::vector<OutputRoute> getListOfUnknown(std::vector<OutputRoute> const& routes
141141
return unknows;
142142
}
143143

144-
/// Expression-based column generator to materialize columns
145-
template <typename... C>
146-
auto spawner(framework::pack<C...> columns, arrow::Table* atable)
147-
{
148-
arrow::TableBatchReader reader(*atable);
149-
std::shared_ptr<arrow::RecordBatch> batch;
150-
arrow::ArrayVector v;
151-
std::vector<arrow::ArrayVector> chunks(sizeof...(C));
152-
153-
auto projectors = framework::expressions::createProjectors(columns, atable->schema());
154-
while (true) {
155-
auto s = reader.ReadNext(&batch);
156-
if (!s.ok()) {
157-
throw std::runtime_error(fmt::format("Cannot read batches from table {}", s.ToString()));
158-
}
159-
if (batch == nullptr) {
160-
break;
161-
}
162-
s = projectors->Evaluate(*batch, arrow::default_memory_pool(), &v);
163-
if (!s.ok()) {
164-
throw std::runtime_error(fmt::format("Cannot apply projector {}", s.ToString()));
165-
}
166-
for (auto i = 0u; i < sizeof...(C); ++i) {
167-
chunks[i].emplace_back(v.at(i));
168-
}
169-
}
170-
std::vector<std::shared_ptr<arrow::ChunkedArray>> results(sizeof...(C));
171-
for (auto i = 0u; i < sizeof...(C); ++i) {
172-
results[i] = std::make_shared<arrow::ChunkedArray>(chunks[i]);
173-
}
174-
return results;
175-
}
176-
177144
AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector<InputSpec> requested)
178145
{
179146
return AlgorithmSpec::InitCallback{[requested](InitContext& ic) {
@@ -204,19 +171,8 @@ AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector<InputSpec> reques
204171
auto maker = [&](auto metadata) {
205172
using metadata_t = decltype(metadata);
206173
using expressions = typename metadata_t::expression_pack_t;
207-
auto extra_schema = o2::soa::createSchemaFromColumns(expressions{});
208174
auto original_table = pc.inputs().get<TableConsumer>(input.binding)->asArrowTable();
209-
auto original_fields = original_table->schema()->fields();
210-
std::vector<std::shared_ptr<arrow::Field>> fields;
211-
auto arrays = spawner(expressions{}, original_table.get());
212-
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns = original_table->columns();
213-
std::copy(original_fields.begin(), original_fields.end(), std::back_inserter(fields));
214-
for (auto i = 0u; i < framework::pack_size(expressions{}); ++i) {
215-
columns.push_back(arrays[i]);
216-
fields.emplace_back(extra_schema->field(i));
217-
}
218-
auto new_schema = std::make_shared<arrow::Schema>(fields);
219-
return arrow::Table::Make(new_schema, columns);
175+
return o2::soa::spawner(expressions{}, original_table.get());
220176
};
221177

222178
if (description == header::DataDescription{"TRACKPAR"}) {

0 commit comments

Comments
 (0)