2525#include " Framework/WorkflowSpec.h"
2626#include " Framework/Logger.h"
2727#include " DetectorsRaw/RDHUtils.h"
28+ #include " Framework/InputRecordWalker.h"
2829
2930using namespace o2 ::framework;
3031
@@ -127,23 +128,7 @@ void CompressedDecodingTask::run(ProcessingContext& pc)
127128 mDecoder .setFirstIR ({0 , mInitOrbit });
128129 }
129130
130- /* * loop over inputs routes **/
131- for (auto iit = pc.inputs ().begin (), iend = pc.inputs ().end (); iit != iend; ++iit) {
132- if (!iit.isValid ()) {
133- continue ;
134- }
135-
136- /* * loop over input parts **/
137- for (auto const & ref : iit) {
138- const auto * headerIn = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
139- auto payloadIn = ref.payload ;
140- auto payloadInSize = headerIn->payloadSize ;
141-
142- DecoderBase::setDecoderBuffer (payloadIn);
143- DecoderBase::setDecoderBufferSize (payloadInSize);
144- DecoderBase::run ();
145- }
146- }
131+ decodeTF (pc);
147132
148133 if (!mConetMode ) {
149134 mHasToBePosted = true ;
@@ -163,6 +148,43 @@ void CompressedDecodingTask::endOfStream(EndOfStreamContext& ec)
163148 mTimer .CpuTime (), mTimer .RealTime (), mTimer .Counter () - 1 );
164149}
165150
151+ void CompressedDecodingTask::decodeTF (ProcessingContext& pc)
152+ {
153+ auto & inputs = pc.inputs ();
154+
155+ // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message"
156+ // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow
157+ {
158+ std::vector<InputSpec> dummy{InputSpec{" dummy" , ConcreteDataMatcher{" TOF" , mDataDesc , 0xDEADBEEF }}};
159+ for (const auto & ref : InputRecordWalker (inputs, dummy)) {
160+ const auto dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
161+ if (dh->payloadSize == 0 ) {
162+ LOGP (WARNING, " Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF" ,
163+ dh->dataOrigin .str , dh->dataDescription .str , dh->subSpecification , dh->tfCounter , dh->firstTForbit , dh->payloadSize );
164+ return ;
165+ }
166+ }
167+ }
168+
169+ /* * loop over inputs routes **/
170+ for (auto iit = pc.inputs ().begin (), iend = pc.inputs ().end (); iit != iend; ++iit) {
171+ if (!iit.isValid ()) {
172+ continue ;
173+ }
174+
175+ /* * loop over input parts **/
176+ for (auto const & ref : iit) {
177+ const auto * headerIn = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
178+ auto payloadIn = ref.payload ;
179+ auto payloadInSize = headerIn->payloadSize ;
180+
181+ DecoderBase::setDecoderBuffer (payloadIn);
182+ DecoderBase::setDecoderBufferSize (payloadInSize);
183+ DecoderBase::run ();
184+ }
185+ }
186+ }
187+
166188void CompressedDecodingTask::headerHandler (const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit)
167189{
168190 if (mConetMode ) {
@@ -362,6 +384,12 @@ void CompressedDecodingTask::frameHandler(const CrateHeader_t* crateHeader, cons
362384
363385DataProcessorSpec getCompressedDecodingSpec (const std::string& inputDesc, bool conet)
364386{
387+ std::vector<InputSpec> inputs;
388+ // inputs.emplace_back(std::string("x:TOF/" + inputDesc).c_str(), 0, Lifetime::Optional);
389+ o2::header::DataDescription dataDesc;
390+ dataDesc.runtimeInit (inputDesc.c_str ());
391+ inputs.emplace_back (" x" , ConcreteDataTypeMatcher{o2::header::gDataOriginTOF , dataDesc}, Lifetime::Optional);
392+
365393 std::vector<OutputSpec> outputs;
366394 outputs.emplace_back (o2::header::gDataOriginTOF , " DIGITHEADER" , 0 , Lifetime::Timeframe);
367395 outputs.emplace_back (o2::header::gDataOriginTOF , " DIGITS" , 0 , Lifetime::Timeframe);
@@ -371,9 +399,10 @@ DataProcessorSpec getCompressedDecodingSpec(const std::string& inputDesc, bool c
371399
372400 return DataProcessorSpec{
373401 " tof-compressed-decoder" ,
374- select (std::string (" x:TOF/" + inputDesc).c_str ()),
402+ inputs,
403+ // select(std::string("x:TOF/" + inputDesc).c_str()),
375404 outputs,
376- AlgorithmSpec{adaptFromTask<CompressedDecodingTask>(conet)},
405+ AlgorithmSpec{adaptFromTask<CompressedDecodingTask>(conet, dataDesc )},
377406 Options{
378407 {" row-filter" , VariantType::Bool, false , {" Filter empty row" }},
379408 {" mask-noise" , VariantType::Bool, false , {" Flag to mask noisy digits" }},
0 commit comments