Skip to content

Commit 811056e

Browse files
committed
DPL: fix route duplication due to wildcards
There are cases in which using a wildcard matcher in a consumer would result in duplicated input routes, while the wanted behavior is to have a single input route processing one matching message after the other. This makes sure that there is no duplication in such case. For example consider producer PRODUCER producing A/1 and A/2, and CONSUMER matching on A/*, this will now invoke the processing callback when either A/1 or A/2 is received.
1 parent c6b4a12 commit 811056e

14 files changed

Lines changed: 321 additions & 23 deletions

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ foreach(t
177177
PtrHelpers
178178
Root2ArrowTable
179179
Services
180+
SimpleWildcard
180181
StringHelpers
181182
SuppressionGenerator
182183
TMessageSerializer

Framework/Core/include/Framework/DataSpecUtils.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ struct DataSpecUtils {
144144
/// and we can add corner cases as we go.
145145
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(InputSpec const& spec);
146146

147+
/// If possible extract the DataOrigin from an InputSpec.
148+
/// This will not always be possible, depending on how complex of
149+
/// a query the InputSpec does, however in most cases it should be ok
150+
/// and we can add corner cases as we go.
151+
static header::DataOrigin asConcreteOrigin(InputSpec const& spec);
152+
147153
/// Create an InputSpec which is able to match all the outputs of the given
148154
/// OutputSpec
149155
static InputSpec matchingInput(OutputSpec const& spec);
@@ -156,6 +162,9 @@ struct DataSpecUtils {
156162

157163
/// Build a DataDescriptMatcher which does not care about the subSpec.
158164
static data_matcher::DataDescriptorMatcher dataDescriptorMatcherFrom(ConcreteDataTypeMatcher const& dataType);
165+
166+
/// Build a DataDescriptMatcher which does not care about the subSpec and description.
167+
static data_matcher::DataDescriptorMatcher dataDescriptorMatcherFrom(header::DataOrigin const& origin);
159168
};
160169

161170
} // namespace framework

Framework/Core/include/Framework/InputRoute.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ struct InputRoute {
3838
using DanglingConfigurator = std::function<ExpirationHandler::Checker(ConfigParamRegistry const&)>;
3939
using ExpirationConfigurator = std::function<ExpirationHandler::Handler(ConfigParamRegistry const&)>;
4040

41+
// FIXME: This should really go away and we should make sure that
42+
// whenever we pass the input routes we also have the associated
43+
// input specs available.
4144
InputSpec matcher;
45+
size_t inputSpecIndex;
4246
std::string sourceChannel;
4347
size_t timeslice;
4448
CreationConfigurator creatorConfigurator = nullptr;

Framework/Core/src/DataRelayer.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ namespace framework
3636

3737
namespace
3838
{
39+
/// Calculate how many input routes there are, doublecounting different
40+
/// timeslices.
3941
std::vector<size_t> createDistinctRouteIndex(std::vector<InputRoute> const& routes)
4042
{
4143
std::vector<size_t> result;

Framework/Core/src/DataSpecUtils.cxx

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,7 @@ bool DataSpecUtils::partialMatch(OutputSpec const& output, header::DataOrigin co
261261

262262
bool DataSpecUtils::partialMatch(InputSpec const& input, header::DataOrigin const& origin)
263263
{
264-
auto dataType = DataSpecUtils::asConcreteDataTypeMatcher(input);
265-
return dataType.origin == origin;
264+
return DataSpecUtils::asConcreteOrigin(input) == origin;
266265
}
267266

268267
ConcreteDataMatcher DataSpecUtils::asConcreteDataMatcher(InputSpec const& spec)
@@ -387,6 +386,22 @@ ConcreteDataTypeMatcher DataSpecUtils::asConcreteDataTypeMatcher(InputSpec const
387386
spec.matcher);
388387
}
389388

389+
header::DataOrigin DataSpecUtils::asConcreteOrigin(InputSpec const& spec)
390+
{
391+
return std::visit(overloaded{
392+
[](ConcreteDataMatcher const& concrete) {
393+
return concrete.origin;
394+
},
395+
[](DataDescriptorMatcher const& matcher) {
396+
auto state = extractMatcherInfo(matcher);
397+
if (state.hasUniqueOrigin) {
398+
return state.origin;
399+
}
400+
throw std::runtime_error("Could not extract data type from query");
401+
}},
402+
spec.matcher);
403+
}
404+
390405
DataDescriptorMatcher DataSpecUtils::dataDescriptorMatcherFrom(ConcreteDataTypeMatcher const& dataType)
391406
{
392407
auto timeDescriptionMatcher = std::make_unique<DataDescriptorMatcher>(
@@ -399,6 +414,24 @@ DataDescriptorMatcher DataSpecUtils::dataDescriptorMatcherFrom(ConcreteDataTypeM
399414
std::move(timeDescriptionMatcher)));
400415
}
401416

417+
DataDescriptorMatcher DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin const& origin)
418+
{
419+
char buf[5] = {0, 0, 0, 0, 0};
420+
strncpy(buf, origin.str, 4);
421+
DataDescriptorMatcher matchOnlyOrigin{
422+
DataDescriptorMatcher::Op::And,
423+
OriginValueMatcher{buf},
424+
std::make_unique<DataDescriptorMatcher>(
425+
DataDescriptorMatcher::Op::And,
426+
DescriptionValueMatcher{ContextRef{1}},
427+
std::make_unique<DataDescriptorMatcher>(
428+
DataDescriptorMatcher::Op::And,
429+
SubSpecificationTypeValueMatcher{ContextRef{2}},
430+
std::make_unique<DataDescriptorMatcher>(DataDescriptorMatcher::Op::Just,
431+
StartTimeValueMatcher{ContextRef{0}})))};
432+
return std::move(matchOnlyOrigin);
433+
}
434+
402435
InputSpec DataSpecUtils::matchingInput(OutputSpec const& spec)
403436
{
404437
return std::visit(overloaded{

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,11 +517,25 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector<DeviceSpec>& devices,
517517

518518
InputRoute route{
519519
inputSpec,
520+
edge.consumerInputIndex,
520521
sourceChannel,
521522
edge.producerTimeIndex,
522523
creationConfigurator,
523524
danglingConfigurator,
524525
expirationConfigurator};
526+
// In case we have wildcards, we must make sure that some other edge
527+
// produced the same route, i.e. has the same matcher. Without this,
528+
// otherwise, we would end up with as many input routes as the outputs that
529+
// can be matched by the wildcard.
530+
for (size_t iri = 0; iri < consumerDevice.inputs.size(); ++iri) {
531+
auto& existingRoute = consumerDevice.inputs[iri];
532+
if (existingRoute.timeslice != edge.producerTimeIndex) {
533+
continue;
534+
}
535+
if (existingRoute.inputSpecIndex == edge.consumerInputIndex) {
536+
return;
537+
}
538+
}
525539

526540
consumerDevice.inputs.push_back(route);
527541
};

Framework/Core/test/benchmark_DataRelayer.cxx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ static void BM_RelayMessageCreation(benchmark::State& state)
3131
InputSpec spec{"clusters", "TPC", "CLUSTERS"};
3232

3333
std::vector<InputRoute> inputs = {
34-
InputRoute{spec, "Fake", 0}};
34+
InputRoute{spec, 0, "Fake", 0}};
3535

3636
std::vector<ForwardRoute> forwards;
3737
TimesliceIndex index;
@@ -72,7 +72,7 @@ static void BM_RelaySingleSlot(benchmark::State& state)
7272
InputSpec spec{"clusters", "TPC", "CLUSTERS"};
7373

7474
std::vector<InputRoute> inputs = {
75-
InputRoute{spec, "Fake", 0}};
75+
InputRoute{spec, 0, "Fake", 0}};
7676

7777
std::vector<ForwardRoute> forwards;
7878
TimesliceIndex index;
@@ -120,7 +120,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state)
120120
InputSpec spec{"clusters", "TPC", "CLUSTERS"};
121121

122122
std::vector<InputRoute> inputs = {
123-
InputRoute{spec, "Fake", 0}};
123+
InputRoute{spec, 0, "Fake", 0}};
124124

125125
std::vector<ForwardRoute> forwards;
126126
TimesliceIndex index;
@@ -171,8 +171,8 @@ static void BM_RelayMultipleRoutes(benchmark::State& state)
171171
InputSpec spec2{"tracks", "TPC", "TRACKS"};
172172

173173
std::vector<InputRoute> inputs = {
174-
InputRoute{spec1, "Fake1", 0},
175-
InputRoute{spec2, "Fake2", 0}};
174+
InputRoute{spec1, 0, "Fake1", 0},
175+
InputRoute{spec2, 1, "Fake2", 0}};
176176

177177
std::vector<ForwardRoute> forwards;
178178
TimesliceIndex index;

Framework/Core/test/benchmark_InputRecord.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@ static void BM_InputRecordGenericGetters(benchmark::State& state)
3131
InputSpec spec2{"y", "ITS", "CLUSTERS", 0, Lifetime::Timeframe};
3232
InputSpec spec3{"z", "TST", "EMPTY", 0, Lifetime::Timeframe};
3333

34-
auto createRoute = [](const char* source, InputSpec& spec) {
34+
size_t i = 0;
35+
auto createRoute = [&i](const char* source, InputSpec& spec) {
3536
return InputRoute{
3637
spec,
38+
i++,
3739
source};
3840
};
3941

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ BOOST_AUTO_TEST_CASE(TestNoWait)
3636
InputSpec spec{"clusters", "TPC", "CLUSTERS"};
3737

3838
std::vector<InputRoute> inputs = {
39-
InputRoute{spec, "Fake", 0}};
39+
InputRoute{spec, 0, "Fake", 0}};
4040

4141
std::vector<ForwardRoute> forwards;
4242
TimesliceIndex index;
@@ -75,7 +75,7 @@ BOOST_AUTO_TEST_CASE(TestNoWaitMatcher)
7575
auto specs = o2::framework::select("clusters:TPC/CLUSTERS");
7676

7777
std::vector<InputRoute> inputs = {
78-
InputRoute{specs[0], "Fake", 0}};
78+
InputRoute{specs[0], 0, "Fake", 0}};
7979

8080
std::vector<ForwardRoute> forwards;
8181
TimesliceIndex index;
@@ -124,8 +124,8 @@ BOOST_AUTO_TEST_CASE(TestRelay)
124124
};
125125

126126
std::vector<InputRoute> inputs = {
127-
InputRoute{spec1, "Fake1", 0},
128-
InputRoute{spec2, "Fake2", 0}};
127+
InputRoute{spec1, 0, "Fake1", 0},
128+
InputRoute{spec2, 1, "Fake2", 0}};
129129

130130
std::vector<ForwardRoute> forwards;
131131

@@ -191,8 +191,8 @@ BOOST_AUTO_TEST_CASE(TestRelayBug)
191191
};
192192

193193
std::vector<InputRoute> inputs = {
194-
InputRoute{spec1, "Fake1", 0},
195-
InputRoute{spec2, "Fake2", 0}};
194+
InputRoute{spec1, 0, "Fake1", 0},
195+
InputRoute{spec2, 1, "Fake2", 0}};
196196

197197
std::vector<ForwardRoute> forwards;
198198

@@ -261,7 +261,7 @@ BOOST_AUTO_TEST_CASE(TestCache)
261261
InputSpec spec{"clusters", "TPC", "CLUSTERS"};
262262

263263
std::vector<InputRoute> inputs = {
264-
InputRoute{spec, "Fake", 0}};
264+
InputRoute{spec, 0, "Fake", 0}};
265265
std::vector<ForwardRoute> forwards;
266266

267267
auto policy = CompletionPolicyHelpers::consumeWhenAll();
@@ -325,8 +325,8 @@ BOOST_AUTO_TEST_CASE(TestPolicies)
325325
InputSpec spec2{"tracks", "TPC", "TRACKS"};
326326

327327
std::vector<InputRoute> inputs = {
328-
InputRoute{spec1, "Fake1", 0},
329-
InputRoute{spec2, "Fake2", 0},
328+
InputRoute{spec1, 0, "Fake1", 0},
329+
InputRoute{spec2, 1, "Fake2", 0},
330330
};
331331

332332
std::vector<ForwardRoute> forwards;

Framework/Core/test/test_DeviceSpec.cxx

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,3 +656,90 @@ BOOST_AUTO_TEST_CASE(TestTopologyLayeredTimePipeline)
656656
BOOST_CHECK_EQUAL(devices[5].inputChannels[2].port, 22008);
657657
BOOST_REQUIRE_EQUAL(devices[5].outputChannels.size(), 0);
658658
}
659+
660+
// Test the case in which we have one source with two
661+
// description and a wildcard for both description and
662+
// subspec on the receiving side:
663+
//
664+
// A/1
665+
// \ B
666+
// /
667+
// A/2
668+
WorkflowSpec defineDataProcessing8()
669+
{
670+
return {
671+
{"A", Inputs{InputSpec{"timer", "DPL", "TIMER", 0, Lifetime::Timer}}, {OutputSpec{"A", "1"}, OutputSpec{"A", "2"}}},
672+
{"B", {InputSpec{"x", DataSpecUtils::dataDescriptorMatcherFrom(o2::header::DataOrigin{"A"})}}},
673+
{"internal-dpl-timer", {}, {OutputSpec{"DPL", "TIMER", 0, Lifetime::Timer}}}};
674+
}
675+
BOOST_AUTO_TEST_CASE(TestSimpleWildcard)
676+
{
677+
auto workflow = defineDataProcessing8();
678+
SimpleResourceManager rm(22000, 1000);
679+
auto resources = rm.getAvailableResources();
680+
auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies();
681+
682+
std::vector<DeviceSpec> devices;
683+
std::vector<DeviceId> deviceIndex;
684+
std::vector<DeviceConnectionId> connections;
685+
std::vector<LogicalForwardInfo> availableForwardsInfo;
686+
687+
std::vector<OutputSpec> globalOutputs = {OutputSpec{"A", "1"},
688+
OutputSpec{"A", "2"},
689+
OutputSpec{"DPL", "TIMER", 0, Lifetime::Timer}};
690+
691+
// See values in test_WorkflowHelpers.cxx
692+
std::vector<size_t> edgeOutIndex{1, 2, 0};
693+
std::vector<size_t> edgeInIndex{0, 1, 2};
694+
std::vector<DeviceConnectionEdge> logicalEdges = {
695+
{2, 0, 0, 0, 2, 0, false, ConnectionKind::Out},
696+
{0, 1, 0, 0, 0, 0, false, ConnectionKind::Out},
697+
{0, 1, 0, 0, 1, 0, false, ConnectionKind::Out},
698+
};
699+
700+
// See values in test_WorkflowHelpers.cxx
701+
std::vector<EdgeAction> outActions{
702+
EdgeAction{true, true},
703+
EdgeAction{true, true},
704+
EdgeAction{false, false},
705+
};
706+
707+
// See values in test_WorkflowHelpers.cxx
708+
std::vector<EdgeAction> inActions{
709+
EdgeAction{true, true},
710+
EdgeAction{true, true},
711+
EdgeAction{false, false},
712+
};
713+
714+
DeviceSpecHelpers::processOutEdgeActions(devices, deviceIndex, connections, resources, edgeOutIndex, logicalEdges,
715+
outActions, workflow, globalOutputs, channelPolicies);
716+
717+
BOOST_REQUIRE_EQUAL(devices.size(), 2); // Two devices have outputs: A and Timer
718+
BOOST_CHECK_EQUAL(devices[0].name, "A");
719+
BOOST_CHECK_EQUAL(devices[1].name, "internal-dpl-timer");
720+
BOOST_REQUIRE_EQUAL(deviceIndex.size(), 2);
721+
BOOST_CHECK_EQUAL(deviceIndex[0].processorIndex, 0); // A is the first processor in the workflow
722+
BOOST_CHECK_EQUAL(deviceIndex[0].timeslice, 0); // There is no time pipelining
723+
BOOST_CHECK_EQUAL(deviceIndex[0].deviceIndex, 0); // It's also the first device created
724+
BOOST_CHECK_EQUAL(deviceIndex[1].processorIndex, 2); // TIMER is added only at the end
725+
BOOST_CHECK_EQUAL(deviceIndex[1].timeslice, 0); // There is no time pipelining
726+
BOOST_CHECK_EQUAL(deviceIndex[1].deviceIndex, 1); // It's the second device created
727+
728+
std::sort(connections.begin(), connections.end());
729+
730+
DeviceSpecHelpers::processInEdgeActions(devices, deviceIndex, resources, connections, edgeInIndex, logicalEdges,
731+
inActions, workflow, availableForwardsInfo, channelPolicies);
732+
733+
BOOST_REQUIRE_EQUAL(devices.size(), 3); // Now we also have B
734+
BOOST_CHECK_EQUAL(devices[0].name, "A");
735+
BOOST_CHECK_EQUAL(devices[1].name, "internal-dpl-timer");
736+
BOOST_CHECK_EQUAL(devices[2].name, "B");
737+
BOOST_REQUIRE_EQUAL(deviceIndex.size(), 3);
738+
BOOST_CHECK_EQUAL(deviceIndex[1].processorIndex, 1); // B is the second processor in the workflow
739+
BOOST_CHECK_EQUAL(deviceIndex[1].timeslice, 0); // There is no time pipelining
740+
BOOST_CHECK_EQUAL(deviceIndex[1].deviceIndex, 2); // It's the last device created because it's a sink
741+
742+
// We should have only one input, because the two outputs of A can
743+
// be captured by the generic matcher in B
744+
BOOST_REQUIRE_EQUAL(devices[2].inputs.size(), 1);
745+
}

0 commit comments

Comments
 (0)