Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Analysis/Tutorials/src/histogramRegistry.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,23 @@ struct DTask {
}
};

struct ETask {
OutputObj<TH1F> phiH{TH1F("phi", "phi", 100, 0., 2. * M_PI)};
OutputObj<TH1F> etaH{TH1F("eta", "eta", 102, -2.01, 2.01)};

void process(aod::Tracks const& tracks)
{
for (auto& track : tracks) {
phiH->Fill(track.phi());
etaH->Fill(track.eta());
}
}
};

WorkflowSpec defineDataProcessing(ConfigContext const&)
{
return WorkflowSpec{
adaptAnalysisTask<ETask>("output-obj-test"),
adaptAnalysisTask<ATask>("eta-and-phi-histograms"),
adaptAnalysisTask<BTask>("filtered-histograms"),
adaptAnalysisTask<CTask>("dimension-test"),
Expand Down
5 changes: 1 addition & 4 deletions Framework/Core/include/Framework/CommonDataProcessors.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ using outputObjects = std::vector<std::pair<uint32_t, std::vector<std::string>>>

/// Helpers to create a few general data processors
struct CommonDataProcessors {
/// Match all inputs of kind HIST and write them to a ROOT file,
/// one root file per originating task.
static DataProcessorSpec getHistogramRegistrySink(outputObjects const& objmap, const outputTasks& tskmap);
/// Match all inputs of kind ATSK and write them to a ROOT file,
/// one root file per originating task.
static DataProcessorSpec getOutputObjSink(outputObjects const& objmap, const outputTasks& tskmap);
static DataProcessorSpec getOutputObjHistSink(outputObjects const& objmap, const outputTasks& tskmap);
/// Given the list of @a danglingInputs @return a DataProcessor which does
/// a binary dump for all the dangling inputs matching the Timeframe
/// lifetime. @a unmatched will be filled with all the InputSpecs which are
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/HistogramRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ class HistogramRegistry
/// @return the associated OutputSpec
OutputSpec const spec()
{
ConcreteDataMatcher matcher{"HIST", "\0", 0};
ConcreteDataMatcher matcher{"ATSK", "\0", 0};
strncpy(matcher.description.str, mName.data(), 16);
return OutputSpec{OutputLabel{mName}, matcher};
}
Expand Down
162 changes: 19 additions & 143 deletions Framework/Core/src/CommonDataProcessors.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ const static std::unordered_map<OutputObjHandlingPolicy, std::string> ROOTfileNa
{OutputObjHandlingPolicy::QAObject, "QAResults.root"}};

// =============================================================================
DataProcessorSpec CommonDataProcessors::getHistogramRegistrySink(outputObjects const& objmap, const outputTasks& tskmap)
DataProcessorSpec CommonDataProcessors::getOutputObjHistSink(outputObjects const& objmap, outputTasks const& tskmap)
{
auto writerFunction = [objmap, tskmap](InitContext& ic) -> std::function<void(ProcessingContext&)> {
auto& callbacks = ic.services().get<CallbackService>();
auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();

auto endofdatacb = [inputObjects](EndOfStreamContext& context) {
LOG(DEBUG) << "Writing merged histograms to file";
LOG(DEBUG) << "Writing merged objects and histograms to file";
if (inputObjects->empty()) {
LOG(ERROR) << "Output object map is empty!";
context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
Expand Down Expand Up @@ -123,149 +123,25 @@ DataProcessorSpec CommonDataProcessors::getHistogramRegistrySink(outputObjects c
};

TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
TList* outputList = (TList*)entry.obj;
outputList->SetOwner(false);

// if registry should live in dedicated folder a TNamed object is appended to the list
if (outputList->Last()->IsA() == TNamed::Class()) {
delete outputList->Last();
outputList->RemoveLast();
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
}

writeListToFile(outputList, currentDir);
outputList->SetOwner();
delete outputList;
entry.obj = nullptr;
}
}
for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) {
if (f[i] != nullptr) {
f[i]->Close();
}
}
LOG(INFO) << "All outputs merged in their respective target files";
context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
};

callbacks.set(CallbackService::Id::EndOfStream, endofdatacb);
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
auto const& ref = pc.inputs().get("y");
if (!ref.header) {
LOG(ERROR) << "Header not found";
return;
}
if (!ref.payload) {
LOG(ERROR) << "Payload not found";
return;
}
auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
if (!datah) {
LOG(ERROR) << "No data header in stack";
return;
}

auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
if (!objh) {
LOG(ERROR) << "No output object header in stack";
return;
}

FairTMessage tm(const_cast<char*>(ref.payload), static_cast<int>(datah->payloadSize));
InputObject obj;
obj.kind = tm.GetClass();
if (obj.kind == nullptr) {
LOG(error) << "Cannot read class info from buffer.";
return;
}

auto policy = objh->mPolicy;
auto hash = objh->mTaskHash;

obj.obj = tm.ReadObjectAny(obj.kind);
TNamed* named = static_cast<TNamed*>(obj.obj);
obj.name = named->GetName();

auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.first == hash; });
if (hpos == tskmap.end()) {
LOG(ERROR) << "No task found for hash " << hash;
return;
}
auto taskname = hpos->second;
auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.first == hash; });
if (opos == objmap.end()) {
LOG(ERROR) << "No object list found for task " << taskname << " (hash=" << hash << ")";
return;
}
auto objects = opos->second;
if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
LOG(ERROR) << "No object " << obj.name << " in map for task " << taskname;
return;
}
auto nameHash = compile_time_hash(obj.name.c_str());
InputObjectRoute key{obj.name, nameHash, taskname, hash, policy};
auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
if (existing == inputObjects->end()) {
inputObjects->push_back(std::make_pair(key, obj));
return;
}
auto merger = existing->second.kind->GetMerge();
if (!merger) {
LOG(ERROR) << "Already one unmergeable object found for " << obj.name;
return;
}

TList coll;
coll.Add(static_cast<TObject*>(obj.obj));
merger(existing->second.obj, &coll, nullptr);
};
};

DataProcessorSpec spec{
"internal-dpl-global-analysis-file-sink",
{InputSpec("y", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"HIST"}))},
Outputs{},
AlgorithmSpec(writerFunction),
{}};

return spec;
}

DataProcessorSpec CommonDataProcessors::getOutputObjSink(outputObjects const& objmap, outputTasks const& tskmap)
{
auto writerFunction = [objmap, tskmap](InitContext& ic) -> std::function<void(ProcessingContext&)> {
auto& callbacks = ic.services().get<CallbackService>();
auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();

auto endofdatacb = [inputObjects](EndOfStreamContext& context) {
LOG(DEBUG) << "Writing merged objects to file";
if (inputObjects->empty()) {
LOG(ERROR) << "Output object map is empty!";
context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
return;
}
std::string currentDirectory = "";
std::string currentFile = "";
TFile* f[OutputObjHandlingPolicy::numPolicies];
for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) {
f[i] = nullptr;
}
for (auto& [route, entry] : *inputObjects) {
auto file = ROOTfileNames.find(route.policy);
if (file != ROOTfileNames.end()) {
auto filename = file->second;
if (f[route.policy] == nullptr) {
f[route.policy] = TFile::Open(filename.c_str(), "RECREATE");
}
auto nextDirectory = route.directory;
if ((nextDirectory != currentDirectory) || (filename != currentFile)) {
if (!f[route.policy]->FindKey(nextDirectory.c_str())) {
f[route.policy]->mkdir(nextDirectory.c_str());
TNamed* named = static_cast<TNamed*>(entry.obj);
if (named->InheritsFrom(TList::Class())) {
TList* outputList = (TList*)entry.obj;
outputList->SetOwner(false);

// if registry should live in dedicated folder a TNamed object is appended to the list
if (outputList->Last()->IsA() == TNamed::Class()) {
delete outputList->Last();
outputList->RemoveLast();
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
}
currentDirectory = nextDirectory;
currentFile = filename;

writeListToFile(outputList, currentDir);
outputList->SetOwner();
delete outputList;
entry.obj = nullptr;
} else {
currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
}
(f[route.policy]->GetDirectory(currentDirectory.c_str()))->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
}
}
for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) {
Expand Down
32 changes: 8 additions & 24 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext

std::vector<InputSpec> requestedCCDBs;
std::vector<OutputSpec> providedCCDBs;
std::vector<OutputSpec> providedOutputObj;
std::vector<OutputSpec> providedHist;
std::vector<OutputSpec> providedOutputObjHist;

outputTasks outTskMap;
outputObjects outObjMap;
outputObjects outHistMap;
outputObjects outObjHistMap;

for (size_t wi = 0; wi < workflow.size(); ++wi) {
auto& processor = workflow[wi];
Expand Down Expand Up @@ -337,18 +335,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"RN2"})) {
providedAODs.emplace_back(output);
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"ATSK"})) {
providedOutputObj.emplace_back(output);
auto it = std::find_if(outObjMap.begin(), outObjMap.end(), [&](auto&& x) { return x.first == hash; });
if (it == outObjMap.end()) {
outObjMap.push_back({hash, {output.binding.value}});
} else {
it->second.push_back(output.binding.value);
}
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"HIST"})) {
providedHist.emplace_back(output);
auto it = std::find_if(outHistMap.begin(), outHistMap.end(), [&](auto&& x) { return x.first == hash; });
if (it == outHistMap.end()) {
outHistMap.push_back({hash, {output.binding.value}});
providedOutputObjHist.emplace_back(output);
auto it = std::find_if(outObjHistMap.begin(), outObjHistMap.end(), [&](auto&& x) { return x.first == hash; });
if (it == outObjHistMap.end()) {
outObjHistMap.push_back({hash, {output.binding.value}});
} else {
it->second.push_back(output.binding.value);
}
Expand Down Expand Up @@ -422,14 +412,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext

// This is to inject a file sink so that any dangling ATSK object is written
// to a ROOT file.
if (providedOutputObj.empty() == false) {
auto rootSink = CommonDataProcessors::getOutputObjSink(outObjMap, outTskMap);
extraSpecs.push_back(rootSink);
}
// This is to inject a file sink so that any dangling HIST object is written
// to a ROOT file.
if (providedHist.empty() == false) {
auto rootSink = CommonDataProcessors::getHistogramRegistrySink(outHistMap, outTskMap);
if (providedOutputObjHist.empty() == false) {
auto rootSink = CommonDataProcessors::getOutputObjHistSink(outObjHistMap, outTskMap);
extraSpecs.push_back(rootSink);
}

Expand Down