Skip to content

Commit 86f09d7

Browse files
committed
TPC shared cl. map producer device for TPC file-based input
TPC reco workflow will produce a shared cluster maps which is not saved in the clusters root file. For the workflows which need this map but reading TPC tracks and clusters from the root trees, this device will build the map on-the-fly
1 parent 1d9d408 commit 86f09d7

3 files changed

Lines changed: 195 additions & 0 deletions

File tree

Detectors/TPC/workflow/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ o2_add_library(TPCWorkflow
2424
src/TPCSectorCompletionPolicy.cxx
2525
src/ZSSpec.cxx
2626
src/CalibProcessingHelper.cxx
27+
src/ClusterSharingMapSpec.cxx
2728
TARGETVARNAME targetName
2829
PUBLIC_LINK_LIBRARIES O2::Framework O2::DataFormatsTPC
2930
O2::DPLUtils O2::TPCReconstruction
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
/// @file ClusterSharingMapSpec.h
12+
/// @brief Device to produce TPC clusters sharing map
13+
/// \author ruben.shahoyan@cern.ch
14+
15+
#ifndef O2_TPC_CLUSTERSHARINGMAP_SPEC
16+
#define O2_TPC_CLUSTERSHARINGMAP_SPEC
17+
18+
#include "Framework/DataProcessorSpec.h"
19+
#include "Framework/Task.h"
20+
21+
namespace o2
22+
{
23+
namespace tpc
24+
{
25+
26+
class ClusterSharingMapSpec : public o2::framework::Task
27+
{
28+
public:
29+
~ClusterSharingMapSpec() override = default;
30+
void run(framework::ProcessingContext& pc) final;
31+
};
32+
33+
o2::framework::DataProcessorSpec getClusterSharingMapSpec()
34+
{
35+
36+
std::vector<o2::framework::InputSpec> inputs;
37+
std::vector<o2::framework::OutputSpec> outputs;
38+
inputs.emplace_back("trackTPC", "TPC", "TRACKS", 0, o2::framework::Lifetime::Timeframe);
39+
inputs.emplace_back("trackTPCClRefs", "TPC", "CLUSREFS", 0, o2::framework::Lifetime::Timeframe);
40+
inputs.emplace_back("clusTPC", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}, o2::framework::Lifetime::Timeframe);
41+
outputs.emplace_back("TPC", "CLSHAREDMAP", 0, o2::framework::Lifetime::Timeframe);
42+
43+
return o2::framework::DataProcessorSpec{
44+
"tpc-clusters-sharing-map-producer",
45+
inputs,
46+
outputs,
47+
o2::framework::AlgorithmSpec{o2::framework::adaptFromTask<ClusterSharingMapSpec>()},
48+
o2::framework::Options{}};
49+
}
50+
51+
} // namespace tpc
52+
} // namespace o2
53+
54+
#endif
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
/// @file ClusterSharingMapSpec.cxx
12+
/// @brief Device to produce TPC clusters sharing map
13+
/// \author ruben.shahoyan@cern.ch
14+
15+
#include <gsl/span>
16+
#include <TStopwatch.h>
17+
#include <vector>
18+
#include "Framework/InputRecordWalker.h"
19+
#include "DataFormatsTPC/ClusterNativeHelper.h"
20+
#include "DataFormatsTPC/TrackTPC.h"
21+
#include "DataFormatsTPC/TPCSectorHeader.h"
22+
#include "GPUO2InterfaceRefit.h"
23+
#include "TPCWorkflow/ClusterSharingMapSpec.h"
24+
25+
using namespace o2::framework;
26+
using namespace o2::tpc;
27+
28+
void ClusterSharingMapSpec::run(ProcessingContext& pc)
29+
{
30+
TStopwatch timer;
31+
32+
const auto tracksTPC = pc.inputs().get<gsl::span<o2::tpc::TrackTPC>>("trackTPC");
33+
const auto tracksTPCClRefs = pc.inputs().get<gsl::span<o2::tpc::TPCClRefElem>>("trackTPCClRefs");
34+
35+
//---------------------------->> TPC Clusters loading >>------------------------------------------
36+
// FIXME: this is a copy-paste from TPCITSMatchingSpec version of TPC cluster reading, later to be organized in a better way
37+
int operation = 0;
38+
uint64_t activeSectors = 0;
39+
std::bitset<o2::tpc::constants::MAXSECTOR> validSectors = 0;
40+
std::map<int, DataRef> datarefs;
41+
std::vector<InputSpec> filter = {
42+
{"check", ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}, Lifetime::Timeframe},
43+
};
44+
for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
45+
auto const* sectorHeader = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
46+
if (sectorHeader == nullptr) {
47+
// FIXME: think about error policy
48+
LOG(ERROR) << "sector header missing on header stack";
49+
throw std::runtime_error("sector header missing on header stack");
50+
}
51+
int sector = sectorHeader->sector();
52+
std::bitset<o2::tpc::constants::MAXSECTOR> sectorMask(sectorHeader->sectorBits);
53+
LOG(INFO) << "Reading TPC cluster data, sector mask is " << sectorMask;
54+
if ((validSectors & sectorMask).any()) {
55+
// have already data for this sector, this should not happen in the current
56+
// sequential implementation, for parallel path merged at the tracker stage
57+
// multiple buffers need to be handled
58+
throw std::runtime_error("can only have one data set per sector");
59+
}
60+
activeSectors |= sectorHeader->activeSectors;
61+
validSectors |= sectorMask;
62+
datarefs[sector] = ref;
63+
}
64+
65+
auto printInputLog = [&validSectors, &activeSectors](auto& r, const char* comment, auto& s) {
66+
LOG(INFO) << comment << " " << *(r.spec) << ", size " << DataRefUtils::getPayloadSize(r) //
67+
<< " for sector " << s //
68+
<< std::endl //
69+
<< " input status: " << validSectors //
70+
<< std::endl //
71+
<< " active sectors: " << std::bitset<o2::tpc::constants::MAXSECTOR>(activeSectors);
72+
};
73+
74+
if (activeSectors == 0 || (activeSectors & validSectors.to_ulong()) != activeSectors) {
75+
// not all sectors available
76+
// Since we expect complete input, this should not happen (why does the bufferization considered for TPC CA tracker? Ask Matthias)
77+
throw std::runtime_error("Did not receive TPC clusters data for all sectors");
78+
}
79+
//------------------------------------------------------------------------------
80+
std::vector<gsl::span<const char>> clustersTPC;
81+
82+
for (auto const& refentry : datarefs) {
83+
auto& sector = refentry.first;
84+
auto& ref = refentry.second;
85+
clustersTPC.emplace_back(ref.payload, DataRefUtils::getPayloadSize(ref));
86+
printInputLog(ref, "received", sector);
87+
}
88+
89+
// Just print TPC clusters status
90+
{
91+
// make human readable information from the bitfield
92+
std::string bitInfo;
93+
auto nActiveBits = validSectors.count();
94+
if (((uint64_t)0x1 << nActiveBits) == validSectors.to_ulong() + 1) {
95+
// sectors 0 to some upper bound are active
96+
bitInfo = "0-" + std::to_string(nActiveBits - 1);
97+
} else {
98+
int rangeStart = -1;
99+
int rangeEnd = -1;
100+
for (size_t sector = 0; sector < validSectors.size(); sector++) {
101+
if (validSectors.test(sector)) {
102+
if (rangeStart < 0) {
103+
if (rangeEnd >= 0) {
104+
bitInfo += ",";
105+
}
106+
bitInfo += std::to_string(sector);
107+
if (nActiveBits == 1) {
108+
break;
109+
}
110+
rangeStart = sector;
111+
}
112+
rangeEnd = sector;
113+
} else {
114+
if (rangeStart >= 0 && rangeEnd > rangeStart) {
115+
bitInfo += "-" + std::to_string(rangeEnd);
116+
}
117+
rangeStart = -1;
118+
}
119+
}
120+
if (rangeStart >= 0 && rangeEnd > rangeStart) {
121+
bitInfo += "-" + std::to_string(rangeEnd);
122+
}
123+
}
124+
LOG(INFO) << "running matching for sector(s) " << bitInfo;
125+
}
126+
127+
o2::tpc::ClusterNativeAccess clusterIndex;
128+
std::unique_ptr<o2::tpc::ClusterNative[]> clusterBuffer;
129+
memset(&clusterIndex, 0, sizeof(clusterIndex));
130+
o2::tpc::ClusterNativeHelper::ConstMCLabelContainerViewWithBuffer dummyMCOutput;
131+
std::vector<o2::tpc::ClusterNativeHelper::ConstMCLabelContainerView> dummyMCInput;
132+
o2::tpc::ClusterNativeHelper::Reader::fillIndex(clusterIndex, clusterBuffer, dummyMCOutput, clustersTPC, dummyMCInput);
133+
//----------------------------<< TPC Clusters loading <<------------------------------------------
134+
135+
auto& bufVec = pc.outputs().make<std::vector<unsigned char>>(Output{o2::header::gDataOriginTPC, "CLSHAREDMAP", 0}, clusterIndex.nClustersTotal);
136+
o2::gpu::GPUTPCO2InterfaceRefit::fillSharedClustersMap(&clusterIndex, tracksTPC, tracksTPCClRefs.data(), bufVec.data());
137+
138+
timer.Stop();
139+
LOGF(INFO, "Timing for TPC clusters sharing map creation: Cpu: %.3e Real: %.3e s", timer.CpuTime(), timer.RealTime());
140+
}

0 commit comments

Comments
 (0)