@@ -86,23 +86,25 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
8686 constexpr static size_t NEndpoints = 20 ; // TODO: get from mapper?
8787 using ClusterGroupParser = o2::algorithm::ForwardParser<ClusterGroupHeader>;
8888 struct ProcessAttributes {
89- std::bitset<NSectors> validInputs = 0 ;
90- std::bitset<NSectors> validMcInputs = 0 ;
9189 std::unique_ptr<ClusterGroupParser> parser;
9290 std::unique_ptr<GPUCATracking> tracker;
9391 std::unique_ptr<GPUDisplayBackend> displayBackend;
9492 std::unique_ptr<TPCFastTransform> fastTransform;
9593 std::unique_ptr<TPCdEdxCalibrationSplines> dEdxSplines;
9694 std::unique_ptr<TPCCFCalibration> tpcCalibration;
97- int verbosity = 1 ;
9895 std::vector<int > clusterOutputIds;
96+ unsigned long outputBufferSize = 0 ;
97+ unsigned long tpcSectorMask = 0 ;
98+ int verbosity = 1 ;
9999 bool readyToQuit = false ;
100100 bool allocateOutputOnTheFly = false ;
101- unsigned long outputBufferSize = 0 ;
102101 bool suppressOutput = false ;
103102 };
104103
105104 auto processAttributes = std::make_shared<ProcessAttributes>();
105+ for (auto s : tpcsectors) {
106+ processAttributes->tpcSectorMask |= (1ul << s);
107+ }
106108 auto initFunction = [processAttributes, specconfig](InitContext& ic) {
107109 GPUO2InterfaceConfiguration config;
108110 GPUSettingsO2 confParam;
@@ -224,8 +226,6 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
224226 if (tracker->initialize (config) != 0 ) {
225227 throw std::invalid_argument (" GPUCATracking initialization failed" );
226228 }
227- processAttributes->validInputs .reset ();
228- processAttributes->validMcInputs .reset ();
229229 }
230230
231231 auto & callbacks = ic.services ().get <CallbackService>();
@@ -260,10 +260,8 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
260260 }
261261 auto & parser = processAttributes->parser ;
262262 auto & tracker = processAttributes->tracker ;
263- uint64_t activeSectors = 0 ;
264263 auto & verbosity = processAttributes->verbosity ;
265264 // FIXME cleanup almost duplicated code
266- auto & validMcInputs = processAttributes->validMcInputs ;
267265 std::vector<ConstMCLabelContainerView> mcInputs;
268266 std::vector<gsl::span<const char >> inputs;
269267 struct InputRef {
@@ -293,6 +291,7 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
293291 {" check" , ConcreteDataTypeMatcher{gDataOriginTPC , " DIGITSMCTR" }, Lifetime::Timeframe},
294292 {" check" , ConcreteDataTypeMatcher{gDataOriginTPC , " CLNATIVEMCLBL" }, Lifetime::Timeframe},
295293 };
294+ unsigned long recvMask = 0 ;
296295 for (auto const & ref : InputRecordWalker (pc.inputs (), filter)) {
297296 auto const * sectorHeader = DataRefUtils::getHeader<TPCSectorHeader*>(ref);
298297 if (sectorHeader == nullptr ) {
@@ -304,81 +303,80 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
304303 if (sector < 0 ) {
305304 continue ;
306305 }
307- std::bitset<NSectors> sectorMask (sectorHeader->sectorBits );
308- if ((validMcInputs & sectorMask).any ()) {
309- // have already data for this sector, this should not happen in the current
310- // sequential implementation, for parallel path merged at the tracker stage
311- // multiple buffers need to be handled
306+ if (recvMask & sectorHeader->sectorBits ) {
312307 throw std::runtime_error (" can only have one MC data set per sector" );
313308 }
309+ recvMask |= sectorHeader->sectorBits ;
314310 inputrefs[sector].labels = ref;
315311 if (specconfig.caClusterer ) {
316312 inputDigitsMCIndex[sector] = inputDigitsMC.size ();
317313 inputDigitsMC.emplace_back (ConstMCLabelContainerView (pc.inputs ().get <gsl::span<char >>(ref)));
318314 }
319- validMcInputs |= sectorMask;
320- activeSectors |= sectorHeader->activeSectors ;
321- if (verbosity > 1 ) {
322- LOG (INFO) << " received " << *(ref.spec ) << " MC label containers"
323- << " for sectors " << sectorMask //
324- << std::endl //
325- << " mc input status: " << validMcInputs //
326- << std::endl //
327- << " active sectors: " << std::bitset<NSectors>(activeSectors); //
328- }
329315 }
330- for (unsigned int i = 0 ; i < NSectors; i++) {
331- inputDigitsMCPtrs[i] = &inputDigitsMC[inputDigitsMCIndex[i]];
316+ if (recvMask != processAttributes->tpcSectorMask ) {
317+ throw std::runtime_error (" Incomplete set of MC labels received" );
318+ }
319+ if (specconfig.caClusterer ) {
320+ for (unsigned int i = 0 ; i < NSectors; i++) {
321+ LOG (INFO) << " GOT MC LABELS FOR SECTOR " << i << " -> " << inputDigitsMC[inputDigitsMCIndex[i]].getNElements ();
322+ inputDigitsMCPtrs[i] = &inputDigitsMC[inputDigitsMCIndex[i]];
323+ }
332324 }
333325 }
334326
335- auto & validInputs = processAttributes->validInputs ;
336- int operation = 0 ;
337- std::vector<InputSpec> filter = {
338- {" check" , ConcreteDataTypeMatcher{gDataOriginTPC , " DIGITS" }, Lifetime::Timeframe},
339- {" check" , ConcreteDataTypeMatcher{gDataOriginTPC , " CLUSTERNATIVE" }, Lifetime::Timeframe},
340- };
341-
342- for (auto const & ref : InputRecordWalker (pc.inputs (), filter)) {
343- auto const * sectorHeader = DataRefUtils::getHeader<TPCSectorHeader*>(ref);
344- if (sectorHeader == nullptr ) {
345- throw std::runtime_error (" sector header missing on header stack" );
346- }
347- const int sector = sectorHeader->sector ();
348- if (sector < 0 ) {
349- continue ;
350- }
351- std::bitset<NSectors> sectorMask (sectorHeader->sectorBits );
352- if ((validInputs & sectorMask).any ()) {
353- // have already data for this sector, this should not happen in the current
354- // sequential implementation, for parallel path merged at the tracker stage
355- // multiple buffers need to be handled
356- throw std::runtime_error (" can only have one cluster data set per sector" );
327+ if (!specconfig.decompressTPC && (!specconfig.caClusterer || ((!specconfig.zsOnTheFly || specconfig.processMC ) && !specconfig.zsDecoder ))) {
328+ std::vector<InputSpec> filter = {
329+ {" check" , ConcreteDataTypeMatcher{gDataOriginTPC , " DIGITS" }, Lifetime::Timeframe},
330+ {" check" , ConcreteDataTypeMatcher{gDataOriginTPC , " CLUSTERNATIVE" }, Lifetime::Timeframe},
331+ };
332+ unsigned long recvMask = 0 ;
333+ for (auto const & ref : InputRecordWalker (pc.inputs (), filter)) {
334+ auto const * sectorHeader = DataRefUtils::getHeader<TPCSectorHeader*>(ref);
335+ if (sectorHeader == nullptr ) {
336+ throw std::runtime_error (" sector header missing on header stack" );
337+ }
338+ const int sector = sectorHeader->sector ();
339+ if (sector < 0 ) {
340+ continue ;
341+ }
342+ if (recvMask & sectorHeader->sectorBits ) {
343+ throw std::runtime_error (" can only have one cluster data set per sector" );
344+ }
345+ recvMask |= sectorHeader->sectorBits ;
346+ inputrefs[sector].data = ref;
347+ if (specconfig.caClusterer && (!specconfig.zsOnTheFly || specconfig.processMC )) {
348+ inputDigits[sector] = pc.inputs ().get <gsl::span<o2::tpc::Digit>>(ref);
349+ LOG (INFO) << " GOT DIGITS SPAN FOR SECTOR " << sector << " -> " << inputDigits[sector].size ();
350+ }
357351 }
358- activeSectors |= sectorHeader->activeSectors ;
359- validInputs |= sectorMask;
360- inputrefs[sector].data = ref;
361- if (specconfig.caClusterer && (!specconfig.zsOnTheFly || specconfig.processMC )) {
362- inputDigits[sector] = pc.inputs ().get <gsl::span<o2::tpc::Digit>>(ref);
363- LOG (INFO) << " GOT DIGITS SPAN FOR SECTOR " << sector << " -> " << inputDigits[sector].size ();
352+ if (recvMask != processAttributes->tpcSectorMask ) {
353+ throw std::runtime_error (" Incomplete set of clusters/digits received" );
364354 }
365355 }
356+
366357 if (specconfig.zsOnTheFly ) {
367358 tpcZSonTheFlySizes = {0 };
368359 // tpcZSonTheFlySizes: #zs pages per endpoint:
369360 std::vector<InputSpec> filter = {{" check" , ConcreteDataTypeMatcher{gDataOriginTPC , " ZSSIZES" }, Lifetime::Timeframe}};
361+ bool recv = false , recvsizes = false ;
370362 for (auto const & ref : InputRecordWalker (pc.inputs (), filter)) {
363+ if (recvsizes) {
364+ throw std::runtime_error (" Received multiple ZSSIZES data" );
365+ }
371366 tpcZSonTheFlySizes = pc.inputs ().get <std::array<unsigned int , NEndpoints * NSectors>>(ref);
367+ recvsizes = true ;
372368 }
373369 // zs pages
374370 std::vector<InputSpec> filter2 = {{" check" , ConcreteDataTypeMatcher{gDataOriginTPC , " TPCZS" }, Lifetime::Timeframe}};
375371 for (auto const & ref : InputRecordWalker (pc.inputs (), filter2)) {
372+ if (recv) {
373+ throw std::runtime_error (" Received multiple TPCZS data" );
374+ }
376375 inputZS = pc.inputs ().get <gsl::span<ZeroSuppressedContainer8kb>>(ref);
376+ recv = true ;
377377 }
378- // set all sectors as active and as valid inputs
379- for (int s = 0 ; s < NSectors; s++) {
380- activeSectors |= 1 << s;
381- validInputs.set (s);
378+ if (!recv || !recvsizes) {
379+ throw std::runtime_error (" TPC ZS data not received" );
382380 }
383381 for (unsigned int i = 0 ; i < GPUTrackingInOutZS::NSLICES; i++) {
384382 for (unsigned int j = 0 ; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
@@ -482,78 +480,25 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
482480 pCompClustersFlat = pc.inputs ().get <CompressedClustersFlat*>(" input" ).get ();
483481 }
484482 } else if (!specconfig.zsOnTheFly ) {
485- // This code path should run optionally also for the zs decoder version
486- auto printInputLog = [&verbosity, &validInputs, &activeSectors](auto & r, const char * comment, auto & s) {
487- if (verbosity > 1 ) {
488- LOG (INFO) << comment << " " << *(r.spec ) << " , size " << DataRefUtils::getPayloadSize (r) //
489- << " for sector " << s //
490- << std::endl //
491- << " input status: " << validInputs //
492- << std::endl //
493- << " active sectors: " << std::bitset<NSectors>(activeSectors); //
494- }
495- };
496- // for digits and clusters we always have the sector information, activeSectors being zero
497- // is thus an error condition. The completion policy makes sure that the data set is complete
498- if (activeSectors == 0 || (activeSectors & validInputs.to_ulong ()) != activeSectors) {
499- throw std::runtime_error (" Incomplete input data, expecting complete data set, buffering has been removed " );
500- }
501- // MC label blocks must be in the same multimessage with the corresponding data, the completion
502- // policy does not check for the MC labels and expects them to be present and thus complete if
503- // the data is complete
504- if (specconfig.processMC && (activeSectors & validMcInputs.to_ulong ()) != activeSectors) {
505- throw std::runtime_error (" Incomplete mc label input, expecting complete data set, buffering has been removed" );
506- }
507- assert (specconfig.processMC == false || validMcInputs == validInputs);
508483 for (auto const & refentry : inputrefs) {
509484 auto & sector = refentry.first ;
510485 auto & ref = refentry.second .data ;
511- if (ref.payload == nullptr ) {
512- // skip zero-length message
513- continue ;
514- }
515- if (refentry.second .labels .header != nullptr && refentry.second .labels .payload != nullptr ) {
516- mcInputs.emplace_back (ConstMCLabelContainerView (pc.inputs ().get <gsl::span<char >>(refentry.second .labels )));
517- }
518- inputs.emplace_back (gsl::span (ref.payload , DataRefUtils::getPayloadSize (ref)));
519- printInputLog (ref, " received" , sector);
520- }
521- assert (mcInputs.size () == 0 || mcInputs.size () == inputs.size ());
522- if (verbosity > 0 ) {
523- // make human readable information from the bitfield
524- std::string bitInfo;
525- auto nActiveBits = validInputs.count ();
526- if (((uint64_t )0x1 << nActiveBits) == validInputs.to_ulong () + 1 ) {
527- // sectors 0 to some upper bound are active
528- bitInfo = " 0-" + std::to_string (nActiveBits - 1 );
529- } else {
530- int rangeStart = -1 ;
531- int rangeEnd = -1 ;
532- for (size_t sector = 0 ; sector < validInputs.size (); sector++) {
533- if (validInputs.test (sector)) {
534- if (rangeStart < 0 ) {
535- if (rangeEnd >= 0 ) {
536- bitInfo += " ," ;
537- }
538- bitInfo += std::to_string (sector);
539- if (nActiveBits == 1 ) {
540- break ;
541- }
542- rangeStart = sector;
543- }
544- rangeEnd = sector;
545- } else {
546- if (rangeStart >= 0 && rangeEnd > rangeStart) {
547- bitInfo += " -" + std::to_string (rangeEnd);
548- }
549- rangeStart = -1 ;
550- }
486+ if (!specconfig.caClusterer ) {
487+ if (ref.payload == nullptr ) {
488+ // skip zero-length message
489+ continue ;
551490 }
552- if (rangeStart >= 0 && rangeEnd > rangeStart ) {
553- bitInfo += " - " + std::to_string (rangeEnd );
491+ if (refentry. second . labels . header != nullptr && refentry. second . labels . payload != nullptr ) {
492+ mcInputs. emplace_back ( ConstMCLabelContainerView (pc. inputs (). get <gsl::span< char >>(refentry. second . labels )) );
554493 }
494+ inputs.emplace_back (gsl::span (ref.payload , DataRefUtils::getPayloadSize (ref)));
555495 }
556- LOG (INFO) << " running tracking for sector(s) " << bitInfo;
496+ if (verbosity > 1 ) {
497+ LOG (INFO) << " received " << *(ref.spec ) << " , size " << DataRefUtils::getPayloadSize (ref) << " for sector " << sector;
498+ }
499+ }
500+ if (verbosity) {
501+ LOGF (INFO, " running tracking for sector(s) 0x%09x" , processAttributes->tpcSectorMask );
557502 }
558503 }
559504
@@ -592,26 +537,18 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
592537 ptrs.compressedClusters = pCompClustersFlat;
593538 } else {
594539 memset (&clusterIndex, 0 , sizeof (clusterIndex));
595- ClusterNativeHelper::Reader::fillIndex (clusterIndex, clusterBuffer, clustersMCBuffer, inputs, mcInputs, [&validInputs ](auto & index) { return validInputs. test ( index); });
540+ ClusterNativeHelper::Reader::fillIndex (clusterIndex, clusterBuffer, clustersMCBuffer, inputs, mcInputs, [&processAttributes ](auto & index) { return processAttributes-> tpcSectorMask & ( 1ul << index); });
596541 ptrs.clusters = &clusterIndex;
597542 }
598543 // a byte size resizable vector object, the DataAllocator returns reference to internal object
599544 // initialize optional pointer to the vector object
600545 using O2CharVectorOutputType = std::decay_t <decltype (pc.outputs ().make <std::vector<char >>(Output{" " , " " , 0 }))>;
601546 TPCSectorHeader clusterOutputSectorHeader{0 };
602547 if (processAttributes->clusterOutputIds .size () > 0 ) {
603- if (activeSectors == 0 ) {
604- // there is no sector header shipped with the ZS raw data and thus we do not have
605- // a valid activeSector variable, though it will be needed downstream
606- // FIXME: check if this can be provided upstream
607- for (auto const & sector : processAttributes->clusterOutputIds ) {
608- activeSectors |= 0x1 << sector;
609- }
610- }
611- clusterOutputSectorHeader.sectorBits = activeSectors;
548+ clusterOutputSectorHeader.sectorBits = processAttributes->tpcSectorMask ;
612549 // subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors
613550 // to indicate the full TPC
614- clusterOutputSectorHeader.activeSectors = activeSectors ;
551+ clusterOutputSectorHeader.activeSectors = processAttributes-> tpcSectorMask ;
615552 }
616553
617554 GPUInterfaceOutputs outputRegions;
@@ -706,11 +643,6 @@ DataProcessorSpec getCATrackerSpec(ca::Config const& specconfig, std::vector<int
706643 pc.outputs ().snapshot ({gDataOriginTPC , " CLNATIVEMCLBL" , subspec, Lifetime::Timeframe, {clusterOutputSectorHeader}}, clustersMCBuffer.first );
707644 }
708645 }
709-
710- validInputs.reset ();
711- if (specconfig.processMC ) {
712- validMcInputs.reset ();
713- }
714646 };
715647
716648 return processingFct;
0 commit comments