From 95a36c5e080404da726f46acbd6c06371d242b4e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 24 Jun 2026 15:04:09 +0200 Subject: [PATCH] DPL: add support for writing general Metadata to AO2D --- .../AnalysisSupport/src/AODWriterHelpers.cxx | 47 ++++++++++++++++ .../AnalysisSupport/src/AODWriterHelpers.h | 1 + Framework/AnalysisSupport/src/Plugin.cxx | 8 +++ .../Framework/AnalysisSupportHelpers.h | 3 + Framework/Core/src/AnalysisSupportHelpers.cxx | 19 +++++++ Framework/Core/src/CompletionPolicy.cxx | 1 + Framework/Core/src/WorkflowHelpers.cxx | 9 +++ .../TestWorkflows/src/o2TestHistograms.cxx | 55 +++++++++++++------ 8 files changed, 125 insertions(+), 18 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx index 5b5829d96a1de..cd7aaf55abf76 100644 --- a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx @@ -16,6 +16,7 @@ #include "Framework/EndOfStreamContext.h" #include "Framework/ProcessingContext.h" #include "Framework/InitContext.h" +#include "Framework/Output.h" #include "Framework/CallbackService.h" #include "Framework/AnalysisSupportHelpers.h" #include "Framework/TableConsumer.h" @@ -31,6 +32,10 @@ #include #include #include +#include +#include +#include +#include O2_DECLARE_DYNAMIC_LOG(histogram_registry); @@ -477,4 +482,46 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& /*ct }; }}; } + +AlgorithmSpec AODWriterHelpers::getMetadataCollector(ConfigContext const& /*ctx*/) +{ + return AlgorithmSpec{[](InitContext&) -> std::function { + // Accumulated metadata, last writer wins per key. + auto merged = std::make_shared>>(); + return [merged](ProcessingContext& pc) -> void { + auto nParts = pc.inputs().getNofParts(0); + for (auto pi = 0U; pi < nParts; ++pi) { + auto part = pc.inputs().get("meta", pi); + if (!part) { + continue; + } + TIter next(part.get()); + while (TObject* key = next()) { + TObject* value = part->GetValue(key); + std::string k = key->GetName(); + std::string v = value != nullptr ? value->GetName() : ""; + auto it = std::find_if(merged->begin(), merged->end(), + [&k](auto const& e) { return e.first == k; }); + if (it != merged->end()) { + it->second = std::move(v); + } else { + merged->emplace_back(std::move(k), std::move(v)); + } + } + } + // Emit the keys/vals vectors the AOD writer already turns into the AO2D + // metaData TMap, so no special handling is needed there. + std::vector keys, vals; + keys.reserve(merged->size()); + vals.reserve(merged->size()); + for (auto const& [k, v] : *merged) { + keys.emplace_back(k); + vals.emplace_back(v); + } + LOG(debug) << "metadata-collector: emitting " << keys.size() << " aggregated metadata entries"; + pc.outputs().snapshot(Output{"AMD", "AODMetadataKeys", 0}, keys); + pc.outputs().snapshot(Output{"AMD", "AODMetadataVals", 0}, vals); + }; + }}; +} } // namespace o2::framework::writers diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.h b/Framework/AnalysisSupport/src/AODWriterHelpers.h index 7ae59a5cf3b01..65b068cb630a7 100644 --- a/Framework/AnalysisSupport/src/AODWriterHelpers.h +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.h @@ -21,6 +21,7 @@ namespace o2::framework::writers struct AODWriterHelpers { static AlgorithmSpec getOutputObjHistWriter(ConfigContext const& context); static AlgorithmSpec getOutputTTreeWriter(ConfigContext const& context); + static AlgorithmSpec getMetadataCollector(ConfigContext const& context); }; } // namespace o2::framework::writers diff --git a/Framework/AnalysisSupport/src/Plugin.cxx b/Framework/AnalysisSupport/src/Plugin.cxx index 5f61a236cbd58..f20475a1a2bd2 100644 --- a/Framework/AnalysisSupport/src/Plugin.cxx +++ b/Framework/AnalysisSupport/src/Plugin.cxx @@ -54,6 +54,13 @@ struct ROOTTTreeWriter : o2::framework::AlgorithmPlugin { } }; +struct ROOTMetadataCollector : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override + { + return o2::framework::writers::AODWriterHelpers::getMetadataCollector(config); + } +}; + using namespace o2::framework; struct RunSummary : o2::framework::ServicePlugin { o2::framework::ServiceSpec* create() final @@ -286,6 +293,7 @@ DEFINE_DPL_PLUGINS_BEGIN DEFINE_DPL_PLUGIN_INSTANCE(ROOTFileReader, CustomAlgorithm); DEFINE_DPL_PLUGIN_INSTANCE(ROOTObjWriter, CustomAlgorithm); DEFINE_DPL_PLUGIN_INSTANCE(ROOTTTreeWriter, CustomAlgorithm); +DEFINE_DPL_PLUGIN_INSTANCE(ROOTMetadataCollector, CustomAlgorithm); DEFINE_DPL_PLUGIN_INSTANCE(RunSummary, CustomService); DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInAOD, ConfigDiscovery); DEFINE_DPL_PLUGINS_END diff --git a/Framework/Core/include/Framework/AnalysisSupportHelpers.h b/Framework/Core/include/Framework/AnalysisSupportHelpers.h index c1968123e765d..8eaaa4f1a61a6 100644 --- a/Framework/Core/include/Framework/AnalysisSupportHelpers.h +++ b/Framework/Core/include/Framework/AnalysisSupportHelpers.h @@ -48,6 +48,9 @@ struct AnalysisSupportHelpers { static DataProcessorSpec getOutputObjHistSink(ConfigContext const&); /// writes inputs of kind AOD to file static DataProcessorSpec getGlobalAODSink(ConfigContext const&); + /// Match all inputs of kind META, merge them and republish as the AOD + /// metadata keys/vals consumed by the AOD writer. + static DataProcessorSpec getMetadataCollectorSink(ConfigContext const&); /// Get the data director static std::shared_ptr getDataOutputDirector(ConfigContext const& ctx); }; diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index c16a1da61ae8a..ba6cb23e8652f 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -220,4 +220,23 @@ DataProcessorSpec return spec; } + +DataProcessorSpec + AnalysisSupportHelpers::getMetadataCollectorSink(ConfigContext const& ctx) +{ + // Lifetime is sporadic because META messages are not produced every + // timeframe. The oldest-possible-timeframe completion policy (registered + // in CompletionPolicy::createDefaultPolicies) decides when the collected + // parts are merged and republished as the AOD metadata keys/vals that the + // AOD writer turns into the AO2D metaData object. + DataProcessorSpec spec{ + .name = "internal-dpl-metadata-collector", + .inputs = {InputSpec("meta", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"META"}), Lifetime::Sporadic)}, + .outputs = {OutputSpec{OutputLabel{"keys"}, header::DataOrigin{"AMD"}, header::DataDescription{"AODMetadataKeys"}, 0, Lifetime::Sporadic}, + OutputSpec{OutputLabel{"vals"}, header::DataOrigin{"AMD"}, header::DataDescription{"AODMetadataVals"}, 0, Lifetime::Sporadic}}, + .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTMetadataCollector", ctx), + }; + + return spec; +} } // namespace o2::framework diff --git a/Framework/Core/src/CompletionPolicy.cxx b/Framework/Core/src/CompletionPolicy.cxx index a09028b9249f3..27b5485207978 100644 --- a/Framework/Core/src/CompletionPolicy.cxx +++ b/Framework/Core/src/CompletionPolicy.cxx @@ -27,6 +27,7 @@ std::vector return { CompletionPolicyHelpers::consumeWhenAllOrdered("internal-dpl-aod-writer"), CompletionPolicyHelpers::consumeWhenAnyZeroCount("internal-dpl-injected-dummy-sink", [](DeviceSpec const& s) { return s.name.find("internal-dpl-injected-dummy-sink") != std::string::npos; }), + CompletionPolicyHelpers::consumeWhenPastOldestPossibleTimeframe("internal-dpl-metadata-collector", [](DeviceSpec const& s) { return s.name == "internal-dpl-metadata-collector"; }), CompletionPolicyHelpers::consumeWhenAll()}; } diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 188b6653c6a43..df689d961245a 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -236,6 +236,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext std::vector requestedCCDBs; std::vector providedCCDBs; + bool hasMetaOutput = false; for (size_t wi = 0; wi < workflow.size(); ++wi) { auto& processor = workflow[wi]; @@ -392,6 +393,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext } else { it->bindings.push_back(output.binding.value); } + } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"META"})) { + hasMetaOutput = true; } if (output.lifetime == Lifetime::Condition) { @@ -583,6 +586,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext extraSpecs.push_back(rootSink); } + // Inject a collector which merges all META messages and republishes them as + // the AOD metadata keys/vals the AOD writer writes into the AO2D file. + if (hasMetaOutput) { + extraSpecs.push_back(AnalysisSupportHelpers::getMetadataCollectorSink(ctx)); + } + workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); extraSpecs.clear(); diff --git a/Framework/TestWorkflows/src/o2TestHistograms.cxx b/Framework/TestWorkflows/src/o2TestHistograms.cxx index 9c2cba35b9156..e4e0549f866c3 100644 --- a/Framework/TestWorkflows/src/o2TestHistograms.cxx +++ b/Framework/TestWorkflows/src/o2TestHistograms.cxx @@ -16,11 +16,18 @@ #include "Framework/runDataProcessing.h" #include "Framework/AnalysisTask.h" #include +#include +#include +#include using namespace o2; using namespace o2::framework; using namespace o2::framework::expressions; +// pT cut applied when producing the skimmed derived dataset; the same value is +// published as run metadata so it lands in the derived AO2D's metaData object. +static constexpr float kSkimPtCut = 1.5f; + namespace o2::aod { O2ORIGIN("EMB"); @@ -38,8 +45,8 @@ DECLARE_SOA_TABLE(SkimmedExampleTrack, "AOD", "SKIMEXTRK", //! struct EtaAndClsHistogramsSimple { OutputObj etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)}; Produces skimEx; - Configurable trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"}; - Filter trackFilter = o2::aod::track::pt < 10.f; + Configurable trackFilterString{"track-filter", "", "Track filter string (overrides the pT cut when set)"}; + Filter trackFilter = o2::aod::track::pt < kSkimPtCut; HistogramRegistry registry{ "registry", @@ -88,8 +95,8 @@ struct EtaAndClsHistogramsSimple { struct EtaAndClsHistogramsIUSimple { OutputObj etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)}; Produces skimEx; - Configurable trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"}; - Filter trackFilter = o2::aod::track::pt < 10.f; + Configurable trackFilterString{"track-filter", "", "Track filter string (overrides the pT cut when set)"}; + Filter trackFilter = o2::aod::track::pt < kSkimPtCut; HistogramRegistry registry{ "registry", @@ -136,8 +143,8 @@ struct EtaAndClsHistogramsFull { } // }; - Configurable trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"}; - Filter trackFilter = o2::aod::track::pt < 10.f; + Configurable trackFilterString{"track-filter", "", "Track filter string (overrides the pT cut when set)"}; + Filter trackFilter = o2::aod::track::pt < kSkimPtCut; void init(InitContext&) { @@ -183,25 +190,37 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) LOGP(info, "- {} present.", table); } // Notice it's important for the tasks to use the same name, otherwise topology generation will be confused. + WorkflowSpec specs; if (runType == "2" || !hasTrackCov) { LOGP(info, "Using only tracks {}", runType); if (hasTrackIU) { - return WorkflowSpec{ - adaptAnalysisTask(cfgc, TaskName{"simple-histos"}), - }; + specs = WorkflowSpec{adaptAnalysisTask(cfgc, TaskName{"simple-histos"})}; + } else { + specs = WorkflowSpec{adaptAnalysisTask(cfgc, TaskName{"simple-histos"})}; } - return WorkflowSpec{ - adaptAnalysisTask(cfgc, TaskName{"simple-histos"}), - }; } else { LOGP(info, "Using tracks extra {}", runType); if (hasTrackIU) { - return WorkflowSpec{ - adaptAnalysisTask(cfgc, TaskName{"simple-histos"}), - }; + specs = WorkflowSpec{adaptAnalysisTask(cfgc, TaskName{"simple-histos"})}; + } else { + specs = WorkflowSpec{adaptAnalysisTask(cfgc, TaskName{"simple-histos"})}; } - return WorkflowSpec{ - adaptAnalysisTask(cfgc, TaskName{"simple-histos"}), - }; } + + // Publish the skimming pT cut as run metadata, once per data frame so it is + // aligned with the derived tables. The auto-injected metadata collector merges + // all META messages (oldest-possible completion) and the AOD writer stores the + // result as the metaData object of the derived AO2D file. + specs.push_back(DataProcessorSpec{ + .name = "skim-metadata", + .inputs = {InputSpec{"tfn", "TFN", "TFNumber"}}, + .outputs = {OutputSpec{{"meta"}, "META", "SKIMINFO", 0, Lifetime::Sporadic}}, + .algorithm = adaptStateless([](ProcessingContext& pc) { + TMap m; + m.SetOwnerKeyValue(); + m.Add(new TObjString("SkimTrackPtCut"), new TObjString(TString::Format("%g", kSkimPtCut))); + pc.outputs().snapshot(Output{"META", "SKIMINFO", 0}, m); + }), + }); + return specs; }