Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions Analysis/Tutorials/src/filters.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ namespace o2::aod
namespace etaphi
{
DECLARE_SOA_COLUMN(NPhi, nphi, float);
DECLARE_SOA_EXPRESSION_COLUMN(CosPhi, cosphi, float, ncos(aod::etaphi::nphi));
} // namespace etaphi
DECLARE_SOA_TABLE(TPhi, "AOD", "ETAPHI",
DECLARE_SOA_TABLE(TPhi, "AOD", "TPHI",
etaphi::NPhi);
DECLARE_SOA_EXTENDED_TABLE_USER(EPhi, TPhi, "EPHI", aod::etaphi::CosPhi);
} // namespace o2::aod

using namespace o2;
Expand All @@ -30,17 +32,18 @@ using namespace o2::framework::expressions;
// FIXME: this should really inherit from AnalysisTask but
// we need GCC 7.4+ for that
struct ATask {
Produces<aod::TPhi> etaphi;

Produces<aod::TPhi> tphi;
void process(aod::Tracks const& tracks)
{
for (auto& track : tracks) {
etaphi(track.phi());
tphi(track.phi());
}
}
};

struct BTask {
Spawns<aod::EPhi> ephi;

float fPI = static_cast<float>(M_PI);
float ptlow = 0.5f;
float ptup = 2.0f;
Expand All @@ -64,9 +67,19 @@ struct BTask {
}
};

struct CTask {
void process(aod::Collision const&, soa::Join<aod::Tracks, aod::EPhi> const& tracks)
{
for (auto& track : tracks) {
LOGF(INFO, "%.3f == %.3f", track.cosphi(), std::cos(track.phi()));
}
}
};

WorkflowSpec defineDataProcessing(ConfigContext const&)
{
return WorkflowSpec{
adaptAnalysisTask<ATask>("produce-normalizedphi"),
adaptAnalysisTask<BTask>("consume-normalizedphi")};
adaptAnalysisTask<BTask>("consume-normalizedphi"),
adaptAnalysisTask<CTask>("consume-spawned")};
}
92 changes: 72 additions & 20 deletions Framework/Core/include/Framework/ASoA.h
Original file line number Diff line number Diff line change
Expand Up @@ -1232,28 +1232,34 @@ using ConcatBase = decltype(concat(std::declval<T1>(), std::declval<T2>()));
#define DECLARE_SOA_TABLE(_Name_, _Origin_, _Description_, ...) \
DECLARE_SOA_TABLE_FULL(_Name_, #_Name_, _Origin_, _Description_, __VA_ARGS__);

#define DECLARE_SOA_EXTENDED_TABLE(_Name_, _Table_, _Description_, ...) \
using _Name_ = o2::soa::JoinBase<_Table_, o2::soa::Table<__VA_ARGS__>>; \
\
struct _Name_##Metadata : o2::soa::TableMetadata<_Name_##Metadata> { \
using table_t = _Name_; \
using base_table_t = _Table_; \
using expression_pack_t = framework::pack<__VA_ARGS__>; \
static constexpr char const* mLabel = #_Name_; \
static constexpr char const mOrigin[4] = "DYN"; \
static constexpr char const mDescription[16] = _Description_; \
}; \
\
template <> \
struct MetadataTrait<_Name_> { \
using metadata = _Name_##Metadata; \
}; \
\
template <> \
struct MetadataTrait<_Name_::unfiltered_iterator> { \
using metadata = _Name_##Metadata; \
#define DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, _Table_, _Origin_, _Description_, ...) \
using _Name_ = o2::soa::JoinBase<_Table_, o2::soa::Table<__VA_ARGS__>>; \
\
struct _Name_##Metadata : o2::soa::TableMetadata<_Name_##Metadata> { \
using table_t = _Name_; \
using base_table_t = _Table_; \
using expression_pack_t = framework::pack<__VA_ARGS__>; \
static constexpr char const* mLabel = #_Name_; \
static constexpr char const mOrigin[4] = _Origin_; \
static constexpr char const mDescription[16] = _Description_; \
}; \
\
template <> \
struct MetadataTrait<_Name_> { \
using metadata = _Name_##Metadata; \
}; \
\
template <> \
struct MetadataTrait<_Name_::unfiltered_iterator> { \
using metadata = _Name_##Metadata; \
};

#define DECLARE_SOA_EXTENDED_TABLE(_Name_, _Table_, _Description_, ...) \
DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, _Table_, "DYN", _Description_, __VA_ARGS__)

#define DECLARE_SOA_EXTENDED_TABLE_USER(_Name_, _Table_, _Description_, ...) \
DECLARE_SOA_EXTENDED_TABLE_FULL(_Name_, _Table_, "AOD", _Description_, __VA_ARGS__)

namespace o2::soa
{
template <typename... Ts>
Expand Down Expand Up @@ -1443,6 +1449,52 @@ auto filter(T&& t, framework::expressions::Filter const& expr)
return Filtered<T>(t.asArrowTable(), expr);
}

/// Expression-based column generator to materialize columns
template <typename... C>
auto spawner(framework::pack<C...> columns, arrow::Table* atable)
{
arrow::TableBatchReader reader(*atable);
std::shared_ptr<arrow::RecordBatch> batch;
arrow::ArrayVector v;
std::vector<arrow::ArrayVector> chunks(sizeof...(C));

auto projectors = framework::expressions::createProjectors(columns, atable->schema());
while (true) {
auto s = reader.ReadNext(&batch);
if (!s.ok()) {
throw std::runtime_error(fmt::format("Cannot read batches from table {}", s.ToString()));
}
if (batch == nullptr) {
break;
}
s = projectors->Evaluate(*batch, arrow::default_memory_pool(), &v);
if (!s.ok()) {
throw std::runtime_error(fmt::format("Cannot apply projector {}", s.ToString()));
}
for (auto i = 0u; i < sizeof...(C); ++i) {
chunks[i].emplace_back(v.at(i));
}
}
std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays(sizeof...(C));
for (auto i = 0u; i < sizeof...(C); ++i) {
arrays[i] = std::make_shared<arrow::ChunkedArray>(chunks[i]);
}

auto extra_schema = o2::soa::createSchemaFromColumns(columns);
std::vector<std::shared_ptr<arrow::Field>> new_fields;
std::vector<std::shared_ptr<arrow::ChunkedArray>> new_columns;
auto original_columns = atable->columns();
auto original_fields = atable->schema()->fields();
std::copy(original_fields.begin(), original_fields.end(), std::back_inserter(new_fields));
std::copy(original_columns.begin(), original_columns.end(), std::back_inserter(new_columns));
for (auto i = 0u; i < framework::pack_size(columns); ++i) {
new_columns.push_back(arrays[i]);
new_fields.emplace_back(extra_schema->field(i));
}
auto new_schema = std::make_shared<arrow::Schema>(new_fields);
return arrow::Table::Make(new_schema, new_columns);
}

} // namespace o2::soa

#endif // O2_FRAMEWORK_ASOA_H_
107 changes: 105 additions & 2 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ struct WritingCursor<soa::Table<PC...>> {
int64_t mCount = -1;
};

/// This helper class allow you to declare things which will be crated by a
/// give analysis task. Notice how the actual cursor is implemented by the
/// This helper class allows you to declare things which will be created by a
/// given analysis task. Notice how the actual cursor is implemented by the
/// means of the WritingCursor helper class, from which produces actually
/// derives.
template <typename... C>
Expand All @@ -142,6 +142,58 @@ struct Produces<soa::Table<C...>> : WritingCursor<typename soa::FilterPersistent
}
};

/// This helper class allows you to declare extended tables which should be
/// created by the task (as opposed to those pre-defined by data model)
template <typename T>
struct Spawns {
using metadata = typename aod::MetadataTrait<T>::metadata;
using base_table_t = typename metadata::base_table_t;
using expression_pack_t = typename metadata::expression_pack_t;
using base_metadata = typename aod::MetadataTrait<base_table_t>::metadata;

constexpr expression_pack_t pack()
{
return expression_pack_t{};
}

InputSpec const base_spec()
{
return InputSpec{
base_metadata::tableLabel(),
header::DataOrigin{base_metadata::origin()},
header::DataDescription{base_metadata::description()}};
}

constexpr const char* base_label()
{
return base_metadata::tableLabel();
}

OutputSpec const spec() const
{
return OutputSpec{OutputLabel{metadata::tableLabel()}, metadata::origin(), metadata::description()};
}

OutputRef ref() const
{
return OutputRef{metadata::tableLabel(), 0};
}
T* operator->()
{
return table.get();
}
T& operator*()
{
return *table.get();
}

auto asArrowTable()
{
return table->asArrowTable();
}
std::shared_ptr<T> table = nullptr;
};

/// This helper class allows you to declare things which will be created by a
/// given analysis task. Currently wrapped objects are limited to be TNamed
/// descendants. Objects will be written to a ROOT file at the end of the
Expand Down Expand Up @@ -886,6 +938,34 @@ struct OutputManager<OutputObj<T>> {
}
};

template <typename T>
struct OutputManager<Spawns<T>> {
static bool appendOutput(std::vector<OutputSpec>& outputs, Spawns<T>& what, uint32_t)
{
outputs.emplace_back(what.spec());
return true;
}

static bool prepare(ProcessingContext& pc, Spawns<T>& what)
{
auto original_table = pc.inputs().get<TableConsumer>(what.base_label())->asArrowTable();
what.table = std::make_shared<T>(o2::soa::spawner(what.pack(), original_table.get()));
return true;
}

static bool finalize(ProcessingContext&, Spawns<T>&)
{
return true;
}

static bool postRun(EndOfStreamContext& eosc, Spawns<T>& what)
{
using metadata = typename std::decay_t<decltype(what)>::metadata;
eosc.outputs().adopt(Output{metadata::origin(), metadata::description()}, what.asArrowTable());
return true;
}
};

template <typename T>
class has_instance
{
Expand Down Expand Up @@ -967,6 +1047,24 @@ struct OptionManager<Configurable<T>> {
}
};

/// Manager template to facilitate extended tables spawning
template <typename T>
struct SpawnManager {
static bool requestInputs(std::vector<InputSpec>&, T const&) { return false; }
};

template <typename TABLE>
struct SpawnManager<Spawns<TABLE>> {
static bool requestInputs(std::vector<InputSpec>& inputs, Spawns<TABLE>& spawns)
{
auto base_spec = spawns.base_spec();
if (std::find_if(inputs.begin(), inputs.end(), [&](InputSpec const& spec) { return base_spec.binding == spec.binding; }) == inputs.end()) {
inputs.emplace_back(base_spec);
}
return true;
}
};

// SFINAE test
template <typename T>
class has_process
Expand Down Expand Up @@ -1047,6 +1145,11 @@ DataProcessorSpec adaptAnalysisTask(char const* name, Args&&... args)
},
tupledTask);
}
//request base tables for spawnable extended tables
std::apply([&inputs](auto&... x) {
return (SpawnManager<std::decay_t<decltype(x)>>::requestInputs(inputs, x), ...);
},
tupledTask);

std::apply([&outputs, &hash](auto&... x) { return (OutputManager<std::decay_t<decltype(x)>>::appendOutput(outputs, x, hash), ...); }, tupledTask);
std::apply([&options, &hash](auto&... x) { return (OptionManager<std::decay_t<decltype(x)>>::appendOption(options, x), ...); }, tupledTask);
Expand Down
19 changes: 19 additions & 0 deletions Framework/Core/include/Framework/Expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,25 @@ void updateExpressionInfos(expressions::Filter const& filter, std::vector<Expres
gandiva::ConditionPtr makeCondition(gandiva::NodePtr node);
/// Function to create gandiva projecting expression from generic gandiva expression tree
gandiva::ExpressionPtr makeExpression(gandiva::NodePtr node, gandiva::FieldPtr result);

template <typename... C>
std::shared_ptr<gandiva::Projector> createProjectors(framework::pack<C...>, gandiva::SchemaPtr schema)
{
std::shared_ptr<gandiva::Projector> projector;
auto s = gandiva::Projector::Make(
schema,
{makeExpression(
framework::expressions::createExpressionTree(
framework::expressions::createOperations(C::Projector()),
schema),
C::asArrowField())...},
&projector);
if (s.ok()) {
return projector;
} else {
throw std::runtime_error(fmt::format("Failed to create projector: {}", s.ToString()));
}
}
} // namespace o2::framework::expressions

#endif // O2_FRAMEWORK_EXPRESSIONS_H_
46 changes: 1 addition & 45 deletions Framework/Core/src/AODReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -141,39 +141,6 @@ std::vector<OutputRoute> getListOfUnknown(std::vector<OutputRoute> const& routes
return unknows;
}

/// Expression-based column generator to materialize columns
template <typename... C>
auto spawner(framework::pack<C...> columns, arrow::Table* atable)
{
arrow::TableBatchReader reader(*atable);
std::shared_ptr<arrow::RecordBatch> batch;
arrow::ArrayVector v;
std::vector<arrow::ArrayVector> chunks(sizeof...(C));

auto projectors = framework::expressions::createProjectors(columns, atable->schema());
while (true) {
auto s = reader.ReadNext(&batch);
if (!s.ok()) {
throw std::runtime_error(fmt::format("Cannot read batches from table {}", s.ToString()));
}
if (batch == nullptr) {
break;
}
s = projectors->Evaluate(*batch, arrow::default_memory_pool(), &v);
if (!s.ok()) {
throw std::runtime_error(fmt::format("Cannot apply projector {}", s.ToString()));
}
for (auto i = 0u; i < sizeof...(C); ++i) {
chunks[i].emplace_back(v.at(i));
}
}
std::vector<std::shared_ptr<arrow::ChunkedArray>> results(sizeof...(C));
for (auto i = 0u; i < sizeof...(C); ++i) {
results[i] = std::make_shared<arrow::ChunkedArray>(chunks[i]);
}
return results;
}

AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector<InputSpec> requested)
{
return AlgorithmSpec::InitCallback{[requested](InitContext& ic) {
Expand Down Expand Up @@ -204,19 +171,8 @@ AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector<InputSpec> reques
auto maker = [&](auto metadata) {
using metadata_t = decltype(metadata);
using expressions = typename metadata_t::expression_pack_t;
auto extra_schema = o2::soa::createSchemaFromColumns(expressions{});
auto original_table = pc.inputs().get<TableConsumer>(input.binding)->asArrowTable();
auto original_fields = original_table->schema()->fields();
std::vector<std::shared_ptr<arrow::Field>> fields;
auto arrays = spawner(expressions{}, original_table.get());
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns = original_table->columns();
std::copy(original_fields.begin(), original_fields.end(), std::back_inserter(fields));
for (auto i = 0u; i < framework::pack_size(expressions{}); ++i) {
columns.push_back(arrays[i]);
fields.emplace_back(extra_schema->field(i));
}
auto new_schema = std::make_shared<arrow::Schema>(fields);
return arrow::Table::Make(new_schema, columns);
return o2::soa::spawner(expressions{}, original_table.get());
};

if (description == header::DataDescription{"TRACKPAR"}) {
Expand Down
Loading