2020#include " Framework/InputSpec.h"
2121#include " Framework/InputSpan.h"
2222#include " Framework/DeviceSpec.h"
23+ #include " Framework/DataProcessingHeader.h"
2324#include " DataFormatsTPC/TPCSectorHeader.h"
2425#include " Headers/DataHeaderHelpers.h"
2526#include " TPCBase/Sector.h"
2930#include < stdexcept>
3031#include < sstream>
3132#include < regex>
33+ #include < functional>
3234
3335namespace o2
3436{
@@ -89,7 +91,7 @@ class TPCSectorCompletionPolicy
8991 return std::regex_match (device.name .begin (), device.name .end (), std::regex (expression.c_str ()));
9092 };
9193
92- auto callback = [bRequireAll = mRequireAll , inputMatchers = mInputMatchers , externalInputMatchers = mExternalInputMatchers , pTpcSectorMask = mTpcSectorMask ](framework::InputSpan const & inputs) -> framework::CompletionPolicy::CompletionOp {
94+ auto callback = [bRequireAll = mRequireAll , inputMatchers = mInputMatchers , externalInputMatchers = mExternalInputMatchers , pTpcSectorMask = mTpcSectorMask , orderCheck = mOrderCheck ](framework::InputSpan const & inputs) -> framework::CompletionPolicy::CompletionOp {
9395 unsigned long tpcSectorMask = pTpcSectorMask ? *pTpcSectorMask : 0xFFFFFFFFF ;
9496 std::bitset<NSectors> validSectors = 0 ;
9597 bool haveMatchedInput = false ;
@@ -165,6 +167,7 @@ class TPCSectorCompletionPolicy
165167 }
166168 }
167169
170+ o2::framework::CompletionPolicy::CompletionOp retVal = framework::CompletionPolicy::CompletionOp::Wait;
168171 // If the flag Config::RequireAll is set in the constructor arguments we require
169172 // data from all inputs in addition to the sector matching condition
170173 // To be fully correct we would need to require data from all inputs not going
@@ -175,7 +178,7 @@ class TPCSectorCompletionPolicy
175178 (!bRequireAll || nActiveInputRoutes == inputs.size ())) {
176179 // we can process if there is input for all sectors, the required sectors are
177180 // transported as part of the sector header
178- return framework::CompletionPolicy::CompletionOp::Consume;
181+ retVal = framework::CompletionPolicy::CompletionOp::Consume;
179182 } else if (activeSectors == 0 && nActiveInputRoutes == inputs.size ()) {
180183 // no sector header is transmitted, this is the case for e.g. the ZS raw data
181184 // we simply require input on all routes, this is also the default of DPL DataRelayer
@@ -184,13 +187,25 @@ class TPCSectorCompletionPolicy
184187 // Currently, the workflow has multiple O2 messages per input route, but they all come in
185188 // a single multipart message. So it works fine, and we disable the warning below, but there
186189 // is a potential problem. Need to fix this on the level of the workflow.
187- // if (nMaxPartsPerRoute > 1 ) {
190+ // if (nMaxPartsPerRoute ) {
188191 // LOG(warning) << "No sector information is provided with the data, data set is complete with data on all input routes. But there are multiple parts on at least one route and this policy might not be complete, no check possible if other parts on some routes are still missing. It is adviced to add a custom policy.";
189192 // }
190- return framework::CompletionPolicy::CompletionOp::Consume;
193+ retVal = framework::CompletionPolicy::CompletionOp::Consume;
191194 }
192195
193- return framework::CompletionPolicy::CompletionOp::Wait;
196+ if (retVal != framework::CompletionPolicy::CompletionOp::Wait && orderCheck && *orderCheck && **orderCheck) {
197+ for (auto & input : inputs) {
198+ auto * dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
199+ if (!dph) {
200+ continue ;
201+ }
202+ if (!(**orderCheck)(dph->startTime )) {
203+ retVal = framework::CompletionPolicy::CompletionOp::Wait;
204+ }
205+ break ;
206+ }
207+ }
208+ return retVal;
194209 };
195210 return framework::CompletionPolicy{" TPCSectorCompletionPolicy" , matcher, callback};
196211 }
@@ -211,6 +226,8 @@ class TPCSectorCompletionPolicy
211226 }
212227 } else if constexpr (std::is_same_v<Type, std::vector<o2::framework::InputSpec>*>) {
213228 mExternalInputMatchers = arg;
229+ } else if constexpr (std::is_same_v<Type, std::function<bool (o2::framework::DataProcessingHeader::StartTime)>**>) {
230+ mOrderCheck = arg;
214231 } else if constexpr (std::is_same_v<Type, unsigned long *> || std::is_same_v<Type, const unsigned long *>) {
215232 mTpcSectorMask = arg;
216233 } else {
@@ -223,6 +240,7 @@ class TPCSectorCompletionPolicy
223240
224241 std::string mProcessorName ;
225242 std::vector<framework::InputSpec> mInputMatchers ;
243+ std::function<bool (o2::framework::DataProcessingHeader::StartTime)>** mOrderCheck = nullptr ;
226244 // The external input matchers behave as the internal ones with the following differences:
227245 // - They are controlled externally and the external entity can modify them, e.g. after parsing command line arguments.
228246 // - They are all matched independently, it is not sufficient that one of them is present for all sectors
0 commit comments