Skip to content

Commit 4a04ec6

Browse files
committed
TPC Workflow: Fix check that data is received for all sectors exactly once + cleanup
1 parent b16f8b1 commit 4a04ec6

3 files changed

Lines changed: 75 additions & 144 deletions

File tree

Detectors/TPC/workflow/src/CATrackerSpec.cxx

Lines changed: 71 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -86,23 +86,25 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
8686
constexpr static size_t NEndpoints = 20; //TODO: get from mapper?
8787
using ClusterGroupParser = o2::algorithm::ForwardParser<ClusterGroupHeader>;
8888
struct ProcessAttributes {
89-
std::bitset<NSectors> validInputs = 0;
90-
std::bitset<NSectors> validMcInputs = 0;
9189
std::unique_ptr<ClusterGroupParser> parser;
9290
std::unique_ptr<GPUCATracking> tracker;
9391
std::unique_ptr<GPUDisplayBackend> displayBackend;
9492
std::unique_ptr<TPCFastTransform> fastTransform;
9593
std::unique_ptr<TPCdEdxCalibrationSplines> dEdxSplines;
9694
std::unique_ptr<TPCCFCalibration> tpcCalibration;
97-
int verbosity = 1;
9895
std::vector<int> clusterOutputIds;
96+
unsigned long outputBufferSize = 0;
97+
unsigned long tpcSectorMask = 0;
98+
int verbosity = 1;
9999
bool readyToQuit = false;
100100
bool allocateOutputOnTheFly = false;
101-
unsigned long outputBufferSize = 0;
102101
bool suppressOutput = false;
103102
};
104103

105104
auto processAttributes = std::make_shared<ProcessAttributes>();
105+
for (auto s : tpcsectors) {
106+
processAttributes->tpcSectorMask |= (1ul << s);
107+
}
106108
auto initFunction = [processAttributes, specconfig](InitContext& ic) {
107109
GPUO2InterfaceConfiguration config;
108110
GPUSettingsO2 confParam;
@@ -224,8 +226,6 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
224226
if (tracker->initialize(config) != 0) {
225227
throw std::invalid_argument("GPUCATracking initialization failed");
226228
}
227-
processAttributes->validInputs.reset();
228-
processAttributes->validMcInputs.reset();
229229
}
230230

231231
auto& callbacks = ic.services().get<CallbackService>();
@@ -260,10 +260,8 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
260260
}
261261
auto& parser = processAttributes->parser;
262262
auto& tracker = processAttributes->tracker;
263-
uint64_t activeSectors = 0;
264263
auto& verbosity = processAttributes->verbosity;
265264
// FIXME cleanup almost duplicated code
266-
auto& validMcInputs = processAttributes->validMcInputs;
267265
std::vector<ConstMCLabelContainerView> mcInputs;
268266
std::vector<gsl::span<const char>> inputs;
269267
struct InputRef {
@@ -293,6 +291,7 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
293291
{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITSMCTR"}, Lifetime::Timeframe},
294292
{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLNATIVEMCLBL"}, Lifetime::Timeframe},
295293
};
294+
unsigned long recvMask = 0;
296295
for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
297296
auto const* sectorHeader = DataRefUtils::getHeader<TPCSectorHeader*>(ref);
298297
if (sectorHeader == nullptr) {
@@ -304,81 +303,80 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
304303
if (sector < 0) {
305304
continue;
306305
}
307-
std::bitset<NSectors> sectorMask(sectorHeader->sectorBits);
308-
if ((validMcInputs & sectorMask).any()) {
309-
// have already data for this sector, this should not happen in the current
310-
// sequential implementation, for parallel path merged at the tracker stage
311-
// multiple buffers need to be handled
306+
if (recvMask & sectorHeader->sectorBits) {
312307
throw std::runtime_error("can only have one MC data set per sector");
313308
}
309+
recvMask |= sectorHeader->sectorBits;
314310
inputrefs[sector].labels = ref;
315311
if (specconfig.caClusterer) {
316312
inputDigitsMCIndex[sector] = inputDigitsMC.size();
317313
inputDigitsMC.emplace_back(ConstMCLabelContainerView(pc.inputs().get<gsl::span<char>>(ref)));
318314
}
319-
validMcInputs |= sectorMask;
320-
activeSectors |= sectorHeader->activeSectors;
321-
if (verbosity > 1) {
322-
LOG(INFO) << "received " << *(ref.spec) << " MC label containers"
323-
<< " for sectors " << sectorMask //
324-
<< std::endl //
325-
<< " mc input status: " << validMcInputs //
326-
<< std::endl //
327-
<< " active sectors: " << std::bitset<NSectors>(activeSectors); //
328-
}
329315
}
330-
for (unsigned int i = 0; i < NSectors; i++) {
331-
inputDigitsMCPtrs[i] = &inputDigitsMC[inputDigitsMCIndex[i]];
316+
if (recvMask != processAttributes->tpcSectorMask) {
317+
throw std::runtime_error("Incomplete set of MC labels received");
318+
}
319+
if (specconfig.caClusterer) {
320+
for (unsigned int i = 0; i < NSectors; i++) {
321+
LOG(INFO) << "GOT MC LABELS FOR SECTOR " << i << " -> " << inputDigitsMC[inputDigitsMCIndex[i]].getNElements();
322+
inputDigitsMCPtrs[i] = &inputDigitsMC[inputDigitsMCIndex[i]];
323+
}
332324
}
333325
}
334326

335-
auto& validInputs = processAttributes->validInputs;
336-
int operation = 0;
337-
std::vector<InputSpec> filter = {
338-
{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITS"}, Lifetime::Timeframe},
339-
{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERNATIVE"}, Lifetime::Timeframe},
340-
};
341-
342-
for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
343-
auto const* sectorHeader = DataRefUtils::getHeader<TPCSectorHeader*>(ref);
344-
if (sectorHeader == nullptr) {
345-
throw std::runtime_error("sector header missing on header stack");
346-
}
347-
const int sector = sectorHeader->sector();
348-
if (sector < 0) {
349-
continue;
350-
}
351-
std::bitset<NSectors> sectorMask(sectorHeader->sectorBits);
352-
if ((validInputs & sectorMask).any()) {
353-
// have already data for this sector, this should not happen in the current
354-
// sequential implementation, for parallel path merged at the tracker stage
355-
// multiple buffers need to be handled
356-
throw std::runtime_error("can only have one cluster data set per sector");
327+
if (!specconfig.decompressTPC && (!specconfig.caClusterer || ((!specconfig.zsOnTheFly || specconfig.processMC) && !specconfig.zsDecoder))) {
328+
std::vector<InputSpec> filter = {
329+
{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITS"}, Lifetime::Timeframe},
330+
{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERNATIVE"}, Lifetime::Timeframe},
331+
};
332+
unsigned long recvMask = 0;
333+
for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
334+
auto const* sectorHeader = DataRefUtils::getHeader<TPCSectorHeader*>(ref);
335+
if (sectorHeader == nullptr) {
336+
throw std::runtime_error("sector header missing on header stack");
337+
}
338+
const int sector = sectorHeader->sector();
339+
if (sector < 0) {
340+
continue;
341+
}
342+
if (recvMask & sectorHeader->sectorBits) {
343+
throw std::runtime_error("can only have one cluster data set per sector");
344+
}
345+
recvMask |= sectorHeader->sectorBits;
346+
inputrefs[sector].data = ref;
347+
if (specconfig.caClusterer && (!specconfig.zsOnTheFly || specconfig.processMC)) {
348+
inputDigits[sector] = pc.inputs().get<gsl::span<o2::tpc::Digit>>(ref);
349+
LOG(INFO) << "GOT DIGITS SPAN FOR SECTOR " << sector << " -> " << inputDigits[sector].size();
350+
}
357351
}
358-
activeSectors |= sectorHeader->activeSectors;
359-
validInputs |= sectorMask;
360-
inputrefs[sector].data = ref;
361-
if (specconfig.caClusterer && (!specconfig.zsOnTheFly || specconfig.processMC)) {
362-
inputDigits[sector] = pc.inputs().get<gsl::span<o2::tpc::Digit>>(ref);
363-
LOG(INFO) << "GOT DIGITS SPAN FOR SECTOR " << sector << " -> " << inputDigits[sector].size();
352+
if (recvMask != processAttributes->tpcSectorMask) {
353+
throw std::runtime_error("Incomplete set of clusters/digits received");
364354
}
365355
}
356+
366357
if (specconfig.zsOnTheFly) {
367358
tpcZSonTheFlySizes = {0};
368359
// tpcZSonTheFlySizes: #zs pages per endpoint:
369360
std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "ZSSIZES"}, Lifetime::Timeframe}};
361+
bool recv = false, recvsizes = false;
370362
for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
363+
if (recvsizes) {
364+
throw std::runtime_error("Received multiple ZSSIZES data");
365+
}
371366
tpcZSonTheFlySizes = pc.inputs().get<std::array<unsigned int, NEndpoints * NSectors>>(ref);
367+
recvsizes = true;
372368
}
373369
// zs pages
374370
std::vector<InputSpec> filter2 = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "TPCZS"}, Lifetime::Timeframe}};
375371
for (auto const& ref : InputRecordWalker(pc.inputs(), filter2)) {
372+
if (recv) {
373+
throw std::runtime_error("Received multiple TPCZS data");
374+
}
376375
inputZS = pc.inputs().get<gsl::span<ZeroSuppressedContainer8kb>>(ref);
376+
recv = true;
377377
}
378-
//set all sectors as active and as valid inputs
379-
for (int s = 0; s < NSectors; s++) {
380-
activeSectors |= 1 << s;
381-
validInputs.set(s);
378+
if (!recv || !recvsizes) {
379+
throw std::runtime_error("TPC ZS data not received");
382380
}
383381
for (unsigned int i = 0; i < GPUTrackingInOutZS::NSLICES; i++) {
384382
for (unsigned int j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
@@ -482,78 +480,25 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
482480
pCompClustersFlat = pc.inputs().get<CompressedClustersFlat*>("input").get();
483481
}
484482
} else if (!specconfig.zsOnTheFly) {
485-
// This code path should run optionally also for the zs decoder version
486-
auto printInputLog = [&verbosity, &validInputs, &activeSectors](auto& r, const char* comment, auto& s) {
487-
if (verbosity > 1) {
488-
LOG(INFO) << comment << " " << *(r.spec) << ", size " << DataRefUtils::getPayloadSize(r) //
489-
<< " for sector " << s //
490-
<< std::endl //
491-
<< " input status: " << validInputs //
492-
<< std::endl //
493-
<< " active sectors: " << std::bitset<NSectors>(activeSectors); //
494-
}
495-
};
496-
// for digits and clusters we always have the sector information, activeSectors being zero
497-
// is thus an error condition. The completion policy makes sure that the data set is complete
498-
if (activeSectors == 0 || (activeSectors & validInputs.to_ulong()) != activeSectors) {
499-
throw std::runtime_error("Incomplete input data, expecting complete data set, buffering has been removed ");
500-
}
501-
// MC label blocks must be in the same multimessage with the corresponding data, the completion
502-
// policy does not check for the MC labels and expects them to be present and thus complete if
503-
// the data is complete
504-
if (specconfig.processMC && (activeSectors & validMcInputs.to_ulong()) != activeSectors) {
505-
throw std::runtime_error("Incomplete mc label input, expecting complete data set, buffering has been removed");
506-
}
507-
assert(specconfig.processMC == false || validMcInputs == validInputs);
508483
for (auto const& refentry : inputrefs) {
509484
auto& sector = refentry.first;
510485
auto& ref = refentry.second.data;
511-
if (ref.payload == nullptr) {
512-
// skip zero-length message
513-
continue;
514-
}
515-
if (refentry.second.labels.header != nullptr && refentry.second.labels.payload != nullptr) {
516-
mcInputs.emplace_back(ConstMCLabelContainerView(pc.inputs().get<gsl::span<char>>(refentry.second.labels)));
517-
}
518-
inputs.emplace_back(gsl::span(ref.payload, DataRefUtils::getPayloadSize(ref)));
519-
printInputLog(ref, "received", sector);
520-
}
521-
assert(mcInputs.size() == 0 || mcInputs.size() == inputs.size());
522-
if (verbosity > 0) {
523-
// make human readable information from the bitfield
524-
std::string bitInfo;
525-
auto nActiveBits = validInputs.count();
526-
if (((uint64_t)0x1 << nActiveBits) == validInputs.to_ulong() + 1) {
527-
// sectors 0 to some upper bound are active
528-
bitInfo = "0-" + std::to_string(nActiveBits - 1);
529-
} else {
530-
int rangeStart = -1;
531-
int rangeEnd = -1;
532-
for (size_t sector = 0; sector < validInputs.size(); sector++) {
533-
if (validInputs.test(sector)) {
534-
if (rangeStart < 0) {
535-
if (rangeEnd >= 0) {
536-
bitInfo += ",";
537-
}
538-
bitInfo += std::to_string(sector);
539-
if (nActiveBits == 1) {
540-
break;
541-
}
542-
rangeStart = sector;
543-
}
544-
rangeEnd = sector;
545-
} else {
546-
if (rangeStart >= 0 && rangeEnd > rangeStart) {
547-
bitInfo += "-" + std::to_string(rangeEnd);
548-
}
549-
rangeStart = -1;
550-
}
486+
if (!specconfig.caClusterer) {
487+
if (ref.payload == nullptr) {
488+
// skip zero-length message
489+
continue;
551490
}
552-
if (rangeStart >= 0 && rangeEnd > rangeStart) {
553-
bitInfo += "-" + std::to_string(rangeEnd);
491+
if (refentry.second.labels.header != nullptr && refentry.second.labels.payload != nullptr) {
492+
mcInputs.emplace_back(ConstMCLabelContainerView(pc.inputs().get<gsl::span<char>>(refentry.second.labels)));
554493
}
494+
inputs.emplace_back(gsl::span(ref.payload, DataRefUtils::getPayloadSize(ref)));
555495
}
556-
LOG(INFO) << "running tracking for sector(s) " << bitInfo;
496+
if (verbosity > 1) {
497+
LOG(INFO) << "received " << *(ref.spec) << ", size " << DataRefUtils::getPayloadSize(ref) << " for sector " << sector;
498+
}
499+
}
500+
if (verbosity) {
501+
LOGF(INFO, "running tracking for sector(s) 0x%09x", processAttributes->tpcSectorMask);
557502
}
558503
}
559504

@@ -592,26 +537,18 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
592537
ptrs.compressedClusters = pCompClustersFlat;
593538
} else {
594539
memset(&clusterIndex, 0, sizeof(clusterIndex));
595-
ClusterNativeHelper::Reader::fillIndex(clusterIndex, clusterBuffer, clustersMCBuffer, inputs, mcInputs, [&validInputs](auto& index) { return validInputs.test(index); });
540+
ClusterNativeHelper::Reader::fillIndex(clusterIndex, clusterBuffer, clustersMCBuffer, inputs, mcInputs, [&processAttributes](auto& index) { return processAttributes->tpcSectorMask & (1ul << index); });
596541
ptrs.clusters = &clusterIndex;
597542
}
598543
// a byte size resizable vector object, the DataAllocator returns reference to internal object
599544
// initialize optional pointer to the vector object
600545
using O2CharVectorOutputType = std::decay_t<decltype(pc.outputs().make<std::vector<char>>(Output{"", "", 0}))>;
601546
TPCSectorHeader clusterOutputSectorHeader{0};
602547
if (processAttributes->clusterOutputIds.size() > 0) {
603-
if (activeSectors == 0) {
604-
// there is no sector header shipped with the ZS raw data and thus we do not have
605-
// a valid activeSector variable, though it will be needed downstream
606-
// FIXME: check if this can be provided upstream
607-
for (auto const& sector : processAttributes->clusterOutputIds) {
608-
activeSectors |= 0x1 << sector;
609-
}
610-
}
611-
clusterOutputSectorHeader.sectorBits = activeSectors;
548+
clusterOutputSectorHeader.sectorBits = processAttributes->tpcSectorMask;
612549
// subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors
613550
// to indicate the full TPC
614-
clusterOutputSectorHeader.activeSectors = activeSectors;
551+
clusterOutputSectorHeader.activeSectors = processAttributes->tpcSectorMask;
615552
}
616553

617554
GPUInterfaceOutputs outputRegions;
@@ -706,11 +643,6 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
706643
pc.outputs().snapshot({gDataOriginTPC, "CLNATIVEMCLBL", subspec, Lifetime::Timeframe, {clusterOutputSectorHeader}}, clustersMCBuffer.first);
707644
}
708645
}
709-
710-
validInputs.reset();
711-
if (specconfig.processMC) {
712-
validMcInputs.reset();
713-
}
714646
};
715647

716648
return processingFct;

Detectors/TPC/workflow/src/RecoWorkflow.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,11 +417,11 @@ framework::WorkflowSpec getWorkflow(std::vector<int> const& tpcSectors, std::vec
417417
}
418418

419419
if (zsOnTheFly) {
420-
specs.emplace_back(o2::tpc::getZSEncoderSpec(laneConfiguration, zs10bit, zsThreshold, outRaw));
420+
specs.emplace_back(o2::tpc::getZSEncoderSpec(tpcSectors, zs10bit, zsThreshold, outRaw));
421421
}
422422

423423
if (zsToDigit) {
424-
specs.emplace_back(o2::tpc::getZStoDigitsSpec(laneConfiguration));
424+
specs.emplace_back(o2::tpc::getZStoDigitsSpec(tpcSectors));
425425
}
426426

427427
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -442,7 +442,7 @@ framework::WorkflowSpec getWorkflow(std::vector<int> const& tpcSectors, std::vec
442442
runClusterEncoder ? ca::Operation::OutputCompClustersFlat : ca::Operation::Noop,
443443
isEnabled(OutputType::Clusters) && (caClusterer || decompressTPC) ? ca::Operation::OutputCAClusters : ca::Operation::Noop,
444444
},
445-
laneConfiguration));
445+
tpcSectors));
446446
}
447447

448448
//////////////////////////////////////////////////////////////////////////////////////////////

Detectors/TPC/workflow/src/tpc-reco-workflow.cxx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,9 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
114114
std::vector<int> tpcSectors(36);
115115
std::iota(tpcSectors.begin(), tpcSectors.end(), 0);
116116
// the lane configuration defines the subspecification ids to be distributed among the lanes.
117-
std::vector<int> laneConfiguration;
117+
std::vector<int> laneConfiguration = tpcSectors; // Currently just a copy of the tpcSectors, why?
118118
auto nLanes = cfgc.options().get<int>("tpc-lanes");
119119
auto inputType = cfgc.options().get<std::string>("input-type");
120-
laneConfiguration = tpcSectors;
121120

122121
// depending on whether to dispatch early (prompt) and on the input type, we
123122
// set the matcher. Note that this has to be in accordance with the OutputSpecs

0 commit comments

Comments
 (0)