Skip to content

Commit 4e21fa5

Browse files
matthiasrichterktf
authored andcommitted
Simplifying creation of writers in the TPC reco workflow
- removing remnants of custom end-of-stream handling The checkReady callback is not necessary anymore to steer termination of the workflow, eos is handled inside DPL - MC label branches are optionally disabled at runtime using a new feature of the RootTreeWriter. The definition of branches can now be done completely at compile time, separate code branches for the different can be avoided
1 parent bb93f8a commit 4e21fa5

1 file changed

Lines changed: 26 additions & 56 deletions

File tree

Detectors/TPC/workflow/src/RecoWorkflow.cxx

Lines changed: 26 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
#include <unordered_map>
4242
#include <stdexcept>
4343
#include <algorithm> // std::find
44-
#include <tuple> // make_tuple
4544

4645
namespace o2
4746
{
@@ -272,32 +271,13 @@ framework::WorkflowSpec getWorkflow(std::vector<int> const& tpcSectors, std::vec
272271
return base + "_" + std::to_string(tpcSectors.at(index));
273272
};
274273

275-
// check if the process is ready to quit
276-
// this is decided upon the meta information in the TPC sector header, the operation is set as
277-
// a negative number in the sector member, -2 indicates no-operation, -1 indicates end-of-data
278-
// see also PublisherSpec.cxx
279-
// in this workflow, the EOD is sent after the last real data, and all inputs will receive EOD,
280-
// so it is enough to check on the first occurence
281-
// FIXME: this will be changed once DPL can propagate control events like EOD
282-
auto checkReady = [](o2::framework::DataRef const& ref) {
283-
auto const* tpcSectorHeader = o2::framework::DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
284-
// sector number -1 indicates end-of-data
285-
if (tpcSectorHeader != nullptr) {
286-
if (tpcSectorHeader->sector == -1) {
287-
// indicate normal processing if not ready and skip if ready
288-
return std::make_tuple(MakeRootTreeWriterSpec::TerminationCondition::Action::SkipProcessing, true);
289-
}
290-
}
291-
return std::make_tuple(MakeRootTreeWriterSpec::TerminationCondition::Action::DoProcessing, false);
292-
};
293-
294274
// -------------------------------------------------------------------------------------------
295275
// helper to create writer specs for different types of output
296-
auto makeWriterSpec = [tpcSectors, laneConfiguration, propagateMC, getIndex, getName, checkReady](const char* processName,
297-
const char* defaultFileName,
298-
const char* defaultTreeName,
299-
auto&& databranch,
300-
auto&& mcbranch) {
276+
auto makeWriterSpec = [tpcSectors, laneConfiguration, propagateMC, getIndex, getName](const char* processName,
277+
const char* defaultFileName,
278+
const char* defaultTreeName,
279+
auto&& databranch,
280+
auto&& mcbranch) {
301281
if (tpcSectors.size() == 0) {
302282
throw std::invalid_argument(std::string("writer process configuration needs list of TPC sectors"));
303283
}
@@ -306,25 +286,18 @@ framework::WorkflowSpec getWorkflow(std::vector<int> const& tpcSectors, std::vec
306286
input.binding += std::to_string(laneConfiguration[index]);
307287
DataSpecUtils::updateMatchingSubspec(input, laneConfiguration[index]);
308288
};
309-
auto amendBranchDef = [laneConfiguration, propagateMC, amendInput, tpcSectors, getIndex, getName](auto&& def) {
289+
auto amendBranchDef = [laneConfiguration, amendInput, tpcSectors, getIndex, getName](auto&& def, bool enable = true) {
310290
def.keys = mergeInputs(def.keys, laneConfiguration.size(), amendInput);
311-
def.nofBranches = tpcSectors.size();
291+
// the branch is disabled if set to 0
292+
def.nofBranches = enable ? tpcSectors.size() : 0;
312293
def.getIndex = getIndex;
313294
def.getName = getName;
314295
return std::move(def);
315296
};
316297

317-
// depending on the MC propagation flag, the RootTreeWriter spec is created with two
318-
// or one branch definition
319-
if (propagateMC) {
320-
return std::move(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName,
321-
MakeRootTreeWriterSpec::TerminationCondition{checkReady},
322-
std::move(amendBranchDef(databranch)),
323-
std::move(amendBranchDef(mcbranch)))());
324-
}
325298
return std::move(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName,
326-
MakeRootTreeWriterSpec::TerminationCondition{checkReady},
327-
std::move(amendBranchDef(databranch)))());
299+
std::move(amendBranchDef(databranch)),
300+
std::move(amendBranchDef(mcbranch, propagateMC)))());
328301
};
329302

330303
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -428,27 +401,26 @@ framework::WorkflowSpec getWorkflow(std::vector<int> const& tpcSectors, std::vec
428401
using ClusRefsOutputType = std::vector<o2::tpc::TPCClRefElem>;
429402

430403
using MCLabelContainer = o2::dataformats::MCTruthContainer<o2::MCCompLabel>;
404+
// a spectator callback which will be invoked by the tree writer with the extracted object
405+
// we are using it for printing a log message
406+
auto logger = BranchDefinition<TrackOutputType>::Spectator([](TrackOutputType const& tracks) {
407+
LOG(INFO) << "writing " << tracks.size() << " track(s)";
408+
});
431409
auto tracksdef = BranchDefinition<TrackOutputType>{InputSpec{"inputTracks", "TPC", "TRACKS"}, //
432-
"TPCTracks", "track-branch-name"}; //
410+
"TPCTracks", "track-branch-name", //
411+
1, //
412+
logger}; //
433413
auto clrefdef = BranchDefinition<ClusRefsOutputType>{InputSpec{"inputClusRef", "TPC", "CLUSREFS"}, //
434414
"ClusRefs", "trackclusref-branch-name"}; //
435415
auto mcdef = BranchDefinition<MCLabelContainer>{InputSpec{"mcinput", "TPC", "TRACKSMCLBL"}, //
436-
"TPCTracksMCTruth", "trackmc-branch-name"}; //
437-
438-
// depending on the MC propagation flag, the RootTreeWriter spec is created with 3 or 2
439-
// branch definition
440-
if (propagateMC) {
441-
specs.push_back(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName, //
442-
MakeRootTreeWriterSpec::TerminationPolicy::Process, //
443-
MakeRootTreeWriterSpec::TerminationCondition{checkReady}, //
444-
std::move(tracksdef), std::move(clrefdef), //
445-
std::move(mcdef))()); //
446-
} else { //
447-
specs.push_back(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName, //
448-
MakeRootTreeWriterSpec::TerminationPolicy::Process, //
449-
MakeRootTreeWriterSpec::TerminationCondition{checkReady}, //
450-
std::move(tracksdef), std::move(clrefdef))()); //
451-
}
416+
"TPCTracksMCTruth", //
417+
(propagateMC ? 1 : 0), //
418+
"trackmc-branch-name"}; //
419+
420+
// depending on the MC propagation flag, branch definition for MC labels is disabled
421+
specs.push_back(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName,
422+
std::move(tracksdef), std::move(clrefdef),
423+
std::move(mcdef))());
452424
}
453425

454426
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -470,8 +442,6 @@ framework::WorkflowSpec getWorkflow(std::vector<int> const& tpcSectors, std::vec
470442
"TPCCompClusters_0", "compcluster-branch-name"}; //
471443

472444
specs.push_back(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName, //
473-
MakeRootTreeWriterSpec::TerminationPolicy::Process, //
474-
MakeRootTreeWriterSpec::TerminationCondition{checkReady}, //
475445
std::move(ccldef))()); //
476446
}
477447

0 commit comments

Comments
 (0)