Skip to content

Commit 8345ea3

Browse files
committed
TPC: Support callback in SectorCompletionPolicy that defines an ordering
1 parent 1ab7698 commit 8345ea3

1 file changed

Lines changed: 23 additions & 5 deletions

File tree

Detectors/TPC/workflow/readers/include/TPCReaderWorkflow/TPCSectorCompletionPolicy.h

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "Framework/InputSpec.h"
2121
#include "Framework/InputSpan.h"
2222
#include "Framework/DeviceSpec.h"
23+
#include "Framework/DataProcessingHeader.h"
2324
#include "DataFormatsTPC/TPCSectorHeader.h"
2425
#include "Headers/DataHeaderHelpers.h"
2526
#include "TPCBase/Sector.h"
@@ -29,6 +30,7 @@
2930
#include <stdexcept>
3031
#include <sstream>
3132
#include <regex>
33+
#include <functional>
3234

3335
namespace o2
3436
{
@@ -89,7 +91,7 @@ class TPCSectorCompletionPolicy
8991
return std::regex_match(device.name.begin(), device.name.end(), std::regex(expression.c_str()));
9092
};
9193

92-
auto callback = [bRequireAll = mRequireAll, inputMatchers = mInputMatchers, externalInputMatchers = mExternalInputMatchers, pTpcSectorMask = mTpcSectorMask](framework::InputSpan const& inputs) -> framework::CompletionPolicy::CompletionOp {
94+
auto callback = [bRequireAll = mRequireAll, inputMatchers = mInputMatchers, externalInputMatchers = mExternalInputMatchers, pTpcSectorMask = mTpcSectorMask, orderCheck = mOrderCheck](framework::InputSpan const& inputs) -> framework::CompletionPolicy::CompletionOp {
9395
unsigned long tpcSectorMask = pTpcSectorMask ? *pTpcSectorMask : 0xFFFFFFFFF;
9496
std::bitset<NSectors> validSectors = 0;
9597
bool haveMatchedInput = false;
@@ -165,6 +167,7 @@ class TPCSectorCompletionPolicy
165167
}
166168
}
167169

170+
o2::framework::CompletionPolicy::CompletionOp retVal = framework::CompletionPolicy::CompletionOp::Wait;
168171
// If the flag Config::RequireAll is set in the constructor arguments we require
169172
// data from all inputs in addition to the sector matching condition
170173
// To be fully correct we would need to require data from all inputs not going
@@ -175,7 +178,7 @@ class TPCSectorCompletionPolicy
175178
(!bRequireAll || nActiveInputRoutes == inputs.size())) {
176179
// we can process if there is input for all sectors, the required sectors are
177180
// transported as part of the sector header
178-
return framework::CompletionPolicy::CompletionOp::Consume;
181+
retVal = framework::CompletionPolicy::CompletionOp::Consume;
179182
} else if (activeSectors == 0 && nActiveInputRoutes == inputs.size()) {
180183
// no sector header is transmitted, this is the case for e.g. the ZS raw data
181184
// we simply require input on all routes, this is also the default of DPL DataRelayer
@@ -184,13 +187,25 @@ class TPCSectorCompletionPolicy
184187
// Currently, the workflow has multiple O2 messages per input route, but they all come in
185188
// a single multipart message. So it works fine, and we disable the warning below, but there
186189
// is a potential problem. Need to fix this on the level of the workflow.
187-
//if (nMaxPartsPerRoute > 1) {
190+
// if (nMaxPartsPerRoute ) {
188191
// LOG(warning) << "No sector information is provided with the data, data set is complete with data on all input routes. But there are multiple parts on at least one route and this policy might not be complete, no check possible if other parts on some routes are still missing. It is adviced to add a custom policy.";
189192
//}
190-
return framework::CompletionPolicy::CompletionOp::Consume;
193+
retVal = framework::CompletionPolicy::CompletionOp::Consume;
191194
}
192195

193-
return framework::CompletionPolicy::CompletionOp::Wait;
196+
if (retVal != framework::CompletionPolicy::CompletionOp::Wait && orderCheck && *orderCheck && **orderCheck) {
197+
for (auto& input : inputs) {
198+
auto* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
199+
if (!dph) {
200+
continue;
201+
}
202+
if (!(**orderCheck)(dph->startTime)) {
203+
retVal = framework::CompletionPolicy::CompletionOp::Wait;
204+
}
205+
break;
206+
}
207+
}
208+
return retVal;
194209
};
195210
return framework::CompletionPolicy{"TPCSectorCompletionPolicy", matcher, callback};
196211
}
@@ -211,6 +226,8 @@ class TPCSectorCompletionPolicy
211226
}
212227
} else if constexpr (std::is_same_v<Type, std::vector<o2::framework::InputSpec>*>) {
213228
mExternalInputMatchers = arg;
229+
} else if constexpr (std::is_same_v<Type, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>**>) {
230+
mOrderCheck = arg;
214231
} else if constexpr (std::is_same_v<Type, unsigned long*> || std::is_same_v<Type, const unsigned long*>) {
215232
mTpcSectorMask = arg;
216233
} else {
@@ -223,6 +240,7 @@ class TPCSectorCompletionPolicy
223240

224241
std::string mProcessorName;
225242
std::vector<framework::InputSpec> mInputMatchers;
243+
std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** mOrderCheck = nullptr;
226244
// The external input matchers behave as the internal ones with the following differences:
227245
// - They are controlled externally and the external entity can modify them, e.g. after parsing command line arguments.
228246
// - They are all matched independently, it is not sufficient that one of them is present for all sectors

0 commit comments

Comments
 (0)