Skip to content

Commit 298e880

Browse files
authored
Merged hist and output obj sinks (#4730)
1 parent eed9106 commit 298e880

5 files changed

Lines changed: 43 additions & 172 deletions

File tree

Analysis/Tutorials/src/histogramRegistry.cxx

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,23 @@ struct DTask {
199199
}
200200
};
201201

202+
struct ETask {
203+
OutputObj<TH1F> phiH{TH1F("phi", "phi", 100, 0., 2. * M_PI)};
204+
OutputObj<TH1F> etaH{TH1F("eta", "eta", 102, -2.01, 2.01)};
205+
206+
void process(aod::Tracks const& tracks)
207+
{
208+
for (auto& track : tracks) {
209+
phiH->Fill(track.phi());
210+
etaH->Fill(track.eta());
211+
}
212+
}
213+
};
214+
202215
WorkflowSpec defineDataProcessing(ConfigContext const&)
203216
{
204217
return WorkflowSpec{
218+
adaptAnalysisTask<ETask>("output-obj-test"),
205219
adaptAnalysisTask<ATask>("eta-and-phi-histograms"),
206220
adaptAnalysisTask<BTask>("filtered-histograms"),
207221
adaptAnalysisTask<CTask>("dimension-test"),

Framework/Core/include/Framework/CommonDataProcessors.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,9 @@ using outputObjects = std::vector<std::pair<uint32_t, std::vector<std::string>>>
2424

2525
/// Helpers to create a few general data processors
2626
struct CommonDataProcessors {
27-
/// Match all inputs of kind HIST and write them to a ROOT file,
28-
/// one root file per originating task.
29-
static DataProcessorSpec getHistogramRegistrySink(outputObjects const& objmap, const outputTasks& tskmap);
3027
/// Match all inputs of kind ATSK and write them to a ROOT file,
3128
/// one root file per originating task.
32-
static DataProcessorSpec getOutputObjSink(outputObjects const& objmap, const outputTasks& tskmap);
29+
static DataProcessorSpec getOutputObjHistSink(outputObjects const& objmap, const outputTasks& tskmap);
3330
/// Given the list of @a danglingInputs @return a DataProcessor which does
3431
/// a binary dump for all the dangling inputs matching the Timeframe
3532
/// lifetime. @a unmatched will be filled with all the InputSpecs which are

Framework/Core/include/Framework/HistogramRegistry.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ class HistogramRegistry
538538
/// @return the associated OutputSpec
539539
OutputSpec const spec()
540540
{
541-
ConcreteDataMatcher matcher{"HIST", "\0", 0};
541+
ConcreteDataMatcher matcher{"ATSK", "\0", 0};
542542
strncpy(matcher.description.str, mName.data(), 16);
543543
return OutputSpec{OutputLabel{mName}, matcher};
544544
}

Framework/Core/src/CommonDataProcessors.cxx

Lines changed: 19 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ const static std::unordered_map<OutputObjHandlingPolicy, std::string> ROOTfileNa
7272
{OutputObjHandlingPolicy::QAObject, "QAResults.root"}};
7373

7474
// =============================================================================
75-
DataProcessorSpec CommonDataProcessors::getHistogramRegistrySink(outputObjects const& objmap, const outputTasks& tskmap)
75+
DataProcessorSpec CommonDataProcessors::getOutputObjHistSink(outputObjects const& objmap, outputTasks const& tskmap)
7676
{
7777
auto writerFunction = [objmap, tskmap](InitContext& ic) -> std::function<void(ProcessingContext&)> {
7878
auto& callbacks = ic.services().get<CallbackService>();
7979
auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
8080

8181
auto endofdatacb = [inputObjects](EndOfStreamContext& context) {
82-
LOG(DEBUG) << "Writing merged histograms to file";
82+
LOG(DEBUG) << "Writing merged objects and histograms to file";
8383
if (inputObjects->empty()) {
8484
LOG(ERROR) << "Output object map is empty!";
8585
context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
@@ -123,149 +123,25 @@ DataProcessorSpec CommonDataProcessors::getHistogramRegistrySink(outputObjects c
123123
};
124124

125125
TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
126-
TList* outputList = (TList*)entry.obj;
127-
outputList->SetOwner(false);
128-
129-
// if registry should live in dedicated folder a TNamed object is appended to the list
130-
if (outputList->Last()->IsA() == TNamed::Class()) {
131-
delete outputList->Last();
132-
outputList->RemoveLast();
133-
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
134-
}
135-
136-
writeListToFile(outputList, currentDir);
137-
outputList->SetOwner();
138-
delete outputList;
139-
entry.obj = nullptr;
140-
}
141-
}
142-
for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) {
143-
if (f[i] != nullptr) {
144-
f[i]->Close();
145-
}
146-
}
147-
LOG(INFO) << "All outputs merged in their respective target files";
148-
context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
149-
};
150-
151-
callbacks.set(CallbackService::Id::EndOfStream, endofdatacb);
152-
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
153-
auto const& ref = pc.inputs().get("y");
154-
if (!ref.header) {
155-
LOG(ERROR) << "Header not found";
156-
return;
157-
}
158-
if (!ref.payload) {
159-
LOG(ERROR) << "Payload not found";
160-
return;
161-
}
162-
auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
163-
if (!datah) {
164-
LOG(ERROR) << "No data header in stack";
165-
return;
166-
}
167-
168-
auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
169-
if (!objh) {
170-
LOG(ERROR) << "No output object header in stack";
171-
return;
172-
}
173-
174-
FairTMessage tm(const_cast<char*>(ref.payload), static_cast<int>(datah->payloadSize));
175-
InputObject obj;
176-
obj.kind = tm.GetClass();
177-
if (obj.kind == nullptr) {
178-
LOG(error) << "Cannot read class info from buffer.";
179-
return;
180-
}
181-
182-
auto policy = objh->mPolicy;
183-
auto hash = objh->mTaskHash;
184-
185-
obj.obj = tm.ReadObjectAny(obj.kind);
186-
TNamed* named = static_cast<TNamed*>(obj.obj);
187-
obj.name = named->GetName();
188-
189-
auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.first == hash; });
190-
if (hpos == tskmap.end()) {
191-
LOG(ERROR) << "No task found for hash " << hash;
192-
return;
193-
}
194-
auto taskname = hpos->second;
195-
auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.first == hash; });
196-
if (opos == objmap.end()) {
197-
LOG(ERROR) << "No object list found for task " << taskname << " (hash=" << hash << ")";
198-
return;
199-
}
200-
auto objects = opos->second;
201-
if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
202-
LOG(ERROR) << "No object " << obj.name << " in map for task " << taskname;
203-
return;
204-
}
205-
auto nameHash = compile_time_hash(obj.name.c_str());
206-
InputObjectRoute key{obj.name, nameHash, taskname, hash, policy};
207-
auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
208-
if (existing == inputObjects->end()) {
209-
inputObjects->push_back(std::make_pair(key, obj));
210-
return;
211-
}
212-
auto merger = existing->second.kind->GetMerge();
213-
if (!merger) {
214-
LOG(ERROR) << "Already one unmergeable object found for " << obj.name;
215-
return;
216-
}
217-
218-
TList coll;
219-
coll.Add(static_cast<TObject*>(obj.obj));
220-
merger(existing->second.obj, &coll, nullptr);
221-
};
222-
};
223-
224-
DataProcessorSpec spec{
225-
"internal-dpl-global-analysis-file-sink",
226-
{InputSpec("y", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"HIST"}))},
227-
Outputs{},
228-
AlgorithmSpec(writerFunction),
229-
{}};
230-
231-
return spec;
232-
}
233-
234-
DataProcessorSpec CommonDataProcessors::getOutputObjSink(outputObjects const& objmap, outputTasks const& tskmap)
235-
{
236-
auto writerFunction = [objmap, tskmap](InitContext& ic) -> std::function<void(ProcessingContext&)> {
237-
auto& callbacks = ic.services().get<CallbackService>();
238-
auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
239-
240-
auto endofdatacb = [inputObjects](EndOfStreamContext& context) {
241-
LOG(DEBUG) << "Writing merged objects to file";
242-
if (inputObjects->empty()) {
243-
LOG(ERROR) << "Output object map is empty!";
244-
context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
245-
return;
246-
}
247-
std::string currentDirectory = "";
248-
std::string currentFile = "";
249-
TFile* f[OutputObjHandlingPolicy::numPolicies];
250-
for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) {
251-
f[i] = nullptr;
252-
}
253-
for (auto& [route, entry] : *inputObjects) {
254-
auto file = ROOTfileNames.find(route.policy);
255-
if (file != ROOTfileNames.end()) {
256-
auto filename = file->second;
257-
if (f[route.policy] == nullptr) {
258-
f[route.policy] = TFile::Open(filename.c_str(), "RECREATE");
259-
}
260-
auto nextDirectory = route.directory;
261-
if ((nextDirectory != currentDirectory) || (filename != currentFile)) {
262-
if (!f[route.policy]->FindKey(nextDirectory.c_str())) {
263-
f[route.policy]->mkdir(nextDirectory.c_str());
126+
TNamed* named = static_cast<TNamed*>(entry.obj);
127+
if (named->InheritsFrom(TList::Class())) {
128+
TList* outputList = (TList*)entry.obj;
129+
outputList->SetOwner(false);
130+
131+
// if registry should live in dedicated folder a TNamed object is appended to the list
132+
if (outputList->Last()->IsA() == TNamed::Class()) {
133+
delete outputList->Last();
134+
outputList->RemoveLast();
135+
currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
264136
}
265-
currentDirectory = nextDirectory;
266-
currentFile = filename;
137+
138+
writeListToFile(outputList, currentDir);
139+
outputList->SetOwner();
140+
delete outputList;
141+
entry.obj = nullptr;
142+
} else {
143+
currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
267144
}
268-
(f[route.policy]->GetDirectory(currentDirectory.c_str()))->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
269145
}
270146
}
271147
for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) {

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
267267

268268
std::vector<InputSpec> requestedCCDBs;
269269
std::vector<OutputSpec> providedCCDBs;
270-
std::vector<OutputSpec> providedOutputObj;
271-
std::vector<OutputSpec> providedHist;
270+
std::vector<OutputSpec> providedOutputObjHist;
272271

273272
outputTasks outTskMap;
274-
outputObjects outObjMap;
275-
outputObjects outHistMap;
273+
outputObjects outObjHistMap;
276274

277275
for (size_t wi = 0; wi < workflow.size(); ++wi) {
278276
auto& processor = workflow[wi];
@@ -347,18 +345,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
347345
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"RN2"})) {
348346
providedAODs.emplace_back(output);
349347
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"ATSK"})) {
350-
providedOutputObj.emplace_back(output);
351-
auto it = std::find_if(outObjMap.begin(), outObjMap.end(), [&](auto&& x) { return x.first == hash; });
352-
if (it == outObjMap.end()) {
353-
outObjMap.push_back({hash, {output.binding.value}});
354-
} else {
355-
it->second.push_back(output.binding.value);
356-
}
357-
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"HIST"})) {
358-
providedHist.emplace_back(output);
359-
auto it = std::find_if(outHistMap.begin(), outHistMap.end(), [&](auto&& x) { return x.first == hash; });
360-
if (it == outHistMap.end()) {
361-
outHistMap.push_back({hash, {output.binding.value}});
348+
providedOutputObjHist.emplace_back(output);
349+
auto it = std::find_if(outObjHistMap.begin(), outObjHistMap.end(), [&](auto&& x) { return x.first == hash; });
350+
if (it == outObjHistMap.end()) {
351+
outObjHistMap.push_back({hash, {output.binding.value}});
362352
} else {
363353
it->second.push_back(output.binding.value);
364354
}
@@ -432,14 +422,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
432422

433423
// This is to inject a file sink so that any dangling ATSK object is written
434424
// to a ROOT file.
435-
if (providedOutputObj.empty() == false) {
436-
auto rootSink = CommonDataProcessors::getOutputObjSink(outObjMap, outTskMap);
437-
extraSpecs.push_back(rootSink);
438-
}
439-
// This is to inject a file sink so that any dangling HIST object is written
440-
// to a ROOT file.
441-
if (providedHist.empty() == false) {
442-
auto rootSink = CommonDataProcessors::getHistogramRegistrySink(outHistMap, outTskMap);
425+
if (providedOutputObjHist.empty() == false) {
426+
auto rootSink = CommonDataProcessors::getOutputObjHistSink(outObjHistMap, outTskMap);
443427
extraSpecs.push_back(rootSink);
444428
}
445429

0 commit comments

Comments
 (0)